Skip to content

Commit 292dd75

Browse files
committed
Refactor to use ThrottleDebounce package, fix throttle being leading/trailing instead of just trailing
1 parent 54bdaad commit 292dd75

File tree

3 files changed

+75
-95
lines changed

3 files changed

+75
-95
lines changed

PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs

Lines changed: 45 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
namespace PowerSync.Common.Client;
22

3-
using System.Diagnostics;
43
using System.Runtime.CompilerServices;
54
using System.Text.RegularExpressions;
5+
using System.Threading.Channels;
66
using System.Threading.Tasks;
77

88
using Microsoft.Extensions.Logging;
99
using Microsoft.Extensions.Logging.Abstractions;
1010

1111
using Newtonsoft.Json;
12-
1312
using Nito.AsyncEx;
13+
using ThrottleDebounce;
1414

1515
using PowerSync.Common.Client.Connection;
1616
using PowerSync.Common.Client.Sync.Bucket;
@@ -990,7 +990,7 @@ internal async Task<HashSet<string>> GetSourceTables(string sql, object?[]? para
990990
private async IAsyncEnumerable<WatchOnChangeEvent> OnRawTableChange(
991991
HashSet<string> watchedTables,
992992
IAsyncEnumerable<DBAdapterEvents.TablesUpdatedEvent> listener,
993-
[EnumeratorCancellation] CancellationToken token,
993+
[EnumeratorCancellation] CancellationToken signal,
994994
bool triggerImmediately = false,
995995
int throttleMs = DEFAULT_WATCH_THROTTLE_MS
996996
)
@@ -1014,102 +1014,66 @@ private async IAsyncEnumerable<WatchOnChangeEvent> OnRawTableChange(
10141014
yield break;
10151015
}
10161016

1017-
// Leading + trailing edge throttle
1018-
var enumerator = listener.GetAsyncEnumerator(token);
1019-
try
1020-
{
1021-
var accumulatedTables = new HashSet<string>();
1022-
var changedTables = new HashSet<string>();
1023-
long lastYieldTime = 0;
1024-
1025-
Task<bool> moveNextTask = enumerator.MoveNextAsync().AsTask();
1026-
Task? throttleTask = null;
1027-
1028-
while (true)
1029-
{
1030-
if (throttleTask != null)
1031-
await Task.WhenAny(moveNextTask, throttleTask);
1032-
else
1033-
{
1034-
try { await moveNextTask; }
1035-
catch (OperationCanceledException) { break; }
1036-
}
1017+
// Throttled - publish via throttled call to an action that flushes accumulated changes into this channel
1018+
var channel = Channel.CreateUnbounded<WatchOnChangeEvent>();
1019+
var accumulatedTables = new HashSet<string>();
10371020

1038-
if (throttleTask != null && throttleTask.IsCompleted && !moveNextTask.IsCompleted)
1021+
_ = Task.Run(async () =>
1022+
{
1023+
using var throttledFlush = Throttler.Throttle(() =>
10391024
{
1040-
// Throttle timer expired without a new event
1041-
if (accumulatedTables.Count > 0)
1025+
// Safe to lock directly on accumulatedTables because it's a local variable
1026+
lock (accumulatedTables)
10421027
{
1043-
lastYieldTime = Stopwatch.GetTimestamp();
1044-
yield return new WatchOnChangeEvent { ChangedTables = [.. accumulatedTables] };
1028+
if (accumulatedTables.Count == 0) return;
1029+
channel.Writer.TryWrite(new WatchOnChangeEvent { ChangedTables = [.. accumulatedTables] });
10451030
accumulatedTables.Clear();
10461031
}
1047-
throttleTask = null;
1048-
continue;
1049-
}
1050-
1051-
// A new event arrived (possibly alongside throttle)
1052-
// Check if the event actually exists or if this is the end of the enumerator
1053-
bool hasNext;
1054-
try { hasNext = await moveNextTask; }
1055-
catch (OperationCanceledException) { break; }
1056-
if (!hasNext) break;
1057-
1058-
// Accumulate changed tables from the most recent OnTablesUpdated event
1059-
GetTablesFromNotification(enumerator.Current.TablesUpdated, changedTables);
1060-
1061-
// Filter only watched tables and add to accumulatedTables set
1062-
changedTables.IntersectWith(watchedTables);
1063-
accumulatedTables.UnionWith(changedTables);
1032+
},
1033+
TimeSpan.FromMilliseconds(throttleMs),
1034+
leading: false,
1035+
trailing: true
1036+
);
10641037

1065-
if (accumulatedTables.Count > 0)
1038+
try
1039+
{
1040+
var changedTables = new HashSet<string>();
1041+
await foreach (var e in listener)
10661042
{
1067-
var now = Stopwatch.GetTimestamp();
1043+
GetTablesFromNotification(e.TablesUpdated, changedTables);
1044+
changedTables.IntersectWith(watchedTables);
1045+
if (changedTables.Count == 0) continue;
10681046

1069-
// There's a nice built-in method for this (Stopwatch.GetElapsedTime), but
1070-
// it's not supported in .NET 6.0. :(
1071-
var elapsedMs = (now - lastYieldTime) * 1000.0 / Stopwatch.Frequency;
1072-
1073-
if (elapsedMs >= throttleMs)
1047+
lock (accumulatedTables) { accumulatedTables.UnionWith(changedTables); }
1048+
throttledFlush.Invoke();
1049+
}
1050+
}
1051+
catch (OperationCanceledException) { }
1052+
finally
1053+
{
1054+
// Flush any remaining events and close the channel
1055+
lock (accumulatedTables)
1056+
{
1057+
if (accumulatedTables.Count > 0)
10741058
{
1075-
// First event since throttle expiration
1076-
// Fire immediately (leading edge) and reset throttle timer
1077-
lastYieldTime = now;
1078-
yield return new WatchOnChangeEvent { ChangedTables = [.. accumulatedTables] };
1059+
channel.Writer.TryWrite(new WatchOnChangeEvent { ChangedTables = [.. accumulatedTables] });
10791060
accumulatedTables.Clear();
1080-
throttleTask = null;
1081-
}
1082-
else
1083-
{
1084-
throttleTask ??= Task.Delay((int)(throttleMs - elapsedMs), token);
10851061
}
10861062
}
1087-
1088-
moveNextTask = enumerator.MoveNextAsync().AsTask();
1063+
channel.Writer.Complete();
10891064
}
1065+
});
10901066

1091-
// Flush any remaining events
1092-
if (accumulatedTables.Count > 0)
1093-
yield return new WatchOnChangeEvent { ChangedTables = [.. accumulatedTables] };
1094-
}
1095-
finally
1067+
// Continuously pull values from channel and publish to the consumer
1068+
while (await channel.Reader.WaitToReadAsync(CancellationToken.None))
10961069
{
1097-
await enumerator.DisposeAsync();
1070+
while (channel.Reader.TryRead(out var evt))
1071+
{
1072+
yield return evt;
1073+
}
10981074
}
10991075
}
11001076

