@@ -2,14 +2,15 @@ namespace PowerSync.Common.Client;
22
33using System . Runtime . CompilerServices ;
44using System . Text . RegularExpressions ;
5+ using System . Threading . Channels ;
56using System . Threading . Tasks ;
67
78using Microsoft . Extensions . Logging ;
89using Microsoft . Extensions . Logging . Abstractions ;
910
1011using Newtonsoft . Json ;
11-
1212using Nito . AsyncEx ;
13+ using ThrottleDebounce ;
1314
1415using PowerSync . Common . Client . Connection ;
1516using PowerSync . Common . Client . Sync . Bucket ;
@@ -131,7 +132,7 @@ public class PowerSyncDatabase : IPowerSyncDatabase
131132 public IDBAdapter Database { get ; protected set ; }
132133 private CompiledSchema schema ;
133134
134- private static readonly int DEFAULT_WATCH_THROTTLE_MS = 30 ;
135+ private const int DEFAULT_WATCH_THROTTLE_MS = 30 ;
135136 private static readonly Regex POWERSYNC_TABLE_MATCH = new Regex ( @"(^ps_data__|^ps_data_local__)" , RegexOptions . Compiled ) ;
136137
137138 public bool Closed { get ; protected set ; }
@@ -785,19 +786,21 @@ public IAsyncEnumerable<WatchOnChangeEvent> OnChange(SQLWatchOptions? options =
785786
786787 // Return the actual IAsyncEnumerable here, using OnChange as a synchronous wrapper that blocks until the
787788 // connection is established
788- return OnChangeCore ( powersyncTables , listener , signal , options ? . TriggerImmediately == true ) ;
789+ var throttleMs = options ? . ThrottleMs ?? DEFAULT_WATCH_THROTTLE_MS ;
790+ return OnChangeCore ( powersyncTables , listener , signal , options ? . TriggerImmediately == true , throttleMs ) ;
789791 }
790792
791793 private async IAsyncEnumerable < WatchOnChangeEvent > OnChangeCore (
792794 HashSet < string > watchedTables ,
793795 IAsyncEnumerable < DBAdapterEvents . TablesUpdatedEvent > listener ,
794796 CancellationTokenSource signal ,
795- bool triggerImmediately
797+ bool triggerImmediately ,
798+ int throttleMs = DEFAULT_WATCH_THROTTLE_MS
796799 )
797800 {
798801 try
799802 {
800- await foreach ( var update in OnRawTableChange ( watchedTables , listener , signal . Token , triggerImmediately ) )
803+ await foreach ( var update in OnRawTableChange ( watchedTables , listener , signal . Token , triggerImmediately , throttleMs ) )
801804 {
802805 // Convert from 'ps_data__<name>' to '<name>'
803806 for ( int i = 0 ; i < update . ChangedTables . Length ; i ++ )
@@ -875,6 +878,7 @@ private async IAsyncEnumerable<T[]> WatchCore<T>(
875878 bool isRestart = false ;
876879 var currentRestartCts = initialRestartCts ;
877880 var currentListener = initialListener ;
881+ var throttleMs = options ? . ThrottleMs ?? DEFAULT_WATCH_THROTTLE_MS ;
878882
879883 try
880884 {
@@ -898,7 +902,8 @@ private async IAsyncEnumerable<T[]> WatchCore<T>(
898902 powersyncTables ,
899903 currentListener ,
900904 currentRestartCts . Token ,
901- isRestart || ( options ? . TriggerImmediately == true )
905+ isRestart || ( options ? . TriggerImmediately == true ) ,
906+ throttleMs
902907 ) . GetAsyncEnumerator ( ) ;
903908
904909 // Continually wait for either OnChange or SchemaChanged to fire
@@ -985,31 +990,93 @@ internal async Task<HashSet<string>> GetSourceTables(string sql, object?[]? para
985990 private async IAsyncEnumerable < WatchOnChangeEvent > OnRawTableChange (
986991 HashSet < string > watchedTables ,
987992 IAsyncEnumerable < DBAdapterEvents . TablesUpdatedEvent > listener ,
988- [ EnumeratorCancellation ] CancellationToken token ,
989- bool triggerImmediately = false
993+ [ EnumeratorCancellation ] CancellationToken signal ,
994+ bool triggerImmediately = false ,
995+ int throttleMs = DEFAULT_WATCH_THROTTLE_MS
990996 )
991997 {
992998 if ( triggerImmediately )
993999 {
9941000 yield return new WatchOnChangeEvent { ChangedTables = [ ] } ;
9951001 }
9961002
997- HashSet < string > changedTables = new ( ) ;
998- await foreach ( var e in listener )
1003+ if ( throttleMs <= 0 )
9991004 {
1000- // Extract the changed tables and intersect with the watched tables
1001- changedTables . Clear ( ) ;
1002- GetTablesFromNotification ( e . TablesUpdated , changedTables ) ;
1003- changedTables . IntersectWith ( watchedTables ) ;
1005+ // No throttling
1006+ HashSet < string > changedTables = new ( ) ;
1007+ await foreach ( var e in listener )
1008+ {
1009+ GetTablesFromNotification ( e . TablesUpdated , changedTables ) ;
1010+ changedTables . IntersectWith ( watchedTables ) ;
1011+ if ( changedTables . Count == 0 ) continue ;
1012+ yield return new WatchOnChangeEvent { ChangedTables = [ .. changedTables ] } ;
1013+ }
1014+ yield break ;
1015+ }
10041016
1005- if ( changedTables . Count == 0 ) continue ;
1017+ // Throttled - publish via throttled call to an action that flushes accumulated changes into this channel
1018+ var channel = Channel . CreateUnbounded < WatchOnChangeEvent > ( ) ;
1019+ var accumulatedTables = new HashSet < string > ( ) ;
1020+
1021+ _ = Task . Run ( async ( ) =>
1022+ {
1023+ using var throttledFlush = Throttler . Throttle ( ( ) =>
1024+ {
1025+ // Safe to lock directly on accumulatedTables because it's a local variable
1026+ lock ( accumulatedTables )
1027+ {
1028+ if ( accumulatedTables . Count == 0 ) return ;
1029+ channel . Writer . TryWrite ( new WatchOnChangeEvent { ChangedTables = [ .. accumulatedTables ] } ) ;
1030+ accumulatedTables . Clear ( ) ;
1031+ }
1032+ } ,
1033+ TimeSpan . FromMilliseconds ( throttleMs ) ,
1034+ leading : false ,
1035+ trailing : true
1036+ ) ;
1037+
1038+ try
1039+ {
1040+ var changedTables = new HashSet < string > ( ) ;
1041+ await foreach ( var e in listener )
1042+ {
1043+ GetTablesFromNotification ( e . TablesUpdated , changedTables ) ;
1044+ changedTables . IntersectWith ( watchedTables ) ;
1045+ if ( changedTables . Count == 0 ) continue ;
10061046
1007- yield return new WatchOnChangeEvent { ChangedTables = [ .. changedTables ] } ;
1047+ lock ( accumulatedTables ) { accumulatedTables . UnionWith ( changedTables ) ; }
1048+ throttledFlush . Invoke ( ) ;
1049+ }
1050+ }
1051+ catch ( OperationCanceledException ) { }
1052+ finally
1053+ {
1054+ // Flush any remaining events and close the channel
1055+ lock ( accumulatedTables )
1056+ {
1057+ if ( accumulatedTables . Count > 0 )
1058+ {
1059+ channel . Writer . TryWrite ( new WatchOnChangeEvent { ChangedTables = [ .. accumulatedTables ] } ) ;
1060+ accumulatedTables . Clear ( ) ;
1061+ }
1062+ }
1063+ channel . Writer . Complete ( ) ;
1064+ }
1065+ } ) ;
1066+
1067+ // Continuously pull values from channel and publish to the consumer
1068+ while ( await channel . Reader . WaitToReadAsync ( CancellationToken . None ) )
1069+ {
1070+ while ( channel . Reader . TryRead ( out var evt ) )
1071+ {
1072+ yield return evt ;
1073+ }
10081074 }
10091075 }
10101076
10111077 private static void GetTablesFromNotification ( INotification updateNotification , HashSet < string > changedTables )
10121078 {
1079+ changedTables . Clear ( ) ;
10131080 string [ ] tables = [ ] ;
10141081 if ( updateNotification is BatchedUpdateNotification batchedUpdate )
10151082 {
0 commit comments