Skip to content

Commit 4ef63a6

Browse files
authored
Use async primitives instead of static timeouts (#134)
1 parent caf96c5 commit 4ef63a6

19 files changed

+376
-109
lines changed

.github/workflows/test.yaml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,8 @@ jobs:
8585
enableCrossOsArchive: true
8686

8787
- name: "Get dependencies"
88-
# Passing --offline here asserts that dependencies have been cached properly.
8988
run: |
90-
dart pub get --offline
89+
dart pub get
9190
dart pub global activate pana
9291
9392
- name: Setup

packages/sqlite_async/CHANGELOG.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@
1515
- Instead of `package:sqlite_async/sqlite3_wasm.dart`, import `package:sqlite3/wasm.dart`.
1616
- Instead of `package:sqlite_async/sqlite3_web.dart`, import `package:sqlite3_web/sqlite3_web.dart`.
1717
- __Breaking__: Remove `SqliteDatabaseMixin` and `SqliteQueries`. Extend `SqliteConnection` instead.
18-
18+
- __Breaking__: Calls locking the database that fail with a timeout now throw an `AbortException` instead of a
19+
`TimeoutException`.
20+
- The `throttle` parameter on `watch` and `onChange` can now be set to `null`. This also introduces `watchUnthrottled`
21+
and `onChangeUnthrottled`, which only buffer on paused subscriptions instead of applying a static timeout.
22+
- Add `abortableReadLock` and `abortableWriteLock`. The methods can be used to acquire a read/write context with a flexible
23+
abort signal instead of a static timeout.
1924

2025
## 0.13.1
2126

packages/sqlite_async/lib/sqlite_async.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ export 'src/common/abstract_open_factory.dart' hide InternalOpenFactory;
77
export 'src/common/connection/sync_sqlite_connection.dart';
88
export 'src/common/mutex.dart';
99
export 'src/common/sqlite_database.dart' hide SqliteDatabaseImpl;
10+
export 'src/common/timeouts.dart' show AbortException;
1011
export 'src/sqlite_connection.dart';
1112
export 'src/sqlite_migrations.dart';
1213
export 'src/sqlite_options.dart';

packages/sqlite_async/lib/src/common/connection/sync_sqlite_connection.dart

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@ final class SyncSqliteConnection extends SqliteConnection {
3636
}
3737

3838
@override
39-
Future<T> readLock<T>(Future<T> Function(SqliteReadContext tx) callback,
40-
{Duration? lockTimeout, String? debugContext}) {
39+
Future<T> abortableReadLock<T>(
40+
Future<T> Function(SqliteReadContext tx) callback,
41+
{Future<void>? abortTrigger,
42+
String? debugContext}) {
4143
final task = profileQueries ? TimelineTask() : null;
4244
task?.start('${profilerPrefix}mutex_lock');
4345

@@ -49,13 +51,15 @@ final class SyncSqliteConnection extends SqliteConnection {
4951
callback,
5052
);
5153
},
52-
timeout: lockTimeout,
54+
abortTrigger: abortTrigger,
5355
);
5456
}
5557

5658
@override
57-
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
58-
{Duration? lockTimeout, String? debugContext}) {
59+
Future<T> abortableWriteLock<T>(
60+
Future<T> Function(SqliteWriteContext tx) callback,
61+
{Future<void>? abortTrigger,
62+
String? debugContext}) {
5963
final task = profileQueries ? TimelineTask() : null;
6064
task?.start('${profilerPrefix}mutex_lock');
6165

@@ -67,7 +71,7 @@ final class SyncSqliteConnection extends SqliteConnection {
6771
callback,
6872
);
6973
},
70-
timeout: lockTimeout,
74+
abortTrigger: abortTrigger,
7175
);
7276
}
7377

packages/sqlite_async/lib/src/common/mutex.dart

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
11
import 'dart:async';
22

3+
import 'timeouts.dart';
4+
35
/// An asynchronous mutex.
46
abstract interface class Mutex {
57
/// Creates a simple mutex instance that can't be shared between tabs or
68
/// isolates.
79
factory Mutex.simple() = _SimpleMutex;
810

9-
/// timeout is a timeout for acquiring the lock, not for the callback
10-
Future<T> lock<T>(Future<T> Function() callback, {Duration? timeout});
11+
/// Runs [callback] in a critical section.
12+
///
13+
/// If [abortTrigger] completes before the critical section was entered, an
14+
/// [AbortException] is thrown and [callback] will not be invoked.
15+
Future<T> lock<T>(Future<T> Function() callback,
16+
{Future<void>? abortTrigger});
1117
}
1218