1101-
private static void AccumulateMatchingTables(
1102-
DBAdapterEvents.TablesUpdatedEvent e,
1103-
HashSet<string> watchedTables,
1104-
HashSet<string> accumulated
1105-
)
1106-
{
1107-
var tables = new HashSet<string>();
1108-
GetTablesFromNotification(e.TablesUpdated, tables);
1109-
tables.IntersectWith(watchedTables);
1110-
accumulated.UnionWith(tables);
1111-
}
1112-
11131077
private static void GetTablesFromNotification(INotification updateNotification, HashSet<string> changedTables)
11141078
{
11151079
changedTables.Clear();

PowerSync/PowerSync.Common/PowerSync.Common.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
3636
<PackageReference Include="Nito.AsyncEx" Version="5.1.2" />
3737
<PackageReference Include="System.Threading.Channels" Version="8.0.0" />
38+
<PackageReference Include="ThrottleDebounce" Version="2.0.1" />
3839
</ItemGroup>
3940

4041
<ItemGroup>

Tests/PowerSync/PowerSync.Common.Tests/Client/PowerSyncDatabaseTests.cs

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -867,13 +867,13 @@ public async Task Watch_CancelsOnTokenCancellation()
867867
public async Task OnChange_ThrottlesBatchesRapidChanges()
868868
{
869869
int eventCount = 0;
870+
using var sem = new SemaphoreSlim(0);
870871
var tcs = new TaskCompletionSource<bool>();
871872

872873
var listener = db.OnChange(new SQLWatchOptions
873874
{
874875
Tables = ["assets"],
875876
Signal = testCts.Token,
876-
ThrottleMs = 200,
877877
});
878878

879879
_ = Task.Run(async () =>
@@ -883,6 +883,7 @@ public async Task OnChange_ThrottlesBatchesRapidChanges()
883883
await foreach (var _ in listener)
884884
{
885885
Interlocked.Increment(ref eventCount);
886+
sem.Release();
886887
}
887888
tcs.TrySetResult(true);
888889
}
@@ -897,23 +898,25 @@ public async Task OnChange_ThrottlesBatchesRapidChanges()
897898
await TestUtils.InsertRandomAsset(db);
898899
}
899900

901+
// Wait for the throttled event to arrive
902+
Assert.True(await sem.WaitAsync(200));
900903
testCts.Cancel();
901904
Assert.True(await tcs.Task);
902-
903-
Assert.True(eventCount < 5, $"Expected fewer than 5 events but got {eventCount}");
905+
Assert.Equal(1, eventCount);
904906
}
905907

