Skip to content

Commit 0e72cb8

Browse files
pp0rtalhugop95
andauthored
feat: auto migration resume with FETCH_ALL (#31)
* First implementation of FETCH_ALL * Optimize query: do not fetch all inserted ids * Add type-safety with explicit message to use FETCH_ALL * Feed doc * Bump 1.6.0 * chore: remove useless cast * refactor: improve typings * remove require() * [auto] run prettier --------- Co-authored-by: hugo.prunaux <hugo.prunaux@gmail.com>
1 parent daa7bbe commit 0e72cb8

8 files changed

Lines changed: 177 additions & 32 deletions

CHANGLOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changelog
22

3+
## 1.6.0 (2026-02-25)
4+
5+
- Automatic resume for empty queries using `FETCH_ALL` - https://github.com/360Learning/mongo-bulk-data-migration/pull/31
6+
37
## 1.5.2 (2026-02-25)
48

59
- Patch nested $set restore - https://github.com/360Learning/mongo-bulk-data-migration/pull/29

README.md

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ new MongoBulkDataMigration<Score>({
142142
id: 'scores_total_new_field',
143143
collectionName: 'scores',
144144
projection: { scoreA: 1, scoreB: 1 },
145-
query: {},
145+
query: FETCH_ALL,
146146
update: (doc) => {
147147
$set: {
148148
total: doc.scoreA + doc.scoreB;
@@ -151,6 +151,24 @@ new MongoBulkDataMigration<Score>({
151151
});
152152
```
153153

154+
### Automatic resume (`query: FETCH_ALL`)
155+
156+
If you don't can't provide a query to fetch only non-migrated documents (like `newField:{$exists:false}`), you can let MBDM continue according the last `_id` processed:
157+
158+
```ts
159+
import { MongoBulkDataMigration, FETCH_ALL } from "@360-l/mongo-bulk-data-migration";
160+
...
161+
new MongoBulkDataMigration<Score>({
162+
db,
163+
id: "update_all_totals",
164+
collectionName: "scores",
165+
projection: { total:1 },
166+
// Automatic resume, iso {}
167+
query: FETCH_ALL,
168+
update: () => ({ total: total + 1 }),
169+
});
170+
```
171+
154172
### Delete documents (`update: DELETE_OPERATION`)
155173

156174
This migration will delete doc having negative `total`.

__tests__/MongoBulkDataMigration.rollback.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,7 @@ describe('MongoBulkDataMigration', () => {
280280
describe('rollback twice', () => {
281281
it('[non idempotent scripts] should restore the initial documents', async () => {
282282
await collection.insertMany([{ key: 1 }, { key: 2 }, { key: 3 }]);
283+
// @ts-expect-error Use FETCH_ALL instead of an empty query {}
283284
const dataMigration = new MongoBulkDataMigration({
284285
...DM_DEFAULT_SETUP,
285286
query: {},
@@ -316,6 +317,7 @@ describe('MongoBulkDataMigration', () => {
316317

317318
it('[idempotent scripts] should restore the initial documents of the first migration', async () => {
318319
await collection.insertMany([{ key: 1 }, { key: 2 }, { key: 3 }]);
320+
// @ts-expect-error Use FETCH_ALL instead of an empty query {}
319321
const dataMigration = new MongoBulkDataMigration({
320322
...DM_DEFAULT_SETUP,
321323
query: {},

__tests__/MongoBulkDataMigration.update.test.ts

Lines changed: 82 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import _ from 'lodash';
2-
import type { Collection, Db, Document, ObjectId, UpdateFilter } from 'mongodb';
3-
import { MongoBulkDataMigration, DELETE_OPERATION } from '../src';
2+
// import { ObjectId } from 'bson';
3+
import { Collection, Db, Document, ObjectId, UpdateFilter } from 'mongodb';
4+
import { MongoBulkDataMigration, DELETE_OPERATION, FETCH_ALL } from '../src';
45
import { INITIAL_BULK_INFOS } from '../src/lib/AbstractBulkOperationResults';
56
import { LoggerInterface } from '../src/types';
67

@@ -523,6 +524,85 @@ describe('MongoBulkDataMigration', () => {
523524
});
524525
});
525526

527+
describe('FETCH_ALL', () => {
528+
it('should resume migration only on documents inserted after the last migrated one', async () => {
529+
await collection.insertMany([{ key: 1 }, { key: 2 }, { key: 3 }]);
530+
const update = { $set: { key: 10 } };
531+
532+
// First run: migrate all existing documents
533+
const firstRun = new MongoBulkDataMigration({
534+
...DM_DEFAULT_SETUP,
535+
query: FETCH_ALL,
536+
update,
537+
});
538+
await firstRun.update();
539+
540+
// ---- Simulation of a migration resume ---
541+
// New documents arrive after first migration
542+
await collection.insertMany([{ key: 4 }, { key: 5 }]);
543+
544+
// Second run: resume with FETCH_ALL
545+
const secondRun = new MongoBulkDataMigration({
546+
...DM_DEFAULT_SETUP,
547+
query: FETCH_ALL,
548+
update,
549+
});
550+
const resumeResults = await secondRun.update();
551+
552+
expect(resumeResults).toEqual({
553+
...INITIAL_BULK_INFOS,
554+
nMatched: 2,
555+
nModified: 2,
556+
});
557+
const documents = await collection
558+
.find({}, { projection: { _id: 0 } })
559+
.toArray();
560+
expect(documents).toEqual([
561+
{ key: 10 },
562+
{ key: 10 },
563+
{ key: 10 },
564+
{ key: 10 },
565+
{ key: 10 },
566+
]);
567+
});
568+
569+
// Limitation: the FETCH_ALL relies on the last _id fetch, this is assumed for performances reasons
570+
it('[limitation] should not migrate a document with an older _id inserted after first migration', async () => {
571+
await collection.insertMany([{ key: 1 }, { key: 2 }, { key: 3 }]);
572+
const update = { $set: { key: 10 } };
573+
574+
// First run: migrate all existing documents
575+
const firstRun = new MongoBulkDataMigration({
576+
...DM_DEFAULT_SETUP,
577+
query: FETCH_ALL,
578+
update,
579+
});
580+
await firstRun.update();
581+
582+
// ---- Simulation of a migration resume ---
583+
// A document with an old _id is inserted (e.g. restored from a backup)
584+
const oldId = new ObjectId('000000000000000000000001');
585+
await collection.insertOne({ _id: oldId, key: 99 });
586+
587+
// Second run: resume with FETCH_ALL — old _id should be skipped
588+
const secondRun = new MongoBulkDataMigration({
589+
...DM_DEFAULT_SETUP,
590+
query: FETCH_ALL,
591+
update,
592+
});
593+
const resumeResults = await secondRun.update();
594+
595+
expect(resumeResults).toEqual({
596+
...INITIAL_BULK_INFOS,
597+
nMatched: 0,
598+
nModified: 0,
599+
});
600+
// The old document was not migrated
601+
const oldDoc = await collection.findOne({ _id: oldId });
602+
expect(oldDoc?.key).toEqual(99);
603+
});
604+
});
605+
526606
describe('#delete', () => {
527607
it('should perform the delete operation', async () => {
528608
await collection.insertMany([{ key: 1 }, { key: 2 }, { key: 3 }]);

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@360-l/mongo-bulk-data-migration",
3-
"version": "1.5.2",
3+
"version": "1.6.0",
44
"description": "MongoDB bulk data migration for node scripts",
55
"main": "./dist/index.js",
66
"types": "./dist/index.d.ts",

src/MongoBulkDataMigration.ts

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@ import type {
2020
RollbackDocument,
2121
RollBackUpdateObject,
2222
} from './types';
23-
import type { Collection, Document, ObjectId, WithId } from 'mongodb';
23+
import type { Collection, Document, Filter, ObjectId, WithId } from 'mongodb';
2424

2525
const DEFAULT_BULK_SIZE = 5000;
2626
const COUNT_TOO_LONG_WARNING_THRESHOLD_MS = 30000;
2727
const COLLECTION_VALIDATION_LEVEL = 'moderate';
2828
/** Fully delete collection, use with operation:DELETE_COLLECTION */
2929
export const DELETE_COLLECTION = Symbol();
30+
/** Fetches all documents excluding already rolled-back ones, use with query:FETCH_ALL */
31+
export const FETCH_ALL = Symbol();
3032
const defaultLogger = {
3133
info: (...args: unknown[]) => {
3234
if (process.env.NODE_ENV === 'test') {
@@ -43,8 +45,10 @@ const defaultLogger = {
4345
},
4446
};
4547

46-
export default class MongoBulkDataMigration<TSchema extends Document>
47-
implements RollbackableUpdate
48+
export default class MongoBulkDataMigration<
49+
TSchema extends Document,
50+
TQuery extends Filter<TSchema> | typeof FETCH_ALL = Filter<TSchema>,
51+
> implements RollbackableUpdate
4852
{
4953
private readonly options: DataMigrationOptions<TSchema> = {
5054
arrayFilters: [],
@@ -68,7 +72,12 @@ export default class MongoBulkDataMigration<TSchema extends Document>
6872
* @see <a href="/doc/softwareDesigns/bulkDataMigration/index.md">More information in the software design document</a>
6973
* @param config
7074
*/
71-
constructor(config: DataMigrationConfig<TSchema>) {
75+
constructor(
76+
...args: [keyof TQuery] extends [never]
77+
? ['Use FETCH_ALL instead of an empty query {}']
78+
: [config: DataMigrationConfig<TSchema, TQuery>]
79+
) {
80+
const config = args[0] as DataMigrationConfig<TSchema, TQuery>;
7281
this.id = config.id;
7382
this.collectionName = config.collectionName;
7483
Object.assign(this.options, { ...config.options });
@@ -123,8 +132,10 @@ export default class MongoBulkDataMigration<TSchema extends Document>
123132
}
124133

125134
await this.lowerValidationLevel('update');
126-
const { cursor, totalEntries } =
127-
await this.getCursorAndCount(migrationCollection);
135+
const { cursor, totalEntries } = await this.getCursorAndCount(
136+
migrationCollection,
137+
rollbackCollection,
138+
);
128139
const formattedTotalEntries =
129140
totalEntries === NO_COUNT_AVAILABLE
130141
? 'N/A (dontCount option ON)'
@@ -189,8 +200,13 @@ export default class MongoBulkDataMigration<TSchema extends Document>
189200
return bulkMigration.getResults();
190201
}
191202

192-
private async getCursorAndCount(migrationCollection: Collection<TSchema>) {
193-
const cursor = getCursor(this.migrationInfos);
203+
private async getCursorAndCount(
204+
migrationCollection: Collection<TSchema>,
205+
rollbackCollection: Collection<TSchema>,
206+
) {
207+
const resolvedQuery = await this.resolveQuery(rollbackCollection);
208+
209+
const cursor = getCursor(resolvedQuery, this.migrationInfos);
194210
const countTakingTooLongTimeout = setTimeout(
195211
() =>
196212
this.logger.warn(
@@ -201,11 +217,14 @@ export default class MongoBulkDataMigration<TSchema extends Document>
201217
);
202218
const totalEntries = this.options.dontCount
203219
? NO_COUNT_AVAILABLE
204-
: await getTotalEntries(this.migrationInfos);
220+
: await getTotalEntries(resolvedQuery);
205221
clearTimeout(countTakingTooLongTimeout);
206222
return { cursor, totalEntries };
207223

208-
function getCursor({ query, projection }: MigrationInfos<TSchema>) {
224+
function getCursor(
225+
query: Filter<TSchema> | MongoPipeline,
226+
{ projection }: MigrationInfos<TSchema>,
227+
) {
209228
if (isPipeline(query)) {
210229
const pipelineWithProjection = query.concat(
211230
_.isEmpty(projection) ? [] : [{ $project: projection }],
@@ -215,7 +234,7 @@ export default class MongoBulkDataMigration<TSchema extends Document>
215234
return migrationCollection.find(query, { projection });
216235
}
217236

218-
async function getTotalEntries({ query }: MigrationInfos<TSchema>) {
237+
async function getTotalEntries(query: Filter<TSchema> | MongoPipeline) {
219238
if (isPipeline(query)) {
220239
const pipelineComputeTotal = query.concat({ $count: 'totalEntries' });
221240
const cursorComputeTotal =
@@ -283,6 +302,23 @@ export default class MongoBulkDataMigration<TSchema extends Document>
283302
}
284303
}
285304

305+
private async resolveQuery(
306+
rollbackCollection: Collection<TSchema>,
307+
): Promise<Filter<TSchema> | MongoPipeline> {
308+
if (this.migrationInfos.query !== FETCH_ALL) {
309+
return this.migrationInfos.query;
310+
}
311+
const lastBackup = await rollbackCollection
312+
.find({}, { projection: { _id: 1 } })
313+
.sort({ _id: -1 })
314+
.limit(1)
315+
.next();
316+
317+
return lastBackup
318+
? ({ _id: { $gt: lastBackup._id } } as Filter<TSchema>)
319+
: {};
320+
}
321+
286322
async rollback(): Promise<BulkOperationResult> {
287323
if (!this.options.rollbackable) {
288324
this.logger.warn('Calling rollback() on a non rollbackable script');

src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
export { DELETE_OPERATION } from './lib/MigrationBulk';
2-
export { DELETE_COLLECTION } from './MongoBulkDataMigration';
2+
export { DELETE_COLLECTION, FETCH_ALL } from './MongoBulkDataMigration';
33
export { default as MongoBulkDataMigration } from './MongoBulkDataMigration';

src/types.ts

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import type {
77
Document,
88
} from 'mongodb';
99
import type { DELETE_OPERATION } from './lib/MigrationBulk';
10-
import { DELETE_COLLECTION } from './MongoBulkDataMigration';
10+
import { DELETE_COLLECTION, FETCH_ALL } from './MongoBulkDataMigration';
1111

1212
export type DataMigrationOptions<TSchema> = {
1313
/** Array filters to use in case of a migration on nested object in arrays */
@@ -52,7 +52,7 @@ export type MigrationInfos<TSchema extends Document> = {
5252
operation?: typeof DELETE_COLLECTION;
5353
projection: FindOptions<TSchema>['projection'];
5454
rollback?: (backup?: RollbackDocument['backup']) => RollBackUpdateObject;
55-
query: Filter<TSchema> | MongoPipeline;
55+
query: Filter<TSchema> | MongoPipeline | typeof FETCH_ALL;
5656
update:
5757
| UpdateFilter<TSchema>
5858
| typeof DELETE_OPERATION
@@ -61,16 +61,19 @@ export type MigrationInfos<TSchema extends Document> = {
6161
) => Promise<UpdateFilter<TSchema>> | UpdateFilter<TSchema>);
6262
};
6363

64-
export type DataMigrationConfig<TSchema extends Document> =
64+
export type DataMigrationConfig<
65+
TSchema extends Document,
66+
TQuery extends Filter<TSchema> | typeof FETCH_ALL = Filter<TSchema>,
67+
> =
6568
| DMInstanceSpecialOperation<TSchema>
66-
| DMInstanceSpecialOperationDropDocument<TSchema>
69+
| DMInstanceSpecialOperationDropDocument<TSchema, TQuery>
6770
| DMInstanceAggregate<TSchema>
68-
| DMInstanceFilter<TSchema>;
71+
| DMInstanceFilter<TSchema, TQuery>;
6972

70-
type DMInstanceSpecialOperationDropDocument<TSchema> = Omit<
71-
DMInstanceFilter<TSchema>,
72-
'projection'
73-
> & {
73+
type DMInstanceSpecialOperationDropDocument<
74+
TSchema extends Document,
75+
TQuery extends Filter<TSchema> | typeof FETCH_ALL = Filter<TSchema>,
76+
> = Omit<DMInstanceFilter<TSchema, TQuery>, 'projection'> & {
7477
update: typeof DELETE_OPERATION;
7578
};
7679

@@ -85,13 +88,15 @@ export type DMInstanceSpecialOperation<TSchema> = Pick<
8588
operation: typeof DELETE_COLLECTION;
8689
};
8790

88-
export type DMInstanceFilter<TSchema extends Document> =
89-
DMInstanceBase<TSchema> & {
90-
/** Projected properties (and backed up values) */
91-
projection: FindOptions<TSchema>['projection'];
92-
/** Mongo query for documents to migrate OR mongo aggregation pipeline */
93-
query: Filter<TSchema>;
94-
};
91+
export type DMInstanceFilter<
92+
TSchema extends Document,
93+
TQuery extends Filter<TSchema> | typeof FETCH_ALL = Filter<TSchema>,
94+
> = DMInstanceBase<TSchema> & {
95+
/** Projected properties (and backed up values) */
96+
projection: FindOptions<TSchema>['projection'];
97+
/** Mongo query for documents to migrate, or FETCH_ALL to resume excluding already-migrated documents */
98+
query: TQuery;
99+
};
95100

96101
type DMInstanceBase<TSchema> = {
97102
/** Targeted collection */

0 commit comments

Comments
 (0)