Wait for quiescence in MergeManyChangeSets stress tests#1100
Draft
dwcullop wants to merge 2 commits into
Draft
Conversation
added 2 commits
May 26, 2026 22:57
MultiThreadedStressTest(10, 50) fails intermittently in CI with two prices present in market.PricesCache.Items but missing from the live aggregator. The two affected prices have the latest timestamps in the batch, which is the signature of a race during high-contention production. Bogus.Randomizer wraps System.Random. When constructed with a seed, the randomizer stores the random in a protected localSeed field and bypasses its internal Locker on every generator call. The test shares one seeded Randomizer across many parallel producer threads: - Directly via _randomizer.Number / .Bool / .TimeSpan / .Interval - Indirectly via _marketFaker.WithSeed(_randomizer), since every Faker<T>.Generate call routes through the same randomizer Concurrent calls into the underlying System.Random corrupt its internal state, producing values inconsistent with what a serialized run would produce. That is sufficient to explain the observed asymmetry between the post-hoc PricesCache snapshot and the live aggregator stream. Introduce SynchronizedRandomizer, a Randomizer subclass that replaces the protected localSeed field with a LockedRandom (a Random subclass that serializes every virtual method on an internal lock). The seed and method contracts are unchanged; the wrapper only adds synchronization. Apply it to the failing fixture. Other Randomizer uses across the test project remain unchanged for now; they are either single-threaded or have not exhibited flake symptoms. Verified: 20 consecutive runs of the fixture pass at MaxParallelThreads=16, zero failures.
The post-reactivemarbles#1079 cache delivery model decouples mutation from notification: AddOrUpdate enqueues a notification and returns; the actual delivery to subscribers runs later on whichever thread wins the drain. That removed the cross-cache deadlock the old Synchronize(lock) shape produced, but it opened a small window between mutation and observed delivery. Tests that compare a live aggregator's view against the cache's current Items at assert time can see disagreement during that window. The source-compare fixture already adopted the right shape: var merged = source.MergeManyChangeSets(...).Publish(); var cacheCompleted = merged.LastOrDefaultAsync().ToTask(); using var local = merged.AsAggregator(); using var connect = merged.Connect(); ... await cacheCompleted; CheckResultContents(..., local); Port the same pattern to the cache and list MergeManyChangeSets stress fixtures. The local aggregator now sits on the Publish chain so it shares the completion task; the await before CheckResultContents pins the quiescence point. Also delete the SynchronizedRandomizer change made earlier on this branch. Bogus.Randomizer takes a process-wide lock on Locker.Value for every generator call regardless of whether localSeed is set, so the wrapper was addressing a non-problem.
JakenVeina
approved these changes
May 28, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
MergeManyChangeSetsCacheSourceCompareFixture.MultiThreadedStressTest(10, 50)fails intermittently in CI with prices present inmarket.PricesCache.Itemsbut missing from the livepriceResultsaggregator. The mechanism is the post-#1079 cache delivery model:Synchronize(lock)shape):AddOrUpdatedid not return until every subscriber had finished processing the notification. Cache state and observed notifications were always in lockstep.SharedDeliveryQueueshape):AddOrUpdateenqueues a notification and returns; the actual delivery runs later on whichever thread wins the drain. This is intentional - it removes the cross-cache deadlock the old shape produced - but it opens a small window between mutation and observed delivery.A test that compares a live aggregator's view against
cache.Itemsat assert time can see disagreement during that window. The stress fixture's add/remove streams are still in flight at the moment of_marketCache.Dispose; any per-marketAddOrUpdatethat lands afterMergeManyChangeSetsunsubscribes from that market'sLatestPricesstream mutatesPricesCache(so the post-hoc.Itemssnapshot sees it) without delivering a notification topriceResults(so the live aggregator never sees it).The source-compare fixture had already adopted the right shape to defend against this:
The two sibling stress fixtures (
MergeManyChangeSetsCacheFixtureandMergeManyChangeSetsListFixture) did not. They subscribed two independent chains (one for the aggregator, one for the sub/unsub loop) and asserted without waiting for either to complete.Fix
Port the source-compare pattern to both sibling fixtures:
Publishthe merged stream so every consumer in the test shares one upstream chain.merged.LastOrDefaultAsync().ToTask()before connecting.awaitthe completion task between the stress loop andCheckResultContents.No production code is changed. No operator semantics change.
Verification
20 consecutive runs of
MergeManyChangeSetsCacheSourceCompareFixture.MultiThreadedStressTestatxUnit.MaxParallelThreads=16, zero failures. The combined run of all three stress fixtures passes 163/163 tests per iteration.Note on
SynchronizedRandomizerThe previous commit on this branch added
SynchronizedRandomizerto wrapBogus.Randomizerwith explicit synchronization. That was based on a misreading:Bogus.Randomizertakes a process-wide lock onLocker.Valuefor every generator call (verified via IL inspection ofNumber(int, int)), regardless of whetherlocalSeedis set, so the wrapper was solving a non-problem. That commit is reverted in this push.