Skip to content

Commit ccf16f0

Browse files
authored
(feat): Add React useSyncStreams hook (#908)
1 parent 061868b commit ccf16f0

File tree

4 files changed

+149
-70
lines changed

4 files changed

+149
-70
lines changed

.changeset/flat-jars-do.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@powersync/react': minor
3+
---
4+
5+
Added `useSyncStreams` hook that allows you to manage a variable number of sync streams, compared to the existing `useSyncStream` hook intended for the one and only one use case.
6+
Updated `useSyncStream` and `useAllSyncStreamsHaveSynced` to use this hook internally to simplify their implementations.

packages/react/src/hooks/streams.ts

Lines changed: 46 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -36,33 +36,57 @@ export interface UseSyncStreamOptions extends SyncStreamSubscribeOptions {
3636
* @returns The status for that stream, or `null` if the stream is currently being resolved.
3737
*/
3838
export function useSyncStream(options: UseSyncStreamOptions): SyncStreamStatus | null {
39-
const { name, parameters } = options;
39+
return useSyncStreams([options])[0];
40+
}
41+
42+
/**
43+
* Creates multiple PowerSync stream subscriptions. Subscriptions are kept alive as long as the
44+
* React component calling this function. When it unmounts, or when the streams array contents
45+
* change, all previous subscriptions are unsubscribed before new ones are created.
46+
*/
47+
export function useSyncStreams(streamOptions: UseSyncStreamOptions[]): SyncStreamStatus[] {
4048
const db = usePowerSync();
4149
const status = useStatus();
42-
const stream = useMemo(() => db.syncStream(name, parameters), [name, parameters]);
50+
51+
const stringifiedOptions = useMemo(() => JSON.stringify(streamOptions), [streamOptions]);
52+
const syncStreams = useMemo(
53+
() =>
54+
streamOptions.map((options) => {
55+
return {
56+
stream: db.syncStream(options.name, options.parameters ?? undefined),
57+
options
58+
};
59+
}),
60+
[stringifiedOptions]
61+
);
4362

4463
useEffect(() => {
4564
let active = true;
46-
let subscription: SyncStreamSubscription | null = null;
65+
const resolvedSubs: SyncStreamSubscription[] = [];
4766

48-
stream.subscribe(options).then((sub) => {
49-
if (active) {
50-
subscription = sub;
51-
} else {
52-
// The cleanup function already ran, unsubscribe immediately.
53-
sub.unsubscribe();
54-
}
55-
});
67+
for (const entry of syncStreams) {
68+
entry.stream.subscribe(entry.options).then((sub) => {
69+
if (active) {
70+
resolvedSubs.push(sub);
71+
} else {
72+
// The cleanup function already ran, unsubscribe immediately.
73+
sub.unsubscribe();
74+
}
75+
});
76+
}
5677

5778
return () => {
5879
active = false;
59-
// If we don't have a subscription yet, it'll still get cleaned up once the promise resolves because we've set
60-
// active to false.
61-
subscription?.unsubscribe();
80+
for (const sub of resolvedSubs) {
81+
sub.unsubscribe();
82+
}
6283
};
63-
}, [stream]);
84+
}, [stringifiedOptions]);
6485

65-
return status.forStream(stream) ?? null;
86+
return useMemo(
87+
() => syncStreams.map((entry) => status.forStream(entry.stream) ?? null),
88+
[status, stringifiedOptions]
89+
);
6690
}
6791

