diff --git a/.changeset/chilled-bags-complain.md b/.changeset/chilled-bags-complain.md new file mode 100644 index 000000000..f77038737 --- /dev/null +++ b/.changeset/chilled-bags-complain.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-module-mongodb': minor +--- + +Use custom MongoDB change stream implementation to get better low-level control. diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 6ae824ddc..053a9e89c 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -1,4 +1,4 @@ -import { isMongoNetworkTimeoutError, isMongoServerError, mongo } from '@powersync/lib-service-mongodb'; +import { mongo } from '@powersync/lib-service-mongodb'; import { container, DatabaseConnectionError, @@ -29,7 +29,6 @@ import { ReplicationMetric } from '@powersync/service-types'; import { MongoLSN } from '../common/MongoLSN.js'; import { PostImagesOption } from '../types/types.js'; import { escapeRegExp } from '../utils.js'; -import { trackChangeStreamBsonBytes } from './internal-mongodb-utils.js'; import { MongoManager } from './MongoManager.js'; import { constructAfterRecord, @@ -39,6 +38,7 @@ import { STANDALONE_CHECKPOINT_ID } from './MongoRelation.js'; import { ChunkedSnapshotQuery } from './MongoSnapshotQuery.js'; +import { ChangeStreamBatch, rawChangeStream } from './RawChangeStream.js'; import { CHECKPOINTS_COLLECTION, timestampToDate } from './replication-utils.js'; export interface ChangeStreamOptions { @@ -259,48 +259,53 @@ export class ChangeStream { // Create a checkpoint, and open a change stream using startAtOperationTime with the checkpoint's operationTime. const firstCheckpointLsn = await createCheckpoint(this.client, this.defaultDb, this.checkpointStreamId); - await using streamManager = this.openChangeStream({ lsn: firstCheckpointLsn, maxAwaitTimeMs: 0 }); - const { stream } = streamManager; const startTime = performance.now(); let lastCheckpointCreated = performance.now(); let eventsSeen = 0; + let batchesSeen = 0; - while (performance.now() - startTime < LSN_TIMEOUT_SECONDS * 1000) { + const filters = this.getSourceNamespaceFilters(); + const iter = this.rawChangeStreamBatches({ + lsn: firstCheckpointLsn, + maxAwaitTimeMS: 0, + signal: this.abort_signal, + filters + }); + for await (let { events } of iter) { + if (performance.now() - startTime >= LSN_TIMEOUT_SECONDS * 1000) { + break; + } if (performance.now() - lastCheckpointCreated >= LSN_CREATE_INTERVAL_SECONDS * 1000) { await createCheckpoint(this.client, this.defaultDb, this.checkpointStreamId); lastCheckpointCreated = performance.now(); } + batchesSeen += 1; - // tryNext() doesn't block, while next() / hasNext() does block until there is data on the stream - const changeDocument = await stream.tryNext().catch((e) => { - throw mapChangeStreamError(e); - }); - if (changeDocument == null) { - continue; - } - - const ns = 'ns' in changeDocument && 'coll' in changeDocument.ns ? changeDocument.ns : undefined; + for (let rawChangeDocument of events) { + const changeDocument = mongo.BSON.deserialize(rawChangeDocument, { useBigInt64: true }); + const ns = 'ns' in changeDocument && 'coll' in changeDocument.ns ? changeDocument.ns : undefined; - if (ns?.coll == CHECKPOINTS_COLLECTION && 'documentKey' in changeDocument) { - const checkpointId = changeDocument.documentKey._id as string | mongo.ObjectId; - if (!this.checkpointStreamId.equals(checkpointId)) { - continue; + if (ns?.coll == CHECKPOINTS_COLLECTION && 'documentKey' in changeDocument) { + const checkpointId = changeDocument.documentKey._id as string | mongo.ObjectId; + if (!this.checkpointStreamId.equals(checkpointId)) { + continue; + } + const { comparable: lsn } = new MongoLSN({ + timestamp: changeDocument.clusterTime!, + resume_token: changeDocument._id + }); + return lsn; } - const { comparable: lsn } = new MongoLSN({ - timestamp: changeDocument.clusterTime!, - resume_token: changeDocument._id - }); - return lsn; - } - eventsSeen += 1; + eventsSeen += 1; + } } // Could happen if there is a very large replication lag? throw new ServiceError( ErrorCode.PSYNC_S1301, - `Timeout after while waiting for checkpoint document for ${LSN_TIMEOUT_SECONDS}s. Streamed events = ${eventsSeen}` + `Timeout after while waiting for checkpoint document for ${LSN_TIMEOUT_SECONDS}s. Streamed events = ${eventsSeen}, batches = ${batchesSeen}` ); } @@ -308,15 +313,17 @@ export class ChangeStream { * Given a snapshot LSN, validate that we can read from it, by opening a change stream. */ private async validateSnapshotLsn(lsn: string) { - await using streamManager = this.openChangeStream({ lsn: lsn, maxAwaitTimeMs: 0 }); - const { stream } = streamManager; - try { - // tryNext() doesn't block, while next() / hasNext() does block until there is data on the stream - await stream.tryNext(); - } catch (e) { - // Note: A timeout here is not handled as a ChangeStreamInvalidatedError, even though - // we possibly cannot recover from it. - throw mapChangeStreamError(e); + const filters = this.getSourceNamespaceFilters(); + const stream = this.rawChangeStreamBatches({ + lsn: lsn, + // maxAwaitTimeMS should never actually be used here + maxAwaitTimeMS: 0, + filters + }); + for await (let _batch of stream) { + // We got a response from the aggregate command, so consider the LSN valid. + // Close the stream immediately. + break; } } @@ -753,19 +760,18 @@ export class ChangeStream { } } - private openChangeStream(options: { lsn: string | null; maxAwaitTimeMs?: number }) { + private rawChangeStreamBatches(options: { + lsn: string | null; + maxAwaitTimeMS?: number; + batchSize?: number; + filters: { $match: any; multipleDatabases: boolean }; + signal?: AbortSignal; + }): AsyncIterableIterator { const lastLsn = options.lsn ? MongoLSN.fromSerialized(options.lsn) : null; const startAfter = lastLsn?.timestamp; const resumeAfter = lastLsn?.resumeToken; - const filters = this.getSourceNamespaceFilters(); - - const pipeline: mongo.Document[] = [ - { - $match: filters.$match - }, - { $changeStreamSplitLargeEvent: {} } - ]; + const filters = options.filters; let fullDocument: 'required' | 'updateLookup'; @@ -777,12 +783,19 @@ export class ChangeStream { } else { fullDocument = 'updateLookup'; } - const streamOptions: mongo.ChangeStreamOptions = { + const streamOptions: mongo.ChangeStreamOptions & mongo.Document = { showExpandedEvents: true, - maxAwaitTimeMS: options.maxAwaitTimeMs ?? this.maxAwaitTimeMS, - fullDocument: fullDocument, - maxTimeMS: this.changeStreamTimeout + fullDocument: fullDocument }; + const pipeline: mongo.Document[] = [ + { + $changeStream: streamOptions + }, + { + $match: filters.$match + }, + { $changeStreamSplitLargeEvent: {} } + ]; /** * Only one of these options can be supplied at a time. @@ -796,45 +809,22 @@ export class ChangeStream { streamOptions.startAtOperationTime = startAfter; } - let stream: mongo.ChangeStream; + let watchDb: mongo.Db; if (filters.multipleDatabases) { - // Requires readAnyDatabase@admin on Atlas - stream = this.client.watch(pipeline, streamOptions); + watchDb = this.client.db('admin'); + streamOptions.allChangesForCluster = true; } else { - // Same general result, but requires less permissions than the above - stream = this.defaultDb.watch(pipeline, streamOptions); + watchDb = this.defaultDb; } - this.abort_signal.addEventListener('abort', () => { - stream.close(); - }); - - return { - stream, - filters, - [Symbol.asyncDispose]: async () => { - return stream.close(); - } - }; - } + return rawChangeStream(watchDb, pipeline, { + batchSize: options.batchSize ?? this.snapshotChunkLength, + maxAwaitTimeMS: options.maxAwaitTimeMS ?? this.maxAwaitTimeMS, + maxTimeMS: this.changeStreamTimeout, - private getBufferedChangeCount(stream: mongo.ChangeStream): number { - // The driver keeps fetched change stream documents on the underlying cursor, but does - // not expose that through the public ChangeStream API. We use this to detect backlog - // building up before we have processed the corresponding source changes locally. - // If the driver API changes, we'll have a hard error here. - // We specifically want to avoid a silent performance regression if the driver behavior changes. - const cursor = ( - stream as mongo.ChangeStream & { - cursor: mongo.AbstractCursor>; - } - ).cursor; - if (cursor == null || typeof cursor.bufferedCount != 'function') { - throw new ReplicationAssertionError( - 'MongoDB ChangeStream no longer exposes an internal cursor with bufferedCount' - ); - } - return cursor.bufferedCount(); + signal: options.signal, + logger: this.logger + }); } async streamChangesInternal() { @@ -864,16 +854,12 @@ export class ChangeStream { this.logger.info(`Resume streaming at ${startAfter?.inspect()} / ${lastLsn} | Token age: ${tokenAgeSeconds}s`); - await using streamManager = this.openChangeStream({ lsn: resumeFromLsn }); - const { stream, filters } = streamManager; - if (this.abort_signal.aborted) { - await stream.close(); - return; - } - trackChangeStreamBsonBytes(stream, (bytes) => { - bytesReplicatedMetric.add(bytes); - // Each of these represent a single response message from MongoDB. - chunksReplicatedMetric.add(1); + const filters = this.getSourceNamespaceFilters(); + // This is closed when the for loop below returns/breaks/throws + const batchStream = this.rawChangeStreamBatches({ + lsn: resumeFromLsn, + filters, + signal: this.abort_signal }); // Always start with a checkpoint. @@ -893,36 +879,21 @@ export class ChangeStream { let lastEmptyResume = performance.now(); let lastTxnKey: string | null = null; - while (true) { - if (this.abort_signal.aborted) { - break; - } - - const originalChangeDocument = await stream.tryNext().catch((e) => { - throw mapChangeStreamError(e); - }); - // The stream was closed, we will only ever receive `null` from it - if (!originalChangeDocument && stream.closed) { - break; - } - + for await (let eventBatch of batchStream) { + const { events, resumeToken } = eventBatch; + bytesReplicatedMetric.add(eventBatch.byteSize); + chunksReplicatedMetric.add(1); if (this.abort_signal.aborted) { break; } - - if (originalChangeDocument == null) { - // We get a new null document after `maxAwaitTimeMS` if there were no other events. - // In this case, stream.resumeToken is the resume token associated with the last response. - // stream.resumeToken is not updated if stream.tryNext() returns data, while stream.next() - // does update it. - // From observed behavior, the actual resumeToken changes around once every 10 seconds. + this.touch(); + if (events.length == 0) { + // No changes in this batch, but we still want to keep the connection alive. + // We do this by persisting a keepalive checkpoint. // If we don't update it on empty events, we do keep consistency, but resuming the stream // with old tokens may cause connection timeouts. - // We throttle this further by only persisting a keepalive once a minute. - // We add an additional check for waitForCheckpointLsn == null, to make sure we're not - // doing a keepalive in the middle of a transaction. if (waitForCheckpointLsn == null && performance.now() - lastEmptyResume > 60_000) { - const { comparable: lsn, timestamp } = MongoLSN.fromResumeToken(stream.resumeToken); + const { comparable: lsn, timestamp } = MongoLSN.fromResumeToken(resumeToken); await batch.keepalive(lsn); this.touch(); lastEmptyResume = performance.now(); @@ -933,226 +904,244 @@ export class ChangeStream { ); this.replicationLag.markStarted(); } - continue; - } - - this.touch(); - if (startAfter != null && originalChangeDocument.clusterTime?.lte(startAfter)) { - continue; + // If we have no changes, we can just persist the keepalive. + // This is throttled to once per minute. + if (performance.now() - lastEmptyResume < 60_000) { + continue; + } } - let changeDocument = originalChangeDocument; - if (originalChangeDocument?.splitEvent != null) { - // Handle split events from $changeStreamSplitLargeEvent. - // This is only relevant for very large update operations. - const splitEvent = originalChangeDocument?.splitEvent; + this.touch(); - if (splitDocument == null) { - splitDocument = originalChangeDocument; - } else { - splitDocument = Object.assign(splitDocument, originalChangeDocument); + const batchStart = Date.now(); + for (let eventIndex = 0; eventIndex < events.length; eventIndex++) { + const rawChangeDocument = events[eventIndex]; + const originalChangeDocument = mongo.BSON.deserialize(rawChangeDocument, { + useBigInt64: true + }) as mongo.ChangeStreamDocument; + if (this.abort_signal.aborted) { + break; } - if (splitEvent.fragment == splitEvent.of) { - // Got all fragments - changeDocument = splitDocument; - splitDocument = null; - } else { - // Wait for more fragments + if (startAfter != null && originalChangeDocument.clusterTime?.lte(startAfter)) { continue; } - } else if (splitDocument != null) { - // We were waiting for fragments, but got a different event - throw new ReplicationAssertionError(`Incomplete splitEvent: ${JSON.stringify(splitDocument.splitEvent)}`); - } - if ( - !filters.multipleDatabases && - 'ns' in changeDocument && - changeDocument.ns.db != this.defaultDb.databaseName && - changeDocument.ns.db.endsWith(`_${this.defaultDb.databaseName}`) - ) { - // When all of the following conditions are met: - // 1. We're replicating from an Atlas Flex instance. - // 2. There were changestream events recorded while the PowerSync service is paused. - // 3. We're only replicating from a single database. - // Then we've observed an ns with for example {db: '67b83e86cd20730f1e766dde_ps'}, - // instead of the expected {db: 'ps'}. - // We correct this. - changeDocument.ns.db = this.defaultDb.databaseName; - - if (!flexDbNameWorkaroundLogged) { - flexDbNameWorkaroundLogged = true; - this.logger.warn( - `Incorrect DB name in change stream: ${changeDocument.ns.db}. Changed to ${this.defaultDb.databaseName}.` - ); - } - } + let changeDocument = originalChangeDocument; + if (originalChangeDocument?.splitEvent != null) { + // Handle split events from $changeStreamSplitLargeEvent. + // This is only relevant for very large update operations. + const splitEvent = originalChangeDocument?.splitEvent; - const ns = 'ns' in changeDocument && 'coll' in changeDocument.ns ? changeDocument.ns : undefined; - - if (ns?.coll == CHECKPOINTS_COLLECTION) { - /** - * Dropping the database does not provide an `invalidate` event. - * We typically would receive `drop` events for the collection which we - * would process below. - * - * However we don't commit the LSN after collections are dropped. - * The prevents the `startAfter` or `resumeToken` from advancing past the drop events. - * The stream also closes after the drop events. - * This causes an infinite loop of processing the collection drop events. - * - * This check here invalidates the change stream if our `_checkpoints` collection - * is dropped. This allows for detecting when the DB is dropped. - */ - if (changeDocument.operationType == 'drop') { - throw new ChangeStreamInvalidatedError( - 'Internal collections have been dropped', - new Error('_checkpoints collection was dropped') - ); + if (splitDocument == null) { + splitDocument = originalChangeDocument; + } else { + splitDocument = Object.assign(splitDocument, originalChangeDocument); + } + + if (splitEvent.fragment == splitEvent.of) { + // Got all fragments + changeDocument = splitDocument; + splitDocument = null; + } else { + // Wait for more fragments + continue; + } + } else if (splitDocument != null) { + // We were waiting for fragments, but got a different event + throw new ReplicationAssertionError(`Incomplete splitEvent: ${JSON.stringify(splitDocument.splitEvent)}`); } if ( - !( - changeDocument.operationType == 'insert' || - changeDocument.operationType == 'update' || - changeDocument.operationType == 'replace' - ) + !filters.multipleDatabases && + 'ns' in changeDocument && + changeDocument.ns.db != this.defaultDb.databaseName && + changeDocument.ns.db.endsWith(`_${this.defaultDb.databaseName}`) ) { - continue; + // When all of the following conditions are met: + // 1. We're replicating from an Atlas Flex instance. + // 2. There were changestream events recorded while the PowerSync service is paused. + // 3. We're only replicating from a single database. + // Then we've observed an ns with for example {db: '67b83e86cd20730f1e766dde_ps'}, + // instead of the expected {db: 'ps'}. + // We correct this. + changeDocument.ns.db = this.defaultDb.databaseName; + + if (!flexDbNameWorkaroundLogged) { + flexDbNameWorkaroundLogged = true; + this.logger.warn( + `Incorrect DB name in change stream: ${changeDocument.ns.db}. Changed to ${this.defaultDb.databaseName}.` + ); + } } - // We handle two types of checkpoint events: - // 1. "Standalone" checkpoints, typically write checkpoints. We want to process these - // immediately, regardless of where they were created. - // 2. "Batch" checkpoints for the current stream. This is used as a form of dynamic rate - // limiting of commits, so we specifically want to exclude checkpoints from other streams. - // - // It may be useful to also throttle commits due to standalone checkpoints in the future. - // However, these typically have a much lower rate than batch checkpoints, so we don't do that for now. - - const checkpointId = changeDocument.documentKey._id as string | mongo.ObjectId; - - if (checkpointId == STANDALONE_CHECKPOINT_ID) { - // Standalone / write checkpoint received. - // When we are caught up, commit immediately to keep write checkpoint latency low. - // Once there is already a batch checkpoint pending, or the driver has buffered more - // change stream events, collapse standalone checkpoints into the normal batch - // checkpoint flow to avoid commit churn under sustained load. - if (waitForCheckpointLsn != null || this.getBufferedChangeCount(stream) > 0) { - if (waitForCheckpointLsn == null) { - waitForCheckpointLsn = await createCheckpoint(this.client, this.defaultDb, this.checkpointStreamId); - } + const ns = 'ns' in changeDocument && 'coll' in changeDocument.ns ? changeDocument.ns : undefined; + + if (ns?.coll == CHECKPOINTS_COLLECTION) { + /** + * Dropping the database does not provide an `invalidate` event. + * We typically would receive `drop` events for the collection which we + * would process below. + * + * However we don't commit the LSN after collections are dropped. + * This prevents the `startAfter` or `resumeToken` from advancing past the drop events. + * The stream also closes after the drop events. + * This causes an infinite loop of processing the collection drop events. + * + * This check here invalidates the change stream if our `_powersync_checkpoints` collection + * is dropped. This allows for detecting when the DB is dropped. + */ + if (changeDocument.operationType == 'drop') { + throw new ChangeStreamInvalidatedError( + 'Internal collections have been dropped', + new Error('_powersync_checkpoints collection was dropped') + ); + } + + if ( + !( + changeDocument.operationType == 'insert' || + changeDocument.operationType == 'update' || + changeDocument.operationType == 'replace' + ) + ) { continue; } - } else if (!this.checkpointStreamId.equals(checkpointId)) { - continue; - } - const { comparable: lsn } = new MongoLSN({ - timestamp: changeDocument.clusterTime!, - resume_token: changeDocument._id - }); - if (batch.lastCheckpointLsn != null && lsn < batch.lastCheckpointLsn) { - // Checkpoint out of order - should never happen with MongoDB. - // If it does happen, we throw an error to stop the replication - restarting should recover. - // Since we use batch.lastCheckpointLsn for the next resumeAfter, this should not result in an infinite loop. - // Originally a workaround for https://jira.mongodb.org/browse/NODE-7042. - // This has been fixed in the driver in the meantime, but we still keep this as a safety-check. - throw new ReplicationAssertionError( - `Change resumeToken ${(changeDocument._id as any)._data} (${timestampToDate(changeDocument.clusterTime!).toISOString()}) is less than last checkpoint LSN ${batch.lastCheckpointLsn}. Restarting replication.` - ); - } - if (waitForCheckpointLsn != null && lsn >= waitForCheckpointLsn) { - waitForCheckpointLsn = null; - } - const { checkpointBlocked } = await batch.commit(lsn, { - oldestUncommittedChange: this.replicationLag.oldestUncommittedChange - }); + // We handle two types of checkpoint events: + // 1. "Standalone" checkpoints, typically write checkpoints. We want to process these + // immediately, regardless of where they were created. + // 2. "Batch" checkpoints for the current stream. This is used as a form of dynamic rate + // limiting of commits, so we specifically want to exclude checkpoints from other streams. + // + // It may be useful to also throttle commits due to standalone checkpoints in the future. + // However, these typically have a much lower rate than batch checkpoints, so we don't do that for now. + + const checkpointId = changeDocument.documentKey._id as string | mongo.ObjectId; + + if (checkpointId == STANDALONE_CHECKPOINT_ID) { + // Standalone / write checkpoint received. + // When we are caught up, commit immediately to keep write checkpoint latency low. + // Once there is already a batch checkpoint pending, or the driver has buffered more + // change stream events, collapse standalone checkpoints into the normal batch + // checkpoint flow to avoid commit churn under sustained load. + const hasBufferedChanges = eventIndex < events.length - 1; + if (waitForCheckpointLsn != null || hasBufferedChanges) { + if (waitForCheckpointLsn == null) { + waitForCheckpointLsn = await createCheckpoint(this.client, this.defaultDb, this.checkpointStreamId); + } + continue; + } + } else if (!this.checkpointStreamId.equals(checkpointId)) { + continue; + } + const { comparable: lsn } = new MongoLSN({ + timestamp: changeDocument.clusterTime!, + resume_token: changeDocument._id + }); + if (batch.lastCheckpointLsn != null && lsn < batch.lastCheckpointLsn) { + // Checkpoint out of order - should never happen with MongoDB. + // If it does happen, we throw an error to stop the replication - restarting should recover. + // Since we use batch.lastCheckpointLsn for the next resumeAfter, this should not result in an infinite loop. + // Originally a workaround for https://jira.mongodb.org/browse/NODE-7042. + // This has been fixed in the driver in the meantime, but we still keep this as a safety-check. + throw new ReplicationAssertionError( + `Change resumeToken ${(changeDocument._id as any)._data} (${timestampToDate(changeDocument.clusterTime!).toISOString()}) is less than last checkpoint LSN ${batch.lastCheckpointLsn}. Restarting replication.` + ); + } - if (!checkpointBlocked) { - this.replicationLag.markCommitted(); - changesSinceLastCheckpoint = 0; - } - } else if ( - changeDocument.operationType == 'insert' || - changeDocument.operationType == 'update' || - changeDocument.operationType == 'replace' || - changeDocument.operationType == 'delete' - ) { - if (waitForCheckpointLsn == null) { - waitForCheckpointLsn = await createCheckpoint(this.client, this.defaultDb, this.checkpointStreamId); - } + if (waitForCheckpointLsn != null && lsn >= waitForCheckpointLsn) { + waitForCheckpointLsn = null; + } + const { checkpointBlocked } = await batch.commit(lsn, { + oldestUncommittedChange: this.replicationLag.oldestUncommittedChange + }); - const rel = getMongoRelation(changeDocument.ns); - const table = await this.getRelation(batch, rel, { - // In most cases, we should not need to snapshot this. But if this is the first time we see the collection - // for whatever reason, then we do need to snapshot it. - // This may result in some duplicate operations when a collection is created for the first time after - // sync rules was deployed. - snapshot: true - }); - if (table.syncAny) { - this.replicationLag.trackUncommittedChange( - changeDocument.clusterTime == null ? null : timestampToDate(changeDocument.clusterTime) - ); + if (!checkpointBlocked) { + this.replicationLag.markCommitted(); + changesSinceLastCheckpoint = 0; + } + } else if ( + changeDocument.operationType == 'insert' || + changeDocument.operationType == 'update' || + changeDocument.operationType == 'replace' || + changeDocument.operationType == 'delete' + ) { + if (waitForCheckpointLsn == null) { + waitForCheckpointLsn = await createCheckpoint(this.client, this.defaultDb, this.checkpointStreamId); + } - const transactionKeyValue = transactionKey(changeDocument); + const rel = getMongoRelation(changeDocument.ns); + const table = await this.getRelation(batch, rel, { + // In most cases, we should not need to snapshot this. But if this is the first time we see the collection + // for whatever reason, then we do need to snapshot it. + // This may result in some duplicate operations when a collection is created for the first time after + // sync rules was deployed. + snapshot: true + }); + if (table.syncAny) { + this.replicationLag.trackUncommittedChange( + changeDocument.clusterTime == null ? null : timestampToDate(changeDocument.clusterTime) + ); + + const transactionKeyValue = transactionKey(changeDocument); + + if (transactionKeyValue == null || lastTxnKey != transactionKeyValue) { + // Very crude metric for counting transactions replicated. + // We ignore operations other than basic CRUD, and ignore changes to _powersync_checkpoints. + // Individual writes may not have a txnNumber, in which case we count them as separate transactions. + lastTxnKey = transactionKeyValue; + transactionsReplicatedMetric.add(1); + } - if (transactionKeyValue == null || lastTxnKey != transactionKeyValue) { - // Very crude metric for counting transactions replicated. - // We ignore operations other than basic CRUD, and ignore changes to _powersync_checkpoints. - // Individual writes may not have a txnNumber, in which case we count them as separate transactions. - lastTxnKey = transactionKeyValue; - transactionsReplicatedMetric.add(1); + const flushResult = await this.writeChange(batch, table, changeDocument); + changesSinceLastCheckpoint += 1; + if (flushResult != null && changesSinceLastCheckpoint >= 20_000) { + // When we are catching up replication after an initial snapshot, there may be a very long delay + // before we do a commit(). In that case, we need to periodically persist the resume LSN, so + // we don't restart from scratch if we restart replication. + // The same could apply if we need to catch up on replication after some downtime. + const { comparable: lsn } = new MongoLSN({ + timestamp: changeDocument.clusterTime!, + resume_token: changeDocument._id + }); + this.logger.info(`Updating resume LSN to ${lsn} after ${changesSinceLastCheckpoint} changes`); + await batch.setResumeLsn(lsn); + changesSinceLastCheckpoint = 0; + } } - - const flushResult = await this.writeChange(batch, table, changeDocument); - changesSinceLastCheckpoint += 1; - if (flushResult != null && changesSinceLastCheckpoint >= 20_000) { - // When we are catching up replication after an initial snapshot, there may be a very long delay - // before we do a commit(). In that case, we need to periodically persist the resume LSN, so - // we don't restart from scratch if we restart replication. - // The same could apply if we need to catch up on replication after some downtime. - const { comparable: lsn } = new MongoLSN({ - timestamp: changeDocument.clusterTime!, - resume_token: changeDocument._id - }); - this.logger.info(`Updating resume LSN to ${lsn} after ${changesSinceLastCheckpoint} changes`); - await batch.setResumeLsn(lsn); - changesSinceLastCheckpoint = 0; + } else if (changeDocument.operationType == 'drop') { + const rel = getMongoRelation(changeDocument.ns); + const table = await this.getRelation(batch, rel, { + // We're "dropping" this collection, so never snapshot it. + snapshot: false + }); + if (table.syncAny) { + await batch.drop([table]); + this.relationCache.delete(table); } + } else if (changeDocument.operationType == 'rename') { + const relFrom = getMongoRelation(changeDocument.ns); + const relTo = getMongoRelation(changeDocument.to); + const tableFrom = await this.getRelation(batch, relFrom, { + // We're "dropping" this collection, so never snapshot it. + snapshot: false + }); + if (tableFrom.syncAny) { + await batch.drop([tableFrom]); + this.relationCache.delete(relFrom); + } + // Here we do need to snapshot the new table + const collection = await this.getCollectionInfo(relTo.schema, relTo.name); + await this.handleRelation(batch, relTo, { + // This is a new (renamed) collection, so always snapshot it. + snapshot: true, + collectionInfo: collection + }); } - } else if (changeDocument.operationType == 'drop') { - const rel = getMongoRelation(changeDocument.ns); - const table = await this.getRelation(batch, rel, { - // We're "dropping" this collection, so never snapshot it. - snapshot: false - }); - if (table.syncAny) { - await batch.drop([table]); - this.relationCache.delete(table); - } - } else if (changeDocument.operationType == 'rename') { - const relFrom = getMongoRelation(changeDocument.ns); - const relTo = getMongoRelation(changeDocument.to); - const tableFrom = await this.getRelation(batch, relFrom, { - // We're "dropping" this collection, so never snapshot it. - snapshot: false - }); - if (tableFrom.syncAny) { - await batch.drop([tableFrom]); - this.relationCache.delete(relFrom); - } - // Here we do need to snapshot the new table - const collection = await this.getCollectionInfo(relTo.schema, relTo.name); - await this.handleRelation(batch, relTo, { - // This is a new (renamed) collection, so always snapshot it. - snapshot: true, - collectionInfo: collection - }); } + this.logger.info(`Processed batch of ${events.length} changes in ${Date.now() - batchStart}ms`); } } ); @@ -1175,28 +1164,6 @@ export class ChangeStream { } } -function mapChangeStreamError(e: any) { - if (isMongoNetworkTimeoutError(e)) { - // This typically has an unhelpful message like "connection 2 to 159.41.94.47:27017 timed out". - // We wrap the error to make it more useful. - throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e); - } else if (isMongoServerError(e) && e.codeName == 'MaxTimeMSExpired') { - // maxTimeMS was reached. Example message: - // MongoServerError: Executor error during aggregate command on namespace: powersync_test_data.$cmd.aggregate :: caused by :: operation exceeded time limit - throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e); - } else if ( - isMongoServerError(e) && - e.codeName == 'NoMatchingDocument' && - e.errmsg?.includes('post-image was not found') - ) { - throw new ChangeStreamInvalidatedError(e.errmsg, e); - } else if (isMongoServerError(e) && e.hasErrorLabel('NonResumableChangeStreamError')) { - throw new ChangeStreamInvalidatedError(e.message, e); - } else { - throw new DatabaseConnectionError(ErrorCode.PSYNC_S1346, `Error reading MongoDB ChangeStream`, e); - } -} - /** * Transaction key for a change stream event, used to detect transaction boundaries. Returns null if the event is not part of a transaction. */ diff --git a/modules/module-mongodb/src/replication/RawChangeStream.ts b/modules/module-mongodb/src/replication/RawChangeStream.ts new file mode 100644 index 000000000..dab285e64 --- /dev/null +++ b/modules/module-mongodb/src/replication/RawChangeStream.ts @@ -0,0 +1,275 @@ +import { isMongoNetworkTimeoutError, isMongoServerError, mongo } from '@powersync/lib-service-mongodb'; +import { + DatabaseConnectionError, + ErrorCode, + Logger, + ReplicationAssertionError +} from '@powersync/lib-services-framework'; +import { ChangeStreamInvalidatedError } from './ChangeStream.js'; + +export interface RawChangeStreamOptions { + signal?: AbortSignal; + /** + * How long to wait for new data per batch (max time for long-polling). + */ + maxAwaitTimeMS: number; + /** + * Timeout for the initial aggregate command. + */ + maxTimeMS: number; + batchSize: number; + + /** + * Mostly for testing. + */ + collection?: string; + + logger?: Logger; +} + +export interface ChangeStreamBatch { + resumeToken: mongo.ResumeToken; + events: Buffer[]; + /** + * Size in bytes of this event. + */ + byteSize: number; +} + +export async function* rawChangeStream(db: mongo.Db, pipeline: mongo.Document[], options: RawChangeStreamOptions) { + // We generally attempt to follow the spec at: https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.md + // Intentional differences: + // 1. We don't attempt to follow the client API. + // 2. We require that `postBatchResumeToken` is present. + // 3. We don't attempt to handle resumeToken from individual documents - the consumer can handle that. + if (!('$changeStream' in pipeline[0])) { + throw new ReplicationAssertionError(`First pipeline stage must be $changeStream`); + } + let lastResumeToken: unknown | null = null; + + // > If the server supports sessions, the resume attempt MUST use the same session as the previous attempt's command. + const session = db.client.startSession(); + await using _ = { [Symbol.asyncDispose]: () => session.endSession() }; + + while (true) { + options.signal?.throwIfAborted(); + try { + let innerPipeline = pipeline; + if (lastResumeToken != null) { + const [first, ...rest] = pipeline; + const changeStreamStage = { ...first.$changeStream }; + delete changeStreamStage.startAtOperationTime; + delete changeStreamStage.startAfter; + changeStreamStage.resumeAfter = lastResumeToken; + innerPipeline = [{ $changeStream: changeStreamStage }, ...rest]; + } + const inner = rawChangeStreamInner(session, db, innerPipeline, { + ...options + }); + for await (let batch of inner) { + yield batch; + lastResumeToken = batch.resumeToken; + } + } catch (e) { + if (e instanceof ResumableChangeStreamError) { + // This is only triggered on the getMore command. + // If there is a persistent error, we expect it to occur on the aggregate command as well, + // which will trigger a hard error on the next attempt. + // This matches the change stream spec of: + // > A change stream MUST attempt to resume a single time if it encounters any resumable error per Resumable Error. A change stream MUST NOT attempt to resume on any other type of error. + // > An error on an aggregate command is not a resumable error. Only errors on a getMore command may be considered resumable errors. + // https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.md#resume-process + + // Technically we don't _need_ this resume functionality - we can let the replication job handle the failure + // and restart. However, this provides a faster restart path in common cases. + options.logger?.warn(`Resumable change stream error, retrying: ${e.message}`, e.cause); + continue; + } else { + throw e; + } + } + } +} + +async function* rawChangeStreamInner( + session: mongo.ClientSession, + db: mongo.Db, + pipeline: mongo.Document[], + options: RawChangeStreamOptions +): AsyncGenerator { + // See specs: + // https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.md + + let cursorId: bigint | null = null; + + /** + * Typically '$cmd.aggregate', but we need to use the ns from the cursor. + */ + let nsCollection: string | null = null; + + const maxTimeMS = options.maxAwaitTimeMS; + const batchSize = options.batchSize; + const collection = options.collection ?? 1; + let abortPromise: Promise | null = null; + + const onAbort = () => { + if (cursorId != null && cursorId !== 0n && nsCollection != null) { + // This would result in a CursorKilled error. + abortPromise = db + .command({ + killCursors: nsCollection, + cursors: [cursorId] + }) + .catch(() => {}); + } + }; + options.signal?.addEventListener('abort', onAbort); + + try { + { + // Step 1: Send the aggregate command to start the change stream + const aggregateResult = await db + .command( + { + aggregate: collection, + pipeline, + cursor: { batchSize }, + maxTimeMS: options.maxTimeMS + }, + { session, raw: true } + ) + .catch((e) => { + throw mapChangeStreamError(e); + }); + + const cursor = mongo.BSON.deserialize(aggregateResult.cursor, { + useBigInt64: true, + fieldsAsRaw: { firstBatch: true } + }); + + cursorId = BigInt(cursor.id); + nsCollection = namespaceCollection(cursor.ns); + + let batch = cursor.firstBatch; + + if (cursor.postBatchResumeToken == null) { + // Deviation from spec: We require that the server always returns a postBatchResumeToken. + // postBatchResumeToken is returned in MongoDB 4.0.7 and later, and we support 6.0+ + throw new ReplicationAssertionError(`postBatchResumeToken from aggregate response`); + } + + yield { events: batch, resumeToken: cursor.postBatchResumeToken, byteSize: aggregateResult.cursor.byteLength }; + } + + // Step 2: Poll using getMore until the cursor is closed + while (cursorId && cursorId !== 0n) { + options.signal?.throwIfAborted(); + + const getMoreResult: mongo.Document = await db + .command( + { + getMore: cursorId, + collection: nsCollection, + batchSize, + maxTimeMS + }, + { session, raw: true } + ) + .catch((e) => { + if (isMongoServerError(e) && e.codeName == 'CursorKilled') { + // This may be due to the killCursors command issued when aborting. + // In that case, use the abort error instead. + options.signal?.throwIfAborted(); + } + + if (isResumableChangeStreamError(e)) { + throw new ResumableChangeStreamError(e.message, { cause: e }); + } + throw mapChangeStreamError(e); + }); + + const cursor = mongo.BSON.deserialize(getMoreResult.cursor, { + useBigInt64: true, + fieldsAsRaw: { nextBatch: true } + }); + cursorId = BigInt(cursor.id); + const nextBatch = cursor.nextBatch; + + if (cursor.postBatchResumeToken == null) { + // Deviation from spec: We require that the server always returns a postBatchResumeToken. + // postBatchResumeToken is returned in MongoDB 4.0.7 and later, and we support 6.0+ + throw new ReplicationAssertionError(`postBatchResumeToken from aggregate response`); + } + yield { events: nextBatch, resumeToken: cursor.postBatchResumeToken, byteSize: getMoreResult.cursor.byteLength }; + } + + options.signal?.throwIfAborted(); + throw new ReplicationAssertionError(`Change stream ended unexpectedly`); + } finally { + options.signal?.removeEventListener('abort', onAbort); + if (abortPromise != null) { + // killCursors is already sent - we wait for the response in abortPromise. + await abortPromise; + } else if (cursorId != null && cursorId !== 0n && nsCollection != null) { + await db + .command({ + killCursors: nsCollection, + cursors: [cursorId] + }) + .catch(() => {}); + } + } +} + +class ResumableChangeStreamError extends Error {} + +function isResumableChangeStreamError(e: unknown) { + // See: https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.md#resumable-error + if (!isMongoServerError(e)) { + // Any error encountered which is not a server error (e.g. a timeout error or network error) + return true; + } else if (e.codeName == 'CursorNotFound') { + // A server error with code 43 (CursorNotFound) + return true; + } else if (e.hasErrorLabel('ResumableChangeStreamError')) { + // For servers with wire version 9 or higher (server version 4.4 or higher), any server error with the ResumableChangeStreamError error label. + return true; + } else { + // We ignore servers with wire version less than 9, since we only support MongoDB 6.0+. + return false; + } +} + +export function mapChangeStreamError(e: unknown) { + if (isMongoNetworkTimeoutError(e)) { + // This typically has an unhelpful message like "connection 2 to 159.41.94.47:27017 timed out". + // We wrap the error to make it more useful. + throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e); + } else if (isMongoServerError(e) && e.codeName == 'MaxTimeMSExpired') { + // maxTimeMS was reached. Example message: + // MongoServerError: Executor error during aggregate command on namespace: powersync_test_data.$cmd.aggregate :: caused by :: operation exceeded time limit + throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e); + } else if ( + isMongoServerError(e) && + e.codeName == 'NoMatchingDocument' && + e.errmsg?.includes('post-image was not found') + ) { + throw new ChangeStreamInvalidatedError(e.errmsg, e); + } else if (isMongoServerError(e) && e.hasErrorLabel('NonResumableChangeStreamError')) { + throw new ChangeStreamInvalidatedError(e.message, e); + } else { + throw new DatabaseConnectionError(ErrorCode.PSYNC_S1346, `Error reading MongoDB ChangeStream`, e); + } +} + +/** + * Get the "collection" from a ns. + * + * This drops everything before the first . character. + * + * "my_db_name.$cmd.aggregate" -> "$cmd.aggregate" + */ +export function namespaceCollection(ns: string): string { + const dot = ns.indexOf('.'); + return ns.substring(dot + 1); +} diff --git a/modules/module-mongodb/src/replication/internal-mongodb-utils.ts b/modules/module-mongodb/src/replication/internal-mongodb-utils.ts index bc404f4c3..2e4952da3 100644 --- a/modules/module-mongodb/src/replication/internal-mongodb-utils.ts +++ b/modules/module-mongodb/src/replication/internal-mongodb-utils.ts @@ -1,45 +1,5 @@ import { mongo } from '@powersync/lib-service-mongodb'; -/** - * Track bytes read on a change stream. - * - * This is after decompression, and without TLS overhead. - * - * This excludes some protocol overhead, but does include per-batch overhead. - * - * This is built on internal APIs, and may stop working in future driver versions. - * - * @param add Called once for each batch of data. - */ -export function trackChangeStreamBsonBytes(changeStream: mongo.ChangeStream, add: (bytes: number) => void) { - let internalChangeStream = changeStream as ChangeStreamWithCursor; - let current = internalChangeStream.cursor; - let degisterCursor = trackCursor(current, add); - - const refresh = () => { - // The cursor may be replaced closed and re-opened (replaced) in various scenarios, such as - // after a primary fail-over event. - // There is no direct even to track that, but the `resumeTokenChanged` event is a good proxy. - // It may be called more often than the cursor is replaced, so we just check whether the cursor changed. - // This might miss the init batch, so we may under-count slightly in that case. It is a rare event - // and typically a small number of bytes, so it's fine to ignore. - const next = internalChangeStream.cursor; - if (next !== current) { - degisterCursor(); - current = next; - degisterCursor = trackCursor(current, add); - } - }; - - changeStream.on('resumeTokenChanged', refresh); - - // We return this to allow de-registration of the event listeners. - // However, these are garbage collected automatically when the stream is closed, so it's not strictly necessary to call this. - return () => { - changeStream.off('resumeTokenChanged', refresh); - }; -} - /** * Get the byte size of the current batch on a cursor. * @@ -59,31 +19,6 @@ interface CursorResponse { toBytes?(): Uint8Array; } -interface ChangeStreamWithCursor extends mongo.ChangeStream { - cursor?: mongo.AbstractCursor; -} - -function trackCursor(cursor: mongo.AbstractCursor | undefined, add: (bytes: number) => void) { - if (cursor == null) { - return () => {}; - } - const countBatch = (response: CursorResponse | undefined) => { - const bytes = getResponseBytes(response); - if (bytes > 0) { - add(bytes); - } - }; - - // The `init` event is emitted for the first batch, and the `more` event is emitted for subsequent batches. - cursor.on('init', countBatch); - cursor.on('more', countBatch); - - return () => { - cursor.off('init', countBatch); - cursor.off('more', countBatch); - }; -} - function getResponseBytes(response: CursorResponse | undefined): number { const buffer = response?.toBytes?.(); return buffer?.byteLength ?? 0; diff --git a/modules/module-mongodb/test/src/change_stream.test.ts b/modules/module-mongodb/test/src/change_stream.test.ts index 1c53d6498..8ce598357 100644 --- a/modules/module-mongodb/test/src/change_stream.test.ts +++ b/modules/module-mongodb/test/src/change_stream.test.ts @@ -257,6 +257,53 @@ bucket_definitions: expect(data).toMatchObject([test_utils.putOp('test_DATA', { id: test_id, description: 'test1' })]); }); + test('replicating from multiple databases in the same cluster', async () => { + await using context = await openContext(); + const { client, db } = context; + const otherDb = client.db(`${db.databaseName}_other_${storageVersion}`); + await otherDb.dropDatabase(); + await using _ = { + [Symbol.asyncDispose]: async () => { + await otherDb.dropDatabase(); + } + }; + + await context.updateSyncRules(` + bucket_definitions: + global: + data: + - SELECT _id as id, description FROM "${db.databaseName}"."test_data_default" + - SELECT _id as id, description FROM "${otherDb.databaseName}"."test_data_other" + `); + + await db.createCollection('test_data_default'); + await otherDb.createCollection('test_data_other'); + await context.replicateSnapshot(); + context.startStreaming(); + + const defaultResult = await db.collection('test_data_default').insertOne({ description: 'default db' }); + const otherResult = await otherDb.collection('test_data_other').insertOne({ description: 'other db' }); + + const data = await context.getBucketData('global[]'); + + expect(data).toEqual( + expect.arrayContaining([ + expect.objectContaining( + test_utils.putOp('test_data_default', { + id: defaultResult.insertedId.toHexString(), + description: 'default db' + }) + ), + expect.objectContaining( + test_utils.putOp('test_data_other', { + id: otherResult.insertedId.toHexString(), + description: 'other db' + }) + ) + ]) + ); + }); + test('replicating large values', async () => { await using context = await openContext(); const { db } = context; diff --git a/modules/module-mongodb/test/src/internal_mongodb_utils.test.ts b/modules/module-mongodb/test/src/internal_mongodb_utils.test.ts deleted file mode 100644 index 8e22b7a5a..000000000 --- a/modules/module-mongodb/test/src/internal_mongodb_utils.test.ts +++ /dev/null @@ -1,103 +0,0 @@ -import { describe, expect, test } from 'vitest'; - -import { getCursorBatchBytes, trackChangeStreamBsonBytes } from '@module/replication/replication-index.js'; -import { mongo } from '@powersync/lib-service-mongodb'; -import { clearTestDb, connectMongoData } from './util.js'; - -describe('internal mongodb utils', () => { - // The implementation relies on internal APIs, so we verify this works as expected for various types of change streams. - test('collection change stream size tracking', async () => { - await testChangeStreamBsonBytes('collection'); - }); - - test('db change stream size tracking', async () => { - await testChangeStreamBsonBytes('db'); - }); - - test('cluster change stream size tracking', async () => { - await testChangeStreamBsonBytes('cluster'); - }); - - test('cursor batch size tracking', async () => { - const { db, client } = await connectMongoData(); - await using _ = { [Symbol.asyncDispose]: async () => await client.close() }; - await clearTestDb(db); - const collection = db.collection('test_data'); - await collection.insertMany([{ test: 1 }, { test: 2 }, { test: 3 }, { test: 4 }, { test: 5 }]); - - const cursor = collection.find({}, { batchSize: 2 }); - let batchBytes: number[] = []; - let totalBytes = 0; - // We use this in the same way as ChunkedSnapshotQuery - while (await cursor.hasNext()) { - batchBytes.push(getCursorBatchBytes(cursor)); - totalBytes += batchBytes[batchBytes.length - 1]; - cursor.readBufferedDocuments(); - } - - // 3 batches: [2, 2, 1] documents. Should not change - expect(batchBytes.length).toEqual(3); - // Current tests show 839, but this may change depending on the MongoDB version and other conditions. - expect(totalBytes).toBeGreaterThan(400); - expect(totalBytes).toBeLessThan(1200); - }); - - async function testChangeStreamBsonBytes(type: 'db' | 'collection' | 'cluster') { - // With MongoDB, replication uses the exact same document format - // as normal queries. We test it anyway. - const { db, client } = await connectMongoData(); - await using _ = { [Symbol.asyncDispose]: async () => await client.close() }; - await clearTestDb(db); - const collection = db.collection('test_data'); - - let stream: mongo.ChangeStream; - if (type === 'collection') { - stream = collection.watch([], { - maxAwaitTimeMS: 5, - fullDocument: 'updateLookup' - }); - } else if (type === 'db') { - stream = db.watch([], { - maxAwaitTimeMS: 5, - fullDocument: 'updateLookup' - }); - } else { - stream = client.watch([], { - maxAwaitTimeMS: 5, - fullDocument: 'updateLookup' - }); - } - - let batchBytes: number[] = []; - let totalBytes = 0; - trackChangeStreamBsonBytes(stream, (bytes) => { - batchBytes.push(bytes); - totalBytes += bytes; - }); - - const readAll = async () => { - while ((await stream.tryNext()) != null) {} - }; - - await readAll(); - - await collection.insertOne({ test: 1 }); - await readAll(); - await collection.insertOne({ test: 2 }); - await readAll(); - await collection.insertOne({ test: 3 }); - await readAll(); - - await stream.close(); - - // The exact length by vary based on exact batching logic, but we do want to know when it changes. - // Note: If this causes unstable tests, we can relax this check. - expect(batchBytes.length).toEqual(8); - - // Current tests show 4464-4576 bytes for the size, depending on the type of change stream. - // This can easily vary based on the mongodb version and general conditions, so we just check the general range. - // For the most part, if any bytes are reported, the tracking is working. - expect(totalBytes).toBeGreaterThan(2000); - expect(totalBytes).toBeLessThan(8000); - } -}); diff --git a/modules/module-mongodb/test/src/raw_change_stream.test.ts b/modules/module-mongodb/test/src/raw_change_stream.test.ts new file mode 100644 index 000000000..e904b9c80 --- /dev/null +++ b/modules/module-mongodb/test/src/raw_change_stream.test.ts @@ -0,0 +1,394 @@ +import { describe, expect, test } from 'vitest'; + +import { ChangeStreamBatch, namespaceCollection, rawChangeStream } from '@module/replication/RawChangeStream.js'; +import { getCursorBatchBytes } from '@module/replication/replication-index.js'; +import { mongo } from '@powersync/lib-service-mongodb'; +import { bson } from '@powersync/service-core'; +import { clearTestDb, connectMongoData } from './util.js'; + +describe('internal mongodb utils', () => { + // The implementation relies on internal APIs, so we verify this works as expected for various types of change streams. + test('collection change stream size tracking', async () => { + await testChangeStreamBsonBytes('collection'); + }); + + test('db change stream size tracking', async () => { + await testChangeStreamBsonBytes('db'); + }); + + test('cluster change stream size tracking', async () => { + await testChangeStreamBsonBytes('cluster'); + }); + + test('cursor batch size tracking', async () => { + const { db, client } = await connectMongoData(); + await using _ = { [Symbol.asyncDispose]: async () => await client.close() }; + await clearTestDb(db); + const collection = db.collection('test_data'); + await collection.insertMany([{ test: 1 }, { test: 2 }, { test: 3 }, { test: 4 }, { test: 5 }]); + + const cursor = collection.find({}, { batchSize: 2 }); + let batchBytes: number[] = []; + let totalBytes = 0; + // We use this in the same way as ChunkedSnapshotQuery + while (await cursor.hasNext()) { + batchBytes.push(getCursorBatchBytes(cursor)); + totalBytes += batchBytes[batchBytes.length - 1]; + cursor.readBufferedDocuments(); + } + + // 3 batches: [2, 2, 1] documents. Should not change + expect(batchBytes.length).toEqual(3); + // Current tests show 839, but this may change depending on the MongoDB version and other conditions. + expect(totalBytes).toBeGreaterThan(400); + expect(totalBytes).toBeLessThan(1200); + }); + + async function testChangeStreamBsonBytes(type: 'db' | 'collection' | 'cluster') { + const { db, client } = await connectMongoData(); + await using _ = { [Symbol.asyncDispose]: async () => await client.close() }; + await clearTestDb(db); + const collection = db.collection('test_data'); + + let stream: AsyncIterableIterator; + const pipeline = [ + { + $changeStream: { + fullDocument: 'updateLookup', + allChangesForCluster: type == 'cluster' + } + } + ]; + if (type === 'collection') { + stream = rawChangeStream(db, pipeline, { + batchSize: 10, + maxAwaitTimeMS: 5, + maxTimeMS: 1_000, + collection: collection.collectionName + }); + } else if (type === 'db') { + stream = rawChangeStream(db, pipeline, { + batchSize: 10, + maxAwaitTimeMS: 5, + maxTimeMS: 1_000 + }); + } else { + stream = rawChangeStream(client.db('admin'), pipeline, { + batchSize: 10, + maxAwaitTimeMS: 5, + maxTimeMS: 1_000 + }); + } + + let batchBytes: number[] = []; + let totalBytes = 0; + + const readAll = async () => { + while (true) { + const next = await stream.next(); + if (next.done) { + break; + } + const bytes = next.value.byteSize; + batchBytes.push(bytes); + totalBytes += bytes; + + if (next.value.events.length == 0) { + break; + } + } + }; + + await readAll(); + + await collection.insertOne({ test: 1 }); + await readAll(); + await collection.insertOne({ test: 2 }); + await readAll(); + await collection.insertOne({ test: 3 }); + await readAll(); + + await stream.return?.(); + + // The exact length by vary based on exact batching logic, but we do want to know when it changes. + // Note: If this causes unstable tests, we can relax this check. + expect(batchBytes.length).toEqual(7); + + // Current tests show 4464-4576 bytes for the size, depending on the type of change stream. + // This can easily vary based on the mongodb version and general conditions, so we just check the general range. + // For the most part, if any bytes are reported, the tracking is working. + expect(totalBytes).toBeGreaterThan(2000); + expect(totalBytes).toBeLessThan(8000); + } + + test('should resume on missing cursor (1)', async () => { + // Many resumable errors are difficult to simulate, but CursorNotFound is easy. + + const { db, client } = await connectMongoData(); + await using _ = { [Symbol.asyncDispose]: async () => await client.close() }; + await clearTestDb(db); + const collection = db.collection('test_data'); + + const pipeline = [ + { + $changeStream: { + fullDocument: 'updateLookup' + } + } + ]; + const stream = rawChangeStream(db, pipeline, { + batchSize: 10, + maxAwaitTimeMS: 5, + maxTimeMS: 1_000 + }); + + let readDocs: any[] = []; + const readAll = async () => { + while (true) { + const next = await stream.next(); + if (next.done) { + break; + } + + if (next.value.events.length == 0) { + break; + } + + readDocs.push(...next.value.events.map((e) => bson.deserialize(e, { useBigInt64: true }))); + } + }; + + await readAll(); + + await collection.insertOne({ test: 1 }); + await readAll(); + await collection.insertOne({ test: 2 }); + await readAll(); + await collection.insertOne({ test: 3 }); + await killChangeStreamCursor(db, client); + await collection.insertOne({ test: 4 }); + await readAll(); + + await stream.return?.(); + + expect(readDocs.map((doc) => doc.fullDocument)).toMatchObject([{ test: 1 }, { test: 2 }, { test: 3 }, { test: 4 }]); + }); + + test('should resume on missing cursor (2)', async () => { + const { db, client } = await connectMongoData(); + await using _ = { [Symbol.asyncDispose]: async () => await client.close() }; + await clearTestDb(db); + const collection = db.collection('test_data'); + + const currentOpStream = rawChangeStream( + db, + [ + { + $changeStream: { + fullDocument: 'updateLookup' + } + } + ], + { + batchSize: 10, + maxAwaitTimeMS: 5, + maxTimeMS: 1_000 + } + ); + const firstBatch = await currentOpStream.next(); + await currentOpStream.return(); + const resumeAfter = firstBatch.value!.resumeToken; + + const stream = rawChangeStream( + db, + [ + { + $changeStream: { + fullDocument: 'updateLookup', + resumeAfter + } + } + ], + { + batchSize: 10, + maxAwaitTimeMS: 5, + maxTimeMS: 1_000 + } + ); + + let readDocs: any[] = []; + const readAll = async () => { + while (true) { + const next = await stream.next(); + if (next.done) { + break; + } + + if (next.value.events.length == 0) { + break; + } + + readDocs.push(...next.value.events.map((e) => bson.deserialize(e, { useBigInt64: true }))); + } + }; + + await readAll(); + + await collection.insertOne({ test: 1 }); + await readAll(); + await collection.insertOne({ test: 2 }); + await readAll(); + await collection.insertOne({ test: 3 }); + await killChangeStreamCursor(db, client); + await collection.insertOne({ test: 4 }); + await readAll(); + + await stream.return?.(); + + expect(readDocs.map((doc) => doc.fullDocument)).toMatchObject([{ test: 1 }, { test: 2 }, { test: 3 }, { test: 4 }]); + }); + + test('should cleanly abort a stream between events', async () => { + const { db, client } = await connectMongoData(); + const abortController = new AbortController(); + await using _ = { [Symbol.asyncDispose]: async () => await client.close() }; + await clearTestDb(db); + const collection = db.collection('test_data'); + + const pipeline = [ + { + $changeStream: { + fullDocument: 'updateLookup' + } + } + ]; + const stream = rawChangeStream(db, pipeline, { + batchSize: 10, + maxAwaitTimeMS: 5, + maxTimeMS: 1_000, + signal: abortController.signal + }); + + let readDocs: any[] = []; + const readAll = async () => { + while (true) { + const next = await stream.next(); + if (next.done) { + break; + } + + if (next.value.events.length == 0) { + break; + } + + readDocs.push(...next.value.events.map((e) => bson.deserialize(e, { useBigInt64: true }))); + } + }; + + await readAll(); + + await collection.insertOne({ test: 1 }); + await readAll(); + await collection.insertOne({ test: 2 }); + await readAll(); + abortController.abort(new Error('test abort')); + await collection.insertOne({ test: 3 }); + await expect(readAll()).rejects.toMatchObject({ message: 'test abort' }); + + expect(readDocs.map((doc) => doc.fullDocument)).toMatchObject([{ test: 1 }, { test: 2 }]); + }); + + test('should cleanly abort a stream in an event', async () => { + const { db, client } = await connectMongoData(); + const abortController = new AbortController(); + await using _ = { [Symbol.asyncDispose]: async () => await client.close() }; + await clearTestDb(db); + const collection = db.collection('test_data'); + + const pipeline = [ + { + $changeStream: { + fullDocument: 'updateLookup' + } + } + ]; + const stream = rawChangeStream(db, pipeline, { + batchSize: 10, + maxAwaitTimeMS: 200, + maxTimeMS: 1_000, + signal: abortController.signal + }); + + let readDocs: any[] = []; + const readAll = async () => { + while (true) { + const next = await stream.next(); + if (next.done) { + break; + } + + if (next.value.events.length == 0) { + break; + } + + readDocs.push(...next.value.events.map((e) => bson.deserialize(e, { useBigInt64: true }))); + } + }; + + await readAll(); + + await collection.insertOne({ test: 1 }); + await readAll(); + // This is specifically a readAll() without an insert in between, to trigger the longer await + // period. + let readPromise = readAll(); + abortController.abort(new Error('test abort')); + await expect(readPromise).rejects.toMatchObject({ message: 'test abort' }); + + expect(readDocs.map((doc) => doc.fullDocument)).toMatchObject([{ test: 1 }]); + }); +}); + +async function killChangeStreamCursor(db: mongo.Db, client: mongo.MongoClient) { + const ops = await client + .db('admin') + .aggregate([{ $currentOp: { idleCursors: true } }, { $match: { type: 'idleCursor' } }]) + .toArray(); + + const ns = `${db.databaseName}.$cmd.aggregate`; + const op = ops.find((op) => { + const command = op.cursor?.originatingCommand; + return op.ns == ns && Array.isArray(command?.pipeline) && command.pipeline[0]?.$changeStream != null; + }); + + if (op?.cursor == null) { + throw new Error( + `Could not find change stream cursor. Idle cursors: ${JSON.stringify( + ops.map((op) => ({ + ns: op.ns, + type: op.type, + cursorId: op.cursor?.cursorId?.toString(), + aggregate: op.cursor?.originatingCommand?.aggregate, + pipeline: op.cursor?.originatingCommand?.pipeline + })) + )}` + ); + } + + await db.command({ + killCursors: namespaceCollection(op.ns), + cursors: [op.cursor.cursorId] + }); +} + +type CurrentOpIdleCursor = { + ns: string; + type: string; + cursor?: { + cursorId: bigint; + originatingCommand?: { + aggregate?: unknown; + pipeline?: any[]; + }; + }; +};