@@ -5,7 +5,6 @@ namespace PowerSync.Common.Tests;
55
66public class EventStreamTests
77{
8-
98 [ Fact ]
109 public async Task EventStream_ShouldReceiveTwoMessages_Async ( )
1110 {
@@ -109,4 +108,136 @@ public async Task EventStream_ShouldReceiveTwoMessages_Sync()
109108 Assert . Contains ( status2 , receivedMessages ) ;
110109 Assert . Equal ( 0 , eventStream . SubscriberCount ( ) ) ;
111110 }
111+
112+ [ Fact ]
113+ public async Task EventManager_RegistersStreamsCorrectly ( )
114+ {
115+ var manager = new EventManager ( ) ;
116+ var stream1 = new EventStream < bool > ( ) ;
117+ var stream2 = new EventStream < string > ( ) ;
118+ var stream3 = new EventStream < int > ( ) ; // Control
119+
120+ manager . Register ( stream1 ) ;
121+ manager . Register ( stream2 ) ;
122+
123+ Assert . True ( manager . TryGetStream < bool > ( out var obtainedStream1 ) ) ;
124+ Assert . True ( manager . TryGetStream < string > ( out var obtainedStream2 ) ) ;
125+ Assert . False ( manager . TryGetStream < int > ( out var obtainedStream3 ) ) ;
126+
127+ Assert . Equal ( stream1 , obtainedStream1 ) ;
128+ Assert . Equal ( stream2 , obtainedStream2 ) ;
129+ Assert . Equal ( null , obtainedStream3 ) ;
130+
131+ manager . Close ( ) ;
132+ }
133+
134+ [ Fact ]
135+ public async Task EventManager_CloseRemovesAndClosesStreams ( )
136+ {
137+ var manager = new EventManager ( ) ;
138+ var stream1 = new EventStream < bool > ( ) ;
139+ var stream2 = new EventStream < string > ( ) ;
140+ var stream3 = new EventStream < int > ( ) ; // Control
141+
142+ manager . Register ( stream1 ) ;
143+ manager . Register ( stream2 ) ;
144+
145+ manager . Close ( ) ;
146+
147+ Assert . False ( manager . TryGetStream < bool > ( out var obtainedStream1 ) ) ;
148+ Assert . False ( manager . TryGetStream < string > ( out var obtainedStream2 ) ) ;
149+ Assert . False ( manager . TryGetStream < int > ( out var obtainedStream3 ) ) ;
150+
151+ Assert . Equal ( null , obtainedStream1 ) ;
152+ Assert . Equal ( null , obtainedStream2 ) ;
153+ Assert . Equal ( null , obtainedStream3 ) ;
154+
155+ Assert . True ( stream1 . Closed ) ;
156+ Assert . True ( stream2 . Closed ) ;
157+ Assert . False ( stream3 . Closed ) ;
158+ }
159+
160+ [ Fact ( Timeout = 2000 ) ]
161+ public async Task EventManager_ShouldReceiveEmittedEvents ( )
162+ {
163+ var manager = new EventManager ( ) ;
164+ var stream1 = new EventStream < bool > ( ) ;
165+ var stream2 = new EventStream < string > ( ) ;
166+
167+ manager . Register ( stream1 ) ;
168+ manager . Register ( stream2 ) ;
169+
170+ var cts1 = new CancellationTokenSource ( ) ;
171+ var listener1 = stream1 . ListenAsync ( cts1 . Token ) ;
172+ Assert . True ( manager . TryEmit ( false ) ) ;
173+ Assert . True ( manager . TryEmit ( false ) ) ;
174+ Assert . True ( manager . TryEmit ( true ) ) ;
175+ int eventCount = 0 ;
176+
177+ await foreach ( var evt in listener1 )
178+ {
179+ eventCount ++ ;
180+ if ( evt == true )
181+ {
182+ cts1 . Cancel ( ) ;
183+ }
184+ }
185+
186+ Assert . Equal ( 3 , eventCount ) ;
187+
188+ var cts2 = new CancellationTokenSource ( ) ;
189+ var listener2 = stream2 . ListenAsync ( cts2 . Token ) ;
190+ Assert . True ( manager . TryEmit ( "hi" ) ) ;
191+ Assert . True ( manager . TryEmit ( "hello" ) ) ;
192+ Assert . True ( manager . TryEmit ( "sup" ) ) ;
193+ Assert . True ( manager . TryEmit ( "STOP" ) ) ;
194+ eventCount = 0 ;
195+
196+ await foreach ( var evt in listener2 )
197+ {
198+ eventCount ++ ;
199+ if ( evt == "STOP" )
200+ {
201+ cts2 . Cancel ( ) ;
202+ }
203+ }
204+
205+ Assert . Equal ( 4 , eventCount ) ;
206+
207+ manager . Close ( ) ;
208+ }
209+
210+ [ Fact ]
211+ public async Task EventManager_ShouldNotReceiveEventsAfterDeregistering ( )
212+ {
213+ var manager = new EventManager ( ) ;
214+ var stream = new EventStream < string > ( ) ;
215+
216+ manager . Register ( stream ) ;
217+
218+ var cts = new CancellationTokenSource ( ) ;
219+ var listener = stream . ListenAsync ( cts . Token ) ;
220+ var sem = new SemaphoreSlim ( 0 ) ;
221+ int eventCount = 0 ;
222+
223+ _ = Task . Run ( async ( ) =>
224+ {
225+ sem . Release ( ) ;
226+ await foreach ( var evt in listener )
227+ {
228+ eventCount ++ ;
229+ sem . Release ( ) ;
230+ }
231+ } , cts . Token ) ;
232+ Assert . True ( await sem . WaitAsync ( 100 ) ) ;
233+
234+ Assert . True ( manager . Deregister < string > ( ) ) ;
235+
236+ Assert . False ( manager . TryEmit ( "invalid" ) ) ;
237+ Assert . False ( await sem . WaitAsync ( 100 ) ) ;
238+
239+ // Cleanup
240+ cts . Cancel ( ) ;
241+ manager . Close ( ) ;
242+ }
112243}
0 commit comments