6892
/**
@@ -72,57 +96,12 @@ export function useAllSyncStreamsHaveSynced(
7296
db: AbstractPowerSyncDatabase,
7397
streams: QuerySyncStreamOptions[] | undefined
7498
): boolean {
75-
// Since streams are a user-supplied array, they will likely be different each time this function is called. We don't
76-
// want to update underlying subscriptions each time, though.
77-
const hash = useMemo(() => streams && JSON.stringify(streams), [streams]);
78-
const [synced, setHasSynced] = useState(streams == null || streams.every((e) => e.waitForStream != true));
99+
const statuses = useSyncStreams(streams ?? []);
79100

80-
useEffect(() => {
81-
if (streams) {
82-
setHasSynced(false);
83-
84-
const promises: Promise<SyncStreamSubscription>[] = [];
85-
const abort = new AbortController();
86-
for (const stream of streams) {
87-
promises.push(db.syncStream(stream.name, stream.parameters).subscribe(stream));
88-
}
89-
90-
// First, wait for all subscribe() calls to make all subscriptions active.
91-
Promise.all(promises).then(async (resolvedStreams) => {
92-
function allHaveSynced(status: SyncStatus) {
93-
return resolvedStreams.every((s, i) => {
94-
const request = streams[i];
95-
return !request.waitForStream || status.forStream(s)?.subscription?.hasSynced;
96-
});
97-
}
98-
99-
// Wait for the effect to be cancelled or all streams having synced.
100-
await db.waitForStatus(allHaveSynced, abort.signal);
101-
if (abort.signal.aborted) {
102-
// Was cancelled
103-
} else {
104-
// Has synced, update public state.
105-
setHasSynced(true);
106-
107-
// Wait for cancellation before clearing subscriptions.
108-
await new Promise<void>((resolve) => {
109-
abort.signal.addEventListener('abort', () => resolve());
110-
});
111-
}
112-
113-
// Effect was definitely cancelled at this point, so drop the subscriptions.
114-
for (const stream of resolvedStreams) {
115-
stream.unsubscribe();
116-
}
117-
});
118-
119-
return () => abort.abort();
120-
} else {
121-
// There are no streams, so all of them have synced.
122-
setHasSynced(true);
123-
return undefined;
124-
}
125-
}, [hash]);
101+
if (!streams) return true;
126102

127-
return synced;
103+
return streams.every((stream, i) => {
104+
if (!stream.waitForStream) return true;
105+
return statuses[i]?.subscription?.hasSynced === true;
106+
});
128107
}

packages/react/tests/streams.test.tsx

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import React, { act, useSyncExternalStore } from 'react';
44
import { AbstractPowerSyncDatabase, ConnectionManager, SyncStatus } from '@powersync/common';
55
import { openPowerSync } from './utils';
66
import { PowerSyncContext } from '../src/hooks/PowerSyncContext';
7-
import { useSyncStream, UseSyncStreamOptions } from '../src/hooks/streams';
7+
import { useSyncStream, useSyncStreams, UseSyncStreamOptions } from '../src/hooks/streams';
88
import { useQuery } from '../src/hooks/watched/useQuery';
99
import { QuerySyncStreamOptions } from '../src/hooks/watched/watch-types';
1010

@@ -142,6 +142,102 @@ describe('stream hooks', () => {
142142
await waitFor(() => expect(currentStreams()).toHaveLength(0), { timeout: 1000, interval: 100 });
143143
});
144144

145+
it('useSyncStreams subscribes and unsubscribes', async () => {
146+
expect(currentStreams()).toStrictEqual([]);
147+
148+
const { result, unmount } = renderHook(() => useSyncStreams([{ name: 'a' }, { name: 'b' }]), {
149+
wrapper: testWrapper
150+
});
151+
expect(result.current).toStrictEqual([null, null]);
152+
await waitFor(() => expect(currentStreams()).toHaveLength(2), { timeout: 1000, interval: 100 });
153+
await waitFor(() => expect(result.current.every((s) => s !== null)).toBe(true), {
154+
timeout: 1000,
155+
interval: 100
156+
});
157+
158+
unmount();
159+
expect(currentStreams()).toStrictEqual([]);
160+
});
161+
162+
it('useSyncStreams with cached instance', async () => {
163+
const existingSubscription = await db.syncStream('a').subscribe();
164+
await existingSubscription.unsubscribe();
165+
166+
const { result } = renderHook(() => useSyncStreams([{ name: 'a' }]), {
167+
wrapper: testWrapper
168+
});
169+
expect(result.current[0]).not.toBeNull();
170+
});
171+
172+
it('useSyncStreams handles array growing from 1 to 2 entries', async () => {
173+
let streamOptions: UseSyncStreamOptions[] = [{ name: 'a' }];
174+
let streamUpdateListeners: (() => void)[] = [];
175+
176+
const { result } = renderHook(
177+
() => {
178+
const options = useSyncExternalStore(
179+
(cb) => {
180+
streamUpdateListeners.push(cb);
181+
return () => {
182+
const index = streamUpdateListeners.indexOf(cb);
183+
if (index != -1) {
184+
streamUpdateListeners.splice(index, 1);
185+
}
186+
};
187+
},
188+
() => streamOptions
189+
);
190+
return useSyncStreams(options);
191+
},
192+
{ wrapper: testWrapper }
193+
);
194+
195+
await waitFor(() => expect(currentStreams()).toHaveLength(1), { timeout: 1000, interval: 100 });
196+
197+
// Grow to 2 entries
198+
streamOptions = [{ name: 'a' }, { name: 'b' }];
199+
act(() => streamUpdateListeners.forEach((cb) => cb()));
200+
201+
await waitFor(() => expect(currentStreams()).toHaveLength(2), { timeout: 1000, interval: 100 });
202+
expect(result.current).toHaveLength(2);
203+
});
204+
205+
it('useSyncStreams handles array shrinking from 2 to 1 entries', async () => {
206+
let streamOptions: UseSyncStreamOptions[] = [{ name: 'a' }, { name: 'b' }];
207+
let streamUpdateListeners: (() => void)[] = [];
208+
209+
const { result, unmount } = renderHook(
210+
() => {
211+
const options = useSyncExternalStore(
212+
(cb) => {
213+
streamUpdateListeners.push(cb);
214+
return () => {
215+
const index = streamUpdateListeners.indexOf(cb);
216+
if (index != -1) {
217+
streamUpdateListeners.splice(index, 1);
218+
}
219+
};
220+
},
221+
() => streamOptions
222+
);
223+
return useSyncStreams(options);
224+
},
225+
{ wrapper: testWrapper }
226+
);
227+
228+
await waitFor(() => expect(currentStreams()).toHaveLength(2), { timeout: 1000, interval: 100 });
229+
230+
// Shrink to 1 entry
231+
streamOptions = [{ name: 'a' }];
232+
act(() => streamUpdateListeners.forEach((cb) => cb()));
233+
234+
await waitFor(() => expect(currentStreams()).toHaveLength(1), { timeout: 1000, interval: 100 });
235+
expect(result.current).toHaveLength(1);
236+
237+
unmount();
238+
expect(currentStreams()).toStrictEqual([]);
239+
});
240+
145241
it('handles stream parameter changes', async () => {
146242
// Start without streams
147243
let streams: QuerySyncStreamOptions[] = [];

packages/tanstack-react-query/tests/streams.test.tsx

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,6 @@ describe('stream hooks', () => {
140140
// Adopt streams - this should reset back to loading
141141
streams = [{ name: 'a', waitForStream: true }];
142142
act(() => streamUpdateListeners.forEach((cb) => cb()));
143-
144-
expect(result.current).toMatchObject({ isFetching: true });
145143
expect(result.current).toMatchObject({ isPending: true });
146144

147145
// ... and subscribe

0 commit comments

Comments
 (0)