Skip to content

Commit a43d126

Browse files
committed
Merge branch 'main' into chore/test-improvements
2 parents 29f3ae3 + 4d51235 commit a43d126

File tree

18 files changed

+496
-129
lines changed

18 files changed

+496
-129
lines changed

PowerSync/PowerSync.Common/CHANGELOG.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,49 @@
22

33
## 0.0.11-alpha.1
44

5+
- `StatusUpdated` and `StatusChanged` now both emit `SyncStatus` objects instead of just `StatusChanged`.
6+
- Converted most instances of a class inheriting from `EventStream<T>` into a class with an `EventManager` property called `Events`. This allows for subscribing to individual events instead of subscribing to all events and then filtering events manually.
7+
8+
```csharp
9+
// Old
10+
var listener = db.Listen(cts.Token);
11+
foreach (PowerSyncDBEvent update in listener)
12+
{
13+
// Manually filter updates
14+
if (update.StatusChanged != null)
15+
{
16+
Console.WriteLine("status changed: " + update.StatusChanged!);
17+
}
18+
}
19+
20+
// Old (async)
21+
var listener = db.ListenAsync(cts.Token);
22+
await foreach (PowerSyncDBEvent update in listener)
23+
{
24+
// Manually filter updates
25+
if (update.StatusChanged != null)
26+
{
27+
Console.WriteLine("status changed: " + update.StatusChanged!);
28+
}
29+
}
30+
31+
// New
32+
var listener = db.Events.OnStatusChanged.Listen(cts.Token);
33+
await foreach (PowerSyncDBEvents.StatusChanged update in listener)
34+
{
35+
// Events are filtered inherently
36+
Console.WriteLine("status changed: " + update.Status);
37+
}
38+
39+
// New (async) - recommended for most use cases
40+
var listener = db.Events.OnStatusChanged.ListenAsync(cts.Token);
41+
await foreach (PowerSyncDBEvents.StatusChanged update in listener)
42+
{
43+
// Events are filtered inherently
44+
Console.WriteLine("status changed: " + update.Status);
45+
}
46+
```
47+
548
- Pool read connections in `MDSQLiteAdapter`, improving performance in any case where multiple queries run simultaneously (eg. via `Watch`). The number of connections can be set via `MDSQLiteOptions.ReadPoolSize` and defaults to 5.
649
- Updated to the latest version (0.4.11) of the core extension.
750
- `MDSQLiteConnection` now runs query operations on another thread, which stops the caller thread from blocking.

PowerSync/PowerSync.Common/Client/ConnectionManager.cs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,25 @@ public class StoredConnectionOptions(
7979
public PowerSyncConnectionOptions Options { get; set; } = options;
8080
}
8181

82-
public class ConnectionManagerEvent
82+
public class ConnectionManagerEvents : EventManager
8383
{
84-
public StreamingSyncImplementation? SyncStreamCreated { get; set; }
84+
public interface IConnectionManagerEvent;
85+
86+
public class SyncStreamCreatedEvent(StreamingSyncImplementation ssi) : IConnectionManagerEvent
87+
{
88+
public StreamingSyncImplementation SyncStreamCreated { get; set; } = ssi;
89+
}
90+
91+
public EventStream<SyncStreamCreatedEvent> OnSyncStreamCreated { get; } = new();
92+
93+
public ConnectionManagerEvents()
94+
{
95+
Register(OnSyncStreamCreated);
96+
}
8597
}
8698