1319
class LockError extends Error {
@@ -24,40 +30,45 @@ class LockError extends Error {
2430
/// Mutex maintains a queue of Future-returning functions that are executed
2531
/// sequentially.
2632
final class _SimpleMutex implements Mutex {
27-
Future<dynamic>? last;
33+
Future<void>? last;
2834

2935
// Hack to make sure the Mutex is not copied to another isolate.
3036
// ignore: unused_field
3137
final Finalizer _f = Finalizer((_) {});
3238

3339
@override
34-
Future<T> lock<T>(Future<T> Function() callback, {Duration? timeout}) async {
40+
Future<T> lock<T>(Future<T> Function() callback,
41+
{Future<void>? abortTrigger}) async {
3542
if (Zone.current[this] != null) {
3643
throw LockError('Recursive lock is not allowed');
3744
}
3845
var zone = Zone.current.fork(zoneValues: {this: true});
3946

4047
return zone.run(() async {
4148
final prev = last;
49+
var previousDidComplete = false;
50+
4251
final completer = Completer<void>.sync();
4352
last = completer.future;
4453
try {
4554
// If there is a previous running block, wait for it
4655
if (prev != null) {
47-
if (timeout != null) {
48-
// This could throw a timeout error
49-
try {
50-
await prev.timeout(timeout);
51-
} catch (error) {
52-
if (error is TimeoutException) {
53-
throw TimeoutException('Failed to acquire lock', timeout);
54-
} else {
55-
rethrow;
56+
final prevOrAbort = Completer<void>.sync();
57+
58+
prev.then((_) {
59+
previousDidComplete = true;
60+
if (!prevOrAbort.isCompleted) prevOrAbort.complete();
61+
});
62+
if (abortTrigger != null) {
63+
abortTrigger.whenComplete(() {
64+
if (!prevOrAbort.isCompleted) {
65+
prevOrAbort.completeError(
66+
AbortException('lock'), StackTrace.current);
5667
}
57-
}
58-
} else {
59-
await prev;
68+
});
6069
}
70+
71+
await prevOrAbort.future;
6172
}
6273

6374
// Run the function and return the result
@@ -66,7 +77,7 @@ final class _SimpleMutex implements Mutex {
6677
// Cleanup
6778
// waiting for the previous task to be done in case of timeout
6879
void complete() {
69-
// Only mark it unlocked when the last one complete
80+
// Only mark it unlocked when the last one completes
7081
if (identical(last, completer.future)) {
7182
last = null;
7283
}
@@ -75,8 +86,8 @@ final class _SimpleMutex implements Mutex {
7586

7687
// In case of timeout, wait for the previous one to complete too
7788
// before marking this task as complete
78-
if (prev != null && timeout != null) {
79-
// But we still returns immediately
89+
if (prev != null && !previousDidComplete) {
90+
// But we still return immediately
8091
prev.then((_) {
8192
complete();
8293
}).ignore();
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,24 @@
1+
/// @docImport '../sqlite_connection.dart';
2+
library;
3+
4+
import 'package:meta/meta.dart';
5+
6+
@internal
17
extension TimeoutDurationToFuture on Duration {
28
/// Returns a future that completes with `void` after this duration.
39
Future<void> get asTimeout => Future.delayed(this);
410
}
11+
12+
/// An exception thrown when calls to [SqliteConnection.readLock],
13+
/// [SqliteConnection.writeLock] and similar methods are aborted or time out
14+
/// before a connection could be obtained from the pool.
15+
final class AbortException implements Exception {
16+
final String _methodName;
17+
18+
AbortException(this._methodName);
19+
20+
@override
21+
String toString() {
22+
return 'A call to $_methodName has been aborted';
23+
}
24+
}

packages/sqlite_async/lib/src/impl/single_connection_database.dart

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,20 +32,24 @@ final class SingleConnectionDatabase extends SqliteDatabaseImpl {
3232
SqliteOpenFactory get openFactory => throw UnimplementedError();
3333

3434
@override
35-
Future<T> readLock<T>(Future<T> Function(SqliteReadContext tx) callback,
36-
{Duration? lockTimeout, String? debugContext}) {
37-
return connection.readLock(callback,
38-
lockTimeout: lockTimeout, debugContext: debugContext);
35+
Future<T> abortableReadLock<T>(
36+
Future<T> Function(SqliteReadContext tx) callback,
37+
{Future<void>? abortTrigger,
38+
String? debugContext}) {
39+
return connection.abortableReadLock(callback,
40+
abortTrigger: abortTrigger, debugContext: debugContext);
3941
}
4042

4143
@override
4244
Stream<UpdateNotification> get updates => connection.updates;
4345

4446
@override
45-
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
46-
{Duration? lockTimeout, String? debugContext}) {
47-
return connection.writeLock(callback,
48-
lockTimeout: lockTimeout, debugContext: debugContext);
47+
Future<T> abortableWriteLock<T>(
48+
Future<T> Function(SqliteWriteContext tx) callback,
49+
{Future<void>? abortTrigger,
50+
String? debugContext}) {
51+
return connection.abortableWriteLock(callback,
52+
abortTrigger: abortTrigger, debugContext: debugContext);
4953
}
5054

5155
@override

packages/sqlite_async/lib/src/native/database/native_sqlite_database.dart

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ final class NativeSqliteDatabaseImpl extends SqliteDatabaseImpl {
9999
{Duration? lockTimeout}) async {
100100
return _useConnection(
101101
writer: false,
102-
lockTimeout: lockTimeout,
102+
abortTrigger: lockTimeout?.asTimeout,
103103
debugContext: 'readTransaction',
104104
(context) {
105105
return _transactionInLease(context, callback);
@@ -120,7 +120,7 @@ final class NativeSqliteDatabaseImpl extends SqliteDatabaseImpl {
120120
{Duration? lockTimeout}) {
121121
return _useConnection(
122122
writer: true,
123-
lockTimeout: lockTimeout,
123+
abortTrigger: lockTimeout?.asTimeout,
124124
debugContext: 'writeTransaction',
125125
(context) {
126126
return _transactionInLease(context, callback);
@@ -137,23 +137,27 @@ final class NativeSqliteDatabaseImpl extends SqliteDatabaseImpl {
137137
}
138138

139139
@override
140-
Future<T> readLock<T>(Future<T> Function(SqliteReadContext tx) callback,
141-
{Duration? lockTimeout, String? debugContext}) async {
140+
Future<T> abortableReadLock<T>(
141+
Future<T> Function(SqliteReadContext tx) callback,
142+
{Future<void>? abortTrigger,
143+
String? debugContext}) async {
142144
return _useConnection(
143145
writer: false,
144146
debugContext: debugContext ?? 'readLock',
145-
lockTimeout: lockTimeout,
147+
abortTrigger: abortTrigger,
146148
(context) => ScopedReadContext.assumeReadLock(context, callback),
147149
);
148150
}
149151

150152
@override
151-
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
152-
{Duration? lockTimeout, String? debugContext}) async {
153+
Future<T> abortableWriteLock<T>(
154+
Future<T> Function(SqliteWriteContext tx) callback,
155+
{Future<void>? abortTrigger,
156+
String? debugContext}) async {
153157
return _useConnection(
154158
writer: true,
155159
debugContext: debugContext ?? 'writeLock',
156-
lockTimeout: lockTimeout,
160+
abortTrigger: abortTrigger,
157161
(context) => ScopedWriteContext.assumeWriteLock(context, callback),
158162
);
159163
}
@@ -162,14 +166,14 @@ final class NativeSqliteDatabaseImpl extends SqliteDatabaseImpl {
162166
Future<T> Function(_LeasedContext context) callback, {
163167
required bool writer,
164168
required String debugContext,
165-
Duration? lockTimeout,
169+
Future<void>? abortTrigger,
166170
}) {
167-
final timeout = lockTimeout?.asTimeout;
168171
return _runInLockContext(debugContext, () async {
169172
final pool = await _pool;
170173
final connection = await (writer
171-
? pool.writer(abortSignal: timeout)
172-
: pool.reader(abortSignal: timeout));
174+
? pool.writer(abortSignal: abortTrigger)
175+
: pool.reader(abortSignal: abortTrigger))
176+
.translateAbortExceptions(debugContext);
173177

174178
try {
175179
final context = _LeasedContext(
@@ -447,3 +451,10 @@ final class _LeasedContext extends UnscopedContext {
447451
return (db) => db.execute(sql);
448452
}
449453
}
454+
455+
extension<T> on Future<T> {
456+
Future<T> translateAbortExceptions(String debugContext) {
457+
return onError<PoolAbortException>(
458+
(e, s) => Error.throwWithStackTrace(AbortException(debugContext), s));
459+
}
460+
}

0 commit comments

Comments
 (0)