Skip to content

Commit 1d0088f

Browse files
committed
Skip empty checkpoints on Postgres again.
1 parent 8b9ef4f commit 1d0088f

File tree

1 file changed

+79
-26
lines changed

1 file changed

+79
-26
lines changed

modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts

Lines changed: 79 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import { pick } from '../../utils/ts-codec.js';
2020
import { batchCreateCustomWriteCheckpoints } from '../checkpoints/PostgresWriteCheckpointAPI.js';
2121
import { cacheKey, encodedCacheKey, OperationBatch, RecordOperation } from './OperationBatch.js';
2222
import { PostgresPersistedBatch } from './PostgresPersistedBatch.js';
23+
import { bigint } from '../../types/codecs.js';
2324

2425
export interface PostgresBucketBatchOptions {
2526
logger: Logger;
@@ -51,7 +52,10 @@ const CheckpointWithStatus = StatefulCheckpoint.and(
5152
t.object({
5253
snapshot_done: t.boolean,
5354
no_checkpoint_before: t.string.or(t.Null),
54-
can_checkpoint: t.boolean
55+
can_checkpoint: t.boolean,
56+
keepalive_op: bigint.or(t.Null),
57+
new_last_checkpoint: bigint.or(t.Null),
58+
created_checkpoint: t.boolean
5559
})
5660
);
5761
type CheckpointWithStatusDecoded = t.Decoded<typeof CheckpointWithStatus>;
@@ -291,6 +295,8 @@ export class PostgresBucketBatch
291295
}
292296