87-
public class ConnectionManager : EventStream<ConnectionManagerEvent>
99+
public class ConnectionManager : ICloseable
88100
{
89-
90101
/// <summary>
91102
/// Tracks active connection attempts
92103
/// </summary>
@@ -122,6 +133,7 @@ public class ConnectionManager : EventStream<ConnectionManagerEvent>
122133

123134
public IPowerSyncBackendConnector? Connector => PendingConnectionOptions?.Connector;
124135

136+
public ConnectionManagerEvents Events { get; protected set; } = new();
125137

126138
public PowerSyncConnectionOptions? ConnectionOptions => PendingConnectionOptions?.Options;
127139

@@ -148,9 +160,9 @@ public ConnectionManager(Func<IPowerSyncBackendConnector, CreateSyncImplementati
148160
SyncDisposer = null;
149161
}
150162

151-
public new void Close()
163+
public void Close()
152164
{
153-
base.Close();
165+
Events.Close();
154166
SyncStreamImplementation?.Close();
155167
SyncDisposer?.Invoke();
156168
}
@@ -274,7 +286,7 @@ async Task InitSyncStream()
274286
RetryDelayMs = options.RetryDelayMs,
275287
});
276288

277-
Emit(new ConnectionManagerEvent { SyncStreamCreated = result.Sync });
289+
Events.Emit(new ConnectionManagerEvents.SyncStreamCreatedEvent(result.Sync));
278290
SyncStreamImplementation = result.Sync;
279291
SyncDisposer = result.OnDispose;
280292
await SyncStreamImplementation.WaitForReady();

PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs

Lines changed: 66 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -53,17 +53,45 @@ public class PowerSyncDatabaseOptions() : BasePowerSyncDatabaseOptions()
5353
public Func<IPowerSyncBackendConnector, Remote>? RemoteFactory { get; set; }
5454
}
5555

56-
public class PowerSyncDBEvent : StreamingSyncImplementationEvent
56+
public class PowerSyncDBEvents : EventManager
5757
{
58-
public bool? Initialized { get; set; }
59-
public Schema? SchemaChanged { get; set; }
58+
public interface IPowerSyncDBEvent;
6059

61-
public bool? Closing { get; set; }
60+
public class InitializedEvent : IPowerSyncDBEvent;
61+
public class ClosingEvent : IPowerSyncDBEvent;
62+
public class ClosedEvent : IPowerSyncDBEvent;
63+
public class SchemaChangedEvent(Schema schema) : IPowerSyncDBEvent
64+
{
65+
public Schema Schema { get; set; } = schema;
66+
}
67+
public class StatusChangedEvent(SyncStatus status) : IPowerSyncDBEvent
68+
{
69+
public SyncStatus Status { get; set; } = status;
70+
}
71+
public class StatusUpdatedEvent(SyncStatusOptions status) : IPowerSyncDBEvent
72+
{
73+
public SyncStatusOptions Status { get; set; } = status;
74+
}
75+
76+
public EventStream<InitializedEvent> OnInitialized { get; } = new();
77+
public EventStream<ClosingEvent> OnClosing { get; } = new();
78+
public EventStream<ClosedEvent> OnClosed { get; } = new();
79+
public EventStream<SchemaChangedEvent> OnSchemaChanged { get; } = new();
80+
public EventStream<StatusChangedEvent> OnStatusChanged { get; } = new();
81+
public EventStream<StatusUpdatedEvent> OnStatusUpdated { get; } = new();
6282

63-
public bool? Closed { get; set; }
83+
public PowerSyncDBEvents()
84+
{
85+
Register(OnInitialized);
86+
Register(OnClosing);
87+
Register(OnClosed);
88+
Register(OnSchemaChanged);
89+
Register(OnStatusChanged);
90+
Register(OnStatusUpdated);
91+
}
6492
}
6593

66-
public interface IPowerSyncDatabase : IEventStream<PowerSyncDBEvent>
94+
public interface IPowerSyncDatabase : ICloseableAsync
6795
{
6896
public Task Connect(IPowerSyncBackendConnector connector, PowerSyncConnectionOptions? options = null);
6997
public ISyncStream SyncStream(string name, Dictionary<string, object>? parameters = null);
@@ -96,20 +124,21 @@ public interface IPowerSyncDatabase : IEventStream<PowerSyncDBEvent>
96124

97125
Task WriteTransaction(Func<ITransaction, Task> fn, DBLockOptions? options = null);
98126
Task<T> WriteTransaction<T>(Func<ITransaction, Task<T>> fn, DBLockOptions? options = null);
99-
100127
}
101128

