-
Notifications
You must be signed in to change notification settings - Fork 70
Expand file tree
/
Copy pathWorkerWrappedAsyncDatabaseConnection.ts
More file actions
224 lines (196 loc) · 6.96 KB
/
WorkerWrappedAsyncDatabaseConnection.ts
File metadata and controls
224 lines (196 loc) · 6.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
import * as Comlink from 'comlink';
import {
AsyncDatabaseConnection,
OnTableChangeCallback,
OpenAsyncDatabaseConnection,
ProxiedQueryResult
} from './AsyncDatabaseConnection';
import { ResolvedWebSQLOpenOptions } from './web-sql-flags';
export type SharedConnectionWorker = {
identifier: string;
port: MessagePort;
};
export type WrappedWorkerConnectionOptions<Config extends ResolvedWebSQLOpenOptions = ResolvedWebSQLOpenOptions> = {
baseConnection: AsyncDatabaseConnection;
identifier: string;
remoteCanCloseUnexpectedly: boolean;
/**
* Need a remote in order to keep a reference to the Proxied worker
*/
remote: Comlink.Remote<OpenAsyncDatabaseConnection<Config>>;
onClose?: () => void;
};
/**
* Wraps a provided instance of {@link AsyncDatabaseConnection}, providing necessary proxy
* functions for worker listeners.
*/
export class WorkerWrappedAsyncDatabaseConnection<Config extends ResolvedWebSQLOpenOptions = ResolvedWebSQLOpenOptions>
implements AsyncDatabaseConnection
{
protected lockAbortController = new AbortController();
protected notifyRemoteClosed: AbortController | undefined;
private finalized = false;
private closeListeners = new Set<() => void>();
constructor(protected options: WrappedWorkerConnectionOptions<Config>) {
if (options.remoteCanCloseUnexpectedly) {
this.notifyRemoteClosed = new AbortController();
}
if (options.onClose) {
this.closeListeners.add(options.onClose);
}
}
protected get baseConnection() {
return this.options.baseConnection;
}
init(): Promise<void> {
return this.baseConnection.init();
}
/**
* Marks the remote as closed.
*
* This can sometimes happen outside of our control, e.g. when a shared worker requests a connection from a tab. When
* it happens, all methods on the {@link baseConnection} would never resolve. To avoid livelocks in this scenario, we
* throw on all outstanding promises and forbid new calls.
*/
markRemoteClosed() {
// Can non-null assert here because this function is only supposed to be called when remoteCanCloseUnexpectedly was
// set.
this.notifyRemoteClosed!.abort();
}
markHold(): Promise<string> {
return this.withRemote(() => this.baseConnection.markHold());
}
releaseHold(holdId: string): Promise<void> {
return this.withRemote(() => this.baseConnection.releaseHold(holdId));
}
isAutoCommit(): Promise<boolean> {
return this.withRemote(() => this.baseConnection.isAutoCommit());
}
private withRemote<T>(workerPromise: () => Promise<T>): Promise<T> {
const controller = this.notifyRemoteClosed;
if (controller) {
return new Promise((resolve, reject) => {
if (controller.signal.aborted) {
reject(new Error('Called operation on closed remote'));
// Don't run the operation if we're going to reject
return;
}
function handleAbort() {
reject(new Error('Remote peer closed with request in flight'));
}
function completePromise(action: () => void) {
controller!.signal.removeEventListener('abort', handleAbort);
action();
}
controller.signal.addEventListener('abort', handleAbort);
workerPromise()
.then((data) => completePromise(() => resolve(data)))
.catch((e) => completePromise(() => reject(e)));
});
} else {
// Can't close, so just return the inner worker promise unguarded.
return workerPromise();
}
}
/**
* Get a MessagePort which can be used to share the internals of this connection.
*/
async shareConnection(): Promise<SharedConnectionWorker> {
const { identifier, remote } = this.options;
/**
* Hold a navigator lock in order to avoid features such as Chrome's frozen tabs,
* or Edge's sleeping tabs from pausing the thread for this connection.
* This promise resolves once a lock is obtained.
* This lock will be held as long as this connection is open.
* The `shareConnection` method should not be called on multiple tabs concurrently.
*/
await new Promise<void>((resolve, reject) =>
navigator.locks
.request(
`shared-connection-${this.options.identifier}-${Date.now()}-${Math.round(Math.random() * 10000)}`,
{
signal: this.lockAbortController.signal
},
async () => {
resolve();
// Free the lock when the connection is already closed.
if (this.lockAbortController.signal.aborted) {
return;
}
// Hold the lock while the shared connection is in use.
await new Promise<void>((releaseLock) => {
this.lockAbortController.signal.addEventListener('abort', () => {
releaseLock();
});
});
}
)
// We aren't concerned with abort errors here
.catch((ex) => {
if (ex.name == 'AbortError') {
resolve();
} else {
reject(ex);
}
})
);
const newPort = await remote[Comlink.createEndpoint]();
return { port: newPort, identifier };
}
/**
* Registers a table change notification callback with the base database.
* This can be extended by custom implementations in order to handle proxy events.
*/
async registerOnTableChange(callback: OnTableChangeCallback) {
return this.baseConnection.registerOnTableChange(Comlink.proxy(callback));
}
onClose(callback: () => void): () => void {
this.closeListeners.add(callback);
return () => this.closeListeners.delete(callback);
}
private finalizeClose(): void {
if (this.finalized) {
return;
}
this.finalized = true;
// Ensure cleanup is idempotent if close is triggered from multiple paths.
this.notifyRemoteClosed?.abort();
try {
this.options.remote[Comlink.releaseProxy]();
} catch {
// Proxy can already be released on teardown.
}
this.closeListeners.forEach((listener) => {
try {
listener();
} catch {
// Avoid throwing during cleanup.
}
});
}
forceClose(): void {
this.lockAbortController.abort();
this.finalizeClose();
}
async close(): Promise<void> {
// Abort any pending lock requests.
this.lockAbortController.abort();
try {
await this.withRemote(() => this.baseConnection.close());
} finally {
this.finalizeClose();
}
}
execute(sql: string, params?: any[]): Promise<ProxiedQueryResult> {
return this.withRemote(() => this.baseConnection.execute(sql, params));
}
executeRaw(sql: string, params?: any[]): Promise<any[][]> {
return this.withRemote(() => this.baseConnection.executeRaw(sql, params));
}
executeBatch(sql: string, params?: any[]): Promise<ProxiedQueryResult> {
return this.withRemote(() => this.baseConnection.executeBatch(sql, params));
}
getConfig(): Promise<ResolvedWebSQLOpenOptions> {
return this.withRemote(() => this.baseConnection.getConfig());
}
}