906908
[Fact(Timeout = 5000)]
907909
public async Task Watch_ThrottlesBatchesRapidChanges()
908910
{
909911
int eventCount = 0;
910912
long lastCount = 0;
913+
using var sem = new SemaphoreSlim(0);
911914
var tcs = new TaskCompletionSource<bool>();
912915

913916
var listener = db.Watch<CountResult>(
914917
"SELECT COUNT(*) AS count FROM assets",
915918
null,
916-
new() { Signal = testCts.Token, ThrottleMs = 200 });
919+
new() { Signal = testCts.Token });
917920

918921
_ = Task.Run(async () =>
919922
{
@@ -923,6 +926,7 @@ public async Task Watch_ThrottlesBatchesRapidChanges()
923926
{
924927
lastCount = rows.First().count;
925928
Interlocked.Increment(ref eventCount);
929+
sem.Release();
926930
}
927931
tcs.TrySetResult(true);
928932
}
@@ -937,6 +941,7 @@ public async Task Watch_ThrottlesBatchesRapidChanges()
937941
await TestUtils.InsertRandomAsset(db);
938942
}
939943

944+
Assert.True(await sem.WaitAsync(200));
940945
testCts.Cancel();
941946
Assert.True(await tcs.Task);
942947

@@ -976,7 +981,7 @@ public async Task OnChange_NoThrottleWhenZero()
976981
}
977982

978983
[Fact(Timeout = 5000)]
979-
public async Task OnChange_FirstChangeIsNotDelayed()
984+
public async Task OnChange_FirstChangeIsDelayedByThrottle()
980985
{
981986
using var sem = new SemaphoreSlim(0);
982987
var sw = Stopwatch.StartNew();
@@ -1000,26 +1005,30 @@ public async Task OnChange_FirstChangeIsNotDelayed()
10001005
await db.Execute("INSERT INTO assets(id, description) VALUES(?, ?)", [Guid.NewGuid().ToString(), "test"]);
10011006

10021007
Assert.True(await sem.WaitAsync(2000));
1003-
Assert.True(sw.ElapsedMilliseconds < 200, $"First event took {sw.ElapsedMilliseconds}ms, expected <200ms");
1008+
Assert.True(sw.ElapsedMilliseconds >= 400, $"First event took {sw.ElapsedMilliseconds}ms, expected >=400ms (trailing-only throttle)");
10041009
}
10051010

