Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/cursor-aware-effect-replay.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@tanstack/db': patch
'@tanstack/react-db': patch
---

Add cursor-aware effect replay via `startAfter` option on `createEffect` and `useLiveQueryEffect`. Sync writes can now carry an opaque sortable cursor that propagates through `ChangeMessage` and `DeltaEvent`, enabling effects to suppress callbacks during historical replay while still hydrating internal query state. `startAfter` accepts either a scalar cursor (single-source) or a `Record<string, CollectionCursor>` for per-source gating in join queries. `DeltaEvent` now includes `triggeringSource` and `cursors` fields for gated effects.
21 changes: 21 additions & 0 deletions packages/db/src/collection/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type { StandardSchemaV1 } from '@standard-schema/spec'
import type {
ChangeMessage,
CollectionConfig,
CollectionCursor,
OptimisticChangeMessage,
} from '../types'
import type { CollectionImpl } from './index.js'
Expand All @@ -28,6 +29,7 @@ interface PendingSyncedTransaction<
operations: Array<OptimisticChangeMessage<T>>
truncate?: boolean
deletedKeys: Set<string | number>
rowCursorWrites: Map<TKey, PendingCursorWrite>
rowMetadataWrites: Map<TKey, PendingMetadataWrite>
collectionMetadataWrites: Map<string, PendingMetadataWrite>
optimisticSnapshot?: {
Expand All @@ -42,6 +44,10 @@ interface PendingSyncedTransaction<
immediate?: boolean
}

type PendingCursorWrite =
| { type: `set`; value: CollectionCursor }
| { type: `delete` }

type PendingMetadataWrite = { type: `set`; value: unknown } | { type: `delete` }

type InternalChangeMessage<
Expand Down Expand Up @@ -313,6 +319,7 @@ export class CollectionStateManager<
type: change.type,
value: enrichedValue,
previousValue: enrichedPreviousValue,
cursor: change.cursor,
metadata: change.metadata,
} as ChangeMessage<WithVirtualProps<TOutput, TKey>, TKey>
}
Expand Down Expand Up @@ -895,6 +902,7 @@ export class CollectionStateManager<
}

const events: Array<ChangeMessage<TOutput, TKey>> = []
const eventCursors = new Map<TKey, CollectionCursor>()
const rowUpdateMode = this.config.sync.rowUpdateMode || `partial`
const completedOptimisticOps = new Map<
TKey,
Expand Down Expand Up @@ -1034,6 +1042,14 @@ export class CollectionStateManager<
this.syncedMetadata.set(key, metadataWrite.value)
}

for (const [key, cursorWrite] of transaction.rowCursorWrites) {
if (cursorWrite.type === `delete`) {
eventCursors.delete(key)
continue
}
eventCursors.set(key, cursorWrite.value)
}

for (const [
key,
metadataWrite,
Expand Down Expand Up @@ -1162,6 +1178,7 @@ export class CollectionStateManager<

// Now check what actually changed in the final visible state
for (const key of changedKeys) {
const cursor = eventCursors.get(key)
const previousVisibleValue = currentVisibleState.get(key)
const newVisibleValue = this.get(key) // This returns the new derived state
const previousVirtualProps = this.getVirtualPropsSnapshotForState(key, {
Expand Down Expand Up @@ -1235,12 +1252,14 @@ export class CollectionStateManager<
key,
value: newVisibleValue,
previousValue: previousValueWithVirtualFromCompleted,
cursor,
})
} else {
events.push({
type: `insert`,
key,
value: newVisibleValue,
cursor,
})
}
} else if (
Expand All @@ -1251,6 +1270,7 @@ export class CollectionStateManager<
type: `delete`,
key,
value: previousValueWithVirtual ?? previousVisibleValue,
cursor,
})
} else if (
previousVisibleValue !== undefined &&
Expand All @@ -1263,6 +1283,7 @@ export class CollectionStateManager<
key,
value: newVisibleValue,
previousValue: previousValueWithVirtual ?? previousVisibleValue,
cursor,
})
}
}
Expand Down
9 changes: 9 additions & 0 deletions packages/db/src/collection/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ export class CollectionSyncManager<
committed: false,
operations: [],
deletedKeys: new Set(),
rowCursorWrites: new Map(),
rowMetadataWrites: new Map(),
collectionMetadataWrites: new Map(),
immediate: options?.immediate,
Expand Down Expand Up @@ -190,6 +191,13 @@ export class CollectionSyncManager<
value: message.metadata,
})
}

if (message.cursor !== undefined) {
pendingTransaction.rowCursorWrites.set(key, {
type: `set`,
value: message.cursor,
})
}
},
commit: () => {
const pendingTransaction =
Expand Down Expand Up @@ -225,6 +233,7 @@ export class CollectionSyncManager<
// Clear all operations from the current transaction
pendingTransaction.operations = []
pendingTransaction.deletedKeys.clear()
pendingTransaction.rowCursorWrites.clear()
pendingTransaction.rowMetadataWrites.clear()
// Intentionally preserve collectionMetadataWrites across truncate.
// Collection-scoped metadata (for example persisted resume/reset
Expand Down
Loading
Loading