102-
public class PowerSyncDatabase : EventStream<PowerSyncDBEvent>, IPowerSyncDatabase
129+
public class PowerSyncDatabase : IPowerSyncDatabase
103130
{
104131
public IDBAdapter Database { get; protected set; }
105132
private CompiledSchema schema;
106133

107134
private static readonly int DEFAULT_WATCH_THROTTLE_MS = 30;
108135
private static readonly Regex POWERSYNC_TABLE_MATCH = new Regex(@"(^ps_data__|^ps_data_local__)", RegexOptions.Compiled);
109136

110-
public new bool Closed { get; protected set; }
137+
public bool Closed { get; protected set; }
111138
public bool Ready { get; protected set; }
112139

140+
public PowerSyncDBEvents Events { get; protected set; } = new();
141+
113142
protected Task IsReadyTask;
114143
protected ConnectionManager ConnectionManager;
115144

@@ -138,7 +167,6 @@ public class PowerSyncDatabase : EventStream<PowerSyncDBEvent>, IPowerSyncDataba
138167
public bool Connected => CurrentStatus.Connected;
139168
public bool Connecting => CurrentStatus.Connecting;
140169

141-
142170
public PowerSyncConnectionOptions? ConnectionOptions => ConnectionManager.ConnectionOptions;
143171

144172
public PowerSyncDatabase(PowerSyncDatabaseOptions options)
@@ -205,19 +233,16 @@ public PowerSyncDatabase(PowerSyncDatabaseOptions options)
205233
});
206234

207235
var syncStreamStatusCts = CancellationTokenSource.CreateLinkedTokenSource(masterCts.Token);
208-
var syncStreamStatusListener = syncStreamImplementation.ListenAsync(syncStreamStatusCts.Token);
236+
var syncStreamStatusListener = syncStreamImplementation.Events.OnStatusChanged.ListenAsync(syncStreamStatusCts.Token);
209237
var _ = Task.Run(async () =>
210238
{
211239
await foreach (var update in syncStreamStatusListener)
212240
{
213-
if (update.StatusChanged != null)
241+
CurrentStatus = new SyncStatus(new SyncStatusOptions(update.Status.Options)
214242
{
215-
CurrentStatus = new SyncStatus(new SyncStatusOptions(update.StatusChanged.Options)
216-
{
217-
HasSynced = CurrentStatus?.HasSynced == true || update.StatusChanged.LastSyncedAt != null,
218-
});
219-
Emit(new PowerSyncDBEvent { StatusChanged = CurrentStatus });
220-
}
243+
HasSynced = CurrentStatus?.HasSynced == true || update.Status.LastSyncedAt != null,
244+
});
245+
Events.Emit(new PowerSyncDBEvents.StatusChangedEvent(CurrentStatus));
221246
}
222247
});
223248