10061011
[Fact(Timeout = 5000)]
10071012
public async Task OnChange_ThrottleCancelledCleanly()
10081013
{
1014+
int eventCount = 0;
10091015
var tcs = new TaskCompletionSource<bool>();
10101016

10111017
var listener = db.OnChange(new SQLWatchOptions
10121018
{
10131019
Tables = ["assets"],
10141020
Signal = testCts.Token,
1015-
ThrottleMs = 500,
1021+
ThrottleMs = 200,
10161022
});
10171023

10181024
_ = Task.Run(async () =>
10191025
{
10201026
try
10211027
{
1022-
await foreach (var _ in listener) { }
1028+
await foreach (var _ in listener)
1029+
{
1030+
Interlocked.Increment(ref eventCount);
1031+
}
10231032
tcs.TrySetResult(true);
10241033
}
10251034
catch (Exception ex)
@@ -1028,19 +1037,22 @@ public async Task OnChange_ThrottleCancelledCleanly()
10281037
}
10291038
});
10301039

1031-
// Insert to trigger the throttle delay, then cancel before the window expires
1040+
// Insert to trigger the throttle, then cancel before the window expires
10321041
await db.Execute("INSERT INTO assets(id, description) VALUES(?, ?)", [Guid.NewGuid().ToString(), "test"]);
1033-
await Task.Delay(50);
1042+
await Task.Delay(100);
10341043
testCts.Cancel();
10351044

10361045
Assert.True(await tcs.Task);
1046+
// The flush-on-cancel should still deliver the accumulated event
1047+
Assert.Equal(1, eventCount);
10371048
}
10381049

10391050
[Fact]
10401051
public async Task Watch_SingleEventForBatchedQuery()
10411052
{
10421053
int eventCount = 0;
10431054
long lastCount = 0;
1055+
using var sem = new SemaphoreSlim(0);
10441056
var tcs = new TaskCompletionSource<bool>();
10451057

10461058
var listener = db.Watch<CountResult>(
@@ -1054,14 +1066,15 @@ public async Task Watch_SingleEventForBatchedQuery()
10541066
{
10551067
lastCount = rows[0].count;
10561068
Interlocked.Increment(ref eventCount);
1069+
sem.Release();
10571070
}
10581071
tcs.TrySetResult(true);
10591072
});
10601073

1061-
// Long batched query
1062-
const int QUERY_COUNT = 10000;
1074+
const int QUERY_COUNT = 1000;
10631075
await TestUtils.InsertRandomAssets(db, QUERY_COUNT);
10641076

1077+
Assert.True(await sem.WaitAsync(200));
10651078
testCts.Cancel();
10661079
Assert.True(await tcs.Task);
10671080
Assert.Equal(1, eventCount);
@@ -1072,6 +1085,7 @@ public async Task Watch_SingleEventForBatchedQuery()
10721085
public async Task OnChange_SingleEventsForBatchedQuery()
10731086
{
10741087
int eventCount = 0;
1088+
using var sem = new SemaphoreSlim(0);
10751089
var tcs = new TaskCompletionSource<bool>();
10761090

10771091
var listener = db.OnChange(new()
@@ -1085,14 +1099,15 @@ public async Task OnChange_SingleEventsForBatchedQuery()
10851099
await foreach (var _ in listener)
10861100
{
10871101
Interlocked.Increment(ref eventCount);
1102+
sem.Release();
10881103
}
10891104
tcs.TrySetResult(true);
10901105
});
10911106

1092-
// Long batched query
1093-
const int QUERY_COUNT = 10000;
1107+
const int QUERY_COUNT = 1000;
10941108
await TestUtils.InsertRandomAssets(db, QUERY_COUNT);
10951109

1110+
Assert.True(await sem.WaitAsync(200));
10961111
testCts.Cancel();
10971112
Assert.True(await tcs.Task);
10981113

0 commit comments

Comments
 (0)