From 1dff1c188553f4fefa7da0b8a5bda06773018b7a Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Wed, 25 Mar 2026 11:13:19 -0600 Subject: [PATCH 01/11] feat: cursor-aware effect replay with startAfter gating Add CollectionCursor type and startAfter option to createEffect, enabling effects to suppress callbacks during replay of historical changes while still hydrating internal query state. Sync writes can now carry an opaque sortable cursor that propagates through ChangeMessage and DeltaEvent. Includes fixes for cursor type mismatch crashes (routed through onSourceError), cursor-less change handling after replay, and cursor erasure within same-transaction writes. Co-Authored-By: Claude Opus 4.6 --- packages/db/src/collection/state.ts | 21 ++ packages/db/src/collection/sync.ts | 9 + packages/db/src/query/effect.ts | 185 +++++++++- packages/db/src/query/live/utils.ts | 4 +- packages/db/src/types.ts | 4 + packages/db/tests/collection.test.ts | 51 +++ packages/db/tests/effect.test.ts | 367 ++++++++++++++++++++ packages/react-db/src/useLiveQueryEffect.ts | 1 + 8 files changed, 636 insertions(+), 6 deletions(-) diff --git a/packages/db/src/collection/state.ts b/packages/db/src/collection/state.ts index af65cb801..2b6b066a6 100644 --- a/packages/db/src/collection/state.ts +++ b/packages/db/src/collection/state.ts @@ -12,6 +12,7 @@ import type { StandardSchemaV1 } from '@standard-schema/spec' import type { ChangeMessage, CollectionConfig, + CollectionCursor, OptimisticChangeMessage, } from '../types' import type { CollectionImpl } from './index.js' @@ -28,6 +29,7 @@ interface PendingSyncedTransaction< operations: Array> truncate?: boolean deletedKeys: Set + rowCursorWrites: Map rowMetadataWrites: Map collectionMetadataWrites: Map optimisticSnapshot?: { @@ -42,6 +44,10 @@ interface PendingSyncedTransaction< immediate?: boolean } +type PendingCursorWrite = + | { type: `set`; value: CollectionCursor } + | { type: `delete` } + type PendingMetadataWrite = { type: `set`; value: unknown } | { type: `delete` } type InternalChangeMessage< @@ -313,6 +319,7 @@ export class CollectionStateManager< type: change.type, value: enrichedValue, previousValue: enrichedPreviousValue, + cursor: change.cursor, metadata: change.metadata, } as ChangeMessage, TKey> } @@ -895,6 +902,7 @@ export class CollectionStateManager< } const events: Array> = [] + const eventCursors = new Map() const rowUpdateMode = this.config.sync.rowUpdateMode || `partial` const completedOptimisticOps = new Map< TKey, @@ -1034,6 +1042,14 @@ export class CollectionStateManager< this.syncedMetadata.set(key, metadataWrite.value) } + for (const [key, cursorWrite] of transaction.rowCursorWrites) { + if (cursorWrite.type === `delete`) { + eventCursors.delete(key) + continue + } + eventCursors.set(key, cursorWrite.value) + } + for (const [ key, metadataWrite, @@ -1162,6 +1178,7 @@ export class CollectionStateManager< // Now check what actually changed in the final visible state for (const key of changedKeys) { + const cursor = eventCursors.get(key) const previousVisibleValue = currentVisibleState.get(key) const newVisibleValue = this.get(key) // This returns the new derived state const previousVirtualProps = this.getVirtualPropsSnapshotForState(key, { @@ -1235,12 +1252,14 @@ export class CollectionStateManager< key, value: newVisibleValue, previousValue: previousValueWithVirtualFromCompleted, + cursor, }) } else { events.push({ type: `insert`, key, value: newVisibleValue, + cursor, }) } } else if ( @@ -1251,6 +1270,7 @@ export class CollectionStateManager< type: `delete`, key, value: previousValueWithVirtual ?? previousVisibleValue, + cursor, }) } else if ( previousVisibleValue !== undefined && @@ -1263,6 +1283,7 @@ export class CollectionStateManager< key, value: newVisibleValue, previousValue: previousValueWithVirtual ?? previousVisibleValue, + cursor, }) } } diff --git a/packages/db/src/collection/sync.ts b/packages/db/src/collection/sync.ts index 82fffb772..0cede371c 100644 --- a/packages/db/src/collection/sync.ts +++ b/packages/db/src/collection/sync.ts @@ -94,6 +94,7 @@ export class CollectionSyncManager< committed: false, operations: [], deletedKeys: new Set(), + rowCursorWrites: new Map(), rowMetadataWrites: new Map(), collectionMetadataWrites: new Map(), immediate: options?.immediate, @@ -190,6 +191,13 @@ export class CollectionSyncManager< value: message.metadata, }) } + + if (message.cursor !== undefined) { + pendingTransaction.rowCursorWrites.set(key, { + type: `set`, + value: message.cursor, + }) + } }, commit: () => { const pendingTransaction = @@ -225,6 +233,7 @@ export class CollectionSyncManager< // Clear all operations from the current transaction pendingTransaction.operations = [] pendingTransaction.deletedKeys.clear() + pendingTransaction.rowCursorWrites.clear() pendingTransaction.rowMetadataWrites.clear() // Intentionally preserve collectionMetadataWrites across truncate. // Collection-scoped metadata (for example persisted resume/reset diff --git a/packages/db/src/query/effect.ts b/packages/db/src/query/effect.ts index 10857d06e..8fe96e2d4 100644 --- a/packages/db/src/query/effect.ts +++ b/packages/db/src/query/effect.ts @@ -25,7 +25,12 @@ import type { InitialQueryBuilder, QueryBuilder } from './builder/index.js' import type { Context } from './builder/types.js' import type { BasicExpression, QueryIR } from './ir.js' import type { OrderByOptimizationInfo } from './compiler/order-by.js' -import type { ChangeMessage, KeyedStream, ResultStream } from '../types.js' +import type { + ChangeMessage, + CollectionCursor, + KeyedStream, + ResultStream, +} from '../types.js' // --------------------------------------------------------------------------- // Public Types @@ -44,6 +49,7 @@ export type DeltaEvent< key: TKey /** Current value for the entering row */ value: TRow + cursor?: CollectionCursor metadata?: Record } | { @@ -51,6 +57,7 @@ export type DeltaEvent< key: TKey /** Current value for the exiting row */ value: TRow + cursor?: CollectionCursor metadata?: Record } | { @@ -60,6 +67,7 @@ export type DeltaEvent< value: TRow /** Previous value before the batch */ previousValue: TRow + cursor?: CollectionCursor metadata?: Record } @@ -128,6 +136,12 @@ export interface EffectConfig< * Set to true for effects that should only process new changes. */ skipInitial?: boolean + + /** + * Suppress callbacks until the source replay advances past this cursor. + * Historical changes at or before the cursor still update internal query state. + */ + startAfter?: CollectionCursor } /** Handle returned by createEffect */ @@ -262,6 +276,7 @@ export function createEffect< const runner = new EffectPipelineRunner({ query: config.query, skipInitial: config.skipInitial ?? false, + startAfter: config.startAfter, onBatchProcessed, onSourceError: (error: Error) => { if (disposed) return @@ -303,6 +318,7 @@ interface EffectPipelineRunnerConfig< > { query: EffectQueryInput skipInitial: boolean + startAfter?: CollectionCursor onBatchProcessed: (events: Array>) => void /** Called when a source collection enters error or cleaned-up state */ onSourceError: (error: Error) => void @@ -357,6 +373,9 @@ class EffectPipelineRunner { // skipInitial state private readonly skipInitial: boolean private initialLoadComplete = false + private readonly startAfter: CollectionCursor | undefined + private cursorGateOpen: boolean + private pendingBatchCursor: CollectionCursor | undefined // Scheduler integration private subscribedToAllCollections = false @@ -376,6 +395,8 @@ class EffectPipelineRunner { constructor(config: EffectPipelineRunnerConfig) { this.skipInitial = config.skipInitial + this.startAfter = config.startAfter + this.cursorGateOpen = config.startAfter === undefined this.onBatchProcessed = config.onBatchProcessed this.onSourceError = config.onSourceError @@ -613,8 +634,10 @@ class EffectPipelineRunner { if (orderByInfo) { this.trackSentValues(alias, changes, orderByInfo.comparator) const split = [...splitUpdates(changes)] + this.recordPendingBatchCursor(split) this.sendChangesToD2(alias, split) } else { + this.recordPendingBatchCursor(changes) this.sendChangesToD2(alias, changes) } } @@ -638,10 +661,88 @@ class EffectPipelineRunner { alias: string, changes: Array>, ): void { + if (changes.length === 0) { + return + } + + if (!this.cursorGateOpen) { + this.handleSourceChangesBeforeCursorGate(alias, changes) + return + } + + this.recordPendingBatchCursor(changes) this.sendChangesToD2(alias, changes) this.scheduleGraphRun(alias) } + /** + * Replay before startAfter must still hydrate query state, but callbacks stay + * muted until we observe a cursor greater than the configured boundary. + */ + private handleSourceChangesBeforeCursorGate( + alias: string, + changes: Array>, + ): void { + let firstLiveIndex: number + try { + firstLiveIndex = findFirstChangeAfterCursor( + changes, + this.startAfter!, + ) + } catch (error) { + this.onSourceError( + error instanceof Error ? error : new Error(String(error)), + ) + return + } + + if (firstLiveIndex === -1) { + this.sendChangesToD2(alias, changes) + this.scheduleGraphRun(alias) + return + } + + if (firstLiveIndex > 0) { + const replayChanges = changes.slice(0, firstLiveIndex) + this.sendChangesToD2(alias, replayChanges) + this.scheduleGraphRun(alias) + } else if (!getActiveTransaction() && this.graph?.pendingWork()) { + // Best-effort boundary preservation outside transaction-scoped flushes. + this.runGraph() + } + + this.cursorGateOpen = true + + const liveChanges = changes.slice(firstLiveIndex) + this.recordPendingBatchCursor(liveChanges) + this.sendChangesToD2(alias, liveChanges) + this.scheduleGraphRun(alias) + } + + private recordPendingBatchCursor( + changes: Array>, + ): void { + for (const change of changes) { + const { cursor } = change + if (cursor === undefined) { + continue + } + try { + if ( + this.pendingBatchCursor === undefined || + compareCollectionCursors(cursor, this.pendingBatchCursor) > 0 + ) { + this.pendingBatchCursor = cursor + } + } catch (error) { + this.onSourceError( + error instanceof Error ? error : new Error(String(error)), + ) + return + } + } + } + /** * Schedule a graph run via the transaction-scoped scheduler. * @@ -762,11 +863,20 @@ class EffectPipelineRunner { /** Classify accumulated changes into DeltaEvents and invoke the callback */ private flushPendingChanges(): void { - if (this.pendingChanges.size === 0) return + if (this.pendingChanges.size === 0) { + this.pendingBatchCursor = undefined + return + } + + const batchCursor = this.pendingBatchCursor // If skipInitial and initial load isn't complete yet, discard - if (this.skipInitial && !this.initialLoadComplete) { + if ( + (this.skipInitial && !this.initialLoadComplete) || + !this.cursorGateOpen + ) { this.pendingChanges = new Map() + this.pendingBatchCursor = undefined return } @@ -775,11 +885,12 @@ class EffectPipelineRunner { for (const [key, changes] of this.pendingChanges) { const event = classifyDelta(key as TKey, changes) if (event) { - events.push(event) + events.push(attachCursorToEvent(event, batchCursor)) } } this.pendingChanges = new Map() + this.pendingBatchCursor = undefined if (events.length > 0) { this.onBatchProcessed(events) @@ -1087,6 +1198,72 @@ function classifyDelta( return undefined } +function attachCursorToEvent< + TRow extends object, + TKey extends string | number, +>( + event: DeltaEvent, + cursor: CollectionCursor | undefined, +): DeltaEvent { + if (cursor === undefined) { + return event + } + + return { + ...event, + cursor, + } +} + +function findFirstChangeAfterCursor( + changes: Array>, + startAfter: CollectionCursor, +): number { + let anyCursor = false + for (let index = 0; index < changes.length; index++) { + const cursor = changes[index]!.cursor + if (cursor === undefined) { + continue + } + anyCursor = true + if (compareCollectionCursors(cursor, startAfter) > 0) { + // Walk backwards to include any preceding uncursored changes — + // they have no position in cursor space and should not be + // assumed to be replay. + let liveStart = index + while (liveStart > 0 && changes[liveStart - 1]!.cursor === undefined) { + liveStart-- + } + return liveStart + } + } + + // If no change in this batch carries a cursor, treat the entire batch as + // live — the sync provider has moved past cursor-based replay. + if (!anyCursor) { + return 0 + } + + return -1 +} + +function compareCollectionCursors( + left: CollectionCursor, + right: CollectionCursor, +): number { + if (typeof left !== typeof right) { + throw new Error( + `Collection cursors must use a consistent primitive type. Received ${typeof left} and ${typeof right}.`, + ) + } + + if (left === right) { + return 0 + } + + return left > right ? 1 : -1 +} + /** Track a promise in the in-flight set, automatically removing on settlement */ function trackPromise( promise: Promise, diff --git a/packages/db/src/query/live/utils.ts b/packages/db/src/query/live/utils.ts index 6f37a0587..329bca6bf 100644 --- a/packages/db/src/query/live/utils.ts +++ b/packages/db/src/query/live/utils.ts @@ -252,8 +252,8 @@ export function* splitUpdates< ): Generator> { for (const change of changes) { if (change.type === `update`) { - yield { type: `delete`, key: change.key, value: change.previousValue! } - yield { type: `insert`, key: change.key, value: change.value } + yield { ...change, type: `delete`, value: change.previousValue! } + yield { ...change, type: `insert` } } else { yield change } diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index 6087e234e..d6016c1f0 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -324,6 +324,9 @@ export type SyncConfigRes = { loadSubset?: LoadSubsetFn unloadSubset?: UnloadSubsetFn } + +export type CollectionCursor = string | number + export interface SyncConfig< T extends object = Record, TKey extends string | number = string | number, @@ -386,6 +389,7 @@ export interface ChangeMessage< value: T previousValue?: T type: OperationType + cursor?: CollectionCursor metadata?: Record } diff --git a/packages/db/tests/collection.test.ts b/packages/db/tests/collection.test.ts index c5d09039e..32dc41ce0 100644 --- a/packages/db/tests/collection.test.ts +++ b/packages/db/tests/collection.test.ts @@ -1586,6 +1586,57 @@ describe(`Collection`, () => { expect(collection._state.syncedMetadata.has(1)).toBe(false) }) + it(`should emit sync cursors through change subscriptions`, async () => { + let testSyncFunctions: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `sync-cursor-subscription-test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + write({ + type: `insert`, + value: { id: 1, value: `initial` }, + }) + commit() + markReady() + + testSyncFunctions = { begin, write, commit } + }, + }, + }) + + await collection.stateWhenReady() + + const observedChanges: Array> = + [] + const subscription = collection.subscribeChanges( + (changes) => { + observedChanges.push( + ...(changes as Array>), + ) + }, + { includeInitialState: false }, + ) + + const { begin, write, commit } = testSyncFunctions + begin() + write({ + type: `update`, + value: { id: 1, value: `updated` }, + cursor: `002`, + }) + commit() + + expect(observedChanges).toHaveLength(1) + expect(observedChanges[0]!.type).toBe(`update`) + expect(observedChanges[0]!.cursor).toBe(`002`) + + subscription.unsubscribe() + }) + it(`should treat row metadata as cleared after truncate within the same sync transaction`, async () => { let testSyncFunctions: any = null diff --git a/packages/db/tests/effect.test.ts b/packages/db/tests/effect.test.ts index 5c557087c..08879dd14 100644 --- a/packages/db/tests/effect.test.ts +++ b/packages/db/tests/effect.test.ts @@ -134,6 +134,34 @@ describe(`createEffect`, () => { await effect.dispose() }) + it(`should expose the sync cursor on emitted effect events`, async () => { + const users = createUsersCollection() + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + onEnter: collectEvents(events), + skipInitial: true, + }) + + await flushPromises() + + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 4, name: `Diana`, active: true }, + cursor: `004`, + }) + users.utils.commit() + + await flushPromises() + + expect(events).toHaveLength(1) + expect(events[0]!.cursor).toBe(`004`) + + await effect.dispose() + }) + it(`should fire 'exit' event when a row is deleted from source`, async () => { const users = createUsersCollection() const events: Array> = [] @@ -201,6 +229,345 @@ describe(`createEffect`, () => { await effect.dispose() }) + + it(`should suppress replay until startAfter and preserve earlier state`, async () => { + const users = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `cursor-gated-effect-users`, + getKey: (user) => user.id, + }), + ) + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + onBatch: collectBatchEvents(events), + startAfter: 2, + }) + + users.utils.markReady() + await flushPromises() + + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 1, name: `Alice`, active: true }, + cursor: 1, + }) + users.utils.commit() + + await flushPromises() + expect(events).toHaveLength(0) + + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 2, name: `Bob`, active: true }, + cursor: 2, + }) + users.utils.commit() + + await flushPromises() + expect(events).toHaveLength(0) + + users.utils.begin() + users.utils.write({ + type: `update`, + value: { id: 2, name: `Bob v2`, active: true }, + previousValue: { id: 2, name: `Bob`, active: true }, + cursor: 3, + }) + users.utils.commit() + + await flushPromises() + + expect(events).toHaveLength(1) + expect(events[0]!.type).toBe(`update`) + expect(events[0]!.cursor).toBe(3) + if (events[0]!.type !== `update`) { + throw new Error(`Expected update event`) + } + expect(events[0]!.previousValue.name).toBe(`Bob`) + expect(events[0]!.value.name).toBe(`Bob v2`) + + await effect.dispose() + }) + + it(`should not crash sibling subscribers when cursor types are mixed`, async () => { + const users = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `cursor-type-mismatch-users`, + getKey: (user) => user.id, + }), + ) + + const siblingEvents: Array> = [] + const errorEvents: Array = [] + + // Sibling effect — should keep working + const siblingEffect = createEffect({ + query: (q) => q.from({ user: users }), + onBatch: collectBatchEvents(siblingEvents), + }) + + // Effect with string startAfter — will receive numeric cursors + const brokenEffect = createEffect({ + query: (q) => q.from({ user: users }), + onBatch: () => {}, + startAfter: `abc`, + onSourceError: (err) => errorEvents.push(err), + }) + + users.utils.markReady() + await flushPromises() + + // Send a change with a numeric cursor — mismatches the string startAfter + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 1, name: `Alice`, active: true }, + cursor: 1, + }) + users.utils.commit() + + await flushPromises() + + // The broken effect should have errored gracefully, not crashed the pipeline + expect(siblingEvents).toHaveLength(1) + expect(siblingEvents[0]!.type).toBe(`enter`) + + await siblingEffect.dispose() + await brokenEffect.dispose() + }) + + it(`should eventually open cursor gate when cursor-less changes follow cursored replay`, async () => { + const users = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `cursorless-after-replay-users`, + getKey: (user) => user.id, + }), + ) + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + onBatch: collectBatchEvents(events), + startAfter: 1, + }) + + users.utils.markReady() + await flushPromises() + + // Replay change at cursor 1 — should be suppressed + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 1, name: `Alice`, active: true }, + cursor: 1, + }) + users.utils.commit() + await flushPromises() + expect(events).toHaveLength(0) + + // Now a live change arrives WITHOUT a cursor + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 2, name: `Bob`, active: true }, + }) + users.utils.commit() + await flushPromises() + + // Should the gate open for cursor-less changes? Currently it won't. + // This test documents whether cursor-less changes after replay are emitted. + expect(events).toHaveLength(1) + expect(events[0]!.type).toBe(`enter`) + + await effect.dispose() + }) + + it(`should preserve cursor when a cursor-less update follows a cursored insert in same transaction`, async () => { + let testSyncFunctions: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `cursor-erasure-test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + write({ + type: `insert`, + value: { id: 1, value: `initial` }, + }) + commit() + markReady() + testSyncFunctions = { begin, write, commit } + }, + }, + }) + + await collection.stateWhenReady() + + const observedChanges: Array = [] + const subscription = collection.subscribeChanges( + (changes) => { + observedChanges.push(...changes) + }, + { includeInitialState: false }, + ) + + const { begin, write, commit } = testSyncFunctions + + // Insert with cursor, then update same key without cursor in same tx + begin() + write({ + type: `insert`, + value: { id: 2, value: `new` }, + cursor: `005`, + }) + write({ + type: `update`, + value: { id: 2, value: `updated` }, + }) + commit() + + // The cursor from the insert should not be erased by the cursor-less update + const cursoredChanges = observedChanges.filter( + (c: any) => c.cursor !== undefined, + ) + expect(cursoredChanges.length).toBeGreaterThan(0) + + subscription.unsubscribe() + }) + + it(`should emit uncursored changes in a batch that also contains a gate-opening cursor`, async () => { + const users = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `mixed-cursor-batch-users`, + getKey: (user) => user.id, + }), + ) + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + onBatch: collectBatchEvents(events), + startAfter: 2, + }) + + users.utils.markReady() + await flushPromises() + + // Single transaction: uncursored insert first, then a cursored insert past the gate + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 1, name: `Alice`, active: true }, + // no cursor + }) + users.utils.write({ + type: `insert`, + value: { id: 2, name: `Bob`, active: true }, + cursor: 3, + }) + users.utils.commit() + await flushPromises() + + // Both rows should emit — the uncursored change has no position in cursor + // space, so once the gate opens it should not be suppressed + expect(events).toHaveLength(2) + + await effect.dispose() + }) + + it(`should stamp each DeltaEvent with the batch-level max cursor`, async () => { + const users = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `batch-cursor-stamp-users`, + getKey: (user) => user.id, + }), + ) + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + onBatch: collectBatchEvents(events), + }) + + users.utils.markReady() + await flushPromises() + events.length = 0 + + // Single transaction with two inserts at different cursors + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 10, name: `First`, active: true }, + cursor: 3, + }) + users.utils.write({ + type: `insert`, + value: { id: 11, name: `Second`, active: true }, + cursor: 4, + }) + users.utils.commit() + await flushPromises() + + // Batch cursor should be the max (4) on all events — this is by-design + expect(events).toHaveLength(2) + expect(events[0]!.cursor).toBe(4) + expect(events[1]!.cursor).toBe(4) + + await effect.dispose() + }) + + it(`should handle a batch that straddles the startAfter boundary`, async () => { + const users = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `straddling-batch-users`, + getKey: (user) => user.id, + }), + ) + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + onBatch: collectBatchEvents(events), + startAfter: 2, + }) + + users.utils.markReady() + await flushPromises() + + // Single transaction with changes on both sides of the boundary + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 1, name: `Alice`, active: true }, + cursor: 1, + }) + users.utils.write({ + type: `insert`, + value: { id: 2, name: `Bob`, active: true }, + cursor: 2, + }) + users.utils.write({ + type: `insert`, + value: { id: 3, name: `Charlie`, active: true }, + cursor: 3, + }) + users.utils.commit() + await flushPromises() + + // Only cursor 3 should produce an event; cursors 1 & 2 are replay + expect(events).toHaveLength(1) + expect(events[0]!.type).toBe(`enter`) + expect(events[0]!.value.name).toBe(`Charlie`) + expect(events[0]!.cursor).toBe(3) + + await effect.dispose() + }) }) describe(`filtered queries`, () => { diff --git a/packages/react-db/src/useLiveQueryEffect.ts b/packages/react-db/src/useLiveQueryEffect.ts index ab39558e6..0e8a4523e 100644 --- a/packages/react-db/src/useLiveQueryEffect.ts +++ b/packages/react-db/src/useLiveQueryEffect.ts @@ -39,6 +39,7 @@ export function useLiveQueryEffect< id: config.id, query: config.query, skipInitial: config.skipInitial, + startAfter: config.startAfter, onEnter: (event, ctx) => configRef.current.onEnter?.(event, ctx), onUpdate: (event, ctx) => configRef.current.onUpdate?.(event, ctx), onExit: (event, ctx) => configRef.current.onExit?.(event, ctx), From 9143af1c4130d78cf897b22c42fc03f6f43c21e1 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Wed, 25 Mar 2026 11:13:57 -0600 Subject: [PATCH 02/11] chore: add changeset for cursor-aware effect replay Co-Authored-By: Claude Opus 4.6 --- .changeset/cursor-aware-effect-replay.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changeset/cursor-aware-effect-replay.md diff --git a/.changeset/cursor-aware-effect-replay.md b/.changeset/cursor-aware-effect-replay.md new file mode 100644 index 000000000..7df7a90c8 --- /dev/null +++ b/.changeset/cursor-aware-effect-replay.md @@ -0,0 +1,6 @@ +--- +'@tanstack/db': patch +'@tanstack/react-db': patch +--- + +Add cursor-aware effect replay via `startAfter` option on `createEffect` and `useLiveQueryEffect`. Sync writes can now carry an opaque sortable cursor that propagates through `ChangeMessage` and `DeltaEvent`, enabling effects to suppress callbacks during historical replay while still hydrating internal query state. From 89378c4e15461f21e97b4cc3028df58fe49055ba Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Wed, 25 Mar 2026 11:14:46 -0600 Subject: [PATCH 03/11] docs: add JSDoc clarifying batch-level cursor semantics on DeltaEvent Co-Authored-By: Claude Opus 4.6 --- packages/db/src/query/effect.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/db/src/query/effect.ts b/packages/db/src/query/effect.ts index 8fe96e2d4..e9a542197 100644 --- a/packages/db/src/query/effect.ts +++ b/packages/db/src/query/effect.ts @@ -49,6 +49,7 @@ export type DeltaEvent< key: TKey /** Current value for the entering row */ value: TRow + /** Batch-level high-water cursor — the maximum cursor observed across all source changes in this batch. Use for checkpointing via startAfter. */ cursor?: CollectionCursor metadata?: Record } @@ -57,6 +58,7 @@ export type DeltaEvent< key: TKey /** Current value for the exiting row */ value: TRow + /** Batch-level high-water cursor — the maximum cursor observed across all source changes in this batch. Use for checkpointing via startAfter. */ cursor?: CollectionCursor metadata?: Record } @@ -67,6 +69,7 @@ export type DeltaEvent< value: TRow /** Previous value before the batch */ previousValue: TRow + /** Batch-level high-water cursor — the maximum cursor observed across all source changes in this batch. Use for checkpointing via startAfter. */ cursor?: CollectionCursor metadata?: Record } From 653ecebe962e13ff33cc09321b8210519bbbba83 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Wed, 25 Mar 2026 17:16:28 +0000 Subject: [PATCH 04/11] ci: apply automated fixes --- packages/db/src/query/effect.ts | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/packages/db/src/query/effect.ts b/packages/db/src/query/effect.ts index e9a542197..3dc7eb247 100644 --- a/packages/db/src/query/effect.ts +++ b/packages/db/src/query/effect.ts @@ -688,10 +688,7 @@ class EffectPipelineRunner { ): void { let firstLiveIndex: number try { - firstLiveIndex = findFirstChangeAfterCursor( - changes, - this.startAfter!, - ) + firstLiveIndex = findFirstChangeAfterCursor(changes, this.startAfter!) } catch (error) { this.onSourceError( error instanceof Error ? error : new Error(String(error)), @@ -1201,10 +1198,7 @@ function classifyDelta( return undefined } -function attachCursorToEvent< - TRow extends object, - TKey extends string | number, ->( +function attachCursorToEvent( event: DeltaEvent, cursor: CollectionCursor | undefined, ): DeltaEvent { From d651fdfb70a91924d990c85b093e45f26e0935c6 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Wed, 25 Mar 2026 11:25:09 -0600 Subject: [PATCH 05/11] fix: reject startAfter on multi-source (join) effects Cursor gate state is global to the effect, so a cursor from one source collection could incorrectly open the gate for another. Validate at construction time that startAfter is only used with single-source effects until per-source cursor tracking is implemented. Co-Authored-By: Claude Opus 4.6 --- packages/db/src/query/effect.ts | 10 ++++++++++ packages/db/tests/effect.test.ts | 18 ++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/packages/db/src/query/effect.ts b/packages/db/src/query/effect.ts index 3dc7eb247..5df998155 100644 --- a/packages/db/src/query/effect.ts +++ b/packages/db/src/query/effect.ts @@ -410,6 +410,16 @@ class EffectPipelineRunner { this.collections = extractCollectionsFromQuery(this.query) const aliasesById = extractCollectionAliases(this.query) + if ( + config.startAfter !== undefined && + Object.keys(this.collections).length > 1 + ) { + throw new Error( + `startAfter is only supported for single-source effects. ` + + `This effect queries ${Object.keys(this.collections).length} collections.`, + ) + } + // Build alias → collection map this.collectionByAlias = {} for (const [collectionId, aliases] of aliasesById.entries()) { diff --git a/packages/db/tests/effect.test.ts b/packages/db/tests/effect.test.ts index 08879dd14..18f8317f4 100644 --- a/packages/db/tests/effect.test.ts +++ b/packages/db/tests/effect.test.ts @@ -568,6 +568,24 @@ describe(`createEffect`, () => { await effect.dispose() }) + + it(`should throw when startAfter is used with multi-source effects`, () => { + const users = createUsersCollection() + const issues = createIssuesCollection() + + expect(() => { + createEffect({ + query: (q) => + q + .from({ issue: issues }) + .join({ user: users }, ({ issue, user }) => + eq(issue.userId, user.id), + ), + onBatch: () => {}, + startAfter: 1, + }) + }).toThrow(/single-source/) + }) }) describe(`filtered queries`, () => { From 3fc7e6b55b575e6aa307daa7937becb9e87fcf35 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Wed, 25 Mar 2026 11:37:28 -0600 Subject: [PATCH 06/11] fix: validate monotonic cursor ordering and document same-key limitation Reject non-monotonic cursor sequences within a sync batch via onSourceError instead of silently leaking replay changes. Document the known limitation that same-key replay+live changes within one transaction produce an enter event instead of update due to transaction-scoped graph coalescing. Co-Authored-By: Claude Opus 4.6 --- packages/db/src/query/effect.ts | 24 +++++++-- packages/db/tests/effect.test.ts | 93 ++++++++++++++++++++++++++++++++ 2 files changed, 113 insertions(+), 4 deletions(-) diff --git a/packages/db/src/query/effect.ts b/packages/db/src/query/effect.ts index 5df998155..d49fb396a 100644 --- a/packages/db/src/query/effect.ts +++ b/packages/db/src/query/effect.ts @@ -738,11 +738,19 @@ class EffectPipelineRunner { continue } try { - if ( - this.pendingBatchCursor === undefined || - compareCollectionCursors(cursor, this.pendingBatchCursor) > 0 - ) { + if (this.pendingBatchCursor === undefined) { this.pendingBatchCursor = cursor + } else { + const cmp = compareCollectionCursors(cursor, this.pendingBatchCursor) + if (cmp < 0) { + throw new Error( + `Cursors within a sync batch must be monotonically ordered. ` + + `Saw ${String(cursor)} after ${String(this.pendingBatchCursor)}.`, + ) + } + if (cmp > 0) { + this.pendingBatchCursor = cursor + } } } catch (error) { this.onSourceError( @@ -1227,11 +1235,19 @@ function findFirstChangeAfterCursor( startAfter: CollectionCursor, ): number { let anyCursor = false + let lastCursor: CollectionCursor | undefined for (let index = 0; index < changes.length; index++) { const cursor = changes[index]!.cursor if (cursor === undefined) { continue } + if (lastCursor !== undefined && compareCollectionCursors(cursor, lastCursor) < 0) { + throw new Error( + `Cursors within a sync batch must be monotonically ordered. ` + + `Saw ${String(cursor)} after ${String(lastCursor)}.`, + ) + } + lastCursor = cursor anyCursor = true if (compareCollectionCursors(cursor, startAfter) > 0) { // Walk backwards to include any preceding uncursored changes — diff --git a/packages/db/tests/effect.test.ts b/packages/db/tests/effect.test.ts index 18f8317f4..477a6db6c 100644 --- a/packages/db/tests/effect.test.ts +++ b/packages/db/tests/effect.test.ts @@ -569,6 +569,99 @@ describe(`createEffect`, () => { await effect.dispose() }) + it(`should emit enter when same key crosses startAfter within one transaction (known limitation)`, async () => { + // Known limitation: when the same key has a replay insert and a live + // update within one transaction, the transaction-scoped scheduler + // coalesces both into a single graph run. D2 sees the net result as + // a new row, so the event type is `enter` instead of `update`. + // Correct behavior would be `update` with previousValue from the + // replay insert, but fixing this requires per-change D2 fencing + // within transactions, which is too invasive for the initial cut. + const users = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `same-key-straddle-users`, + getKey: (user) => user.id, + }), + ) + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + onBatch: collectBatchEvents(events), + startAfter: 2, + }) + + users.utils.markReady() + await flushPromises() + + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 1, name: `Alice`, active: true }, + cursor: 2, + }) + users.utils.write({ + type: `update`, + value: { id: 1, name: `Alice v2`, active: true }, + previousValue: { id: 1, name: `Alice`, active: true }, + cursor: 3, + }) + users.utils.commit() + await flushPromises() + + expect(events).toHaveLength(1) + // Ideally this would be `update`, but coalescing produces `enter` + expect(events[0]!.type).toBe(`enter`) + expect(events[0]!.value.name).toBe(`Alice v2`) + + await effect.dispose() + }) + + it(`should reject non-monotonic cursor sequences within a batch`, async () => { + const users = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `non-monotonic-cursor-users`, + getKey: (user) => user.id, + }), + ) + const errors: Array = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + onBatch: () => {}, + startAfter: 2, + onSourceError: (err) => errors.push(err), + }) + + users.utils.markReady() + await flushPromises() + + // Non-monotonic batch: [cursor 1, cursor 3, cursor 2] + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 1, name: `Alice`, active: true }, + cursor: 1, + }) + users.utils.write({ + type: `insert`, + value: { id: 2, name: `Bob`, active: true }, + cursor: 3, + }) + users.utils.write({ + type: `insert`, + value: { id: 3, name: `Charlie`, active: true }, + cursor: 2, + }) + users.utils.commit() + await flushPromises() + + expect(errors).toHaveLength(1) + expect(errors[0]!.message).toMatch(/monotonically/) + + await effect.dispose() + }) + it(`should throw when startAfter is used with multi-source effects`, () => { const users = createUsersCollection() const issues = createIssuesCollection() From d5f71404bd46b4ba94a623264585e48c864e9164 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Wed, 25 Mar 2026 17:38:44 +0000 Subject: [PATCH 07/11] ci: apply automated fixes --- packages/db/src/query/effect.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/db/src/query/effect.ts b/packages/db/src/query/effect.ts index d49fb396a..7b7438247 100644 --- a/packages/db/src/query/effect.ts +++ b/packages/db/src/query/effect.ts @@ -1241,7 +1241,10 @@ function findFirstChangeAfterCursor( if (cursor === undefined) { continue } - if (lastCursor !== undefined && compareCollectionCursors(cursor, lastCursor) < 0) { + if ( + lastCursor !== undefined && + compareCollectionCursors(cursor, lastCursor) < 0 + ) { throw new Error( `Cursors within a sync batch must be monotonically ordered. ` + `Saw ${String(cursor)} after ${String(lastCursor)}.`, From fe0ee4270dd655d3e598b79461ae9af2c309fea1 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Wed, 25 Mar 2026 11:41:13 -0600 Subject: [PATCH 08/11] feat: pass Electric stream offset as cursor on sync writes Thread stream.lastOffset as the cursor field on each write in processChangeMessage, enabling effects with startAfter to resume from a persisted Electric offset. Co-Authored-By: Claude Opus 4.6 --- packages/electric-db-collection/src/electric.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index ae398ed17..4c636b076 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -1459,6 +1459,7 @@ function createElectricSync>( write({ type: isDuplicateInsert ? `update` : operation, value: changeMessage.value, + cursor: stream.lastOffset, // Include the primary key and relation info in the metadata metadata: { ...changeMessage.headers, From 37d38ba159b7db4f3ef26cd78ed64c7b93b8766a Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Wed, 25 Mar 2026 12:01:03 -0600 Subject: [PATCH 09/11] docs: document startAfter requirements in JSDoc Clarify that startAfter requires monotonic cursors from the sync source and is only supported for single-source (non-join) effects. Co-Authored-By: Claude Opus 4.6 --- packages/db/src/query/effect.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/db/src/query/effect.ts b/packages/db/src/query/effect.ts index 7b7438247..d365be9ce 100644 --- a/packages/db/src/query/effect.ts +++ b/packages/db/src/query/effect.ts @@ -143,6 +143,9 @@ export interface EffectConfig< /** * Suppress callbacks until the source replay advances past this cursor. * Historical changes at or before the cursor still update internal query state. + * + * Requires the sync source to provide a monotonic cursor on every + * `sync.write()` call. Only supported for single-source (non-join) effects. */ startAfter?: CollectionCursor } From a767f8a74b6cfc44ab140394321715dd403b3455 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Wed, 25 Mar 2026 12:02:46 -0600 Subject: [PATCH 10/11] =?UTF-8?q?fix:=20revert=20same-key=20straddle=20wor?= =?UTF-8?q?karound=20=E2=80=94=20scenario=20is=20impossible?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The same-key boundary-crossing case (insert at cursor N, update at cursor N+1 in one transaction) cannot happen in practice because startAfter is always set from a completed transaction boundary. Within a single transaction, all changes share the same cursor space. Removes the test documenting this as a limitation and reverts the synchronous graph flush that was added to address it. Co-Authored-By: Claude Opus 4.6 --- packages/db/tests/effect.test.ts | 48 -------------------------------- 1 file changed, 48 deletions(-) diff --git a/packages/db/tests/effect.test.ts b/packages/db/tests/effect.test.ts index 477a6db6c..95cc42b50 100644 --- a/packages/db/tests/effect.test.ts +++ b/packages/db/tests/effect.test.ts @@ -569,54 +569,6 @@ describe(`createEffect`, () => { await effect.dispose() }) - it(`should emit enter when same key crosses startAfter within one transaction (known limitation)`, async () => { - // Known limitation: when the same key has a replay insert and a live - // update within one transaction, the transaction-scoped scheduler - // coalesces both into a single graph run. D2 sees the net result as - // a new row, so the event type is `enter` instead of `update`. - // Correct behavior would be `update` with previousValue from the - // replay insert, but fixing this requires per-change D2 fencing - // within transactions, which is too invasive for the initial cut. - const users = createCollection( - mockSyncCollectionOptionsNoInitialState({ - id: `same-key-straddle-users`, - getKey: (user) => user.id, - }), - ) - const events: Array> = [] - - const effect = createEffect({ - query: (q) => q.from({ user: users }), - onBatch: collectBatchEvents(events), - startAfter: 2, - }) - - users.utils.markReady() - await flushPromises() - - users.utils.begin() - users.utils.write({ - type: `insert`, - value: { id: 1, name: `Alice`, active: true }, - cursor: 2, - }) - users.utils.write({ - type: `update`, - value: { id: 1, name: `Alice v2`, active: true }, - previousValue: { id: 1, name: `Alice`, active: true }, - cursor: 3, - }) - users.utils.commit() - await flushPromises() - - expect(events).toHaveLength(1) - // Ideally this would be `update`, but coalescing produces `enter` - expect(events[0]!.type).toBe(`enter`) - expect(events[0]!.value.name).toBe(`Alice v2`) - - await effect.dispose() - }) - it(`should reject non-monotonic cursor sequences within a batch`, async () => { const users = createCollection( mockSyncCollectionOptionsNoInitialState({ From 14e314afcf8999cd870741663282d4884cc314eb Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Wed, 25 Mar 2026 12:44:05 -0600 Subject: [PATCH 11/11] feat: support per-source cursor gating for join queries startAfter now accepts Record for per-source gating in join effects. Each source gets an independent gate. DeltaEvent gains triggeringSource and cursors fields for gated effects. Join tap callbacks during graph runs are isolated from cursor gate logic to prevent false gate opens. Co-Authored-By: Claude Opus 4.6 --- .changeset/cursor-aware-effect-replay.md | 2 +- packages/db/src/query/effect.ts | 184 ++++++++++++++------ packages/db/tests/effect.test.ts | 203 ++++++++++++++++++++++- 3 files changed, 335 insertions(+), 54 deletions(-) diff --git a/.changeset/cursor-aware-effect-replay.md b/.changeset/cursor-aware-effect-replay.md index 7df7a90c8..add735902 100644 --- a/.changeset/cursor-aware-effect-replay.md +++ b/.changeset/cursor-aware-effect-replay.md @@ -3,4 +3,4 @@ '@tanstack/react-db': patch --- -Add cursor-aware effect replay via `startAfter` option on `createEffect` and `useLiveQueryEffect`. Sync writes can now carry an opaque sortable cursor that propagates through `ChangeMessage` and `DeltaEvent`, enabling effects to suppress callbacks during historical replay while still hydrating internal query state. +Add cursor-aware effect replay via `startAfter` option on `createEffect` and `useLiveQueryEffect`. Sync writes can now carry an opaque sortable cursor that propagates through `ChangeMessage` and `DeltaEvent`, enabling effects to suppress callbacks during historical replay while still hydrating internal query state. `startAfter` accepts either a scalar cursor (single-source) or a `Record` for per-source gating in join queries. `DeltaEvent` now includes `triggeringSource` and `cursors` fields for gated effects. diff --git a/packages/db/src/query/effect.ts b/packages/db/src/query/effect.ts index d365be9ce..64ca1efa0 100644 --- a/packages/db/src/query/effect.ts +++ b/packages/db/src/query/effect.ts @@ -51,6 +51,10 @@ export type DeltaEvent< value: TRow /** Batch-level high-water cursor — the maximum cursor observed across all source changes in this batch. Use for checkpointing via startAfter. */ cursor?: CollectionCursor + /** Per-source cursor map. Present when cursor gating (startAfter) is active. */ + cursors?: Record + /** Source alias whose changes triggered this batch. Present when cursor gating is active. */ + triggeringSource?: string metadata?: Record } | { @@ -60,6 +64,10 @@ export type DeltaEvent< value: TRow /** Batch-level high-water cursor — the maximum cursor observed across all source changes in this batch. Use for checkpointing via startAfter. */ cursor?: CollectionCursor + /** Per-source cursor map. Present when cursor gating (startAfter) is active. */ + cursors?: Record + /** Source alias whose changes triggered this batch. Present when cursor gating is active. */ + triggeringSource?: string metadata?: Record } | { @@ -71,6 +79,10 @@ export type DeltaEvent< previousValue: TRow /** Batch-level high-water cursor — the maximum cursor observed across all source changes in this batch. Use for checkpointing via startAfter. */ cursor?: CollectionCursor + /** Per-source cursor map. Present when cursor gating (startAfter) is active. */ + cursors?: Record + /** Source alias whose changes triggered this batch. Present when cursor gating is active. */ + triggeringSource?: string metadata?: Record } @@ -145,9 +157,14 @@ export interface EffectConfig< * Historical changes at or before the cursor still update internal query state. * * Requires the sync source to provide a monotonic cursor on every - * `sync.write()` call. Only supported for single-source (non-join) effects. + * `sync.write()` call. + * + * - **Scalar value**: applies to the single source alias. Throws for + * multi-source (join) effects. + * - **Record**: maps source aliases to their respective cursors, enabling + * independent per-source gating for join queries. */ - startAfter?: CollectionCursor + startAfter?: CollectionCursor | Record } /** Handle returned by createEffect */ @@ -324,7 +341,7 @@ interface EffectPipelineRunnerConfig< > { query: EffectQueryInput skipInitial: boolean - startAfter?: CollectionCursor + startAfter?: CollectionCursor | Record onBatchProcessed: (events: Array>) => void /** Called when a source collection enters error or cleaned-up state */ onSourceError: (error: Error) => void @@ -379,9 +396,13 @@ class EffectPipelineRunner { // skipInitial state private readonly skipInitial: boolean private initialLoadComplete = false - private readonly startAfter: CollectionCursor | undefined - private cursorGateOpen: boolean - private pendingBatchCursor: CollectionCursor | undefined + + // Per-alias cursor gating + private readonly startAfterByAlias: Map = new Map() + private readonly cursorGateOpenByAlias: Map = new Map() + private readonly pendingBatchCursorByAlias: Map = + new Map() + private readonly liveAliasesInBatch: Set = new Set() // Scheduler integration private subscribedToAllCollections = false @@ -401,8 +422,6 @@ class EffectPipelineRunner { constructor(config: EffectPipelineRunnerConfig) { this.skipInitial = config.skipInitial - this.startAfter = config.startAfter - this.cursorGateOpen = config.startAfter === undefined this.onBatchProcessed = config.onBatchProcessed this.onSourceError = config.onSourceError @@ -413,16 +432,6 @@ class EffectPipelineRunner { this.collections = extractCollectionsFromQuery(this.query) const aliasesById = extractCollectionAliases(this.query) - if ( - config.startAfter !== undefined && - Object.keys(this.collections).length > 1 - ) { - throw new Error( - `startAfter is only supported for single-source effects. ` + - `This effect queries ${Object.keys(this.collections).length} collections.`, - ) - } - // Build alias → collection map this.collectionByAlias = {} for (const [collectionId, aliases] of aliasesById.entries()) { @@ -433,6 +442,29 @@ class EffectPipelineRunner { } } + // Normalize startAfter into per-alias map + if (config.startAfter !== undefined) { + if (typeof config.startAfter === `object`) { + // Record — per-alias cursors + for (const [alias, cursor] of Object.entries(config.startAfter)) { + this.startAfterByAlias.set(alias, cursor) + this.cursorGateOpenByAlias.set(alias, false) + } + } else { + // Scalar — single-source shorthand + const aliases = Object.keys(this.collectionByAlias) + if (aliases.length !== 1) { + throw new Error( + `A scalar startAfter value is only supported for single-source effects. ` + + `Use a Record to map cursors to source aliases. ` + + `This effect queries ${aliases.length} collections.`, + ) + } + this.startAfterByAlias.set(aliases[0]!, config.startAfter) + this.cursorGateOpenByAlias.set(aliases[0]!, false) + } + } + // Compile the pipeline this.compilePipeline() } @@ -650,10 +682,10 @@ class EffectPipelineRunner { if (orderByInfo) { this.trackSentValues(alias, changes, orderByInfo.comparator) const split = [...splitUpdates(changes)] - this.recordPendingBatchCursor(split) + this.recordPendingBatchCursor(alias, split) this.sendChangesToD2(alias, split) } else { - this.recordPendingBatchCursor(changes) + this.recordPendingBatchCursor(alias, changes) this.sendChangesToD2(alias, changes) } } @@ -681,12 +713,28 @@ class EffectPipelineRunner { return } - if (!this.cursorGateOpen) { + // During a graph run, changes arrive from join tap operators (lazy source + // loads via requestSnapshot). These inherit the root batch's live status + // and must not affect cursor gates or live-source tracking. + if (this.isGraphRunning) { + this.sendChangesToD2(alias, changes) + return + } + + // Check per-alias cursor gate + if ( + this.startAfterByAlias.has(alias) && + this.cursorGateOpenByAlias.get(alias) === false + ) { this.handleSourceChangesBeforeCursorGate(alias, changes) return } - this.recordPendingBatchCursor(changes) + // Gate is open or alias has no gate — mark as live + if (this.startAfterByAlias.size > 0) { + this.liveAliasesInBatch.add(alias) + } + this.recordPendingBatchCursor(alias, changes) this.sendChangesToD2(alias, changes) this.scheduleGraphRun(alias) } @@ -699,9 +747,10 @@ class EffectPipelineRunner { alias: string, changes: Array>, ): void { + const startAfter = this.startAfterByAlias.get(alias)! let firstLiveIndex: number try { - firstLiveIndex = findFirstChangeAfterCursor(changes, this.startAfter!) + firstLiveIndex = findFirstChangeAfterCursor(changes, startAfter) } catch (error) { this.onSourceError( error instanceof Error ? error : new Error(String(error)), @@ -724,15 +773,17 @@ class EffectPipelineRunner { this.runGraph() } - this.cursorGateOpen = true + this.cursorGateOpenByAlias.set(alias, true) + this.liveAliasesInBatch.add(alias) const liveChanges = changes.slice(firstLiveIndex) - this.recordPendingBatchCursor(liveChanges) + this.recordPendingBatchCursor(alias, liveChanges) this.sendChangesToD2(alias, liveChanges) this.scheduleGraphRun(alias) } private recordPendingBatchCursor( + alias: string, changes: Array>, ): void { for (const change of changes) { @@ -741,18 +792,19 @@ class EffectPipelineRunner { continue } try { - if (this.pendingBatchCursor === undefined) { - this.pendingBatchCursor = cursor + const current = this.pendingBatchCursorByAlias.get(alias) + if (current === undefined) { + this.pendingBatchCursorByAlias.set(alias, cursor) } else { - const cmp = compareCollectionCursors(cursor, this.pendingBatchCursor) + const cmp = compareCollectionCursors(cursor, current) if (cmp < 0) { throw new Error( `Cursors within a sync batch must be monotonically ordered. ` + - `Saw ${String(cursor)} after ${String(this.pendingBatchCursor)}.`, + `Saw ${String(cursor)} after ${String(current)}.`, ) } if (cmp > 0) { - this.pendingBatchCursor = cursor + this.pendingBatchCursorByAlias.set(alias, cursor) } } } catch (error) { @@ -885,33 +937,75 @@ class EffectPipelineRunner { /** Classify accumulated changes into DeltaEvents and invoke the callback */ private flushPendingChanges(): void { if (this.pendingChanges.size === 0) { - this.pendingBatchCursor = undefined + this.pendingBatchCursorByAlias.clear() + this.liveAliasesInBatch.clear() return } - const batchCursor = this.pendingBatchCursor + const hasCursorGating = this.startAfterByAlias.size > 0 - // If skipInitial and initial load isn't complete yet, discard + // Discard if skipInitial isn't satisfied or no live sources contributed if ( (this.skipInitial && !this.initialLoadComplete) || - !this.cursorGateOpen + (hasCursorGating && this.liveAliasesInBatch.size === 0) ) { this.pendingChanges = new Map() - this.pendingBatchCursor = undefined + this.pendingBatchCursorByAlias.clear() + this.liveAliasesInBatch.clear() return } + // Compute batch-level cursor (max across all per-alias cursors) + let batchCursor: CollectionCursor | undefined + for (const cursor of this.pendingBatchCursorByAlias.values()) { + if (batchCursor === undefined) { + batchCursor = cursor + } else { + try { + if (compareCollectionCursors(cursor, batchCursor) > 0) { + batchCursor = cursor + } + } catch { + // Type mismatch across sources — keep the first + break + } + } + } + + // Build per-source cursor map and triggering source (only for gated effects) + let cursors: Record | undefined + let triggeringSource: string | undefined + if (hasCursorGating) { + if (this.pendingBatchCursorByAlias.size > 0) { + cursors = Object.fromEntries(this.pendingBatchCursorByAlias) + } + if (this.liveAliasesInBatch.size > 0) { + triggeringSource = this.liveAliasesInBatch.values().next().value + } + } + const events: Array> = [] for (const [key, changes] of this.pendingChanges) { const event = classifyDelta(key as TKey, changes) if (event) { - events.push(attachCursorToEvent(event, batchCursor)) + let enriched: DeltaEvent = event + if (batchCursor !== undefined) { + enriched = { ...enriched, cursor: batchCursor } + } + if (triggeringSource !== undefined) { + enriched = { ...enriched, triggeringSource } + } + if (cursors !== undefined) { + enriched = { ...enriched, cursors } + } + events.push(enriched) } } this.pendingChanges = new Map() - this.pendingBatchCursor = undefined + this.pendingBatchCursorByAlias.clear() + this.liveAliasesInBatch.clear() if (events.length > 0) { this.onBatchProcessed(events) @@ -1101,6 +1195,8 @@ class EffectPipelineRunner { this.unsubscribeCallbacks.clear() this.sentToD2KeysByAlias.clear() this.pendingChanges.clear() + this.pendingBatchCursorByAlias.clear() + this.liveAliasesInBatch.clear() this.lazySources.clear() this.builderDependencies.clear() this.biggestSentValue.clear() @@ -1219,20 +1315,6 @@ function classifyDelta( return undefined } -function attachCursorToEvent( - event: DeltaEvent, - cursor: CollectionCursor | undefined, -): DeltaEvent { - if (cursor === undefined) { - return event - } - - return { - ...event, - cursor, - } -} - function findFirstChangeAfterCursor( changes: Array>, startAfter: CollectionCursor, diff --git a/packages/db/tests/effect.test.ts b/packages/db/tests/effect.test.ts index 95cc42b50..0cdcc4118 100644 --- a/packages/db/tests/effect.test.ts +++ b/packages/db/tests/effect.test.ts @@ -614,7 +614,7 @@ describe(`createEffect`, () => { await effect.dispose() }) - it(`should throw when startAfter is used with multi-source effects`, () => { + it(`should throw when a scalar startAfter is used with multi-source effects`, () => { const users = createUsersCollection() const issues = createIssuesCollection() @@ -629,7 +629,206 @@ describe(`createEffect`, () => { onBatch: () => {}, startAfter: 1, }) - }).toThrow(/single-source/) + }).toThrow(/scalar startAfter/) + }) + + it(`should support per-source startAfter with Record for join queries`, async () => { + const users = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `join-cursor-users`, + getKey: (user) => user.id, + }), + ) + const issues = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `join-cursor-issues`, + getKey: (issue) => issue.id, + }), + ) + const events: Array> = [] + + const effect = createEffect({ + query: (q) => + q + .from({ issue: issues }) + .join({ user: users }, ({ issue, user }) => + eq(issue.userId, user.id), + ), + onBatch: collectBatchEvents(events), + startAfter: { issue: 2, user: 3 }, + }) + + users.utils.markReady() + issues.utils.markReady() + await flushPromises() + + // Replay: both sources send data at or before their respective cursors + issues.utils.begin() + issues.utils.write({ + type: `insert`, + value: { id: 1, title: `Bug report`, userId: 1 }, + cursor: 1, + }) + issues.utils.write({ + type: `insert`, + value: { id: 2, title: `Feature request`, userId: 2 }, + cursor: 2, + }) + issues.utils.commit() + await flushPromises() + + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 1, name: `Alice`, active: true }, + cursor: 2, + }) + users.utils.write({ + type: `insert`, + value: { id: 2, name: `Bob`, active: true }, + cursor: 3, + }) + users.utils.commit() + await flushPromises() + + // No events yet — both gates are still closed + expect(events).toHaveLength(0) + + // Now issue cursor 3 arrives (past issue gate boundary of 2) + issues.utils.begin() + issues.utils.write({ + type: `insert`, + value: { id: 3, title: `New issue`, userId: 1 }, + cursor: 3, + }) + issues.utils.commit() + await flushPromises() + + // Issue gate opened — events should fire (Alice is already hydrated via join) + expect(events.length).toBeGreaterThan(0) + const issueEvent = events.find( + (e) => e.type === `enter` && e.value?.issue?.title === `New issue`, + ) + expect(issueEvent).toBeDefined() + expect(issueEvent!.triggeringSource).toBe(`issue`) + expect(issueEvent!.cursors).toBeDefined() + expect(issueEvent!.cursors!.issue).toBe(3) + + await effect.dispose() + }) + + it(`should emit events from ungated source while gated source replays`, async () => { + const users = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `partial-gate-users`, + getKey: (user) => user.id, + }), + ) + const issues = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `partial-gate-issues`, + getKey: (issue) => issue.id, + }), + ) + const events: Array> = [] + + // Only gate the issue source; user source has no gate + const effect = createEffect({ + query: (q) => + q + .from({ issue: issues }) + .join({ user: users }, ({ issue, user }) => + eq(issue.userId, user.id), + ), + onBatch: collectBatchEvents(events), + startAfter: { issue: 5 }, + }) + + users.utils.markReady() + issues.utils.markReady() + await flushPromises() + + // Hydrate user data (no gate on users — always live) + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 1, name: `Alice`, active: true }, + }) + users.utils.commit() + await flushPromises() + + // User has no gate, so changes from user source are live + // But there are no issues yet to join with, so no join results + events.length = 0 + + // Replay issues (gate closed — cursor <= 5) + issues.utils.begin() + issues.utils.write({ + type: `insert`, + value: { id: 1, title: `Old issue`, userId: 1 }, + cursor: 4, + }) + issues.utils.commit() + await flushPromises() + + // Issue gate is still closed — no events + expect(events).toHaveLength(0) + + // Live issue arrives (cursor > 5) + issues.utils.begin() + issues.utils.write({ + type: `insert`, + value: { id: 2, title: `New issue`, userId: 1 }, + cursor: 6, + }) + issues.utils.commit() + await flushPromises() + + // Gate opens — should see the new issue joined with Alice + const enterEvents = events.filter((e) => e.type === `enter`) + expect(enterEvents.length).toBeGreaterThan(0) + const newIssueEvent = enterEvents.find( + (e) => e.value?.issue?.title === `New issue`, + ) + expect(newIssueEvent).toBeDefined() + + await effect.dispose() + }) + + it(`should include triggeringSource and cursors on DeltaEvents for gated effects`, async () => { + const users = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `triggering-source-users`, + getKey: (user) => user.id, + }), + ) + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + onBatch: collectBatchEvents(events), + startAfter: 0, + }) + + users.utils.markReady() + await flushPromises() + + // Send a change past the gate + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 1, name: `Alice`, active: true }, + cursor: 1, + }) + users.utils.commit() + await flushPromises() + + expect(events).toHaveLength(1) + expect(events[0]!.triggeringSource).toBe(`user`) + expect(events[0]!.cursors).toEqual({ user: 1 }) + expect(events[0]!.cursor).toBe(1) + + await effect.dispose() }) })