diff --git a/.changeset/cursor-aware-effect-replay.md b/.changeset/cursor-aware-effect-replay.md new file mode 100644 index 000000000..add735902 --- /dev/null +++ b/.changeset/cursor-aware-effect-replay.md @@ -0,0 +1,6 @@ +--- +'@tanstack/db': patch +'@tanstack/react-db': patch +--- + +Add cursor-aware effect replay via `startAfter` option on `createEffect` and `useLiveQueryEffect`. Sync writes can now carry an opaque sortable cursor that propagates through `ChangeMessage` and `DeltaEvent`, enabling effects to suppress callbacks during historical replay while still hydrating internal query state. `startAfter` accepts either a scalar cursor (single-source) or a `Record` for per-source gating in join queries. `DeltaEvent` now includes `triggeringSource` and `cursors` fields for gated effects. diff --git a/packages/db/src/collection/state.ts b/packages/db/src/collection/state.ts index af65cb801..2b6b066a6 100644 --- a/packages/db/src/collection/state.ts +++ b/packages/db/src/collection/state.ts @@ -12,6 +12,7 @@ import type { StandardSchemaV1 } from '@standard-schema/spec' import type { ChangeMessage, CollectionConfig, + CollectionCursor, OptimisticChangeMessage, } from '../types' import type { CollectionImpl } from './index.js' @@ -28,6 +29,7 @@ interface PendingSyncedTransaction< operations: Array> truncate?: boolean deletedKeys: Set + rowCursorWrites: Map rowMetadataWrites: Map collectionMetadataWrites: Map optimisticSnapshot?: { @@ -42,6 +44,10 @@ interface PendingSyncedTransaction< immediate?: boolean } +type PendingCursorWrite = + | { type: `set`; value: CollectionCursor } + | { type: `delete` } + type PendingMetadataWrite = { type: `set`; value: unknown } | { type: `delete` } type InternalChangeMessage< @@ -313,6 +319,7 @@ export class CollectionStateManager< type: change.type, value: enrichedValue, previousValue: enrichedPreviousValue, + cursor: change.cursor, metadata: change.metadata, } as ChangeMessage, TKey> } @@ -895,6 +902,7 @@ export class CollectionStateManager< } const events: Array> = [] + const eventCursors = new Map() const rowUpdateMode = this.config.sync.rowUpdateMode || `partial` const completedOptimisticOps = new Map< TKey, @@ -1034,6 +1042,14 @@ export class CollectionStateManager< this.syncedMetadata.set(key, metadataWrite.value) } + for (const [key, cursorWrite] of transaction.rowCursorWrites) { + if (cursorWrite.type === `delete`) { + eventCursors.delete(key) + continue + } + eventCursors.set(key, cursorWrite.value) + } + for (const [ key, metadataWrite, @@ -1162,6 +1178,7 @@ export class CollectionStateManager< // Now check what actually changed in the final visible state for (const key of changedKeys) { + const cursor = eventCursors.get(key) const previousVisibleValue = currentVisibleState.get(key) const newVisibleValue = this.get(key) // This returns the new derived state const previousVirtualProps = this.getVirtualPropsSnapshotForState(key, { @@ -1235,12 +1252,14 @@ export class CollectionStateManager< key, value: newVisibleValue, previousValue: previousValueWithVirtualFromCompleted, + cursor, }) } else { events.push({ type: `insert`, key, value: newVisibleValue, + cursor, }) } } else if ( @@ -1251,6 +1270,7 @@ export class CollectionStateManager< type: `delete`, key, value: previousValueWithVirtual ?? previousVisibleValue, + cursor, }) } else if ( previousVisibleValue !== undefined && @@ -1263,6 +1283,7 @@ export class CollectionStateManager< key, value: newVisibleValue, previousValue: previousValueWithVirtual ?? previousVisibleValue, + cursor, }) } } diff --git a/packages/db/src/collection/sync.ts b/packages/db/src/collection/sync.ts index 82fffb772..0cede371c 100644 --- a/packages/db/src/collection/sync.ts +++ b/packages/db/src/collection/sync.ts @@ -94,6 +94,7 @@ export class CollectionSyncManager< committed: false, operations: [], deletedKeys: new Set(), + rowCursorWrites: new Map(), rowMetadataWrites: new Map(), collectionMetadataWrites: new Map(), immediate: options?.immediate, @@ -190,6 +191,13 @@ export class CollectionSyncManager< value: message.metadata, }) } + + if (message.cursor !== undefined) { + pendingTransaction.rowCursorWrites.set(key, { + type: `set`, + value: message.cursor, + }) + } }, commit: () => { const pendingTransaction = @@ -225,6 +233,7 @@ export class CollectionSyncManager< // Clear all operations from the current transaction pendingTransaction.operations = [] pendingTransaction.deletedKeys.clear() + pendingTransaction.rowCursorWrites.clear() pendingTransaction.rowMetadataWrites.clear() // Intentionally preserve collectionMetadataWrites across truncate. // Collection-scoped metadata (for example persisted resume/reset diff --git a/packages/db/src/query/effect.ts b/packages/db/src/query/effect.ts index 10857d06e..64ca1efa0 100644 --- a/packages/db/src/query/effect.ts +++ b/packages/db/src/query/effect.ts @@ -25,7 +25,12 @@ import type { InitialQueryBuilder, QueryBuilder } from './builder/index.js' import type { Context } from './builder/types.js' import type { BasicExpression, QueryIR } from './ir.js' import type { OrderByOptimizationInfo } from './compiler/order-by.js' -import type { ChangeMessage, KeyedStream, ResultStream } from '../types.js' +import type { + ChangeMessage, + CollectionCursor, + KeyedStream, + ResultStream, +} from '../types.js' // --------------------------------------------------------------------------- // Public Types @@ -44,6 +49,12 @@ export type DeltaEvent< key: TKey /** Current value for the entering row */ value: TRow + /** Batch-level high-water cursor — the maximum cursor observed across all source changes in this batch. Use for checkpointing via startAfter. */ + cursor?: CollectionCursor + /** Per-source cursor map. Present when cursor gating (startAfter) is active. */ + cursors?: Record + /** Source alias whose changes triggered this batch. Present when cursor gating is active. */ + triggeringSource?: string metadata?: Record } | { @@ -51,6 +62,12 @@ export type DeltaEvent< key: TKey /** Current value for the exiting row */ value: TRow + /** Batch-level high-water cursor — the maximum cursor observed across all source changes in this batch. Use for checkpointing via startAfter. */ + cursor?: CollectionCursor + /** Per-source cursor map. Present when cursor gating (startAfter) is active. */ + cursors?: Record + /** Source alias whose changes triggered this batch. Present when cursor gating is active. */ + triggeringSource?: string metadata?: Record } | { @@ -60,6 +77,12 @@ export type DeltaEvent< value: TRow /** Previous value before the batch */ previousValue: TRow + /** Batch-level high-water cursor — the maximum cursor observed across all source changes in this batch. Use for checkpointing via startAfter. */ + cursor?: CollectionCursor + /** Per-source cursor map. Present when cursor gating (startAfter) is active. */ + cursors?: Record + /** Source alias whose changes triggered this batch. Present when cursor gating is active. */ + triggeringSource?: string metadata?: Record } @@ -128,6 +151,20 @@ export interface EffectConfig< * Set to true for effects that should only process new changes. */ skipInitial?: boolean + + /** + * Suppress callbacks until the source replay advances past this cursor. + * Historical changes at or before the cursor still update internal query state. + * + * Requires the sync source to provide a monotonic cursor on every + * `sync.write()` call. + * + * - **Scalar value**: applies to the single source alias. Throws for + * multi-source (join) effects. + * - **Record**: maps source aliases to their respective cursors, enabling + * independent per-source gating for join queries. + */ + startAfter?: CollectionCursor | Record } /** Handle returned by createEffect */ @@ -262,6 +299,7 @@ export function createEffect< const runner = new EffectPipelineRunner({ query: config.query, skipInitial: config.skipInitial ?? false, + startAfter: config.startAfter, onBatchProcessed, onSourceError: (error: Error) => { if (disposed) return @@ -303,6 +341,7 @@ interface EffectPipelineRunnerConfig< > { query: EffectQueryInput skipInitial: boolean + startAfter?: CollectionCursor | Record onBatchProcessed: (events: Array>) => void /** Called when a source collection enters error or cleaned-up state */ onSourceError: (error: Error) => void @@ -358,6 +397,13 @@ class EffectPipelineRunner { private readonly skipInitial: boolean private initialLoadComplete = false + // Per-alias cursor gating + private readonly startAfterByAlias: Map = new Map() + private readonly cursorGateOpenByAlias: Map = new Map() + private readonly pendingBatchCursorByAlias: Map = + new Map() + private readonly liveAliasesInBatch: Set = new Set() + // Scheduler integration private subscribedToAllCollections = false private readonly builderDependencies = new Set() @@ -396,6 +442,29 @@ class EffectPipelineRunner { } } + // Normalize startAfter into per-alias map + if (config.startAfter !== undefined) { + if (typeof config.startAfter === `object`) { + // Record — per-alias cursors + for (const [alias, cursor] of Object.entries(config.startAfter)) { + this.startAfterByAlias.set(alias, cursor) + this.cursorGateOpenByAlias.set(alias, false) + } + } else { + // Scalar — single-source shorthand + const aliases = Object.keys(this.collectionByAlias) + if (aliases.length !== 1) { + throw new Error( + `A scalar startAfter value is only supported for single-source effects. ` + + `Use a Record to map cursors to source aliases. ` + + `This effect queries ${aliases.length} collections.`, + ) + } + this.startAfterByAlias.set(aliases[0]!, config.startAfter) + this.cursorGateOpenByAlias.set(aliases[0]!, false) + } + } + // Compile the pipeline this.compilePipeline() } @@ -613,8 +682,10 @@ class EffectPipelineRunner { if (orderByInfo) { this.trackSentValues(alias, changes, orderByInfo.comparator) const split = [...splitUpdates(changes)] + this.recordPendingBatchCursor(alias, split) this.sendChangesToD2(alias, split) } else { + this.recordPendingBatchCursor(alias, changes) this.sendChangesToD2(alias, changes) } } @@ -638,10 +709,113 @@ class EffectPipelineRunner { alias: string, changes: Array>, ): void { + if (changes.length === 0) { + return + } + + // During a graph run, changes arrive from join tap operators (lazy source + // loads via requestSnapshot). These inherit the root batch's live status + // and must not affect cursor gates or live-source tracking. + if (this.isGraphRunning) { + this.sendChangesToD2(alias, changes) + return + } + + // Check per-alias cursor gate + if ( + this.startAfterByAlias.has(alias) && + this.cursorGateOpenByAlias.get(alias) === false + ) { + this.handleSourceChangesBeforeCursorGate(alias, changes) + return + } + + // Gate is open or alias has no gate — mark as live + if (this.startAfterByAlias.size > 0) { + this.liveAliasesInBatch.add(alias) + } + this.recordPendingBatchCursor(alias, changes) this.sendChangesToD2(alias, changes) this.scheduleGraphRun(alias) } + /** + * Replay before startAfter must still hydrate query state, but callbacks stay + * muted until we observe a cursor greater than the configured boundary. + */ + private handleSourceChangesBeforeCursorGate( + alias: string, + changes: Array>, + ): void { + const startAfter = this.startAfterByAlias.get(alias)! + let firstLiveIndex: number + try { + firstLiveIndex = findFirstChangeAfterCursor(changes, startAfter) + } catch (error) { + this.onSourceError( + error instanceof Error ? error : new Error(String(error)), + ) + return + } + + if (firstLiveIndex === -1) { + this.sendChangesToD2(alias, changes) + this.scheduleGraphRun(alias) + return + } + + if (firstLiveIndex > 0) { + const replayChanges = changes.slice(0, firstLiveIndex) + this.sendChangesToD2(alias, replayChanges) + this.scheduleGraphRun(alias) + } else if (!getActiveTransaction() && this.graph?.pendingWork()) { + // Best-effort boundary preservation outside transaction-scoped flushes. + this.runGraph() + } + + this.cursorGateOpenByAlias.set(alias, true) + this.liveAliasesInBatch.add(alias) + + const liveChanges = changes.slice(firstLiveIndex) + this.recordPendingBatchCursor(alias, liveChanges) + this.sendChangesToD2(alias, liveChanges) + this.scheduleGraphRun(alias) + } + + private recordPendingBatchCursor( + alias: string, + changes: Array>, + ): void { + for (const change of changes) { + const { cursor } = change + if (cursor === undefined) { + continue + } + try { + const current = this.pendingBatchCursorByAlias.get(alias) + if (current === undefined) { + this.pendingBatchCursorByAlias.set(alias, cursor) + } else { + const cmp = compareCollectionCursors(cursor, current) + if (cmp < 0) { + throw new Error( + `Cursors within a sync batch must be monotonically ordered. ` + + `Saw ${String(cursor)} after ${String(current)}.`, + ) + } + if (cmp > 0) { + this.pendingBatchCursorByAlias.set(alias, cursor) + } + } + } catch (error) { + this.onSourceError( + error instanceof Error ? error : new Error(String(error)), + ) + return + } + } + } + /** * Schedule a graph run via the transaction-scoped scheduler. * @@ -762,24 +936,76 @@ class EffectPipelineRunner { /** Classify accumulated changes into DeltaEvents and invoke the callback */ private flushPendingChanges(): void { - if (this.pendingChanges.size === 0) return + if (this.pendingChanges.size === 0) { + this.pendingBatchCursorByAlias.clear() + this.liveAliasesInBatch.clear() + return + } - // If skipInitial and initial load isn't complete yet, discard - if (this.skipInitial && !this.initialLoadComplete) { + const hasCursorGating = this.startAfterByAlias.size > 0 + + // Discard if skipInitial isn't satisfied or no live sources contributed + if ( + (this.skipInitial && !this.initialLoadComplete) || + (hasCursorGating && this.liveAliasesInBatch.size === 0) + ) { this.pendingChanges = new Map() + this.pendingBatchCursorByAlias.clear() + this.liveAliasesInBatch.clear() return } + // Compute batch-level cursor (max across all per-alias cursors) + let batchCursor: CollectionCursor | undefined + for (const cursor of this.pendingBatchCursorByAlias.values()) { + if (batchCursor === undefined) { + batchCursor = cursor + } else { + try { + if (compareCollectionCursors(cursor, batchCursor) > 0) { + batchCursor = cursor + } + } catch { + // Type mismatch across sources — keep the first + break + } + } + } + + // Build per-source cursor map and triggering source (only for gated effects) + let cursors: Record | undefined + let triggeringSource: string | undefined + if (hasCursorGating) { + if (this.pendingBatchCursorByAlias.size > 0) { + cursors = Object.fromEntries(this.pendingBatchCursorByAlias) + } + if (this.liveAliasesInBatch.size > 0) { + triggeringSource = this.liveAliasesInBatch.values().next().value + } + } + const events: Array> = [] for (const [key, changes] of this.pendingChanges) { const event = classifyDelta(key as TKey, changes) if (event) { - events.push(event) + let enriched: DeltaEvent = event + if (batchCursor !== undefined) { + enriched = { ...enriched, cursor: batchCursor } + } + if (triggeringSource !== undefined) { + enriched = { ...enriched, triggeringSource } + } + if (cursors !== undefined) { + enriched = { ...enriched, cursors } + } + events.push(enriched) } } this.pendingChanges = new Map() + this.pendingBatchCursorByAlias.clear() + this.liveAliasesInBatch.clear() if (events.length > 0) { this.onBatchProcessed(events) @@ -969,6 +1195,8 @@ class EffectPipelineRunner { this.unsubscribeCallbacks.clear() this.sentToD2KeysByAlias.clear() this.pendingChanges.clear() + this.pendingBatchCursorByAlias.clear() + this.liveAliasesInBatch.clear() this.lazySources.clear() this.builderDependencies.clear() this.biggestSentValue.clear() @@ -1087,6 +1315,66 @@ function classifyDelta( return undefined } +function findFirstChangeAfterCursor( + changes: Array>, + startAfter: CollectionCursor, +): number { + let anyCursor = false + let lastCursor: CollectionCursor | undefined + for (let index = 0; index < changes.length; index++) { + const cursor = changes[index]!.cursor + if (cursor === undefined) { + continue + } + if ( + lastCursor !== undefined && + compareCollectionCursors(cursor, lastCursor) < 0 + ) { + throw new Error( + `Cursors within a sync batch must be monotonically ordered. ` + + `Saw ${String(cursor)} after ${String(lastCursor)}.`, + ) + } + lastCursor = cursor + anyCursor = true + if (compareCollectionCursors(cursor, startAfter) > 0) { + // Walk backwards to include any preceding uncursored changes — + // they have no position in cursor space and should not be + // assumed to be replay. + let liveStart = index + while (liveStart > 0 && changes[liveStart - 1]!.cursor === undefined) { + liveStart-- + } + return liveStart + } + } + + // If no change in this batch carries a cursor, treat the entire batch as + // live — the sync provider has moved past cursor-based replay. + if (!anyCursor) { + return 0 + } + + return -1 +} + +function compareCollectionCursors( + left: CollectionCursor, + right: CollectionCursor, +): number { + if (typeof left !== typeof right) { + throw new Error( + `Collection cursors must use a consistent primitive type. Received ${typeof left} and ${typeof right}.`, + ) + } + + if (left === right) { + return 0 + } + + return left > right ? 1 : -1 +} + /** Track a promise in the in-flight set, automatically removing on settlement */ function trackPromise( promise: Promise, diff --git a/packages/db/src/query/live/utils.ts b/packages/db/src/query/live/utils.ts index 6f37a0587..329bca6bf 100644 --- a/packages/db/src/query/live/utils.ts +++ b/packages/db/src/query/live/utils.ts @@ -252,8 +252,8 @@ export function* splitUpdates< ): Generator> { for (const change of changes) { if (change.type === `update`) { - yield { type: `delete`, key: change.key, value: change.previousValue! } - yield { type: `insert`, key: change.key, value: change.value } + yield { ...change, type: `delete`, value: change.previousValue! } + yield { ...change, type: `insert` } } else { yield change } diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index 6087e234e..d6016c1f0 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -324,6 +324,9 @@ export type SyncConfigRes = { loadSubset?: LoadSubsetFn unloadSubset?: UnloadSubsetFn } + +export type CollectionCursor = string | number + export interface SyncConfig< T extends object = Record, TKey extends string | number = string | number, @@ -386,6 +389,7 @@ export interface ChangeMessage< value: T previousValue?: T type: OperationType + cursor?: CollectionCursor metadata?: Record } diff --git a/packages/db/tests/collection.test.ts b/packages/db/tests/collection.test.ts index c5d09039e..32dc41ce0 100644 --- a/packages/db/tests/collection.test.ts +++ b/packages/db/tests/collection.test.ts @@ -1586,6 +1586,57 @@ describe(`Collection`, () => { expect(collection._state.syncedMetadata.has(1)).toBe(false) }) + it(`should emit sync cursors through change subscriptions`, async () => { + let testSyncFunctions: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `sync-cursor-subscription-test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + write({ + type: `insert`, + value: { id: 1, value: `initial` }, + }) + commit() + markReady() + + testSyncFunctions = { begin, write, commit } + }, + }, + }) + + await collection.stateWhenReady() + + const observedChanges: Array> = + [] + const subscription = collection.subscribeChanges( + (changes) => { + observedChanges.push( + ...(changes as Array>), + ) + }, + { includeInitialState: false }, + ) + + const { begin, write, commit } = testSyncFunctions + begin() + write({ + type: `update`, + value: { id: 1, value: `updated` }, + cursor: `002`, + }) + commit() + + expect(observedChanges).toHaveLength(1) + expect(observedChanges[0]!.type).toBe(`update`) + expect(observedChanges[0]!.cursor).toBe(`002`) + + subscription.unsubscribe() + }) + it(`should treat row metadata as cleared after truncate within the same sync transaction`, async () => { let testSyncFunctions: any = null diff --git a/packages/db/tests/effect.test.ts b/packages/db/tests/effect.test.ts index 5c557087c..0cdcc4118 100644 --- a/packages/db/tests/effect.test.ts +++ b/packages/db/tests/effect.test.ts @@ -134,6 +134,34 @@ describe(`createEffect`, () => { await effect.dispose() }) + it(`should expose the sync cursor on emitted effect events`, async () => { + const users = createUsersCollection() + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + onEnter: collectEvents(events), + skipInitial: true, + }) + + await flushPromises() + + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 4, name: `Diana`, active: true }, + cursor: `004`, + }) + users.utils.commit() + + await flushPromises() + + expect(events).toHaveLength(1) + expect(events[0]!.cursor).toBe(`004`) + + await effect.dispose() + }) + it(`should fire 'exit' event when a row is deleted from source`, async () => { const users = createUsersCollection() const events: Array> = [] @@ -201,6 +229,607 @@ describe(`createEffect`, () => { await effect.dispose() }) + + it(`should suppress replay until startAfter and preserve earlier state`, async () => { + const users = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `cursor-gated-effect-users`, + getKey: (user) => user.id, + }), + ) + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + onBatch: collectBatchEvents(events), + startAfter: 2, + }) + + users.utils.markReady() + await flushPromises() + + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 1, name: `Alice`, active: true }, + cursor: 1, + }) + users.utils.commit() + + await flushPromises() + expect(events).toHaveLength(0) + + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 2, name: `Bob`, active: true }, + cursor: 2, + }) + users.utils.commit() + + await flushPromises() + expect(events).toHaveLength(0) + + users.utils.begin() + users.utils.write({ + type: `update`, + value: { id: 2, name: `Bob v2`, active: true }, + previousValue: { id: 2, name: `Bob`, active: true }, + cursor: 3, + }) + users.utils.commit() + + await flushPromises() + + expect(events).toHaveLength(1) + expect(events[0]!.type).toBe(`update`) + expect(events[0]!.cursor).toBe(3) + if (events[0]!.type !== `update`) { + throw new Error(`Expected update event`) + } + expect(events[0]!.previousValue.name).toBe(`Bob`) + expect(events[0]!.value.name).toBe(`Bob v2`) + + await effect.dispose() + }) + + it(`should not crash sibling subscribers when cursor types are mixed`, async () => { + const users = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `cursor-type-mismatch-users`, + getKey: (user) => user.id, + }), + ) + + const siblingEvents: Array> = [] + const errorEvents: Array = [] + + // Sibling effect — should keep working + const siblingEffect = createEffect({ + query: (q) => q.from({ user: users }), + onBatch: collectBatchEvents(siblingEvents), + }) + + // Effect with string startAfter — will receive numeric cursors + const brokenEffect = createEffect({ + query: (q) => q.from({ user: users }), + onBatch: () => {}, + startAfter: `abc`, + onSourceError: (err) => errorEvents.push(err), + }) + + users.utils.markReady() + await flushPromises() + + // Send a change with a numeric cursor — mismatches the string startAfter + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 1, name: `Alice`, active: true }, + cursor: 1, + }) + users.utils.commit() + + await flushPromises() + + // The broken effect should have errored gracefully, not crashed the pipeline + expect(siblingEvents).toHaveLength(1) + expect(siblingEvents[0]!.type).toBe(`enter`) + + await siblingEffect.dispose() + await brokenEffect.dispose() + }) + + it(`should eventually open cursor gate when cursor-less changes follow cursored replay`, async () => { + const users = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `cursorless-after-replay-users`, + getKey: (user) => user.id, + }), + ) + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + onBatch: collectBatchEvents(events), + startAfter: 1, + }) + + users.utils.markReady() + await flushPromises() + + // Replay change at cursor 1 — should be suppressed + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 1, name: `Alice`, active: true }, + cursor: 1, + }) + users.utils.commit() + await flushPromises() + expect(events).toHaveLength(0) + + // Now a live change arrives WITHOUT a cursor + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 2, name: `Bob`, active: true }, + }) + users.utils.commit() + await flushPromises() + + // Should the gate open for cursor-less changes? Currently it won't. + // This test documents whether cursor-less changes after replay are emitted. + expect(events).toHaveLength(1) + expect(events[0]!.type).toBe(`enter`) + + await effect.dispose() + }) + + it(`should preserve cursor when a cursor-less update follows a cursored insert in same transaction`, async () => { + let testSyncFunctions: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `cursor-erasure-test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + write({ + type: `insert`, + value: { id: 1, value: `initial` }, + }) + commit() + markReady() + testSyncFunctions = { begin, write, commit } + }, + }, + }) + + await collection.stateWhenReady() + + const observedChanges: Array = [] + const subscription = collection.subscribeChanges( + (changes) => { + observedChanges.push(...changes) + }, + { includeInitialState: false }, + ) + + const { begin, write, commit } = testSyncFunctions + + // Insert with cursor, then update same key without cursor in same tx + begin() + write({ + type: `insert`, + value: { id: 2, value: `new` }, + cursor: `005`, + }) + write({ + type: `update`, + value: { id: 2, value: `updated` }, + }) + commit() + + // The cursor from the insert should not be erased by the cursor-less update + const cursoredChanges = observedChanges.filter( + (c: any) => c.cursor !== undefined, + ) + expect(cursoredChanges.length).toBeGreaterThan(0) + + subscription.unsubscribe() + }) + + it(`should emit uncursored changes in a batch that also contains a gate-opening cursor`, async () => { + const users = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `mixed-cursor-batch-users`, + getKey: (user) => user.id, + }), + ) + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + onBatch: collectBatchEvents(events), + startAfter: 2, + }) + + users.utils.markReady() + await flushPromises() + + // Single transaction: uncursored insert first, then a cursored insert past the gate + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 1, name: `Alice`, active: true }, + // no cursor + }) + users.utils.write({ + type: `insert`, + value: { id: 2, name: `Bob`, active: true }, + cursor: 3, + }) + users.utils.commit() + await flushPromises() + + // Both rows should emit — the uncursored change has no position in cursor + // space, so once the gate opens it should not be suppressed + expect(events).toHaveLength(2) + + await effect.dispose() + }) + + it(`should stamp each DeltaEvent with the batch-level max cursor`, async () => { + const users = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `batch-cursor-stamp-users`, + getKey: (user) => user.id, + }), + ) + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + onBatch: collectBatchEvents(events), + }) + + users.utils.markReady() + await flushPromises() + events.length = 0 + + // Single transaction with two inserts at different cursors + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 10, name: `First`, active: true }, + cursor: 3, + }) + users.utils.write({ + type: `insert`, + value: { id: 11, name: `Second`, active: true }, + cursor: 4, + }) + users.utils.commit() + await flushPromises() + + // Batch cursor should be the max (4) on all events — this is by-design + expect(events).toHaveLength(2) + expect(events[0]!.cursor).toBe(4) + expect(events[1]!.cursor).toBe(4) + + await effect.dispose() + }) + + it(`should handle a batch that straddles the startAfter boundary`, async () => { + const users = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `straddling-batch-users`, + getKey: (user) => user.id, + }), + ) + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + onBatch: collectBatchEvents(events), + startAfter: 2, + }) + + users.utils.markReady() + await flushPromises() + + // Single transaction with changes on both sides of the boundary + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 1, name: `Alice`, active: true }, + cursor: 1, + }) + users.utils.write({ + type: `insert`, + value: { id: 2, name: `Bob`, active: true }, + cursor: 2, + }) + users.utils.write({ + type: `insert`, + value: { id: 3, name: `Charlie`, active: true }, + cursor: 3, + }) + users.utils.commit() + await flushPromises() + + // Only cursor 3 should produce an event; cursors 1 & 2 are replay + expect(events).toHaveLength(1) + expect(events[0]!.type).toBe(`enter`) + expect(events[0]!.value.name).toBe(`Charlie`) + expect(events[0]!.cursor).toBe(3) + + await effect.dispose() + }) + + it(`should reject non-monotonic cursor sequences within a batch`, async () => { + const users = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `non-monotonic-cursor-users`, + getKey: (user) => user.id, + }), + ) + const errors: Array = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + onBatch: () => {}, + startAfter: 2, + onSourceError: (err) => errors.push(err), + }) + + users.utils.markReady() + await flushPromises() + + // Non-monotonic batch: [cursor 1, cursor 3, cursor 2] + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 1, name: `Alice`, active: true }, + cursor: 1, + }) + users.utils.write({ + type: `insert`, + value: { id: 2, name: `Bob`, active: true }, + cursor: 3, + }) + users.utils.write({ + type: `insert`, + value: { id: 3, name: `Charlie`, active: true }, + cursor: 2, + }) + users.utils.commit() + await flushPromises() + + expect(errors).toHaveLength(1) + expect(errors[0]!.message).toMatch(/monotonically/) + + await effect.dispose() + }) + + it(`should throw when a scalar startAfter is used with multi-source effects`, () => { + const users = createUsersCollection() + const issues = createIssuesCollection() + + expect(() => { + createEffect({ + query: (q) => + q + .from({ issue: issues }) + .join({ user: users }, ({ issue, user }) => + eq(issue.userId, user.id), + ), + onBatch: () => {}, + startAfter: 1, + }) + }).toThrow(/scalar startAfter/) + }) + + it(`should support per-source startAfter with Record for join queries`, async () => { + const users = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `join-cursor-users`, + getKey: (user) => user.id, + }), + ) + const issues = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `join-cursor-issues`, + getKey: (issue) => issue.id, + }), + ) + const events: Array> = [] + + const effect = createEffect({ + query: (q) => + q + .from({ issue: issues }) + .join({ user: users }, ({ issue, user }) => + eq(issue.userId, user.id), + ), + onBatch: collectBatchEvents(events), + startAfter: { issue: 2, user: 3 }, + }) + + users.utils.markReady() + issues.utils.markReady() + await flushPromises() + + // Replay: both sources send data at or before their respective cursors + issues.utils.begin() + issues.utils.write({ + type: `insert`, + value: { id: 1, title: `Bug report`, userId: 1 }, + cursor: 1, + }) + issues.utils.write({ + type: `insert`, + value: { id: 2, title: `Feature request`, userId: 2 }, + cursor: 2, + }) + issues.utils.commit() + await flushPromises() + + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 1, name: `Alice`, active: true }, + cursor: 2, + }) + users.utils.write({ + type: `insert`, + value: { id: 2, name: `Bob`, active: true }, + cursor: 3, + }) + users.utils.commit() + await flushPromises() + + // No events yet — both gates are still closed + expect(events).toHaveLength(0) + + // Now issue cursor 3 arrives (past issue gate boundary of 2) + issues.utils.begin() + issues.utils.write({ + type: `insert`, + value: { id: 3, title: `New issue`, userId: 1 }, + cursor: 3, + }) + issues.utils.commit() + await flushPromises() + + // Issue gate opened — events should fire (Alice is already hydrated via join) + expect(events.length).toBeGreaterThan(0) + const issueEvent = events.find( + (e) => e.type === `enter` && e.value?.issue?.title === `New issue`, + ) + expect(issueEvent).toBeDefined() + expect(issueEvent!.triggeringSource).toBe(`issue`) + expect(issueEvent!.cursors).toBeDefined() + expect(issueEvent!.cursors!.issue).toBe(3) + + await effect.dispose() + }) + + it(`should emit events from ungated source while gated source replays`, async () => { + const users = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `partial-gate-users`, + getKey: (user) => user.id, + }), + ) + const issues = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `partial-gate-issues`, + getKey: (issue) => issue.id, + }), + ) + const events: Array> = [] + + // Only gate the issue source; user source has no gate + const effect = createEffect({ + query: (q) => + q + .from({ issue: issues }) + .join({ user: users }, ({ issue, user }) => + eq(issue.userId, user.id), + ), + onBatch: collectBatchEvents(events), + startAfter: { issue: 5 }, + }) + + users.utils.markReady() + issues.utils.markReady() + await flushPromises() + + // Hydrate user data (no gate on users — always live) + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 1, name: `Alice`, active: true }, + }) + users.utils.commit() + await flushPromises() + + // User has no gate, so changes from user source are live + // But there are no issues yet to join with, so no join results + events.length = 0 + + // Replay issues (gate closed — cursor <= 5) + issues.utils.begin() + issues.utils.write({ + type: `insert`, + value: { id: 1, title: `Old issue`, userId: 1 }, + cursor: 4, + }) + issues.utils.commit() + await flushPromises() + + // Issue gate is still closed — no events + expect(events).toHaveLength(0) + + // Live issue arrives (cursor > 5) + issues.utils.begin() + issues.utils.write({ + type: `insert`, + value: { id: 2, title: `New issue`, userId: 1 }, + cursor: 6, + }) + issues.utils.commit() + await flushPromises() + + // Gate opens — should see the new issue joined with Alice + const enterEvents = events.filter((e) => e.type === `enter`) + expect(enterEvents.length).toBeGreaterThan(0) + const newIssueEvent = enterEvents.find( + (e) => e.value?.issue?.title === `New issue`, + ) + expect(newIssueEvent).toBeDefined() + + await effect.dispose() + }) + + it(`should include triggeringSource and cursors on DeltaEvents for gated effects`, async () => { + const users = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `triggering-source-users`, + getKey: (user) => user.id, + }), + ) + const events: Array> = [] + + const effect = createEffect({ + query: (q) => q.from({ user: users }), + onBatch: collectBatchEvents(events), + startAfter: 0, + }) + + users.utils.markReady() + await flushPromises() + + // Send a change past the gate + users.utils.begin() + users.utils.write({ + type: `insert`, + value: { id: 1, name: `Alice`, active: true }, + cursor: 1, + }) + users.utils.commit() + await flushPromises() + + expect(events).toHaveLength(1) + expect(events[0]!.triggeringSource).toBe(`user`) + expect(events[0]!.cursors).toEqual({ user: 1 }) + expect(events[0]!.cursor).toBe(1) + + await effect.dispose() + }) }) describe(`filtered queries`, () => { diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index ae398ed17..4c636b076 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -1459,6 +1459,7 @@ function createElectricSync>( write({ type: isDuplicateInsert ? `update` : operation, value: changeMessage.value, + cursor: stream.lastOffset, // Include the primary key and relation info in the metadata metadata: { ...changeMessage.headers, diff --git a/packages/react-db/src/useLiveQueryEffect.ts b/packages/react-db/src/useLiveQueryEffect.ts index ab39558e6..0e8a4523e 100644 --- a/packages/react-db/src/useLiveQueryEffect.ts +++ b/packages/react-db/src/useLiveQueryEffect.ts @@ -39,6 +39,7 @@ export function useLiveQueryEffect< id: config.id, query: config.query, skipInitial: config.skipInitial, + startAfter: config.startAfter, onEnter: (event, ctx) => configRef.current.onEnter?.(event, ctx), onUpdate: (event, ctx) => configRef.current.onUpdate?.(event, ctx), onExit: (event, ctx) => configRef.current.onExit?.(event, ctx),