From 2ae80bac1987b8cdf79ca9e4ccac3f85d8ab559d Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Fri, 3 Apr 2026 09:50:01 +0200 Subject: [PATCH 01/20] Add rawChangeStream implementation. --- .../src/replication/ChangeStream.ts | 25 +--- .../src/replication/RawChangeStream.ts | 124 ++++++++++++++++++ 2 files changed, 126 insertions(+), 23 deletions(-) create mode 100644 modules/module-mongodb/src/replication/RawChangeStream.ts diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 6ae824ddc..c7936a1a2 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -1,4 +1,4 @@ -import { isMongoNetworkTimeoutError, isMongoServerError, mongo } from '@powersync/lib-service-mongodb'; +import { mongo } from '@powersync/lib-service-mongodb'; import { container, DatabaseConnectionError, @@ -39,6 +39,7 @@ import { STANDALONE_CHECKPOINT_ID } from './MongoRelation.js'; import { ChunkedSnapshotQuery } from './MongoSnapshotQuery.js'; +import { mapChangeStreamError } from './RawChangeStream.js'; import { CHECKPOINTS_COLLECTION, timestampToDate } from './replication-utils.js'; export interface ChangeStreamOptions { @@ -1175,28 +1176,6 @@ export class ChangeStream { } } -function mapChangeStreamError(e: any) { - if (isMongoNetworkTimeoutError(e)) { - // This typically has an unhelpful message like "connection 2 to 159.41.94.47:27017 timed out". - // We wrap the error to make it more useful. - throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e); - } else if (isMongoServerError(e) && e.codeName == 'MaxTimeMSExpired') { - // maxTimeMS was reached. Example message: - // MongoServerError: Executor error during aggregate command on namespace: powersync_test_data.$cmd.aggregate :: caused by :: operation exceeded time limit - throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e); - } else if ( - isMongoServerError(e) && - e.codeName == 'NoMatchingDocument' && - e.errmsg?.includes('post-image was not found') - ) { - throw new ChangeStreamInvalidatedError(e.errmsg, e); - } else if (isMongoServerError(e) && e.hasErrorLabel('NonResumableChangeStreamError')) { - throw new ChangeStreamInvalidatedError(e.message, e); - } else { - throw new DatabaseConnectionError(ErrorCode.PSYNC_S1346, `Error reading MongoDB ChangeStream`, e); - } -} - /** * Transaction key for a change stream event, used to detect transaction boundaries. Returns null if the event is not part of a transaction. */ diff --git a/modules/module-mongodb/src/replication/RawChangeStream.ts b/modules/module-mongodb/src/replication/RawChangeStream.ts new file mode 100644 index 000000000..bffd70617 --- /dev/null +++ b/modules/module-mongodb/src/replication/RawChangeStream.ts @@ -0,0 +1,124 @@ +import { isMongoNetworkTimeoutError, isMongoServerError, mongo } from '@powersync/lib-service-mongodb'; +import { DatabaseConnectionError, ErrorCode } from '@powersync/lib-services-framework'; +import { ChangeStreamInvalidatedError } from './ChangeStream.js'; + +export interface RawChangeStreamOptions { + signal?: AbortSignal; + maxAwaitTimeMs: number; + batchSize: number; +} + +export interface ChangeStreamBatch { + resumeToken: mongo.ResumeToken; + events: mongo.Document[]; +} + +export async function* rawChangeStream( + db: mongo.Db, + pipeline: mongo.Document[], + options: RawChangeStreamOptions +): AsyncGenerator { + // See specs: + // https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.md + + /** + * Typically '$cmd.aggregate', but we need to use the ns from the cursor. + */ + let cursorId: bigint | null = null; + let ns: string | null = null; + + const maxTimeMS = options.maxAwaitTimeMs; + const batchSize = options.batchSize; + let abortPromise: Promise | null = null; + + options.signal?.addEventListener('abort', () => { + if (cursorId != null && cursorId !== 0n && ns != null) { + // This would result in a CursorKilled error. + abortPromise = db.command({ + killCursors: ns, + cursors: [cursorId] + }); + } + }); + + const session = db.client.startSession(); + try { + // Step 1: Send the aggregate command to start the change stream + const aggregateResult = await db + .command( + { + aggregate: 1, + pipeline, + cursor: { batchSize } + }, + { session, raw: false } + ) + .catch((e) => { + throw mapChangeStreamError(e); + }); + + cursorId = BigInt(aggregateResult.cursor.id); + ns = aggregateResult.cursor.ns as string; + let batch = aggregateResult.cursor.firstBatch; + + yield { events: batch, resumeToken: aggregateResult.cursor.postBatchResumeToken }; + + // Step 2: Poll using getMore until the cursor is closed + while (cursorId && cursorId !== 0n) { + if (options.signal?.aborted) { + break; + } + const getMoreResult: mongo.Document = await db + .command( + { + getMore: cursorId, + collection: ns, + batchSize, + maxTimeMS + }, + { session, raw: false } + ) + .catch((e) => { + throw mapChangeStreamError(e); + }); + + cursorId = BigInt(getMoreResult.cursor.id); + const nextBatch = getMoreResult.cursor.nextBatch; + + yield { events: nextBatch, resumeToken: getMoreResult.cursor.postBatchResumeToken }; + } + } finally { + if (abortPromise != null) { + await abortPromise; + } + if (cursorId != null && cursorId !== 0n && abortPromise != null) { + await db.command({ + killCursors: ns, + cursors: [cursorId] + }); + } + await session.endSession(); + } +} + +export function mapChangeStreamError(e: unknown) { + if (isMongoNetworkTimeoutError(e)) { + // This typically has an unhelpful message like "connection 2 to 159.41.94.47:27017 timed out". + // We wrap the error to make it more useful. + throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e); + } else if (isMongoServerError(e) && e.codeName == 'MaxTimeMSExpired') { + // maxTimeMS was reached. Example message: + // MongoServerError: Executor error during aggregate command on namespace: powersync_test_data.$cmd.aggregate :: caused by :: operation exceeded time limit + throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e); + } else if ( + isMongoServerError(e) && + e.codeName == 'NoMatchingDocument' && + e.errmsg?.includes('post-image was not found') + ) { + throw new ChangeStreamInvalidatedError(e.errmsg, e); + } else if (isMongoServerError(e) && e.hasErrorLabel('NonResumableChangeStreamError')) { + throw new ChangeStreamInvalidatedError(e.message, e); + } else { + throw new DatabaseConnectionError(ErrorCode.PSYNC_S1346, `Error reading MongoDB ChangeStream`, e); + } +} From 875a06f48edd46be0442732ecdbe3570b9df585a Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Fri, 3 Apr 2026 10:44:11 +0200 Subject: [PATCH 02/20] Use the new rawChangeStream(). --- .../src/replication/ChangeStream.ts | 592 ++++++++++-------- .../src/replication/RawChangeStream.ts | 44 +- 2 files changed, 363 insertions(+), 273 deletions(-) diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index c7936a1a2..f9c5ff765 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -29,7 +29,6 @@ import { ReplicationMetric } from '@powersync/service-types'; import { MongoLSN } from '../common/MongoLSN.js'; import { PostImagesOption } from '../types/types.js'; import { escapeRegExp } from '../utils.js'; -import { trackChangeStreamBsonBytes } from './internal-mongodb-utils.js'; import { MongoManager } from './MongoManager.js'; import { constructAfterRecord, @@ -39,7 +38,7 @@ import { STANDALONE_CHECKPOINT_ID } from './MongoRelation.js'; import { ChunkedSnapshotQuery } from './MongoSnapshotQuery.js'; -import { mapChangeStreamError } from './RawChangeStream.js'; +import { rawChangeStream } from './RawChangeStream.js'; import { CHECKPOINTS_COLLECTION, timestampToDate } from './replication-utils.js'; export interface ChangeStreamOptions { @@ -260,48 +259,52 @@ export class ChangeStream { // Create a checkpoint, and open a change stream using startAtOperationTime with the checkpoint's operationTime. const firstCheckpointLsn = await createCheckpoint(this.client, this.defaultDb, this.checkpointStreamId); - await using streamManager = this.openChangeStream({ lsn: firstCheckpointLsn, maxAwaitTimeMs: 0 }); - const { stream } = streamManager; const startTime = performance.now(); let lastCheckpointCreated = performance.now(); let eventsSeen = 0; + let batchesSeen = 0; - while (performance.now() - startTime < LSN_TIMEOUT_SECONDS * 1000) { + const filters = this.getSourceNamespaceFilters(); + const iter = this.rawChangeStreamBatches({ + lsn: firstCheckpointLsn, + maxAwaitTimeMS: 0, + signal: this.abort_signal, + filters + }); + for await (let { events } of iter) { + if (performance.now() - startTime >= LSN_TIMEOUT_SECONDS * 1000) { + break; + } if (performance.now() - lastCheckpointCreated >= LSN_CREATE_INTERVAL_SECONDS * 1000) { await createCheckpoint(this.client, this.defaultDb, this.checkpointStreamId); lastCheckpointCreated = performance.now(); } - // tryNext() doesn't block, while next() / hasNext() does block until there is data on the stream - const changeDocument = await stream.tryNext().catch((e) => { - throw mapChangeStreamError(e); - }); - if (changeDocument == null) { - continue; - } - - const ns = 'ns' in changeDocument && 'coll' in changeDocument.ns ? changeDocument.ns : undefined; + for (let changeDocument of events) { + const ns = 'ns' in changeDocument && 'coll' in changeDocument.ns ? changeDocument.ns : undefined; - if (ns?.coll == CHECKPOINTS_COLLECTION && 'documentKey' in changeDocument) { - const checkpointId = changeDocument.documentKey._id as string | mongo.ObjectId; - if (!this.checkpointStreamId.equals(checkpointId)) { - continue; + if (ns?.coll == CHECKPOINTS_COLLECTION && 'documentKey' in changeDocument) { + const checkpointId = changeDocument.documentKey._id as string | mongo.ObjectId; + if (!this.checkpointStreamId.equals(checkpointId)) { + continue; + } + const { comparable: lsn } = new MongoLSN({ + timestamp: changeDocument.clusterTime!, + resume_token: changeDocument._id + }); + return lsn; } - const { comparable: lsn } = new MongoLSN({ - timestamp: changeDocument.clusterTime!, - resume_token: changeDocument._id - }); - return lsn; - } - eventsSeen += 1; + eventsSeen += 1; + batchesSeen += 1; + } } // Could happen if there is a very large replication lag? throw new ServiceError( ErrorCode.PSYNC_S1301, - `Timeout after while waiting for checkpoint document for ${LSN_TIMEOUT_SECONDS}s. Streamed events = ${eventsSeen}` + `Timeout after while waiting for checkpoint document for ${LSN_TIMEOUT_SECONDS}s. Streamed events = ${eventsSeen}, batches = ${batchesSeen}` ); } @@ -309,15 +312,17 @@ export class ChangeStream { * Given a snapshot LSN, validate that we can read from it, by opening a change stream. */ private async validateSnapshotLsn(lsn: string) { - await using streamManager = this.openChangeStream({ lsn: lsn, maxAwaitTimeMs: 0 }); - const { stream } = streamManager; - try { - // tryNext() doesn't block, while next() / hasNext() does block until there is data on the stream - await stream.tryNext(); - } catch (e) { - // Note: A timeout here is not handled as a ChangeStreamInvalidatedError, even though - // we possibly cannot recover from it. - throw mapChangeStreamError(e); + const filters = this.getSourceNamespaceFilters(); + const stream = this.rawChangeStreamBatches({ + lsn: lsn, + // maxAwaitTimeMS should never actually be used here + maxAwaitTimeMS: 0, + filters + }); + for await (let _batch of stream) { + // We got a response from the aggregate command, so consider the LSN valid. + // Close the stream immediately. + break; } } @@ -819,6 +824,71 @@ export class ChangeStream { }; } + private rawChangeStreamBatches(options: { + lsn: string | null; + maxAwaitTimeMS?: number; + batchSize?: number; + filters: { $match: any; multipleDatabases: boolean }; + signal?: AbortSignal; + }): AsyncIterableIterator<{ events: mongo.ChangeStreamDocument[]; resumeToken: unknown }> { + const lastLsn = options.lsn ? MongoLSN.fromSerialized(options.lsn) : null; + const startAfter = lastLsn?.timestamp; + const resumeAfter = lastLsn?.resumeToken; + + const filters = options.filters; + + let fullDocument: 'required' | 'updateLookup'; + + if (this.usePostImages) { + // 'read_only' or 'auto_configure' + // Configuration happens during snapshot, or when we see new + // collections. + fullDocument = 'required'; + } else { + fullDocument = 'updateLookup'; + } + const streamOptions: mongo.ChangeStreamOptions = { + showExpandedEvents: true, + fullDocument: fullDocument + }; + const pipeline: mongo.Document[] = [ + { + $changeStream: streamOptions + }, + { + $match: filters.$match + }, + { $changeStreamSplitLargeEvent: {} } + ]; + + /** + * Only one of these options can be supplied at a time. + */ + if (resumeAfter) { + streamOptions.resumeAfter = resumeAfter; + } else { + // Legacy: We don't persist lsns without resumeTokens anymore, but we do still handle the + // case if we have an old one. + // This is also relevant for getSnapshotLSN(). + streamOptions.startAtOperationTime = startAfter; + } + + let watchDb: mongo.Db; + if (filters.multipleDatabases) { + watchDb = this.client.db('admin'); + } else { + watchDb = this.defaultDb; + } + + return rawChangeStream(watchDb, pipeline, { + batchSize: options.batchSize ?? this.snapshotChunkLength, + maxAwaitTimeMS: options.maxAwaitTimeMS ?? this.maxAwaitTimeMS, + maxTimeMS: this.changeStreamTimeout, + + signal: options.signal + }); + } + private getBufferedChangeCount(stream: mongo.ChangeStream): number { // The driver keeps fetched change stream documents on the underlying cursor, but does // not expose that through the public ChangeStream API. We use this to detect backlog @@ -865,17 +935,18 @@ export class ChangeStream { this.logger.info(`Resume streaming at ${startAfter?.inspect()} / ${lastLsn} | Token age: ${tokenAgeSeconds}s`); - await using streamManager = this.openChangeStream({ lsn: resumeFromLsn }); - const { stream, filters } = streamManager; - if (this.abort_signal.aborted) { - await stream.close(); - return; - } - trackChangeStreamBsonBytes(stream, (bytes) => { - bytesReplicatedMetric.add(bytes); - // Each of these represent a single response message from MongoDB. - chunksReplicatedMetric.add(1); + const filters = this.getSourceNamespaceFilters(); + // This is closed when the for loop below returns/breaks/throws + const batchStream = this.rawChangeStreamBatches({ + lsn: resumeFromLsn, + filters, + signal: this.abort_signal }); + // trackChangeStreamBsonBytes(stream, (bytes) => { + // bytesReplicatedMetric.add(bytes); + // // Each of these represent a single response message from MongoDB. + // chunksReplicatedMetric.add(1); + // }); // Always start with a checkpoint. // This helps us to clear errors when restarting, even if there is @@ -894,36 +965,18 @@ export class ChangeStream { let lastEmptyResume = performance.now(); let lastTxnKey: string | null = null; - while (true) { - if (this.abort_signal.aborted) { - break; - } - - const originalChangeDocument = await stream.tryNext().catch((e) => { - throw mapChangeStreamError(e); - }); - // The stream was closed, we will only ever receive `null` from it - if (!originalChangeDocument && stream.closed) { - break; - } - + for await (let { events, resumeToken } of batchStream) { if (this.abort_signal.aborted) { break; } - - if (originalChangeDocument == null) { - // We get a new null document after `maxAwaitTimeMS` if there were no other events. - // In this case, stream.resumeToken is the resume token associated with the last response. - // stream.resumeToken is not updated if stream.tryNext() returns data, while stream.next() - // does update it. - // From observed behavior, the actual resumeToken changes around once every 10 seconds. + this.touch(); + if (events.length == 0) { + // No changes in this batch, but we still want to keep the connection alive. + // We do this by persisting a keepalive checkpoint. // If we don't update it on empty events, we do keep consistency, but resuming the stream // with old tokens may cause connection timeouts. - // We throttle this further by only persisting a keepalive once a minute. - // We add an additional check for waitForCheckpointLsn == null, to make sure we're not - // doing a keepalive in the middle of a transaction. if (waitForCheckpointLsn == null && performance.now() - lastEmptyResume > 60_000) { - const { comparable: lsn, timestamp } = MongoLSN.fromResumeToken(stream.resumeToken); + const { comparable: lsn, timestamp } = MongoLSN.fromResumeToken(resumeToken); await batch.keepalive(lsn); this.touch(); lastEmptyResume = performance.now(); @@ -934,226 +987,241 @@ export class ChangeStream { ); this.replicationLag.markStarted(); } - continue; - } - - this.touch(); - if (startAfter != null && originalChangeDocument.clusterTime?.lte(startAfter)) { - continue; + // If we have no changes, we can just persist the keepalive. + // This is throttled to once per minute. + if (performance.now() - lastEmptyResume < 60_000) { + continue; + } } - let changeDocument = originalChangeDocument; - if (originalChangeDocument?.splitEvent != null) { - // Handle split events from $changeStreamSplitLargeEvent. - // This is only relevant for very large update operations. - const splitEvent = originalChangeDocument?.splitEvent; + this.touch(); - if (splitDocument == null) { - splitDocument = originalChangeDocument; - } else { - splitDocument = Object.assign(splitDocument, originalChangeDocument); + const batchStart = Date.now(); + for (let eventIndex = 0; eventIndex < events.length; eventIndex++) { + const originalChangeDocument = events[eventIndex]; + if (this.abort_signal.aborted) { + break; } - if (splitEvent.fragment == splitEvent.of) { - // Got all fragments - changeDocument = splitDocument; - splitDocument = null; - } else { - // Wait for more fragments + if (startAfter != null && originalChangeDocument.clusterTime?.lte(startAfter)) { continue; } - } else if (splitDocument != null) { - // We were waiting for fragments, but got a different event - throw new ReplicationAssertionError(`Incomplete splitEvent: ${JSON.stringify(splitDocument.splitEvent)}`); - } - if ( - !filters.multipleDatabases && - 'ns' in changeDocument && - changeDocument.ns.db != this.defaultDb.databaseName && - changeDocument.ns.db.endsWith(`_${this.defaultDb.databaseName}`) - ) { - // When all of the following conditions are met: - // 1. We're replicating from an Atlas Flex instance. - // 2. There were changestream events recorded while the PowerSync service is paused. - // 3. We're only replicating from a single database. - // Then we've observed an ns with for example {db: '67b83e86cd20730f1e766dde_ps'}, - // instead of the expected {db: 'ps'}. - // We correct this. - changeDocument.ns.db = this.defaultDb.databaseName; - - if (!flexDbNameWorkaroundLogged) { - flexDbNameWorkaroundLogged = true; - this.logger.warn( - `Incorrect DB name in change stream: ${changeDocument.ns.db}. Changed to ${this.defaultDb.databaseName}.` - ); - } - } + let changeDocument = originalChangeDocument; + if (originalChangeDocument?.splitEvent != null) { + // Handle split events from $changeStreamSplitLargeEvent. + // This is only relevant for very large update operations. + const splitEvent = originalChangeDocument?.splitEvent; - const ns = 'ns' in changeDocument && 'coll' in changeDocument.ns ? changeDocument.ns : undefined; - - if (ns?.coll == CHECKPOINTS_COLLECTION) { - /** - * Dropping the database does not provide an `invalidate` event. - * We typically would receive `drop` events for the collection which we - * would process below. - * - * However we don't commit the LSN after collections are dropped. - * The prevents the `startAfter` or `resumeToken` from advancing past the drop events. - * The stream also closes after the drop events. - * This causes an infinite loop of processing the collection drop events. - * - * This check here invalidates the change stream if our `_checkpoints` collection - * is dropped. This allows for detecting when the DB is dropped. - */ - if (changeDocument.operationType == 'drop') { - throw new ChangeStreamInvalidatedError( - 'Internal collections have been dropped', - new Error('_checkpoints collection was dropped') - ); + if (splitDocument == null) { + splitDocument = originalChangeDocument; + } else { + splitDocument = Object.assign(splitDocument, originalChangeDocument); + } + + if (splitEvent.fragment == splitEvent.of) { + // Got all fragments + changeDocument = splitDocument; + splitDocument = null; + } else { + // Wait for more fragments + continue; + } + } else if (splitDocument != null) { + // We were waiting for fragments, but got a different event + throw new ReplicationAssertionError(`Incomplete splitEvent: ${JSON.stringify(splitDocument.splitEvent)}`); } if ( - !( - changeDocument.operationType == 'insert' || - changeDocument.operationType == 'update' || - changeDocument.operationType == 'replace' - ) + !filters.multipleDatabases && + 'ns' in changeDocument && + changeDocument.ns.db != this.defaultDb.databaseName && + changeDocument.ns.db.endsWith(`_${this.defaultDb.databaseName}`) ) { - continue; + // When all of the following conditions are met: + // 1. We're replicating from an Atlas Flex instance. + // 2. There were changestream events recorded while the PowerSync service is paused. + // 3. We're only replicating from a single database. + // Then we've observed an ns with for example {db: '67b83e86cd20730f1e766dde_ps'}, + // instead of the expected {db: 'ps'}. + // We correct this. + changeDocument.ns.db = this.defaultDb.databaseName; + + if (!flexDbNameWorkaroundLogged) { + flexDbNameWorkaroundLogged = true; + this.logger.warn( + `Incorrect DB name in change stream: ${changeDocument.ns.db}. Changed to ${this.defaultDb.databaseName}.` + ); + } } - // We handle two types of checkpoint events: - // 1. "Standalone" checkpoints, typically write checkpoints. We want to process these - // immediately, regardless of where they were created. - // 2. "Batch" checkpoints for the current stream. This is used as a form of dynamic rate - // limiting of commits, so we specifically want to exclude checkpoints from other streams. - // - // It may be useful to also throttle commits due to standalone checkpoints in the future. - // However, these typically have a much lower rate than batch checkpoints, so we don't do that for now. - - const checkpointId = changeDocument.documentKey._id as string | mongo.ObjectId; - - if (checkpointId == STANDALONE_CHECKPOINT_ID) { - // Standalone / write checkpoint received. - // When we are caught up, commit immediately to keep write checkpoint latency low. - // Once there is already a batch checkpoint pending, or the driver has buffered more - // change stream events, collapse standalone checkpoints into the normal batch - // checkpoint flow to avoid commit churn under sustained load. - if (waitForCheckpointLsn != null || this.getBufferedChangeCount(stream) > 0) { - if (waitForCheckpointLsn == null) { - waitForCheckpointLsn = await createCheckpoint(this.client, this.defaultDb, this.checkpointStreamId); - } + const ns = 'ns' in changeDocument && 'coll' in changeDocument.ns ? changeDocument.ns : undefined; + + if (ns?.coll == CHECKPOINTS_COLLECTION) { + /** + * Dropping the database does not provide an `invalidate` event. + * We typically would receive `drop` events for the collection which we + * would process below. + * + * However we don't commit the LSN after collections are dropped. + * The prevents the `startAfter` or `resumeToken` from advancing past the drop events. + * The stream also closes after the drop events. + * This causes an infinite loop of processing the collection drop events. + * + * This check here invalidates the change stream if our `_checkpoints` collection + * is dropped. This allows for detecting when the DB is dropped. + */ + if (changeDocument.operationType == 'drop') { + throw new ChangeStreamInvalidatedError( + 'Internal collections have been dropped', + new Error('_checkpoints collection was dropped') + ); + } + + if ( + !( + changeDocument.operationType == 'insert' || + changeDocument.operationType == 'update' || + changeDocument.operationType == 'replace' + ) + ) { continue; } - } else if (!this.checkpointStreamId.equals(checkpointId)) { - continue; - } - const { comparable: lsn } = new MongoLSN({ - timestamp: changeDocument.clusterTime!, - resume_token: changeDocument._id - }); - if (batch.lastCheckpointLsn != null && lsn < batch.lastCheckpointLsn) { - // Checkpoint out of order - should never happen with MongoDB. - // If it does happen, we throw an error to stop the replication - restarting should recover. - // Since we use batch.lastCheckpointLsn for the next resumeAfter, this should not result in an infinite loop. - // Originally a workaround for https://jira.mongodb.org/browse/NODE-7042. - // This has been fixed in the driver in the meantime, but we still keep this as a safety-check. - throw new ReplicationAssertionError( - `Change resumeToken ${(changeDocument._id as any)._data} (${timestampToDate(changeDocument.clusterTime!).toISOString()}) is less than last checkpoint LSN ${batch.lastCheckpointLsn}. Restarting replication.` - ); - } - if (waitForCheckpointLsn != null && lsn >= waitForCheckpointLsn) { - waitForCheckpointLsn = null; - } - const { checkpointBlocked } = await batch.commit(lsn, { - oldestUncommittedChange: this.replicationLag.oldestUncommittedChange - }); + // We handle two types of checkpoint events: + // 1. "Standalone" checkpoints, typically write checkpoints. We want to process these + // immediately, regardless of where they were created. + // 2. "Batch" checkpoints for the current stream. This is used as a form of dynamic rate + // limiting of commits, so we specifically want to exclude checkpoints from other streams. + // + // It may be useful to also throttle commits due to standalone checkpoints in the future. + // However, these typically have a much lower rate than batch checkpoints, so we don't do that for now. + + const checkpointId = changeDocument.documentKey._id as string | mongo.ObjectId; + + if (checkpointId == STANDALONE_CHECKPOINT_ID) { + // Standalone / write checkpoint received. + // When we are caught up, commit immediately to keep write checkpoint latency low. + // Once there is already a batch checkpoint pending, or the driver has buffered more + // change stream events, collapse standalone checkpoints into the normal batch + // checkpoint flow to avoid commit churn under sustained load. + const hasBufferedChanges = eventIndex < events.length - 1; + if (waitForCheckpointLsn != null || hasBufferedChanges) { + if (waitForCheckpointLsn == null) { + waitForCheckpointLsn = await createCheckpoint(this.client, this.defaultDb, this.checkpointStreamId); + } + continue; + } + } else if (!this.checkpointStreamId.equals(checkpointId)) { + continue; + } + const { comparable: lsn } = new MongoLSN({ + timestamp: changeDocument.clusterTime!, + resume_token: changeDocument._id + }); + if (batch.lastCheckpointLsn != null && lsn < batch.lastCheckpointLsn) { + // Checkpoint out of order - should never happen with MongoDB. + // If it does happen, we throw an error to stop the replication - restarting should recover. + // Since we use batch.lastCheckpointLsn for the next resumeAfter, this should not result in an infinite loop. + // Originally a workaround for https://jira.mongodb.org/browse/NODE-7042. + // This has been fixed in the driver in the meantime, but we still keep this as a safety-check. + throw new ReplicationAssertionError( + `Change resumeToken ${(changeDocument._id as any)._data} (${timestampToDate(changeDocument.clusterTime!).toISOString()}) is less than last checkpoint LSN ${batch.lastCheckpointLsn}. Restarting replication.` + ); + } - if (!checkpointBlocked) { - this.replicationLag.markCommitted(); - changesSinceLastCheckpoint = 0; - } - } else if ( - changeDocument.operationType == 'insert' || - changeDocument.operationType == 'update' || - changeDocument.operationType == 'replace' || - changeDocument.operationType == 'delete' - ) { - if (waitForCheckpointLsn == null) { - waitForCheckpointLsn = await createCheckpoint(this.client, this.defaultDb, this.checkpointStreamId); - } + if (waitForCheckpointLsn != null && lsn >= waitForCheckpointLsn) { + waitForCheckpointLsn = null; + } + const { checkpointBlocked } = await batch.commit(lsn, { + oldestUncommittedChange: this.replicationLag.oldestUncommittedChange + }); - const rel = getMongoRelation(changeDocument.ns); - const table = await this.getRelation(batch, rel, { - // In most cases, we should not need to snapshot this. But if this is the first time we see the collection - // for whatever reason, then we do need to snapshot it. - // This may result in some duplicate operations when a collection is created for the first time after - // sync rules was deployed. - snapshot: true - }); - if (table.syncAny) { - this.replicationLag.trackUncommittedChange( - changeDocument.clusterTime == null ? null : timestampToDate(changeDocument.clusterTime) - ); + if (!checkpointBlocked) { + this.replicationLag.markCommitted(); + changesSinceLastCheckpoint = 0; + } + } else if ( + changeDocument.operationType == 'insert' || + changeDocument.operationType == 'update' || + changeDocument.operationType == 'replace' || + changeDocument.operationType == 'delete' + ) { + if (waitForCheckpointLsn == null) { + waitForCheckpointLsn = await createCheckpoint(this.client, this.defaultDb, this.checkpointStreamId); + } - const transactionKeyValue = transactionKey(changeDocument); + const rel = getMongoRelation(changeDocument.ns); + const table = await this.getRelation(batch, rel, { + // In most cases, we should not need to snapshot this. But if this is the first time we see the collection + // for whatever reason, then we do need to snapshot it. + // This may result in some duplicate operations when a collection is created for the first time after + // sync rules was deployed. + snapshot: true + }); + if (table.syncAny) { + this.replicationLag.trackUncommittedChange( + changeDocument.clusterTime == null ? null : timestampToDate(changeDocument.clusterTime) + ); + + const transactionKeyValue = transactionKey(changeDocument); + + if (transactionKeyValue == null || lastTxnKey != transactionKeyValue) { + // Very crude metric for counting transactions replicated. + // We ignore operations other than basic CRUD, and ignore changes to _powersync_checkpoints. + // Individual writes may not have a txnNumber, in which case we count them as separate transactions. + lastTxnKey = transactionKeyValue; + transactionsReplicatedMetric.add(1); + } - if (transactionKeyValue == null || lastTxnKey != transactionKeyValue) { - // Very crude metric for counting transactions replicated. - // We ignore operations other than basic CRUD, and ignore changes to _powersync_checkpoints. - // Individual writes may not have a txnNumber, in which case we count them as separate transactions. - lastTxnKey = transactionKeyValue; - transactionsReplicatedMetric.add(1); + const flushResult = await this.writeChange(batch, table, changeDocument); + changesSinceLastCheckpoint += 1; + if (flushResult != null && changesSinceLastCheckpoint >= 20_000) { + // When we are catching up replication after an initial snapshot, there may be a very long delay + // before we do a commit(). In that case, we need to periodically persist the resume LSN, so + // we don't restart from scratch if we restart replication. + // The same could apply if we need to catch up on replication after some downtime. + const { comparable: lsn } = new MongoLSN({ + timestamp: changeDocument.clusterTime!, + resume_token: changeDocument._id + }); + this.logger.info(`Updating resume LSN to ${lsn} after ${changesSinceLastCheckpoint} changes`); + await batch.setResumeLsn(lsn); + changesSinceLastCheckpoint = 0; + } } - - const flushResult = await this.writeChange(batch, table, changeDocument); - changesSinceLastCheckpoint += 1; - if (flushResult != null && changesSinceLastCheckpoint >= 20_000) { - // When we are catching up replication after an initial snapshot, there may be a very long delay - // before we do a commit(). In that case, we need to periodically persist the resume LSN, so - // we don't restart from scratch if we restart replication. - // The same could apply if we need to catch up on replication after some downtime. - const { comparable: lsn } = new MongoLSN({ - timestamp: changeDocument.clusterTime!, - resume_token: changeDocument._id - }); - this.logger.info(`Updating resume LSN to ${lsn} after ${changesSinceLastCheckpoint} changes`); - await batch.setResumeLsn(lsn); - changesSinceLastCheckpoint = 0; + } else if (changeDocument.operationType == 'drop') { + const rel = getMongoRelation(changeDocument.ns); + const table = await this.getRelation(batch, rel, { + // We're "dropping" this collection, so never snapshot it. + snapshot: false + }); + if (table.syncAny) { + await batch.drop([table]); + this.relationCache.delete(table); } + } else if (changeDocument.operationType == 'rename') { + const relFrom = getMongoRelation(changeDocument.ns); + const relTo = getMongoRelation(changeDocument.to); + const tableFrom = await this.getRelation(batch, relFrom, { + // We're "dropping" this collection, so never snapshot it. + snapshot: false + }); + if (tableFrom.syncAny) { + await batch.drop([tableFrom]); + this.relationCache.delete(relFrom); + } + // Here we do need to snapshot the new table + const collection = await this.getCollectionInfo(relTo.schema, relTo.name); + await this.handleRelation(batch, relTo, { + // This is a new (renamed) collection, so always snapshot it. + snapshot: true, + collectionInfo: collection + }); } - } else if (changeDocument.operationType == 'drop') { - const rel = getMongoRelation(changeDocument.ns); - const table = await this.getRelation(batch, rel, { - // We're "dropping" this collection, so never snapshot it. - snapshot: false - }); - if (table.syncAny) { - await batch.drop([table]); - this.relationCache.delete(table); - } - } else if (changeDocument.operationType == 'rename') { - const relFrom = getMongoRelation(changeDocument.ns); - const relTo = getMongoRelation(changeDocument.to); - const tableFrom = await this.getRelation(batch, relFrom, { - // We're "dropping" this collection, so never snapshot it. - snapshot: false - }); - if (tableFrom.syncAny) { - await batch.drop([tableFrom]); - this.relationCache.delete(relFrom); - } - // Here we do need to snapshot the new table - const collection = await this.getCollectionInfo(relTo.schema, relTo.name); - await this.handleRelation(batch, relTo, { - // This is a new (renamed) collection, so always snapshot it. - snapshot: true, - collectionInfo: collection - }); } + this.logger.info(`Processed batch of ${events.length} changes in ${Date.now() - batchStart}ms`); } } ); diff --git a/modules/module-mongodb/src/replication/RawChangeStream.ts b/modules/module-mongodb/src/replication/RawChangeStream.ts index bffd70617..d0c766eb6 100644 --- a/modules/module-mongodb/src/replication/RawChangeStream.ts +++ b/modules/module-mongodb/src/replication/RawChangeStream.ts @@ -4,13 +4,20 @@ import { ChangeStreamInvalidatedError } from './ChangeStream.js'; export interface RawChangeStreamOptions { signal?: AbortSignal; - maxAwaitTimeMs: number; + /** + * How long to wait for new data per batch (max time for long-polling). + */ + maxAwaitTimeMS: number; + /** + * Timeout for the initial aggregate command. + */ + maxTimeMS: number; batchSize: number; } export interface ChangeStreamBatch { resumeToken: mongo.ResumeToken; - events: mongo.Document[]; + events: mongo.ChangeStreamDocument[]; } export async function* rawChangeStream( @@ -21,21 +28,22 @@ export async function* rawChangeStream( // See specs: // https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.md + let cursorId: bigint | null = null; + /** * Typically '$cmd.aggregate', but we need to use the ns from the cursor. */ - let cursorId: bigint | null = null; - let ns: string | null = null; + let nsCollection: string | null = null; - const maxTimeMS = options.maxAwaitTimeMs; + const maxTimeMS = options.maxAwaitTimeMS; const batchSize = options.batchSize; let abortPromise: Promise | null = null; options.signal?.addEventListener('abort', () => { - if (cursorId != null && cursorId !== 0n && ns != null) { + if (cursorId != null && cursorId !== 0n && nsCollection != null) { // This would result in a CursorKilled error. abortPromise = db.command({ - killCursors: ns, + killCursors: nsCollection, cursors: [cursorId] }); } @@ -49,7 +57,8 @@ export async function* rawChangeStream( { aggregate: 1, pipeline, - cursor: { batchSize } + cursor: { batchSize }, + maxTimeMS: options.maxTimeMS }, { session, raw: false } ) @@ -58,7 +67,8 @@ export async function* rawChangeStream( }); cursorId = BigInt(aggregateResult.cursor.id); - ns = aggregateResult.cursor.ns as string; + nsCollection = namespaceCollection(aggregateResult.cursor.ns); + let batch = aggregateResult.cursor.firstBatch; yield { events: batch, resumeToken: aggregateResult.cursor.postBatchResumeToken }; @@ -72,7 +82,7 @@ export async function* rawChangeStream( .command( { getMore: cursorId, - collection: ns, + collection: nsCollection, batchSize, maxTimeMS }, @@ -93,7 +103,7 @@ export async function* rawChangeStream( } if (cursorId != null && cursorId !== 0n && abortPromise != null) { await db.command({ - killCursors: ns, + killCursors: nsCollection, cursors: [cursorId] }); } @@ -122,3 +132,15 @@ export function mapChangeStreamError(e: unknown) { throw new DatabaseConnectionError(ErrorCode.PSYNC_S1346, `Error reading MongoDB ChangeStream`, e); } } + +/** + * Get the "collection" from a ns. + * + * This drops everything before the first . character. + * + * "my_db_name.$cmd.aggregate" -> "$cmd.aggregate" + */ +export function namespaceCollection(ns: string): string { + const dot = ns.indexOf('.'); + return ns.substring(dot + 1); +} From 122b0370f8b95ffd249c4ab4281dbec9fe63b305 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Fri, 3 Apr 2026 10:45:40 +0200 Subject: [PATCH 03/20] Remove old openChangeStream. --- .../src/replication/ChangeStream.ts | 65 ------------------- 1 file changed, 65 deletions(-) diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index f9c5ff765..9bf8f152c 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -759,71 +759,6 @@ export class ChangeStream { } } - private openChangeStream(options: { lsn: string | null; maxAwaitTimeMs?: number }) { - const lastLsn = options.lsn ? MongoLSN.fromSerialized(options.lsn) : null; - const startAfter = lastLsn?.timestamp; - const resumeAfter = lastLsn?.resumeToken; - - const filters = this.getSourceNamespaceFilters(); - - const pipeline: mongo.Document[] = [ - { - $match: filters.$match - }, - { $changeStreamSplitLargeEvent: {} } - ]; - - let fullDocument: 'required' | 'updateLookup'; - - if (this.usePostImages) { - // 'read_only' or 'auto_configure' - // Configuration happens during snapshot, or when we see new - // collections. - fullDocument = 'required'; - } else { - fullDocument = 'updateLookup'; - } - const streamOptions: mongo.ChangeStreamOptions = { - showExpandedEvents: true, - maxAwaitTimeMS: options.maxAwaitTimeMs ?? this.maxAwaitTimeMS, - fullDocument: fullDocument, - maxTimeMS: this.changeStreamTimeout - }; - - /** - * Only one of these options can be supplied at a time. - */ - if (resumeAfter) { - streamOptions.resumeAfter = resumeAfter; - } else { - // Legacy: We don't persist lsns without resumeTokens anymore, but we do still handle the - // case if we have an old one. - // This is also relevant for getSnapshotLSN(). - streamOptions.startAtOperationTime = startAfter; - } - - let stream: mongo.ChangeStream; - if (filters.multipleDatabases) { - // Requires readAnyDatabase@admin on Atlas - stream = this.client.watch(pipeline, streamOptions); - } else { - // Same general result, but requires less permissions than the above - stream = this.defaultDb.watch(pipeline, streamOptions); - } - - this.abort_signal.addEventListener('abort', () => { - stream.close(); - }); - - return { - stream, - filters, - [Symbol.asyncDispose]: async () => { - return stream.close(); - } - }; - } - private rawChangeStreamBatches(options: { lsn: string | null; maxAwaitTimeMS?: number; From 84d1b74f89f371e682d15842447443a3c1b552d5 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Fri, 3 Apr 2026 11:03:26 +0200 Subject: [PATCH 04/20] Use raw responses. --- .../src/replication/RawChangeStream.ts | 43 +++++++++++-------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/modules/module-mongodb/src/replication/RawChangeStream.ts b/modules/module-mongodb/src/replication/RawChangeStream.ts index d0c766eb6..fc52150d4 100644 --- a/modules/module-mongodb/src/replication/RawChangeStream.ts +++ b/modules/module-mongodb/src/replication/RawChangeStream.ts @@ -42,10 +42,12 @@ export async function* rawChangeStream( options.signal?.addEventListener('abort', () => { if (cursorId != null && cursorId !== 0n && nsCollection != null) { // This would result in a CursorKilled error. - abortPromise = db.command({ - killCursors: nsCollection, - cursors: [cursorId] - }); + abortPromise = db + .command({ + killCursors: nsCollection, + cursors: [cursorId] + }) + .catch(() => {}); } }); @@ -60,18 +62,22 @@ export async function* rawChangeStream( cursor: { batchSize }, maxTimeMS: options.maxTimeMS }, - { session, raw: false } + { session, raw: true } ) .catch((e) => { throw mapChangeStreamError(e); }); - cursorId = BigInt(aggregateResult.cursor.id); - nsCollection = namespaceCollection(aggregateResult.cursor.ns); + { + const cursor = mongo.BSON.deserialize(aggregateResult.cursor, { useBigInt64: true }); + + cursorId = BigInt(cursor.id); + nsCollection = namespaceCollection(cursor.ns); - let batch = aggregateResult.cursor.firstBatch; + let batch = cursor.firstBatch; - yield { events: batch, resumeToken: aggregateResult.cursor.postBatchResumeToken }; + yield { events: batch, resumeToken: cursor.postBatchResumeToken }; + } // Step 2: Poll using getMore until the cursor is closed while (cursorId && cursorId !== 0n) { @@ -86,26 +92,29 @@ export async function* rawChangeStream( batchSize, maxTimeMS }, - { session, raw: false } + { session, raw: true } ) .catch((e) => { throw mapChangeStreamError(e); }); - cursorId = BigInt(getMoreResult.cursor.id); - const nextBatch = getMoreResult.cursor.nextBatch; + const cursor = mongo.BSON.deserialize(getMoreResult.cursor, { useBigInt64: true }); + cursorId = BigInt(cursor.id); + const nextBatch = cursor.nextBatch; - yield { events: nextBatch, resumeToken: getMoreResult.cursor.postBatchResumeToken }; + yield { events: nextBatch, resumeToken: cursor.postBatchResumeToken }; } } finally { if (abortPromise != null) { await abortPromise; } if (cursorId != null && cursorId !== 0n && abortPromise != null) { - await db.command({ - killCursors: nsCollection, - cursors: [cursorId] - }); + await db + .command({ + killCursors: nsCollection, + cursors: [cursorId] + }) + .catch(() => {}); } await session.endSession(); } From 377d8c19c1edb9031d88e5150f1c8e27a9f377a8 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Fri, 3 Apr 2026 11:07:43 +0200 Subject: [PATCH 05/20] Defer parsing change stream documents. --- .../module-mongodb/src/replication/ChangeStream.ts | 10 +++++++--- .../src/replication/RawChangeStream.ts | 12 +++++++++--- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 9bf8f152c..ed4767eb0 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -281,7 +281,8 @@ export class ChangeStream { lastCheckpointCreated = performance.now(); } - for (let changeDocument of events) { + for (let rawChangeDocument of events) { + const changeDocument = mongo.BSON.deserialize(rawChangeDocument, { useBigInt64: true }); const ns = 'ns' in changeDocument && 'coll' in changeDocument.ns ? changeDocument.ns : undefined; if (ns?.coll == CHECKPOINTS_COLLECTION && 'documentKey' in changeDocument) { @@ -765,7 +766,7 @@ export class ChangeStream { batchSize?: number; filters: { $match: any; multipleDatabases: boolean }; signal?: AbortSignal; - }): AsyncIterableIterator<{ events: mongo.ChangeStreamDocument[]; resumeToken: unknown }> { + }): AsyncIterableIterator<{ events: Buffer[]; resumeToken: unknown }> { const lastLsn = options.lsn ? MongoLSN.fromSerialized(options.lsn) : null; const startAfter = lastLsn?.timestamp; const resumeAfter = lastLsn?.resumeToken; @@ -934,7 +935,10 @@ export class ChangeStream { const batchStart = Date.now(); for (let eventIndex = 0; eventIndex < events.length; eventIndex++) { - const originalChangeDocument = events[eventIndex]; + const rawChangeDocument = events[eventIndex]; + const originalChangeDocument = mongo.BSON.deserialize(rawChangeDocument, { + useBigInt64: true + }) as mongo.ChangeStreamDocument; if (this.abort_signal.aborted) { break; } diff --git a/modules/module-mongodb/src/replication/RawChangeStream.ts b/modules/module-mongodb/src/replication/RawChangeStream.ts index fc52150d4..b088730f2 100644 --- a/modules/module-mongodb/src/replication/RawChangeStream.ts +++ b/modules/module-mongodb/src/replication/RawChangeStream.ts @@ -17,7 +17,7 @@ export interface RawChangeStreamOptions { export interface ChangeStreamBatch { resumeToken: mongo.ResumeToken; - events: mongo.ChangeStreamDocument[]; + events: Buffer[]; } export async function* rawChangeStream( @@ -69,7 +69,10 @@ export async function* rawChangeStream( }); { - const cursor = mongo.BSON.deserialize(aggregateResult.cursor, { useBigInt64: true }); + const cursor = mongo.BSON.deserialize(aggregateResult.cursor, { + useBigInt64: true, + fieldsAsRaw: { firstBatch: true } + }); cursorId = BigInt(cursor.id); nsCollection = namespaceCollection(cursor.ns); @@ -98,7 +101,10 @@ export async function* rawChangeStream( throw mapChangeStreamError(e); }); - const cursor = mongo.BSON.deserialize(getMoreResult.cursor, { useBigInt64: true }); + const cursor = mongo.BSON.deserialize(getMoreResult.cursor, { + useBigInt64: true, + fieldsAsRaw: { nextBatch: true } + }); cursorId = BigInt(cursor.id); const nextBatch = cursor.nextBatch; From 2ee52cd5b54c2417795225fe0dbcc27077014e00 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Fri, 3 Apr 2026 11:21:13 +0200 Subject: [PATCH 06/20] Redo replication metrics. --- .../src/replication/ChangeStream.ts | 17 +++-- .../src/replication/RawChangeStream.ts | 44 ++++++++----- .../src/replication/internal-mongodb-utils.ts | 65 ------------------- .../test/src/internal_mongodb_utils.test.ts | 53 ++++++++++----- 4 files changed, 70 insertions(+), 109 deletions(-) diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index ed4767eb0..9bdfcbbbc 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -38,7 +38,7 @@ import { STANDALONE_CHECKPOINT_ID } from './MongoRelation.js'; import { ChunkedSnapshotQuery } from './MongoSnapshotQuery.js'; -import { rawChangeStream } from './RawChangeStream.js'; +import { ChangeStreamBatch, rawChangeStream } from './RawChangeStream.js'; import { CHECKPOINTS_COLLECTION, timestampToDate } from './replication-utils.js'; export interface ChangeStreamOptions { @@ -766,7 +766,7 @@ export class ChangeStream { batchSize?: number; filters: { $match: any; multipleDatabases: boolean }; signal?: AbortSignal; - }): AsyncIterableIterator<{ events: Buffer[]; resumeToken: unknown }> { + }): AsyncIterableIterator { const lastLsn = options.lsn ? MongoLSN.fromSerialized(options.lsn) : null; const startAfter = lastLsn?.timestamp; const resumeAfter = lastLsn?.resumeToken; @@ -783,7 +783,7 @@ export class ChangeStream { } else { fullDocument = 'updateLookup'; } - const streamOptions: mongo.ChangeStreamOptions = { + const streamOptions: mongo.ChangeStreamOptions & mongo.Document = { showExpandedEvents: true, fullDocument: fullDocument }; @@ -812,6 +812,7 @@ export class ChangeStream { let watchDb: mongo.Db; if (filters.multipleDatabases) { watchDb = this.client.db('admin'); + streamOptions.allChangesForCluster = true; } else { watchDb = this.defaultDb; } @@ -878,11 +879,6 @@ export class ChangeStream { filters, signal: this.abort_signal }); - // trackChangeStreamBsonBytes(stream, (bytes) => { - // bytesReplicatedMetric.add(bytes); - // // Each of these represent a single response message from MongoDB. - // chunksReplicatedMetric.add(1); - // }); // Always start with a checkpoint. // This helps us to clear errors when restarting, even if there is @@ -901,7 +897,10 @@ export class ChangeStream { let lastEmptyResume = performance.now(); let lastTxnKey: string | null = null; - for await (let { events, resumeToken } of batchStream) { + for await (let eventBatch of batchStream) { + const { events, resumeToken } = eventBatch; + bytesReplicatedMetric.add(eventBatch.byteSize); + chunksReplicatedMetric.add(1); if (this.abort_signal.aborted) { break; } diff --git a/modules/module-mongodb/src/replication/RawChangeStream.ts b/modules/module-mongodb/src/replication/RawChangeStream.ts index b088730f2..2412f89b4 100644 --- a/modules/module-mongodb/src/replication/RawChangeStream.ts +++ b/modules/module-mongodb/src/replication/RawChangeStream.ts @@ -13,11 +13,20 @@ export interface RawChangeStreamOptions { */ maxTimeMS: number; batchSize: number; + + /** + * Mostly for testing. + */ + collection?: string; } export interface ChangeStreamBatch { resumeToken: mongo.ResumeToken; events: Buffer[]; + /** + * Size in bytes of this event. + */ + byteSize: number; } export async function* rawChangeStream( @@ -37,6 +46,7 @@ export async function* rawChangeStream( const maxTimeMS = options.maxAwaitTimeMS; const batchSize = options.batchSize; + const collection = options.collection ?? 1; let abortPromise: Promise | null = null; options.signal?.addEventListener('abort', () => { @@ -53,22 +63,22 @@ export async function* rawChangeStream( const session = db.client.startSession(); try { - // Step 1: Send the aggregate command to start the change stream - const aggregateResult = await db - .command( - { - aggregate: 1, - pipeline, - cursor: { batchSize }, - maxTimeMS: options.maxTimeMS - }, - { session, raw: true } - ) - .catch((e) => { - throw mapChangeStreamError(e); - }); - { + // Step 1: Send the aggregate command to start the change stream + const aggregateResult = await db + .command( + { + aggregate: collection, + pipeline, + cursor: { batchSize }, + maxTimeMS: options.maxTimeMS + }, + { session, raw: true } + ) + .catch((e) => { + throw mapChangeStreamError(e); + }); + const cursor = mongo.BSON.deserialize(aggregateResult.cursor, { useBigInt64: true, fieldsAsRaw: { firstBatch: true } @@ -79,7 +89,7 @@ export async function* rawChangeStream( let batch = cursor.firstBatch; - yield { events: batch, resumeToken: cursor.postBatchResumeToken }; + yield { events: batch, resumeToken: cursor.postBatchResumeToken, byteSize: aggregateResult.cursor.byteLength }; } // Step 2: Poll using getMore until the cursor is closed @@ -108,7 +118,7 @@ export async function* rawChangeStream( cursorId = BigInt(cursor.id); const nextBatch = cursor.nextBatch; - yield { events: nextBatch, resumeToken: cursor.postBatchResumeToken }; + yield { events: nextBatch, resumeToken: cursor.postBatchResumeToken, byteSize: getMoreResult.cursor.byteLength }; } } finally { if (abortPromise != null) { diff --git a/modules/module-mongodb/src/replication/internal-mongodb-utils.ts b/modules/module-mongodb/src/replication/internal-mongodb-utils.ts index bc404f4c3..2e4952da3 100644 --- a/modules/module-mongodb/src/replication/internal-mongodb-utils.ts +++ b/modules/module-mongodb/src/replication/internal-mongodb-utils.ts @@ -1,45 +1,5 @@ import { mongo } from '@powersync/lib-service-mongodb'; -/** - * Track bytes read on a change stream. - * - * This is after decompression, and without TLS overhead. - * - * This excludes some protocol overhead, but does include per-batch overhead. - * - * This is built on internal APIs, and may stop working in future driver versions. - * - * @param add Called once for each batch of data. - */ -export function trackChangeStreamBsonBytes(changeStream: mongo.ChangeStream, add: (bytes: number) => void) { - let internalChangeStream = changeStream as ChangeStreamWithCursor; - let current = internalChangeStream.cursor; - let degisterCursor = trackCursor(current, add); - - const refresh = () => { - // The cursor may be replaced closed and re-opened (replaced) in various scenarios, such as - // after a primary fail-over event. - // There is no direct even to track that, but the `resumeTokenChanged` event is a good proxy. - // It may be called more often than the cursor is replaced, so we just check whether the cursor changed. - // This might miss the init batch, so we may under-count slightly in that case. It is a rare event - // and typically a small number of bytes, so it's fine to ignore. - const next = internalChangeStream.cursor; - if (next !== current) { - degisterCursor(); - current = next; - degisterCursor = trackCursor(current, add); - } - }; - - changeStream.on('resumeTokenChanged', refresh); - - // We return this to allow de-registration of the event listeners. - // However, these are garbage collected automatically when the stream is closed, so it's not strictly necessary to call this. - return () => { - changeStream.off('resumeTokenChanged', refresh); - }; -} - /** * Get the byte size of the current batch on a cursor. * @@ -59,31 +19,6 @@ interface CursorResponse { toBytes?(): Uint8Array; } -interface ChangeStreamWithCursor extends mongo.ChangeStream { - cursor?: mongo.AbstractCursor; -} - -function trackCursor(cursor: mongo.AbstractCursor | undefined, add: (bytes: number) => void) { - if (cursor == null) { - return () => {}; - } - const countBatch = (response: CursorResponse | undefined) => { - const bytes = getResponseBytes(response); - if (bytes > 0) { - add(bytes); - } - }; - - // The `init` event is emitted for the first batch, and the `more` event is emitted for subsequent batches. - cursor.on('init', countBatch); - cursor.on('more', countBatch); - - return () => { - cursor.off('init', countBatch); - cursor.off('more', countBatch); - }; -} - function getResponseBytes(response: CursorResponse | undefined): number { const buffer = response?.toBytes?.(); return buffer?.byteLength ?? 0; diff --git a/modules/module-mongodb/test/src/internal_mongodb_utils.test.ts b/modules/module-mongodb/test/src/internal_mongodb_utils.test.ts index 8e22b7a5a..74469bc36 100644 --- a/modules/module-mongodb/test/src/internal_mongodb_utils.test.ts +++ b/modules/module-mongodb/test/src/internal_mongodb_utils.test.ts @@ -1,7 +1,7 @@ import { describe, expect, test } from 'vitest'; -import { getCursorBatchBytes, trackChangeStreamBsonBytes } from '@module/replication/replication-index.js'; -import { mongo } from '@powersync/lib-service-mongodb'; +import { ChangeStreamBatch, rawChangeStream } from '@module/replication/RawChangeStream.js'; +import { getCursorBatchBytes } from '@module/replication/replication-index.js'; import { clearTestDb, connectMongoData } from './util.js'; describe('internal mongodb utils', () => { @@ -43,40 +43,57 @@ describe('internal mongodb utils', () => { }); async function testChangeStreamBsonBytes(type: 'db' | 'collection' | 'cluster') { - // With MongoDB, replication uses the exact same document format - // as normal queries. We test it anyway. const { db, client } = await connectMongoData(); await using _ = { [Symbol.asyncDispose]: async () => await client.close() }; await clearTestDb(db); const collection = db.collection('test_data'); - let stream: mongo.ChangeStream; + let stream: AsyncIterableIterator; + const pipeline = [ + { + $changeStream: { + fullDocument: 'updateLookup', + allChangesForCluster: type == 'cluster' + } + } + ]; if (type === 'collection') { - stream = collection.watch([], { + stream = rawChangeStream(db, pipeline, { + batchSize: 10, maxAwaitTimeMS: 5, - fullDocument: 'updateLookup' + maxTimeMS: 1_000 }); } else if (type === 'db') { - stream = db.watch([], { + stream = rawChangeStream(db, pipeline, { + batchSize: 10, maxAwaitTimeMS: 5, - fullDocument: 'updateLookup' + maxTimeMS: 1_000 }); } else { - stream = client.watch([], { + stream = rawChangeStream(client.db('admin'), pipeline, { + batchSize: 10, maxAwaitTimeMS: 5, - fullDocument: 'updateLookup' + maxTimeMS: 1_000 }); } let batchBytes: number[] = []; let totalBytes = 0; - trackChangeStreamBsonBytes(stream, (bytes) => { - batchBytes.push(bytes); - totalBytes += bytes; - }); const readAll = async () => { - while ((await stream.tryNext()) != null) {} + while (true) { + const next = await stream.next(); + if (next.done) { + break; + } + const bytes = next.value.byteSize; + batchBytes.push(bytes); + totalBytes += bytes; + + if (next.value.events.length == 0) { + break; + } + } }; await readAll(); @@ -88,11 +105,11 @@ describe('internal mongodb utils', () => { await collection.insertOne({ test: 3 }); await readAll(); - await stream.close(); + await stream.return?.(); // The exact length by vary based on exact batching logic, but we do want to know when it changes. // Note: If this causes unstable tests, we can relax this check. - expect(batchBytes.length).toEqual(8); + expect(batchBytes.length).toEqual(7); // Current tests show 4464-4576 bytes for the size, depending on the type of change stream. // This can easily vary based on the mongodb version and general conditions, so we just check the general range. From 6b1d4addc818632bd0f1f35f2dea3263598e6d18 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Fri, 3 Apr 2026 11:21:34 +0200 Subject: [PATCH 07/20] Cleanup. --- .../src/replication/ChangeStream.ts | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 9bdfcbbbc..abb556885 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -826,25 +826,6 @@ export class ChangeStream { }); } - private getBufferedChangeCount(stream: mongo.ChangeStream): number { - // The driver keeps fetched change stream documents on the underlying cursor, but does - // not expose that through the public ChangeStream API. We use this to detect backlog - // building up before we have processed the corresponding source changes locally. - // If the driver API changes, we'll have a hard error here. - // We specifically want to avoid a silent performance regression if the driver behavior changes. - const cursor = ( - stream as mongo.ChangeStream & { - cursor: mongo.AbstractCursor>; - } - ).cursor; - if (cursor == null || typeof cursor.bufferedCount != 'function') { - throw new ReplicationAssertionError( - 'MongoDB ChangeStream no longer exposes an internal cursor with bufferedCount' - ); - } - return cursor.bufferedCount(); - } - async streamChangesInternal() { const transactionsReplicatedMetric = this.metrics.getCounter(ReplicationMetric.TRANSACTIONS_REPLICATED); const bytesReplicatedMetric = this.metrics.getCounter(ReplicationMetric.DATA_REPLICATED_BYTES); From d9fdaa736262ee699597ade84d74d90c52e52d61 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Fri, 3 Apr 2026 11:29:09 +0200 Subject: [PATCH 08/20] Fix collection-level testing. --- modules/module-mongodb/test/src/internal_mongodb_utils.test.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/modules/module-mongodb/test/src/internal_mongodb_utils.test.ts b/modules/module-mongodb/test/src/internal_mongodb_utils.test.ts index 74469bc36..ae4bd9ba7 100644 --- a/modules/module-mongodb/test/src/internal_mongodb_utils.test.ts +++ b/modules/module-mongodb/test/src/internal_mongodb_utils.test.ts @@ -61,7 +61,8 @@ describe('internal mongodb utils', () => { stream = rawChangeStream(db, pipeline, { batchSize: 10, maxAwaitTimeMS: 5, - maxTimeMS: 1_000 + maxTimeMS: 1_000, + collection: collection.collectionName }); } else if (type === 'db') { stream = rawChangeStream(db, pipeline, { From eb533c6750dffebb90fe8f2fa9467fa67a515c9e Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 7 Apr 2026 14:43:22 +0200 Subject: [PATCH 09/20] Add retries for rawChangeStream. --- .../src/replication/ChangeStream.ts | 3 +- .../src/replication/RawChangeStream.ts | 77 ++++++++++++++++++- 2 files changed, 77 insertions(+), 3 deletions(-) diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index abb556885..714b81356 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -822,7 +822,8 @@ export class ChangeStream { maxAwaitTimeMS: options.maxAwaitTimeMS ?? this.maxAwaitTimeMS, maxTimeMS: this.changeStreamTimeout, - signal: options.signal + signal: options.signal, + logger: this.logger }); } diff --git a/modules/module-mongodb/src/replication/RawChangeStream.ts b/modules/module-mongodb/src/replication/RawChangeStream.ts index 2412f89b4..8a6aac36c 100644 --- a/modules/module-mongodb/src/replication/RawChangeStream.ts +++ b/modules/module-mongodb/src/replication/RawChangeStream.ts @@ -1,5 +1,10 @@ import { isMongoNetworkTimeoutError, isMongoServerError, mongo } from '@powersync/lib-service-mongodb'; -import { DatabaseConnectionError, ErrorCode } from '@powersync/lib-services-framework'; +import { + DatabaseConnectionError, + ErrorCode, + Logger, + ReplicationAssertionError +} from '@powersync/lib-services-framework'; import { ChangeStreamInvalidatedError } from './ChangeStream.js'; export interface RawChangeStreamOptions { @@ -18,6 +23,8 @@ export interface RawChangeStreamOptions { * Mostly for testing. */ collection?: string; + + logger?: Logger; } export interface ChangeStreamBatch { @@ -29,7 +36,51 @@ export interface ChangeStreamBatch { byteSize: number; } -export async function* rawChangeStream( +export async function* rawChangeStream(db: mongo.Db, pipeline: mongo.Document[], options: RawChangeStreamOptions) { + if (!('$changeStream' in pipeline[0])) { + throw new ReplicationAssertionError(`First pipeline stage must be $changeStream`); + } + let lastResumeToken: unknown | null = null; + + while (true) { + try { + let innerPipeline = pipeline; + if (lastResumeToken != null) { + const [first, ...rest] = pipeline; + const options = { ...first.$changeStream }; + delete options.startAtOperationTime; + options.resumeAfter = lastResumeToken; + innerPipeline = [{ $changeStream: options }, ...rest]; + } + const inner = rawChangeStreamInner(db, innerPipeline, { + ...options + }); + for await (let batch of inner) { + yield batch; + lastResumeToken = batch.resumeToken; + } + } catch (e) { + if (e instanceof ResumableChangeStreamError) { + // This is only triggered on the getMore command. + // If there is a persistent error, we expect it to occur on the aggregate command as well, + // which will trigger a hard error on the next attempt. + // This matches the change stream spec of: + // > A change stream MUST attempt to resume a single time if it encounters any resumable error per Resumable Error. A change stream MUST NOT attempt to resume on any other type of error. + // > An error on an aggregate command is not a resumable error. Only errors on a getMore command may be considered resumable errors. + // https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.md#resume-process + + // Technically we don't _need_ this resume functionality - we can let the replication job handle the failure + // and restart. However, this provides a faster restart path in common cases. + options.logger?.warn(`Resumable change stream error, retrying: ${e.message}`, e.cause); + continue; + } else { + throw e; + } + } + } +} + +async function* rawChangeStreamInner( db: mongo.Db, pipeline: mongo.Document[], options: RawChangeStreamOptions @@ -108,6 +159,9 @@ export async function* rawChangeStream( { session, raw: true } ) .catch((e) => { + if (isResumableChangeStreamError(e)) { + throw new ResumableChangeStreamError(e.message, { cause: e }); + } throw mapChangeStreamError(e); }); @@ -136,6 +190,25 @@ export async function* rawChangeStream( } } +class ResumableChangeStreamError extends Error {} + +function isResumableChangeStreamError(e: unknown) { + // See: https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.md#resumable-error + if (!isMongoServerError(e)) { + // Any error encountered which is not a server error (e.g. a timeout error or network error) + return true; + } else if (e.codeName == 'CursorNotFound') { + // A server error with code 43 (CursorNotFound) + return true; + } else if (e.hasErrorLabel('ResumableChangeStreamError')) { + // For servers with wire version 9 or higher (server version 4.4 or higher), any server error with the ResumableChangeStreamError error label. + return true; + } else { + // We ignore servers with wire version less than 9, since we only support MongoDB 6.0+. + return false; + } +} + export function mapChangeStreamError(e: unknown) { if (isMongoNetworkTimeoutError(e)) { // This typically has an unhelpful message like "connection 2 to 159.41.94.47:27017 timed out". From 70dd798c42facd468f9368a3ce0c026fbd916ac2 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 7 Apr 2026 14:50:35 +0200 Subject: [PATCH 10/20] Re-use session for retries. --- .../module-mongodb/src/replication/RawChangeStream.ts | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/modules/module-mongodb/src/replication/RawChangeStream.ts b/modules/module-mongodb/src/replication/RawChangeStream.ts index 8a6aac36c..7d0cf0120 100644 --- a/modules/module-mongodb/src/replication/RawChangeStream.ts +++ b/modules/module-mongodb/src/replication/RawChangeStream.ts @@ -42,6 +42,10 @@ export async function* rawChangeStream(db: mongo.Db, pipeline: mongo.Document[], } let lastResumeToken: unknown | null = null; + // > If the server supports sessions, the resume attempt MUST use the same session as the previous attempt's command. + const session = db.client.startSession(); + await using _ = { [Symbol.asyncDispose]: () => session.endSession() }; + while (true) { try { let innerPipeline = pipeline; @@ -52,7 +56,7 @@ export async function* rawChangeStream(db: mongo.Db, pipeline: mongo.Document[], options.resumeAfter = lastResumeToken; innerPipeline = [{ $changeStream: options }, ...rest]; } - const inner = rawChangeStreamInner(db, innerPipeline, { + const inner = rawChangeStreamInner(session, db, innerPipeline, { ...options }); for await (let batch of inner) { @@ -81,6 +85,7 @@ export async function* rawChangeStream(db: mongo.Db, pipeline: mongo.Document[], } async function* rawChangeStreamInner( + session: mongo.ClientSession, db: mongo.Db, pipeline: mongo.Document[], options: RawChangeStreamOptions @@ -112,7 +117,6 @@ async function* rawChangeStreamInner( } }); - const session = db.client.startSession(); try { { // Step 1: Send the aggregate command to start the change stream @@ -186,7 +190,6 @@ async function* rawChangeStreamInner( }) .catch(() => {}); } - await session.endSession(); } } From e4d57b121f2b542efef0a52b6f513d3759bbdcd7 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 7 Apr 2026 14:56:17 +0200 Subject: [PATCH 11/20] Fix killCursors. --- modules/module-mongodb/src/replication/RawChangeStream.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/module-mongodb/src/replication/RawChangeStream.ts b/modules/module-mongodb/src/replication/RawChangeStream.ts index 7d0cf0120..bbbd4aa7a 100644 --- a/modules/module-mongodb/src/replication/RawChangeStream.ts +++ b/modules/module-mongodb/src/replication/RawChangeStream.ts @@ -180,9 +180,9 @@ async function* rawChangeStreamInner( } } finally { if (abortPromise != null) { + // killCursors is already sent - we wait for the response in abortPromise. await abortPromise; - } - if (cursorId != null && cursorId !== 0n && abortPromise != null) { + } else if (cursorId != null && cursorId !== 0n && nsCollection != null) { await db .command({ killCursors: nsCollection, From ca73e8a38648aa86b1621ea48f18b6ed4b732a05 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 7 Apr 2026 14:56:58 +0200 Subject: [PATCH 12/20] Handle startAfter. --- modules/module-mongodb/src/replication/RawChangeStream.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/module-mongodb/src/replication/RawChangeStream.ts b/modules/module-mongodb/src/replication/RawChangeStream.ts index bbbd4aa7a..65639c59f 100644 --- a/modules/module-mongodb/src/replication/RawChangeStream.ts +++ b/modules/module-mongodb/src/replication/RawChangeStream.ts @@ -53,6 +53,7 @@ export async function* rawChangeStream(db: mongo.Db, pipeline: mongo.Document[], const [first, ...rest] = pipeline; const options = { ...first.$changeStream }; delete options.startAtOperationTime; + delete options.startAfter; options.resumeAfter = lastResumeToken; innerPipeline = [{ $changeStream: options }, ...rest]; } From 0c9b7dfaadf49a730fe1d59068bb8d4ec39f9ad7 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 7 Apr 2026 15:08:26 +0200 Subject: [PATCH 13/20] Improve abort handling; add comments. --- .../src/replication/RawChangeStream.ts | 31 ++++++++++++++++--- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/modules/module-mongodb/src/replication/RawChangeStream.ts b/modules/module-mongodb/src/replication/RawChangeStream.ts index 65639c59f..e117ea7bf 100644 --- a/modules/module-mongodb/src/replication/RawChangeStream.ts +++ b/modules/module-mongodb/src/replication/RawChangeStream.ts @@ -37,6 +37,11 @@ export interface ChangeStreamBatch { } export async function* rawChangeStream(db: mongo.Db, pipeline: mongo.Document[], options: RawChangeStreamOptions) { + // We generally attempt to follow the spec at: https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.md + // Intentional differences: + // 1. We don't attempt to follow the client API. + // 2. We require that `postBatchResumeToken` is present. + // 3. We don't attempt to handle resumeToken from individual documents - the consumer can handle that. if (!('$changeStream' in pipeline[0])) { throw new ReplicationAssertionError(`First pipeline stage must be $changeStream`); } @@ -47,6 +52,7 @@ export async function* rawChangeStream(db: mongo.Db, pipeline: mongo.Document[], await using _ = { [Symbol.asyncDispose]: () => session.endSession() }; while (true) { + options.signal?.throwIfAborted(); try { let innerPipeline = pipeline; if (lastResumeToken != null) { @@ -106,7 +112,7 @@ async function* rawChangeStreamInner( const collection = options.collection ?? 1; let abortPromise: Promise | null = null; - options.signal?.addEventListener('abort', () => { + const onAbort = () => { if (cursorId != null && cursorId !== 0n && nsCollection != null) { // This would result in a CursorKilled error. abortPromise = db @@ -116,7 +122,8 @@ async function* rawChangeStreamInner( }) .catch(() => {}); } - }); + }; + options.signal?.addEventListener('abort', onAbort); try { { @@ -145,14 +152,19 @@ async function* rawChangeStreamInner( let batch = cursor.firstBatch; + if (cursor.postBatchResumeToken == null) { + // Deviation from spec: We require that the server always returns a postBatchResumeToken. + // postBatchResumeToken is returned in MongoDB 4.0.7 and later, and we support 6.0+ + throw new ReplicationAssertionError(`postBatchResumeToken from aggregate response`); + } + yield { events: batch, resumeToken: cursor.postBatchResumeToken, byteSize: aggregateResult.cursor.byteLength }; } // Step 2: Poll using getMore until the cursor is closed while (cursorId && cursorId !== 0n) { - if (options.signal?.aborted) { - break; - } + options.signal?.throwIfAborted(); + const getMoreResult: mongo.Document = await db .command( { @@ -177,9 +189,18 @@ async function* rawChangeStreamInner( cursorId = BigInt(cursor.id); const nextBatch = cursor.nextBatch; + if (cursor.postBatchResumeToken == null) { + // Deviation from spec: We require that the server always returns a postBatchResumeToken. + // postBatchResumeToken is returned in MongoDB 4.0.7 and later, and we support 6.0+ + throw new ReplicationAssertionError(`postBatchResumeToken from aggregate response`); + } yield { events: nextBatch, resumeToken: cursor.postBatchResumeToken, byteSize: getMoreResult.cursor.byteLength }; } + + options.signal?.throwIfAborted(); + throw new ReplicationAssertionError(`Change stream ended unexpectedly`); } finally { + options.signal?.removeEventListener('abort', onAbort); if (abortPromise != null) { // killCursors is already sent - we wait for the response in abortPromise. await abortPromise; From a40bcb3bb8f9e6fa2cceaf2a241c322dce35b929 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 7 Apr 2026 15:22:02 +0200 Subject: [PATCH 14/20] Add tests for resuming streams. --- .../test/src/internal_mongodb_utils.test.ts | 175 +++++++++++++++++- 1 file changed, 174 insertions(+), 1 deletion(-) diff --git a/modules/module-mongodb/test/src/internal_mongodb_utils.test.ts b/modules/module-mongodb/test/src/internal_mongodb_utils.test.ts index ae4bd9ba7..fc51e60dc 100644 --- a/modules/module-mongodb/test/src/internal_mongodb_utils.test.ts +++ b/modules/module-mongodb/test/src/internal_mongodb_utils.test.ts @@ -1,7 +1,9 @@ import { describe, expect, test } from 'vitest'; -import { ChangeStreamBatch, rawChangeStream } from '@module/replication/RawChangeStream.js'; +import { ChangeStreamBatch, namespaceCollection, rawChangeStream } from '@module/replication/RawChangeStream.js'; import { getCursorBatchBytes } from '@module/replication/replication-index.js'; +import { mongo } from '@powersync/lib-service-mongodb'; +import { bson } from '@powersync/service-core'; import { clearTestDb, connectMongoData } from './util.js'; describe('internal mongodb utils', () => { @@ -118,4 +120,175 @@ describe('internal mongodb utils', () => { expect(totalBytes).toBeGreaterThan(2000); expect(totalBytes).toBeLessThan(8000); } + + test('should resume on missing cursor (1)', async () => { + // Many resumable errors are difficult to simulate, but CursorNotFound is easy. + + const { db, client } = await connectMongoData(); + await using _ = { [Symbol.asyncDispose]: async () => await client.close() }; + await clearTestDb(db); + const collection = db.collection('test_data'); + + const pipeline = [ + { + $changeStream: { + fullDocument: 'updateLookup' + } + } + ]; + const stream = rawChangeStream(db, pipeline, { + batchSize: 10, + maxAwaitTimeMS: 5, + maxTimeMS: 1_000 + }); + + let readDocs: any[] = []; + const readAll = async () => { + while (true) { + const next = await stream.next(); + if (next.done) { + break; + } + + if (next.value.events.length == 0) { + break; + } + + readDocs.push(...next.value.events.map((e) => bson.deserialize(e, { useBigInt64: true }))); + } + }; + + await readAll(); + + await collection.insertOne({ test: 1 }); + await readAll(); + await collection.insertOne({ test: 2 }); + await readAll(); + await collection.insertOne({ test: 3 }); + await killChangeStreamCursor(db, client); + await collection.insertOne({ test: 4 }); + await readAll(); + + await stream.return?.(); + + expect(readDocs.map((doc) => doc.fullDocument)).toMatchObject([{ test: 1 }, { test: 2 }, { test: 3 }, { test: 4 }]); + }); + + test('should resume on missing cursor (2)', async () => { + const { db, client } = await connectMongoData(); + await using _ = { [Symbol.asyncDispose]: async () => await client.close() }; + await clearTestDb(db); + const collection = db.collection('test_data'); + + const currentOpStream = rawChangeStream( + db, + [ + { + $changeStream: { + fullDocument: 'updateLookup' + } + } + ], + { + batchSize: 10, + maxAwaitTimeMS: 5, + maxTimeMS: 1_000 + } + ); + const firstBatch = await currentOpStream.next(); + await currentOpStream.return(); + const resumeAfter = firstBatch.value!.resumeToken; + + const stream = rawChangeStream( + db, + [ + { + $changeStream: { + fullDocument: 'updateLookup', + resumeAfter + } + } + ], + { + batchSize: 10, + maxAwaitTimeMS: 5, + maxTimeMS: 1_000 + } + ); + + let readDocs: any[] = []; + const readAll = async () => { + while (true) { + const next = await stream.next(); + if (next.done) { + break; + } + + if (next.value.events.length == 0) { + break; + } + + readDocs.push(...next.value.events.map((e) => bson.deserialize(e, { useBigInt64: true }))); + } + }; + + await readAll(); + + await collection.insertOne({ test: 1 }); + await readAll(); + await collection.insertOne({ test: 2 }); + await readAll(); + await collection.insertOne({ test: 3 }); + await killChangeStreamCursor(db, client); + await collection.insertOne({ test: 4 }); + await readAll(); + + await stream.return?.(); + + expect(readDocs.map((doc) => doc.fullDocument)).toMatchObject([{ test: 1 }, { test: 2 }, { test: 3 }, { test: 4 }]); + }); }); + +async function killChangeStreamCursor(db: mongo.Db, client: mongo.MongoClient) { + const ops = await client + .db('admin') + .aggregate([{ $currentOp: { idleCursors: true } }, { $match: { type: 'idleCursor' } }]) + .toArray(); + + const ns = `${db.databaseName}.$cmd.aggregate`; + const op = ops.find((op) => { + const command = op.cursor?.originatingCommand; + return op.ns == ns && Array.isArray(command?.pipeline) && command.pipeline[0]?.$changeStream != null; + }); + + if (op?.cursor == null) { + throw new Error( + `Could not find change stream cursor. Idle cursors: ${JSON.stringify( + ops.map((op) => ({ + ns: op.ns, + type: op.type, + cursorId: op.cursor?.cursorId?.toString(), + aggregate: op.cursor?.originatingCommand?.aggregate, + pipeline: op.cursor?.originatingCommand?.pipeline + })) + )}` + ); + } + + await db.command({ + killCursors: namespaceCollection(op.ns), + cursors: [op.cursor.cursorId] + }); +} + +type CurrentOpIdleCursor = { + ns: string; + type: string; + cursor?: { + cursorId: bigint; + originatingCommand?: { + aggregate?: unknown; + pipeline?: any[]; + }; + }; +}; From d52222d6dbef8bb27e826204eebe96f838d46ed8 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 7 Apr 2026 16:28:10 +0200 Subject: [PATCH 15/20] Improve and test abort handling. --- .../src/replication/RawChangeStream.ts | 6 ++ .../test/src/internal_mongodb_utils.test.ts | 100 ++++++++++++++++++ 2 files changed, 106 insertions(+) diff --git a/modules/module-mongodb/src/replication/RawChangeStream.ts b/modules/module-mongodb/src/replication/RawChangeStream.ts index e117ea7bf..8b4e5af7b 100644 --- a/modules/module-mongodb/src/replication/RawChangeStream.ts +++ b/modules/module-mongodb/src/replication/RawChangeStream.ts @@ -176,6 +176,12 @@ async function* rawChangeStreamInner( { session, raw: true } ) .catch((e) => { + if (isMongoServerError(e) && e.codeName == 'CursorKilled') { + // This may be due to the killCursors command issued when aborting. + // In that case, use the abort error instead. + options.signal?.throwIfAborted(); + } + if (isResumableChangeStreamError(e)) { throw new ResumableChangeStreamError(e.message, { cause: e }); } diff --git a/modules/module-mongodb/test/src/internal_mongodb_utils.test.ts b/modules/module-mongodb/test/src/internal_mongodb_utils.test.ts index fc51e60dc..e904b9c80 100644 --- a/modules/module-mongodb/test/src/internal_mongodb_utils.test.ts +++ b/modules/module-mongodb/test/src/internal_mongodb_utils.test.ts @@ -247,6 +247,106 @@ describe('internal mongodb utils', () => { expect(readDocs.map((doc) => doc.fullDocument)).toMatchObject([{ test: 1 }, { test: 2 }, { test: 3 }, { test: 4 }]); }); + + test('should cleanly abort a stream between events', async () => { + const { db, client } = await connectMongoData(); + const abortController = new AbortController(); + await using _ = { [Symbol.asyncDispose]: async () => await client.close() }; + await clearTestDb(db); + const collection = db.collection('test_data'); + + const pipeline = [ + { + $changeStream: { + fullDocument: 'updateLookup' + } + } + ]; + const stream = rawChangeStream(db, pipeline, { + batchSize: 10, + maxAwaitTimeMS: 5, + maxTimeMS: 1_000, + signal: abortController.signal + }); + + let readDocs: any[] = []; + const readAll = async () => { + while (true) { + const next = await stream.next(); + if (next.done) { + break; + } + + if (next.value.events.length == 0) { + break; + } + + readDocs.push(...next.value.events.map((e) => bson.deserialize(e, { useBigInt64: true }))); + } + }; + + await readAll(); + + await collection.insertOne({ test: 1 }); + await readAll(); + await collection.insertOne({ test: 2 }); + await readAll(); + abortController.abort(new Error('test abort')); + await collection.insertOne({ test: 3 }); + await expect(readAll()).rejects.toMatchObject({ message: 'test abort' }); + + expect(readDocs.map((doc) => doc.fullDocument)).toMatchObject([{ test: 1 }, { test: 2 }]); + }); + + test('should cleanly abort a stream in an event', async () => { + const { db, client } = await connectMongoData(); + const abortController = new AbortController(); + await using _ = { [Symbol.asyncDispose]: async () => await client.close() }; + await clearTestDb(db); + const collection = db.collection('test_data'); + + const pipeline = [ + { + $changeStream: { + fullDocument: 'updateLookup' + } + } + ]; + const stream = rawChangeStream(db, pipeline, { + batchSize: 10, + maxAwaitTimeMS: 200, + maxTimeMS: 1_000, + signal: abortController.signal + }); + + let readDocs: any[] = []; + const readAll = async () => { + while (true) { + const next = await stream.next(); + if (next.done) { + break; + } + + if (next.value.events.length == 0) { + break; + } + + readDocs.push(...next.value.events.map((e) => bson.deserialize(e, { useBigInt64: true }))); + } + }; + + await readAll(); + + await collection.insertOne({ test: 1 }); + await readAll(); + // This is specifically a readAll() without an insert in between, to trigger the longer await + // period. + let readPromise = readAll(); + abortController.abort(new Error('test abort')); + await expect(readPromise).rejects.toMatchObject({ message: 'test abort' }); + + expect(readDocs.map((doc) => doc.fullDocument)).toMatchObject([{ test: 1 }]); + }); }); async function killChangeStreamCursor(db: mongo.Db, client: mongo.MongoClient) { From 6835f5e01e7d8bbe03941cba3ac01ac0d7e46f53 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 7 Apr 2026 16:28:40 +0200 Subject: [PATCH 16/20] Rename test. --- .../{internal_mongodb_utils.test.ts => raw_change_stream.test.ts} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename modules/module-mongodb/test/src/{internal_mongodb_utils.test.ts => raw_change_stream.test.ts} (100%) diff --git a/modules/module-mongodb/test/src/internal_mongodb_utils.test.ts b/modules/module-mongodb/test/src/raw_change_stream.test.ts similarity index 100% rename from modules/module-mongodb/test/src/internal_mongodb_utils.test.ts rename to modules/module-mongodb/test/src/raw_change_stream.test.ts From 886b5702c130d105ab038655fe29ef49d5153938 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 7 Apr 2026 16:29:52 +0200 Subject: [PATCH 17/20] Add test for replicating from multiple databases in the same cluster. --- .../test/src/change_stream.test.ts | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/modules/module-mongodb/test/src/change_stream.test.ts b/modules/module-mongodb/test/src/change_stream.test.ts index 1c53d6498..8ce598357 100644 --- a/modules/module-mongodb/test/src/change_stream.test.ts +++ b/modules/module-mongodb/test/src/change_stream.test.ts @@ -257,6 +257,53 @@ bucket_definitions: expect(data).toMatchObject([test_utils.putOp('test_DATA', { id: test_id, description: 'test1' })]); }); + test('replicating from multiple databases in the same cluster', async () => { + await using context = await openContext(); + const { client, db } = context; + const otherDb = client.db(`${db.databaseName}_other_${storageVersion}`); + await otherDb.dropDatabase(); + await using _ = { + [Symbol.asyncDispose]: async () => { + await otherDb.dropDatabase(); + } + }; + + await context.updateSyncRules(` + bucket_definitions: + global: + data: + - SELECT _id as id, description FROM "${db.databaseName}"."test_data_default" + - SELECT _id as id, description FROM "${otherDb.databaseName}"."test_data_other" + `); + + await db.createCollection('test_data_default'); + await otherDb.createCollection('test_data_other'); + await context.replicateSnapshot(); + context.startStreaming(); + + const defaultResult = await db.collection('test_data_default').insertOne({ description: 'default db' }); + const otherResult = await otherDb.collection('test_data_other').insertOne({ description: 'other db' }); + + const data = await context.getBucketData('global[]'); + + expect(data).toEqual( + expect.arrayContaining([ + expect.objectContaining( + test_utils.putOp('test_data_default', { + id: defaultResult.insertedId.toHexString(), + description: 'default db' + }) + ), + expect.objectContaining( + test_utils.putOp('test_data_other', { + id: otherResult.insertedId.toHexString(), + description: 'other db' + }) + ) + ]) + ); + }); + test('replicating large values', async () => { await using context = await openContext(); const { db } = context; From d04024fb73bd1c393e452cee7560f40172f92939 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 7 Apr 2026 16:35:37 +0200 Subject: [PATCH 18/20] Changeset. --- .changeset/chilled-bags-complain.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/chilled-bags-complain.md diff --git a/.changeset/chilled-bags-complain.md b/.changeset/chilled-bags-complain.md new file mode 100644 index 000000000..f77038737 --- /dev/null +++ b/.changeset/chilled-bags-complain.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-module-mongodb': minor +--- + +Use custom MongoDB change stream implementation to get better low-level control. From 9634c6327f06a14f213882569ddabd9d7cb4f2c4 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Fri, 10 Apr 2026 15:24:29 +0200 Subject: [PATCH 19/20] Fix `batchesSeen`. --- modules/module-mongodb/src/replication/ChangeStream.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 714b81356..053a9e89c 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -280,6 +280,7 @@ export class ChangeStream { await createCheckpoint(this.client, this.defaultDb, this.checkpointStreamId); lastCheckpointCreated = performance.now(); } + batchesSeen += 1; for (let rawChangeDocument of events) { const changeDocument = mongo.BSON.deserialize(rawChangeDocument, { useBigInt64: true }); @@ -298,7 +299,6 @@ export class ChangeStream { } eventsSeen += 1; - batchesSeen += 1; } } @@ -985,17 +985,17 @@ export class ChangeStream { * would process below. * * However we don't commit the LSN after collections are dropped. - * The prevents the `startAfter` or `resumeToken` from advancing past the drop events. + * This prevents the `startAfter` or `resumeToken` from advancing past the drop events. * The stream also closes after the drop events. * This causes an infinite loop of processing the collection drop events. * - * This check here invalidates the change stream if our `_checkpoints` collection + * This check here invalidates the change stream if our `_powersync_checkpoints` collection * is dropped. This allows for detecting when the DB is dropped. */ if (changeDocument.operationType == 'drop') { throw new ChangeStreamInvalidatedError( 'Internal collections have been dropped', - new Error('_checkpoints collection was dropped') + new Error('_powersync_checkpoints collection was dropped') ); } From 6822d4f863748c206b03d1ca0250ff2ab9966548 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Fri, 10 Apr 2026 15:26:44 +0200 Subject: [PATCH 20/20] Rename options. --- .../module-mongodb/src/replication/RawChangeStream.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/modules/module-mongodb/src/replication/RawChangeStream.ts b/modules/module-mongodb/src/replication/RawChangeStream.ts index 8b4e5af7b..dab285e64 100644 --- a/modules/module-mongodb/src/replication/RawChangeStream.ts +++ b/modules/module-mongodb/src/replication/RawChangeStream.ts @@ -57,11 +57,11 @@ export async function* rawChangeStream(db: mongo.Db, pipeline: mongo.Document[], let innerPipeline = pipeline; if (lastResumeToken != null) { const [first, ...rest] = pipeline; - const options = { ...first.$changeStream }; - delete options.startAtOperationTime; - delete options.startAfter; - options.resumeAfter = lastResumeToken; - innerPipeline = [{ $changeStream: options }, ...rest]; + const changeStreamStage = { ...first.$changeStream }; + delete changeStreamStage.startAtOperationTime; + delete changeStreamStage.startAfter; + changeStreamStage.resumeAfter = lastResumeToken; + innerPipeline = [{ $changeStream: changeStreamStage }, ...rest]; } const inner = rawChangeStreamInner(session, db, innerPipeline, { ...options