@@ -1006,7 +1006,6 @@ private async IAsyncEnumerable<WatchOnChangeEvent> OnRawTableChange(
10061006 HashSet < string > changedTables = new ( ) ;
10071007 await foreach ( var e in listener )
10081008 {
1009- changedTables . Clear ( ) ;
10101009 GetTablesFromNotification ( e . TablesUpdated , changedTables ) ;
10111010 changedTables . IntersectWith ( watchedTables ) ;
10121011 if ( changedTables . Count == 0 ) continue ;
@@ -1016,12 +1015,14 @@ private async IAsyncEnumerable<WatchOnChangeEvent> OnRawTableChange(
10161015 }
10171016
10181017 // Leading + trailing edge throttle
1019- var listenerEnumerator = listener . GetAsyncEnumerator ( token ) ;
1018+ var enumerator = listener . GetAsyncEnumerator ( token ) ;
10201019 try
10211020 {
1022- var accumulated = new HashSet < string > ( ) ;
1021+ var accumulatedTables = new HashSet < string > ( ) ;
1022+ var changedTables = new HashSet < string > ( ) ;
10231023 long lastYieldTime = 0 ;
1024- Task < bool > moveNextTask = listenerEnumerator . MoveNextAsync ( ) . AsTask ( ) ;
1024+
1025+ Task < bool > moveNextTask = enumerator . MoveNextAsync ( ) . AsTask ( ) ;
10251026 Task ? throttleTask = null ;
10261027
10271028 while ( true )
@@ -1034,28 +1035,34 @@ private async IAsyncEnumerable<WatchOnChangeEvent> OnRawTableChange(
10341035 catch ( OperationCanceledException ) { break ; }
10351036 }
10361037
1037- // Throttle timer fired without a new event
10381038 if ( throttleTask != null && throttleTask . IsCompleted && ! moveNextTask . IsCompleted )
10391039 {
1040- if ( accumulated . Count > 0 )
1040+ // Throttle timer expired without a new event
1041+ if ( accumulatedTables . Count > 0 )
10411042 {
10421043 lastYieldTime = Stopwatch . GetTimestamp ( ) ;
1043- yield return new WatchOnChangeEvent { ChangedTables = [ .. accumulated ] } ;
1044- accumulated . Clear ( ) ;
1044+ yield return new WatchOnChangeEvent { ChangedTables = [ .. accumulatedTables ] } ;
1045+ accumulatedTables . Clear ( ) ;
10451046 }
10461047 throttleTask = null ;
10471048 continue ;
10481049 }
10491050
10501051 // A new event arrived (possibly alongside throttle)
1052+ // Check if the event actually exists or if this is the end of the enumerator
10511053 bool hasNext ;
10521054 try { hasNext = await moveNextTask ; }
10531055 catch ( OperationCanceledException ) { break ; }
10541056 if ( ! hasNext ) break ;
10551057
1056- AccumulateMatchingTables ( listenerEnumerator . Current , watchedTables , accumulated ) ;
1058+ // Accumulate changed tables from the most recent OnTablesUpdated event
1059+ GetTablesFromNotification ( enumerator . Current . TablesUpdated , changedTables ) ;
1060+
1061+ // Filter only watched tables and add to accumulatedTables set
1062+ changedTables . IntersectWith ( watchedTables ) ;
1063+ accumulatedTables . UnionWith ( changedTables ) ;
10571064
1058- if ( accumulated . Count > 0 )
1065+ if ( accumulatedTables . Count > 0 )
10591066 {
10601067 var now = Stopwatch . GetTimestamp ( ) ;
10611068
@@ -1065,10 +1072,11 @@ private async IAsyncEnumerable<WatchOnChangeEvent> OnRawTableChange(
10651072
10661073 if ( elapsedMs >= throttleMs )
10671074 {
1068- // First event in series - fire immediately and start collecting future events
1075+ // First event since throttle expiration
1076+ // Fire immediately (leading edge) and reset throttle timer
10691077 lastYieldTime = now ;
1070- yield return new WatchOnChangeEvent { ChangedTables = [ .. accumulated ] } ;
1071- accumulated . Clear ( ) ;
1078+ yield return new WatchOnChangeEvent { ChangedTables = [ .. accumulatedTables ] } ;
1079+ accumulatedTables . Clear ( ) ;
10721080 throttleTask = null ;
10731081 }
10741082 else
@@ -1077,15 +1085,16 @@ private async IAsyncEnumerable<WatchOnChangeEvent> OnRawTableChange(
10771085 }
10781086 }
10791087
1080- moveNextTask = listenerEnumerator . MoveNextAsync ( ) . AsTask ( ) ;
1088+ moveNextTask = enumerator . MoveNextAsync ( ) . AsTask ( ) ;
10811089 }
10821090
1083- if ( accumulated . Count > 0 )
1084- yield return new WatchOnChangeEvent { ChangedTables = [ .. accumulated ] } ;
1091+ // Flush any remaining events
1092+ if ( accumulatedTables . Count > 0 )
1093+ yield return new WatchOnChangeEvent { ChangedTables = [ .. accumulatedTables ] } ;
10851094 }
10861095 finally
10871096 {
1088- await listenerEnumerator . DisposeAsync ( ) ;
1097+ await enumerator . DisposeAsync ( ) ;
10891098 }
10901099 }
10911100
@@ -1103,6 +1112,7 @@ HashSet<string> accumulated
11031112
11041113 private static void GetTablesFromNotification ( INotification updateNotification , HashSet < string > changedTables )
11051114 {
1115+ changedTables . Clear ( ) ;
11061116 string [ ] tables = [ ] ;
11071117 if ( updateNotification is BatchedUpdateNotification batchedUpdate )
11081118 {
0 commit comments