Skip to content

Commit 4b957f0

Browse files
authored
Add Semaphore utility to help implement connection pools (#906)
1 parent 995fa19 commit 4b957f0

File tree

9 files changed

+461
-190
lines changed

9 files changed

+461
-190
lines changed

.changeset/fair-toes-dance.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@powersync/node': patch
3+
'@powersync/op-sqlite': patch
4+
---
5+
6+
Internal: Refactor connection pool to use `Semaphore` from `@powersync/common`.

.changeset/short-moons-hug.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/common': minor
3+
---
4+
5+
Internal: Add `Semaphore` utility to manage leases in a connection pool.

packages/common/src/utils/mutex.ts

Lines changed: 111 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,32 @@
1+
import { Queue } from './queue.js';
2+
13
export type UnlockFn = () => void;
24

35
/**
4-
* An asynchronous mutex implementation.
6+
* An asynchronous semaphore implementation with associated items per lease.
57
*
68
* @internal This class is meant to be used in PowerSync SDKs only, and is not part of the public API.
79
*/
8-
export class Mutex {
9-
private inCriticalSection = false;
10+
export class Semaphore<T> {
11+
// Available items that are not currently assigned to a waiter.
12+
private readonly available: Queue<T>;
1013

14+
readonly size: number;
1115
// Linked list of waiters. We don't expect the wait list to become particularly large, and this allows removing
1216
// aborted waiters from the middle of the list efficiently.
13-
private firstWaiter?: MutexWaitNode;
14-
private lastWaiter?: MutexWaitNode;
17+
private firstWaiter?: SemaphoreWaitNode<T>;
18+
private lastWaiter?: SemaphoreWaitNode<T>;
1519

16-
private addWaiter(onAcquire: () => void): MutexWaitNode {
17-
const node: MutexWaitNode = {
20+
constructor(elements: Iterable<T>) {
21+
this.available = new Queue(elements);
22+
this.size = this.available.length;
23+
}
24+
25+
private addWaiter(requestedItems: number, onAcquire: () => void): SemaphoreWaitNode<T> {
26+
const node: SemaphoreWaitNode<T> = {
1827
isActive: true,
28+
acquiredItems: [],
29+
remainingItems: requestedItems,
1930
onAcquire,
2031
prev: this.lastWaiter
2132
};
@@ -30,7 +41,7 @@ export class Mutex {
3041
return node;
3142
}
3243

33-
private deactivateWaiter(waiter: MutexWaitNode) {
44+
private deactivateWaiter(waiter: SemaphoreWaitNode<T>) {
3445
const { prev, next } = waiter;
3546
waiter.isActive = false;
3647

@@ -40,77 +51,129 @@ export class Mutex {
4051
if (waiter == this.lastWaiter) this.lastWaiter = prev;
4152
}
4253

43-
acquire(abort?: AbortSignal): Promise<UnlockFn> {
54+
private requestPermits(amount: number, abort?: AbortSignal): Promise<{ items: T[]; release: UnlockFn }> {
55+
if (amount <= 0 || amount > this.size) {
56+
throw new Error(`Invalid amount of items requested (${amount}), must be between 1 and ${this.size}`);
57+
}
58+
4459
return new Promise((resolve, reject) => {
4560
function rejectAborted() {
46-
reject(abort?.reason ?? new Error('Mutex acquire aborted'));
61+
reject(abort?.reason ?? new Error('Semaphore acquire aborted'));
4762
}
4863
if (abort?.aborted) {
4964
return rejectAborted();
5065
}
5166

52-
let holdsMutex = false;
67+
let waiter: SemaphoreWaitNode<T>;
5368

5469
const markCompleted = () => {
55-
if (!holdsMutex) return;
56-
holdsMutex = false;
70+
const items = waiter.acquiredItems;
71+
waiter.acquiredItems = []; // Avoid releasing items twice.
72+
73+
for (const element of items) {
74+
// Give to next waiter, if possible.
75+
const nextWaiter = this.firstWaiter;
76+
if (nextWaiter) {
77+
nextWaiter.acquiredItems.push(element);
78+
nextWaiter.remainingItems--;
79+
if (nextWaiter.remainingItems == 0) {
80+
nextWaiter.onAcquire();
81+
}
82+
} else {
83+
// No pending waiter, return lease into pool.
84+
this.available.addLast(element);
85+
}
86+
}
87+
};
88+
89+
const onAbort = () => {
90+
abort?.removeEventListener('abort', onAbort);
5791

58-
const waiter = this.firstWaiter;
59-
if (waiter) {
92+
if (waiter.isActive) {
6093
this.deactivateWaiter(waiter);
61-
// Still in critical section, but owned by next waiter now.
62-
waiter.onAcquire();
63-
} else {
64-
this.inCriticalSection = false;
94+
rejectAborted();
6595
}
6696
};
6797

68-
if (!this.inCriticalSection) {
69-
this.inCriticalSection = true;
70-
holdsMutex = true;
71-
return resolve(markCompleted);
72-
} else {
73-
let node: MutexWaitNode;
98+
const resolvePromise = () => {
99+
this.deactivateWaiter(waiter);
100+
abort?.removeEventListener('abort', onAbort);
74101

75-
const onAbort = () => {
76-
abort?.removeEventListener('abort', onAbort);
102+
const items = waiter.acquiredItems;
103+
resolve({ items, release: markCompleted });
104+
};
77105

78-
if (node.isActive) {
79-
this.deactivateWaiter(node);
80-
rejectAborted();
81-
}
82-
};
106+
waiter = this.addWaiter(amount, resolvePromise);
83107

84-
node = this.addWaiter(() => {
85-
abort?.removeEventListener('abort', onAbort);
86-
holdsMutex = true;
87-
resolve(markCompleted);
88-
});
108+
// If there are items in the pool that haven't been assigned, we can pull them into this waiter. Note that this is
109+
// only the case if we're the first waiter (otherwise, items would have been assigned to an earlier waiter).
110+
while (!this.available.isEmpty && waiter.remainingItems > 0) {
111+
waiter.acquiredItems.push(this.available.removeFirst());
112+
waiter.remainingItems--;
113+
}
89114

90-
abort?.addEventListener('abort', onAbort);
115+
if (waiter.remainingItems == 0) {
116+
return resolvePromise();
91117
}
118+
119+
abort?.addEventListener('abort', onAbort);
92120
});
93121
}
94122

95-
async runExclusive<T>(fn: () => PromiseLike<T> | T, abort?: AbortSignal): Promise<T> {
96-
const returnMutex = await this.acquire(abort);
123+
/**
124+
* Requests a single item from the pool.
125+
*
126+
* The returned `release` callback must be invoked to return the item into the pool.
127+
*/
128+
async requestOne(abort?: AbortSignal): Promise<{ item: T; release: UnlockFn }> {
129+
const { items, release } = await this.requestPermits(1, abort);
130+
return { release, item: items[0] };
131+
}
97132

98-
try {
99-
return await fn();
100-
} finally {
101-
returnMutex();
102-
}
133+
/**
134+
* Requests access to all items from the pool.
135+
*
136+
* The returned `release` callback must be invoked to return items into the pool.
137+
*/
138+
requestAll(abort?: AbortSignal): Promise<{ items: T[]; release: UnlockFn }> {
139+
return this.requestPermits(this.size, abort);
103140
}
104141
}
105142

106-
interface MutexWaitNode {
143+
interface SemaphoreWaitNode<T> {
107144
/**
108145
* Whether the waiter is currently active (not aborted and not fullfilled).
109146
*/
110147
isActive: boolean;
148+
acquiredItems: T[];
149+
remainingItems: number;
111150
onAcquire: () => void;
112-
prev?: MutexWaitNode;
113-
next?: MutexWaitNode;
151+
prev?: SemaphoreWaitNode<T>;
152+
next?: SemaphoreWaitNode<T>;
153+
}
154+
155+
/**
156+
* An asynchronous mutex implementation.
157+
*
158+
* @internal This class is meant to be used in PowerSync SDKs only, and is not part of the public API.
159+
*/
160+
export class Mutex {
161+
private inner = new Semaphore([null]);
162+
163+
async acquire(abort?: AbortSignal): Promise<UnlockFn> {
164+
const { release } = await this.inner.requestOne(abort);
165+
return release;
166+
}
167+
168+
async runExclusive<T>(fn: () => PromiseLike<T> | T, abort?: AbortSignal): Promise<T> {
169+
const returnMutex = await this.acquire(abort);
170+
171+
try {
172+
return await fn();
173+
} finally {
174+
returnMutex();
175+
}
176+
}
114177
}
115178

116179
/**

packages/common/src/utils/queue.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/**
2+
* A simple fixed-capacity queue implementation.
3+
*
4+
* Unlike a naive queue implemented by `array.push()` and `array.shift()`, this avoids moving array elements around
5+
* and is `O(1)` for {@link addLast} and {@link removeFirst}.
6+
*/
7+
export class Queue<T> {
8+
private table: (T | undefined)[];
9+
// Index of the first element in the table.
10+
private head: number;
11+
// Amount of items currently in the queue.
12+
private _length: number;
13+
14+
constructor(initialItems: Iterable<T>) {
15+
this.table = [...initialItems];
16+
this.head = 0;
17+
this._length = this.table.length;
18+
}
19+
20+
get isEmpty(): boolean {
21+
return this.length == 0;
22+
}
23+
24+
get length(): number {
25+
return this._length;
26+
}
27+
28+
removeFirst(): T {
29+
if (this.isEmpty) {
30+
throw new Error('Queue is empty');
31+
}
32+
33+
const result = this.table[this.head] as T;
34+
this._length--;
35+
this.table[this.head] = undefined;
36+
this.head = (this.head + 1) % this.table.length;
37+
return result;
38+
}
39+
40+
addLast(element: T) {
41+
if (this.length == this.table.length) {
42+
throw new Error('Queue is full');
43+
}
44+
45+
this.table[(this.head + this._length) % this.table.length] = element;
46+
this._length++;
47+
}
48+
}

0 commit comments

Comments
 (0)