@@ -9,6 +9,7 @@ import 'package:sqlite_async/src/update_notification.dart';
99
1010import 'common/connection/sync_sqlite_connection.dart' ;
1111import 'common/mutex.dart' ;
12+ import 'utils/shared_utils.dart' ;
1213
1314/// Abstract class representing calls available in a read-only or read-write context.
1415abstract interface class SqliteReadContext {
@@ -102,7 +103,7 @@ abstract interface class SqliteWriteContext extends SqliteReadContext {
102103///
103104/// This package typically pools multiple [SqliteConnection] instances into a
104105/// managed [SqliteDatabase] automatically.
105- abstract interface class SqliteConnection extends SqliteWriteContext {
106+ abstract class SqliteConnection implements SqliteWriteContext {
106107 /// Default constructor for subclasses.
107108 SqliteConnection ();
108109
@@ -124,7 +125,7 @@ abstract interface class SqliteConnection extends SqliteWriteContext {
124125 }
125126
126127 /// Reports table change update notifications
127- Stream <UpdateNotification >? get updates;
128+ Stream <UpdateNotification > get updates;
128129
129130 /// Open a read-only transaction.
130131 ///
@@ -133,7 +134,11 @@ abstract interface class SqliteConnection extends SqliteWriteContext {
133134 /// instance will error.
134135 Future <T > readTransaction <T >(
135136 Future <T > Function (SqliteReadContext tx) callback,
136- {Duration ? lockTimeout});
137+ {Duration ? lockTimeout}) {
138+ return readLock ((ctx) async {
139+ return await internalReadTransaction (ctx, callback);
140+ }, lockTimeout: lockTimeout, debugContext: 'readTransaction()' );
141+ }
137142
138143 /// Open a read-write transaction.
139144 ///
@@ -147,16 +152,65 @@ abstract interface class SqliteConnection extends SqliteWriteContext {
147152 @override
148153 Future <T > writeTransaction <T >(
149154 Future <T > Function (SqliteWriteContext tx) callback,
150- {Duration ? lockTimeout});
155+ {Duration ? lockTimeout}) {
156+ return writeLock ((ctx) async {
157+ return ctx.writeTransaction (callback);
158+ }, lockTimeout: lockTimeout, debugContext: 'writeTransaction()' );
159+ }
160+
161+ /// Create a Stream of changes to any of the specified tables.
162+ ///
163+ /// This is preferred over [watch] when multiple queries need to be performed
164+ /// together when data is changed, e.g. like this:
165+ ///
166+ /// ```dart
167+ /// var subscription = db.onChange({'users', 'groups'}).asyncMap((event) async {
168+ /// await db.readTransaction((tx) async {
169+ /// var data = await tx.getAll('SELECT * ROM users');
170+ /// var moreData = await tx.getAll('SELECT * ROM groups');
171+ ///
172+ /// // Handle data here...
173+ /// });
174+ /// });
175+ /// ```
176+ Stream <UpdateNotification > onChange (Iterable <String >? tables,
177+ {Duration throttle = const Duration (milliseconds: 30 ),
178+ bool triggerImmediately = true }) {
179+ final filteredStream = tables != null
180+ ? updates.transform (UpdateNotification .filterTablesTransformer (tables))
181+ : updates;
182+ final throttledStream = UpdateNotification .throttleStream (
183+ filteredStream, throttle,
184+ addOne: triggerImmediately ? UpdateNotification .empty () : null );
185+ return throttledStream;
186+ }
151187
152188 /// Execute a read query every time the source tables are modified.
153189 ///
154190 /// Use [throttle] to specify the minimum interval between queries.
155191 ///
156192 /// Source tables are automatically detected using `EXPLAIN QUERY PLAN` .
157- Stream <sqlite.ResultSet > watch (String sql,
158- {List <Object ?> parameters = const [],
159- Duration throttle = const Duration (milliseconds: 30 )});
193+ Stream <sqlite.ResultSet > watch (
194+ String sql, {
195+ List <Object ?> parameters = const [],
196+ Duration throttle = const Duration (milliseconds: 30 ),
197+ Iterable <String >? triggerOnTables,
198+ }) {
199+ Stream <sqlite.ResultSet > watchInner (Iterable <String > trigger) {
200+ return onChange (
201+ trigger,
202+ throttle: throttle,
203+ triggerImmediately: true ,
204+ ).asyncMap ((_) => getAll (sql, parameters));
205+ }
206+
207+ if (triggerOnTables case final knownTrigger? ) {
208+ return watchInner (knownTrigger);
209+ } else {
210+ return Stream .fromFuture (getSourceTables (this , sql, parameters))
211+ .asyncExpand (watchInner);
212+ }
213+ }
160214
161215 /// Takes a read lock, without starting a transaction.
162216 ///
@@ -181,9 +235,74 @@ abstract interface class SqliteConnection extends SqliteWriteContext {
181235
182236 /// Ensures that all connections are aware of the latest schema changes applied (if any).
183237 /// Queries and watch calls can potentially use outdated schema information after a schema update.
184- Future <void > refreshSchema ();
238+ Future <void > refreshSchema () {
239+ return getAll ("PRAGMA table_info('sqlite_master')" );
240+ }
185241
186242 /// Returns true if the connection is closed
187243 @override
188244 bool get closed;
245+
246+ @override
247+ Future <sqlite.ResultSet > execute (String sql,
248+ [List <Object ?> parameters = const []]) async {
249+ return writeLock ((ctx) async {
250+ return ctx.execute (sql, parameters);
251+ }, debugContext: 'execute()' );
252+ }
253+
254+ @override
255+ Future <sqlite.ResultSet > getAll (String sql,
256+ [List <Object ?> parameters = const []]) {
257+ return readLock ((ctx) async {
258+ return ctx.getAll (sql, parameters);
259+ }, debugContext: 'getAll()' );
260+ }
261+
262+ @override
263+ Future <sqlite.Row > get (String sql, [List <Object ?> parameters = const []]) {
264+ return readLock ((ctx) async {
265+ return ctx.get (sql, parameters);
266+ }, debugContext: 'get()' );
267+ }
268+
269+ @override
270+ Future <sqlite.Row ?> getOptional (String sql,
271+ [List <Object ?> parameters = const []]) {
272+ return readLock ((ctx) async {
273+ return ctx.getOptional (sql, parameters);
274+ }, debugContext: 'getOptional()' );
275+ }
276+
277+ /// See [SqliteReadContext.computeWithDatabase] .
278+ ///
279+ /// When called here directly on the connection, the call is wrapped in a
280+ /// write transaction.
281+ @override
282+ Future <T > computeWithDatabase <T >(
283+ Future <T > Function (sqlite.CommonDatabase db) compute) {
284+ return writeTransaction ((tx) async {
285+ return tx.computeWithDatabase (compute);
286+ });
287+ }
288+
289+ /// Execute a write query (INSERT, UPDATE, DELETE) multiple times with each
290+ /// parameter set. This is more faster than executing separately with each
291+ /// parameter set.
292+ ///
293+ /// When called here directly on the connection, the batch is wrapped in a
294+ /// write transaction.
295+ @override
296+ Future <void > executeBatch (String sql, List <List <Object ?>> parameterSets) {
297+ return writeTransaction ((tx) async {
298+ return tx.executeBatch (sql, parameterSets);
299+ });
300+ }
301+
302+ @override
303+ Future <void > executeMultiple (String sql) {
304+ return writeTransaction ((tx) async {
305+ return tx.executeMultiple (sql);
306+ });
307+ }
189308}
0 commit comments