293297
async commit(lsn: string, options?: storage.BucketBatchCommitOptions): Promise<boolean> {
298+
const createEmptyCheckpoints = options?.createEmptyCheckpoints ?? true;
299+
294300
await this.flush();
295301

296302
const now = new Date().toISOString();
@@ -325,53 +331,68 @@ export class PostgresBucketBatch
325331
id = ${{ type: 'int4', value: this.group_id }}
326332
FOR UPDATE
327333
),
334+
computed AS (
335+
SELECT
336+
selected.*,
337+
CASE
338+
WHEN selected.can_checkpoint THEN GREATEST(
339+
COALESCE(selected.last_checkpoint, 0),
340+
COALESCE(${{ type: 'int8', value: persisted_op }}, 0),
341+
COALESCE(selected.keepalive_op, 0)
342+
)
343+
ELSE selected.last_checkpoint
344+
END AS new_last_checkpoint,
345+
CASE
346+
WHEN selected.can_checkpoint THEN NULL
347+
ELSE GREATEST(
348+
COALESCE(selected.keepalive_op, 0),
349+
COALESCE(${{ type: 'int8', value: persisted_op }}, 0)
350+
)
351+
END AS new_keepalive_op
352+
FROM
353+
selected
354+
),
328355
updated AS (
329356
UPDATE sync_rules AS sr
330357
SET
331358
last_checkpoint_lsn = CASE
332-
WHEN selected.can_checkpoint THEN ${{ type: 'varchar', value: lsn }}
359+
WHEN computed.can_checkpoint THEN ${{ type: 'varchar', value: lsn }}
333360
ELSE sr.last_checkpoint_lsn
334361
END,
335362
last_checkpoint_ts = CASE
336-
WHEN selected.can_checkpoint THEN ${{ type: 1184, value: now }}
363+
WHEN computed.can_checkpoint THEN ${{ type: 1184, value: now }}
337364
ELSE sr.last_checkpoint_ts
338365
END,
339366
last_keepalive_ts = ${{ type: 1184, value: now }},
340367
last_fatal_error = CASE
341-
WHEN selected.can_checkpoint THEN NULL
368+
WHEN computed.can_checkpoint THEN NULL
342369
ELSE sr.last_fatal_error
343370
END,
344-
keepalive_op = CASE
345-
WHEN selected.can_checkpoint THEN NULL
346-
ELSE GREATEST(
347-
COALESCE(sr.keepalive_op, 0),
348-
COALESCE(${{ type: 'int8', value: persisted_op }}, 0)
349-
)
350-
END,
351-
last_checkpoint = CASE
352-
WHEN selected.can_checkpoint THEN GREATEST(
353-
COALESCE(sr.last_checkpoint, 0),
354-
COALESCE(${{ type: 'int8', value: persisted_op }}, 0),
355-
COALESCE(sr.keepalive_op, 0)
356-
)
357-
ELSE sr.last_checkpoint
358-
END,
371+
keepalive_op = computed.new_keepalive_op,
372+
last_checkpoint = computed.new_last_checkpoint,
359373
snapshot_lsn = CASE
360-
WHEN selected.can_checkpoint THEN NULL
374+
WHEN computed.can_checkpoint THEN NULL
361375
ELSE sr.snapshot_lsn
362376
END
363377
FROM
364-
selected
378+
computed
365379
WHERE
366-
sr.id = selected.id
380+
sr.id = computed.id
381+
AND (
382+
sr.keepalive_op IS DISTINCT FROM computed.new_keepalive_op
383+
OR sr.last_checkpoint IS DISTINCT FROM computed.new_last_checkpoint
384+
OR ${{ type: 'bool', value: createEmptyCheckpoints }}
385+
)
367386
RETURNING
368387
sr.id,
369388
sr.state,
370389
sr.last_checkpoint,
371390
sr.last_checkpoint_lsn,
372391
sr.snapshot_done,
373392
sr.no_checkpoint_before,
374-
selected.can_checkpoint
393+
computed.can_checkpoint,
394+
computed.keepalive_op,
395+
computed.new_last_checkpoint
375396
)
376397
SELECT
377398
id,
@@ -380,9 +401,33 @@ export class PostgresBucketBatch
380401
last_checkpoint_lsn,
381402
snapshot_done,
382403
no_checkpoint_before,
383-
can_checkpoint
404+
can_checkpoint,
405+
keepalive_op,
406+
new_last_checkpoint,
407+
TRUE AS created_checkpoint
384408
FROM
385409
updated
410+
UNION ALL
411+
SELECT
412+
id,
413+
state,
414+
new_last_checkpoint AS last_checkpoint,
415+
last_checkpoint_lsn,
416+
snapshot_done,
417+
no_checkpoint_before,
418+
can_checkpoint,
419+
keepalive_op,
420+
new_last_checkpoint,
421+
FALSE AS created_checkpoint
422+
FROM
423+
computed
424+
WHERE
425+
NOT EXISTS (
426+
SELECT
427+
1
428+
FROM
429+
updated
430+
)
386431
`
387432
.decoded(CheckpointWithStatus)
388433
.first();
@@ -407,7 +452,15 @@ export class PostgresBucketBatch
407452
return true;
408453
}
409454

410-
this.logger.info(`Created checkpoint at ${lsn}. Persisted op: ${this.persisted_op}`);
455+
if (result.created_checkpoint) {
456+
this.logger.info(
457+
`Created checkpoint at ${lsn}. Persisted op: ${result.last_checkpoint} (${this.persisted_op}). keepalive: ${result.keepalive_op}`
458+
);
459+
} else {
460+
this.logger.info(
461+
`Skipped empty checkpoint at ${lsn}. Persisted op: ${result.last_checkpoint}. keepalive: ${result.keepalive_op}`
462+
);
463+
}
411464
await this.autoActivate(lsn);
412465
await notifySyncRulesUpdate(this.db, {
413466
id: result.id,
@@ -422,7 +475,7 @@ export class PostgresBucketBatch
422475
}
423476

424477
async keepalive(lsn: string): Promise<boolean> {
425-
return await this.commit(lsn);
478+
return await this.commit(lsn, { createEmptyCheckpoints: true });
426479
}
427480

428481
async setResumeLsn(lsn: string): Promise<void> {

0 commit comments

Comments
 (0)