@@ -285,7 +310,7 @@ public async Task WaitForStatus(Func<SyncStatus, bool> predicate, CancellationTo
285310
? CancellationTokenSource.CreateLinkedTokenSource(masterCts.Token)
286311
: CancellationTokenSource.CreateLinkedTokenSource(masterCts.Token, cancellationToken.Value);
287312

288-
var statusListener = ListenAsync(cts.Token);
313+
var statusListener = Events.OnStatusChanged.ListenAsync(cts.Token);
289314

290315
if (predicate(CurrentStatus))
291316
{
@@ -301,7 +326,7 @@ public async Task WaitForStatus(Func<SyncStatus, bool> predicate, CancellationTo
301326
{
302327
await foreach (var update in statusListener)
303328
{
304-
if ((update.StatusChanged != null) && predicate(update.StatusChanged))
329+
if (predicate(update.Status))
305330
{
306331
tcs.TrySetResult(true);
307332
cts.Cancel();
@@ -322,7 +347,7 @@ protected async Task Initialize(PowerSyncDatabaseOptions options)
322347
await ResolveOfflineSyncStatus();
323348
await Database.Execute("PRAGMA RECURSIVE_TRIGGERS=TRUE");
324349
Ready = true;
325-
Emit(new PowerSyncDBEvent { Initialized = true });
350+
Events.Emit(new PowerSyncDBEvents.InitializedEvent());
326351
}
327352

328353
private record VersionResult(string version);
@@ -366,7 +391,7 @@ protected async Task ResolveOfflineSyncStatus()
366391
if (!updatedStatus.IsEqual(CurrentStatus))
367392
{
368393
CurrentStatus = updatedStatus;
369-
Emit(new PowerSyncDBEvent { StatusChanged = CurrentStatus });
394+
Events.Emit(new PowerSyncDBEvents.StatusChangedEvent(CurrentStatus));
370395
}
371396
}
372397

@@ -394,7 +419,7 @@ public async Task UpdateSchema(Schema schema)
394419
this.schema = compiledSchema;
395420
await Database.Execute("SELECT powersync_replace_schema(?)", [compiledSchema.ToJSON()]);
396421
await Database.RefreshSchema();
397-
Emit(new PowerSyncDBEvent { SchemaChanged = schema });
422+
Events.Emit(new PowerSyncDBEvents.SchemaChangedEvent(schema));
398423
}
399424

400425
/// <summary>
@@ -458,7 +483,7 @@ await Database.WriteTransaction(async tx =>
458483

459484
// The data has been deleted - reset the sync status
460485
CurrentStatus = new SyncStatus(new SyncStatusOptions());
461-
Emit(new PowerSyncDBEvent { StatusChanged = CurrentStatus });
486+
Events.Emit(new PowerSyncDBEvents.StatusChangedEvent(CurrentStatus));
462487
}
463488

464489
/// <summary>
@@ -479,18 +504,16 @@ public ISyncStream SyncStream(string name, Dictionary<string, object>? parameter
479504
/// Once close is called, this connection cannot be used again - a new one
480505
/// must be constructed.
481506
/// </summary>
482-
public new async Task Close()
507+
public async Task Close()
483508
{
484509
await WaitForReady();
485510

486511
if (Closed) return;
487512

488-
Emit(new PowerSyncDBEvent { Closing = true });
513+
Events.Emit(new PowerSyncDBEvents.ClosingEvent());
489514

490515
await Disconnect();
491516

492-
base.Close();
493-
494517
masterCts.Cancel();
495518
masterCts.Dispose();
496519

@@ -499,7 +522,10 @@ public ISyncStream SyncStream(string name, Dictionary<string, object>? parameter
499522

500523
await Database.Close();
501524
Closed = true;
502-
Emit(new PowerSyncDBEvent { Closed = true });
525+
526+
Events.Emit(new PowerSyncDBEvents.ClosedEvent());
527+
528+
Events.Close();
503529
}
504530

505531
private record UploadQueueStatsSizeCountResult(long size, long count);
@@ -755,7 +781,7 @@ public IAsyncEnumerable<WatchOnChangeEvent> OnChange(SQLWatchOptions? options =
755781
? CancellationTokenSource.CreateLinkedTokenSource(masterCts.Token, options.Signal.Value)
756782
: CancellationTokenSource.CreateLinkedTokenSource(masterCts.Token);
757783

758-
var listener = Database.ListenAsync(signal.Token);
784+
var listener = Database.Events.OnTablesUpdated.ListenAsync(signal.Token);
759785

760786
// Return the actual IAsyncEnumerable here, using OnChange as a synchronous wrapper that blocks until the
761787
// connection is established
@@ -764,7 +790,7 @@ public IAsyncEnumerable<WatchOnChangeEvent> OnChange(SQLWatchOptions? options =
764790

765791
private async IAsyncEnumerable<WatchOnChangeEvent> OnChangeCore(
766792
HashSet<string> watchedTables,
767-
IAsyncEnumerable<DBAdapterEvent> listener,
793+
IAsyncEnumerable<DBAdapterEvents.TablesUpdatedEvent> listener,
768794
CancellationTokenSource signal,
769795
bool triggerImmediately
770796
)
@@ -818,7 +844,7 @@ public IAsyncEnumerable<T[]> Watch<T>(
818844
// so that table changes between Watch() being called and iteration starting are not missed.
819845
// This mirrors the pattern used in OnChange().
820846
var initialRestartCts = CancellationTokenSource.CreateLinkedTokenSource(signal.Token);
821-
var initialListener = Database.ListenAsync(initialRestartCts.Token);
847+
var initialListener = Database.Events.OnTablesUpdated.ListenAsync(initialRestartCts.Token);
822848

823849
return WatchCore<T>(sql, parameters, options, signal, initialRestartCts, initialListener);
824850
}
@@ -829,22 +855,19 @@ private async IAsyncEnumerable<T[]> WatchCore<T>(
829855
SQLWatchOptions options,
830856
CancellationTokenSource signal,
831857
CancellationTokenSource initialRestartCts,
832-
IAsyncEnumerable<DBAdapterEvent> initialListener
858+
IAsyncEnumerable<DBAdapterEvents.TablesUpdatedEvent> initialListener
833859
)
834860
{
835861
var schemaChanged = new TaskCompletionSource<bool>();
836862

837863
// Listen for schema changes in the background
838864
var schemaListenerTask = Task.Run(async () =>
839865
{
840-
await foreach (var update in ListenAsync(signal.Token))
866+
await foreach (var update in Events.OnSchemaChanged.ListenAsync(signal.Token))
841867
{
842-
if (update.SchemaChanged != null)
843-
{
844-
// Swap schemaChanged with an unresolved TCS
845-
var oldTcs = Interlocked.Exchange(ref schemaChanged, new());
846-
oldTcs.TrySetResult(true);
847-
}
868+
// Swap schemaChanged with an unresolved TCS
869+
var oldTcs = Interlocked.Exchange(ref schemaChanged, new());
870+
oldTcs.TrySetResult(true);
848871
}
849872
}, signal.Token);
850873

@@ -897,7 +920,7 @@ IAsyncEnumerable<DBAdapterEvent> initialListener
897920
// Establish a new listener BEFORE resolving source tables in the next iteration,
898921
// so that changes during the async GetSourceTables call are not missed.
899922
currentRestartCts = CancellationTokenSource.CreateLinkedTokenSource(signal.Token);
900-
currentListener = Database.ListenAsync(currentRestartCts.Token);
923+
currentListener = Database.Events.OnTablesUpdated.ListenAsync(currentRestartCts.Token);
901924
oldRestartCts.Dispose();
902925

903926
break;
@@ -961,7 +984,7 @@ internal async Task<HashSet<string>> GetSourceTables(string sql, object?[]? para
961984

962985
private async IAsyncEnumerable<WatchOnChangeEvent> OnRawTableChange(
963986
HashSet<string> watchedTables,
964-
IAsyncEnumerable<DBAdapterEvent> listener,
987+
IAsyncEnumerable<DBAdapterEvents.TablesUpdatedEvent> listener,
965988
[EnumeratorCancellation] CancellationToken token,
966989
bool triggerImmediately = false
967990
)
@@ -974,8 +997,7 @@ private async IAsyncEnumerable<WatchOnChangeEvent> OnRawTableChange(
974997
HashSet<string> changedTables = new();
975998
await foreach (var e in listener)
976999
{
977-
if (e.TablesUpdated == null) continue;
978-
1000+
// Extract the changed tables and intersect with the watched tables
9791001
changedTables.Clear();
9801002
GetTablesFromNotification(e.TablesUpdated, changedTables);
9811003
changedTables.IntersectWith(watchedTables);

0 commit comments

Comments
 (0)