diff --git a/packages/db-sqlite-persisted-collection-core/src/persisted.ts b/packages/db-sqlite-persisted-collection-core/src/persisted.ts index 1691b19cd..ba4c0cfde 100644 --- a/packages/db-sqlite-persisted-collection-core/src/persisted.ts +++ b/packages/db-sqlite-persisted-collection-core/src/persisted.ts @@ -19,6 +19,7 @@ import type { PendingMutation, SyncConfig, SyncConfigRes, + SyncMetadataApi, UpdateMutationFnParams, UtilsRecord, } from '@tanstack/db' @@ -77,6 +78,10 @@ export type TxCommitted = { value: Record }> deletedKeys: Array + rowMetadataMutations?: Array< + PersistedRowMetadataMutation + > + collectionMetadataMutations?: Array } ) @@ -150,6 +155,9 @@ export type PullSinceResponse = requiresFullReload: false changedKeys: Array deletedKeys: Array + deltas?: Array< + ReplayableTxDelta, string | number> + > } | { type: `rpc:pullSince:res` @@ -170,6 +178,39 @@ export interface PersistedIndexSpec { readonly metadata?: Readonly> } +export type PersistedRowMetadataMutation< + TKey extends string | number = string | number, +> = { type: `set`; key: TKey; value: unknown } | { type: `delete`; key: TKey } + +export type PersistedCollectionMetadataMutation = + | { type: `set`; key: string; value: unknown } + | { type: `delete`; key: string } + +export type ReplayableTxDelta< + T extends Record = Record, + TKey extends string | number = string | number, +> = { + txId: string + latestRowVersion: number + changedRows: Array<{ key: TKey; value: T }> + deletedKeys: Array + rowMetadataMutations: Array> + collectionMetadataMutations: Array +} + +export type PersistedScannedRow< + T extends object, + TKey extends string | number = string | number, +> = { + key: TKey + value: T + metadata?: unknown +} + +export type PersistedRowScanOptions = { + metadataOnly?: boolean +} + export type PersistedTx< T extends object, TKey extends string | number = string | number, @@ -178,11 +219,26 @@ export type PersistedTx< term: number seq: number rowVersion: number + truncate?: boolean mutations: Array< - | { type: `insert`; key: TKey; value: T } - | { type: `update`; key: TKey; value: T } + | { + type: `insert` + key: TKey + value: T + metadata?: unknown + metadataChanged?: boolean + } + | { + type: `update` + key: TKey + value: T + metadata?: unknown + metadataChanged?: boolean + } | { type: `delete`; key: TKey; value: T } > + rowMetadataMutations?: Array> + collectionMetadataMutations?: Array } export interface PersistenceAdapter< @@ -193,11 +249,18 @@ export interface PersistenceAdapter< collectionId: string, options: LoadSubsetOptions, ctx?: { requiredIndexSignatures?: ReadonlyArray }, - ) => Promise> + ) => Promise> applyCommittedTx: ( collectionId: string, tx: PersistedTx, ) => Promise + loadCollectionMetadata?: ( + collectionId: string, + ) => Promise> + scanRows?: ( + collectionId: string, + options?: PersistedRowScanOptions, + ) => Promise>> ensureIndex: ( collectionId: string, signature: string, @@ -366,13 +429,14 @@ type SyncControlFns = { write: | (( message: - | { type: `insert`; value: T } - | { type: `update`; value: T } + | { type: `insert`; value: T; metadata?: Record } + | { type: `update`; value: T; metadata?: Record } | { type: `delete`; key: TKey }, ) => void) | null commit: (() => void) | null truncate: (() => void) | null + metadata: SyncMetadataApi | null } /** @@ -417,6 +481,7 @@ export class SingleProcessCoordinator implements PersistedCollectionCoordinator requiresFullReload: false, changedKeys: [], deletedKeys: [], + deltas: [], }) } } @@ -511,6 +576,7 @@ type NormalizedSyncOperation = type: `update` key: TKey value: T + metadata?: Record } | { type: `delete` @@ -520,6 +586,14 @@ type NormalizedSyncOperation = type BufferedSyncTransaction = { operations: Array> + rowMetadataWrites: Map< + TKey, + { type: `set`; value: unknown } | { type: `delete` } + > + collectionMetadataWrites: Map< + string, + { type: `set`; value: unknown } | { type: `delete` } + > truncate: boolean internal: boolean } @@ -536,6 +610,7 @@ type SyncWriteNormalization = { | { type: `update` value: T + metadata?: Record } | { type: `delete` @@ -680,7 +755,12 @@ function isTxCommittedPayload(payload: unknown): payload is TxCommitted { } return ( - Array.isArray(payload.changedRows) && Array.isArray(payload.deletedKeys) + Array.isArray(payload.changedRows) && + Array.isArray(payload.deletedKeys) && + (payload.rowMetadataMutations === undefined || + Array.isArray(payload.rowMetadataMutations)) && + (payload.collectionMetadataMutations === undefined || + Array.isArray(payload.collectionMetadataMutations)) ) } @@ -735,8 +815,10 @@ class PersistedCollectionRuntime< write: null, commit: null, truncate: null, + metadata: null, } private started = false + private startupMetadataPromise: Promise | null = null private startPromise: Promise | null = null private internalApplyDepth = 0 private isHydrating = false @@ -771,6 +853,7 @@ class PersistedCollectionRuntime< write: null, commit: null, truncate: null, + metadata: null, } } @@ -809,6 +892,15 @@ class PersistedCollectionRuntime< return this.startPromise } + async ensureStartupMetadataLoaded(): Promise { + if (this.startupMetadataPromise) { + return this.startupMetadataPromise + } + + this.startupMetadataPromise = this.loadStartupMetadataInternal() + return this.startupMetadataPromise + } + private async startInternal(): Promise { if (this.started) { return @@ -816,6 +908,21 @@ class PersistedCollectionRuntime< this.started = true + await this.ensureStartupMetadataLoaded() + + const indexBootstrapSnapshot = this.collection?.getIndexMetadata() ?? [] + this.attachIndexLifecycleListeners() + await this.bootstrapPersistedIndexes(indexBootstrapSnapshot) + + if (this.syncMode !== `on-demand`) { + this.activeSubsets.set(this.getSubsetKey({}), {}) + await this.applyMutex.run(() => + this.hydrateSubsetUnsafe({}, { requestRemoteEnsure: false }), + ) + } + } + + private async loadStartupMetadataInternal(): Promise { // Restore stream position from the database so that new mutations // don't collide with previously applied transactions. if (this.persistence.adapter.getStreamPosition) { @@ -829,16 +936,57 @@ class PersistedCollectionRuntime< ) } - const indexBootstrapSnapshot = this.collection?.getIndexMetadata() ?? [] - this.attachIndexLifecycleListeners() - await this.bootstrapPersistedIndexes(indexBootstrapSnapshot) + await this.loadCollectionMetadataIntoCollection() + } - if (this.syncMode !== `on-demand`) { - this.activeSubsets.set(this.getSubsetKey({}), {}) - await this.applyMutex.run(() => - this.hydrateSubsetUnsafe({}, { requestRemoteEnsure: false }), - ) + private async loadCollectionMetadataIntoCollection(): Promise { + const collectionMetadata = await this.loadCollectionMetadataSnapshot() + this.replaceCollectionMetadataSnapshot(collectionMetadata) + } + + private async loadCollectionMetadataSnapshot(): Promise< + Array<{ key: string; value: unknown }> + > { + if (!this.persistence.adapter.loadCollectionMetadata) { + return [] + } + + return this.persistence.adapter.loadCollectionMetadata(this.collectionId) + } + + private replaceCollectionMetadataSnapshot( + collectionMetadata: Array<{ key: string; value: unknown }>, + ): void { + if ( + !this.syncControls.begin || + !this.syncControls.commit || + !this.syncControls.metadata + ) { + return } + + const nextMetadata = new Map( + collectionMetadata.map(({ key, value }) => [key, value]), + ) + const currentKeys = this.syncControls.metadata.collection + .list() + .map(({ key }) => key) + + this.withInternalApply(() => { + this.syncControls.begin?.({ immediate: true }) + + currentKeys.forEach((key) => { + if (!nextMetadata.has(key)) { + this.syncControls.metadata?.collection.delete(key) + } + }) + + nextMetadata.forEach((value, key) => { + this.syncControls.metadata?.collection.set(key, value) + }) + + this.syncControls.commit?.() + }) } async loadSubset( @@ -951,11 +1099,13 @@ class PersistedCollectionRuntime< forwardMessage: { type: `update`, value: message.value, + metadata: message.metadata, }, operation: { type: `update`, key, value: message.value, + metadata: message.metadata, }, } } @@ -1042,12 +1192,28 @@ class PersistedCollectionRuntime< private loadSubsetRowsUnsafe( options: LoadSubsetOptions, - ): Promise> { + ): Promise> { return this.persistence.adapter.loadSubset(this.collectionId, options, { requiredIndexSignatures: this.getRequiredIndexSignatures(), }) } + private async scanPersistedRowsUnsafe( + options?: PersistedRowScanOptions, + ): Promise>> { + if (!this.persistence.adapter.scanRows) { + return [] + } + + return this.persistence.adapter.scanRows(this.collectionId, options) + } + + async scanPersistedRows( + options?: PersistedRowScanOptions, + ): Promise>> { + return this.applyMutex.run(() => this.scanPersistedRowsUnsafe(options)) + } + private async hydrateSubsetUnsafe( options: LoadSubsetOptions, config: { @@ -1071,7 +1237,9 @@ class PersistedCollectionRuntime< } } - private applyRowsToCollection(rows: Array<{ key: TKey; value: T }>): void { + private applyRowsToCollection( + rows: Array<{ key: TKey; value: T; metadata?: unknown }>, + ): void { if ( !this.syncControls.begin || !this.syncControls.write || @@ -1087,6 +1255,7 @@ class PersistedCollectionRuntime< this.syncControls.write?.({ type: `update`, value: row.value, + metadata: row.metadata as Record | undefined, }) } @@ -1094,15 +1263,26 @@ class PersistedCollectionRuntime< }) } - private replaceCollectionRows(rows: Array<{ key: TKey; value: T }>): void { + private replaceCollectionSnapshot( + rows: Array<{ key: TKey; value: T; metadata?: unknown }>, + collectionMetadata: Array<{ key: string; value: unknown }>, + ): void { if ( !this.syncControls.begin || !this.syncControls.write || - !this.syncControls.commit + !this.syncControls.commit || + !this.syncControls.metadata ) { return } + const nextMetadata = new Map( + collectionMetadata.map(({ key, value }) => [key, value]), + ) + const currentKeys = this.syncControls.metadata.collection + .list() + .map(({ key }) => key) + this.withInternalApply(() => { this.syncControls.begin?.({ immediate: true }) this.syncControls.truncate?.() @@ -1111,9 +1291,20 @@ class PersistedCollectionRuntime< this.syncControls.write?.({ type: `update`, value: row.value, + metadata: row.metadata as Record | undefined, }) } + currentKeys.forEach((key) => { + if (!nextMetadata.has(key)) { + this.syncControls.metadata?.collection.delete(key) + } + }) + + nextMetadata.forEach((value, key) => { + this.syncControls.metadata?.collection.set(key, value) + }) + this.syncControls.commit?.() }) } @@ -1156,10 +1347,27 @@ class PersistedCollectionRuntime< this.syncControls.write?.({ type: `update`, value: operation.value, + metadata: operation.metadata, }) } } + for (const [key, metadataWrite] of transaction.rowMetadataWrites) { + if (metadataWrite.type === `delete`) { + this.syncControls.metadata?.row.delete(key) + } else { + this.syncControls.metadata?.row.set(key, metadataWrite.value) + } + } + + for (const [key, metadataWrite] of transaction.collectionMetadataWrites) { + if (metadataWrite.type === `delete`) { + this.syncControls.metadata?.collection.delete(key) + } else { + this.syncControls.metadata?.collection.set(key, metadataWrite.value) + } + } + this.syncControls.commit?.() } @@ -1181,7 +1389,12 @@ class PersistedCollectionRuntime< const streamPosition = this.nextLocalStreamPosition() - if (transaction.truncate || transaction.operations.length === 0) { + if ( + !transaction.truncate && + transaction.operations.length === 0 && + transaction.rowMetadataWrites.size === 0 && + transaction.collectionMetadataWrites.size === 0 + ) { this.publishTxCommittedEvent( this.createTxCommittedPayload({ term: streamPosition.term, @@ -1196,10 +1409,7 @@ class PersistedCollectionRuntime< return } - const tx = this.createPersistedTxFromOperations( - transaction.operations, - streamPosition, - ) + const tx = this.createPersistedTxFromOperations(transaction, streamPosition) await this.persistence.adapter.applyCommittedTx(this.collectionId, tx) this.publishTxCommittedEvent( @@ -1208,18 +1418,21 @@ class PersistedCollectionRuntime< seq: tx.seq, txId: tx.txId, latestRowVersion: tx.rowVersion, + requiresFullReload: transaction.truncate, changedRows: transaction.operations .filter((operation) => operation.type === `update`) .map((operation) => ({ key: operation.key, value: operation.value })), deletedKeys: transaction.operations .filter((operation) => operation.type === `delete`) .map((operation) => operation.key), + rowMetadataMutations: tx.rowMetadataMutations, + collectionMetadataMutations: tx.collectionMetadataMutations, }), ) } private createPersistedTxFromOperations( - operations: Array>, + transaction: BufferedSyncTransaction, streamPosition: { term: number; seq: number; rowVersion: number }, ): PersistedTx { return { @@ -1227,7 +1440,8 @@ class PersistedCollectionRuntime< term: streamPosition.term, seq: streamPosition.seq, rowVersion: streamPosition.rowVersion, - mutations: operations.map((operation) => + truncate: transaction.truncate, + mutations: transaction.operations.map((operation) => operation.type === `update` ? { type: `update`, @@ -1240,6 +1454,20 @@ class PersistedCollectionRuntime< value: operation.value, }, ), + rowMetadataMutations: Array.from( + transaction.rowMetadataWrites.entries(), + ).map(([key, metadataWrite]) => + metadataWrite.type === `delete` + ? { type: `delete`, key } + : { type: `set`, key, value: metadataWrite.value }, + ), + collectionMetadataMutations: Array.from( + transaction.collectionMetadataWrites.entries(), + ).map(([key, metadataWrite]) => + metadataWrite.type === `delete` + ? { type: `delete`, key } + : { type: `set`, key, value: metadataWrite.value }, + ), } } @@ -1396,6 +1624,8 @@ class PersistedCollectionRuntime< deletedKeys: mutations .filter((mutation) => mutation.type === `delete`) .map((mutation) => mutation.key as TKey), + rowMetadataMutations: tx.rowMetadataMutations, + collectionMetadataMutations: tx.collectionMetadataMutations, }), ) @@ -1409,11 +1639,19 @@ class PersistedCollectionRuntime< latestRowVersion: number changedRows: Array<{ key: TKey; value: T }> deletedKeys: Array + rowMetadataMutations?: Array> + collectionMetadataMutations?: Array + hasMetadataChanges?: boolean requiresFullReload?: boolean }): TxCommitted { + const rowMetadataMutations = args.rowMetadataMutations ?? [] + const collectionMetadataMutations = args.collectionMetadataMutations ?? [] const requiresFullReload = args.requiresFullReload === true || - args.changedRows.length + args.deletedKeys.length > + args.changedRows.length + + args.deletedKeys.length + + rowMetadataMutations.length + + collectionMetadataMutations.length > TARGETED_INVALIDATION_KEY_LIMIT if (requiresFullReload) { @@ -1439,6 +1677,10 @@ class PersistedCollectionRuntime< value: Record }>, deletedKeys: args.deletedKeys, + rowMetadataMutations: rowMetadataMutations as Array< + PersistedRowMetadataMutation + >, + collectionMetadataMutations, } } @@ -1667,6 +1909,13 @@ class PersistedCollectionRuntime< if (hasGap) { await this.recoverFromSeqGapUnsafe() + if ( + txCommitted.term < this.latestTerm || + (txCommitted.term === this.latestTerm && + txCommitted.seq <= this.latestSeq) + ) { + return + } } this.observeStreamPosition( @@ -1692,7 +1941,25 @@ class PersistedCollectionRuntime< pullResponse.latestSeq, pullResponse.latestRowVersion, ) - await this.reloadActiveSubsetsUnsafe() + if (pullResponse.requiresFullReload || !pullResponse.deltas) { + await this.reloadActiveSubsetsUnsafe() + return + } + + for (const delta of pullResponse.deltas) { + await this.invalidateFromCommittedTxUnsafe({ + type: `tx:committed`, + term: pullResponse.latestTerm, + seq: pullResponse.latestSeq, + txId: delta.txId, + latestRowVersion: delta.latestRowVersion, + requiresFullReload: false, + changedRows: delta.changedRows, + deletedKeys: delta.deletedKeys, + rowMetadataMutations: delta.rowMetadataMutations, + collectionMetadataMutations: delta.collectionMetadataMutations, + }) + } return } } catch (error) { @@ -1740,7 +2007,7 @@ class PersistedCollectionRuntime< (opt) => opt.limit != null || opt.offset != null || opt.cursor != null, ) - if (!hasPaginatedSubset) { + if (!hasPaginatedSubset || changedKeyCount === 0) { await this.applyTargetedInvalidationUnsafe(txCommitted) return } @@ -1780,6 +2047,28 @@ class PersistedCollectionRuntime< this.syncControls.write?.({ type: `delete`, key: deletedKey as TKey }) } + txCommitted.rowMetadataMutations?.forEach((mutation) => { + if (mutation.type === `delete`) { + this.syncControls.metadata?.row.delete(mutation.key as TKey) + } else { + this.syncControls.metadata?.row.set( + mutation.key as TKey, + mutation.value, + ) + } + }) + + txCommitted.collectionMetadataMutations?.forEach((mutation) => { + if (mutation.type === `delete`) { + this.syncControls.metadata?.collection.delete(mutation.key) + } else { + this.syncControls.metadata?.collection.set( + mutation.key, + mutation.value, + ) + } + }) + this.syncControls.commit?.() }) } @@ -1792,19 +2081,25 @@ class PersistedCollectionRuntime< this.isHydrating = true try { - const mergedRows = new Map() + const mergedRows = new Map() + const collectionMetadata = await this.loadCollectionMetadataSnapshot() for (const options of activeSubsetOptions) { const subsetRows = await this.loadSubsetRowsUnsafe(options) for (const row of subsetRows) { - mergedRows.set(row.key, row.value) + mergedRows.set(row.key, { + value: row.value, + metadata: row.metadata, + }) } } - this.replaceCollectionRows( - Array.from(mergedRows.entries()).map(([key, value]) => ({ + this.replaceCollectionSnapshot( + Array.from(mergedRows.entries()).map(([key, row]) => ({ key, - value, + value: row.value, + metadata: row.metadata, })), + collectionMetadata, ) } finally { this.isHydrating = false @@ -1920,11 +2215,16 @@ function createWrappedSyncConfig< ...sourceSyncConfig, sync: (params) => { const transactionStack: Array> = [] + const getOpenTransaction = () => + transactionStack[transactionStack.length - 1] + let fullStartPromise: Promise | null = null + const cancelledLoads = new WeakSet() runtime.setSyncControls({ begin: params.begin, write: params.write as SyncControlFns[`write`], commit: params.commit, truncate: params.truncate, + metadata: params.metadata ?? null, }) runtime.setCollection( params.collection as Collection, @@ -1933,22 +2233,15 @@ function createWrappedSyncConfig< const wrappedParams = { ...params, markReady: () => { - void runtime - .ensureStarted() - .then(() => { - params.markReady() - }) - .catch((error) => { - console.warn( - `Failed persisted sync startup before markReady:`, - error, - ) - params.markReady() - }) + void (fullStartPromise ?? runtime.ensureStarted()).then(() => { + params.markReady() + }) }, begin: (options?: { immediate?: boolean }) => { const transaction: OpenSyncTransaction = { operations: [], + rowMetadataWrites: new Map(), + collectionMetadataWrites: new Map(), truncate: false, internal: runtime.isApplyingInternally(), queuedBecauseHydrating: @@ -1962,7 +2255,7 @@ function createWrappedSyncConfig< }, write: (message: ChangeMessageOrDeleteKeyMessage) => { const normalization = runtime.normalizeSyncWriteMessage(message) - const openTransaction = transactionStack[transactionStack.length - 1] + const openTransaction = getOpenTransaction() if (!openTransaction) { params.write(normalization.forwardMessage) @@ -1970,17 +2263,156 @@ function createWrappedSyncConfig< } openTransaction.operations.push(normalization.operation) + if (normalization.operation.type === `delete`) { + openTransaction.rowMetadataWrites.set(normalization.operation.key, { + type: `delete`, + }) + } else if ( + message.type === `insert` && + normalization.operation.metadata === undefined + ) { + openTransaction.rowMetadataWrites.set(normalization.operation.key, { + type: `delete`, + }) + } else if (normalization.operation.metadata !== undefined) { + openTransaction.rowMetadataWrites.set(normalization.operation.key, { + type: `set`, + value: normalization.operation.metadata, + }) + } if (!openTransaction.queuedBecauseHydrating) { params.write(normalization.forwardMessage) } }, + metadata: params.metadata + ? { + row: { + get: (key: TKey) => { + const openTransaction = getOpenTransaction() + const pendingWrite = + openTransaction?.rowMetadataWrites.get(key) + if (pendingWrite) { + return pendingWrite.type === `delete` + ? undefined + : pendingWrite.value + } + if (openTransaction?.truncate) { + return undefined + } + return params.metadata!.row.get(key) + }, + scanPersisted: (options?: PersistedRowScanOptions) => + runtime.scanPersistedRows(options), + set: (key: TKey, value: unknown) => { + const openTransaction = getOpenTransaction() + if (!openTransaction) { + throw new InvalidPersistedCollectionConfigError( + `metadata.row.set must be called within an open sync transaction`, + ) + } + openTransaction.rowMetadataWrites.set(key, { + type: `set`, + value, + }) + if (!openTransaction.queuedBecauseHydrating) { + params.metadata!.row.set(key, value) + } + }, + delete: (key: TKey) => { + const openTransaction = getOpenTransaction() + if (!openTransaction) { + throw new InvalidPersistedCollectionConfigError( + `metadata.row.delete must be called within an open sync transaction`, + ) + } + openTransaction.rowMetadataWrites.set(key, { + type: `delete`, + }) + if (!openTransaction.queuedBecauseHydrating) { + params.metadata!.row.delete(key) + } + }, + }, + collection: { + get: (key: string) => { + const openTransaction = getOpenTransaction() + const pendingWrite = + openTransaction?.collectionMetadataWrites.get(key) + if (pendingWrite) { + return pendingWrite.type === `delete` + ? undefined + : pendingWrite.value + } + return params.metadata!.collection.get(key) + }, + set: (key: string, value: unknown) => { + const openTransaction = getOpenTransaction() + if (!openTransaction) { + throw new InvalidPersistedCollectionConfigError( + `metadata.collection.set must be called within an open sync transaction`, + ) + } + openTransaction.collectionMetadataWrites.set(key, { + type: `set`, + value, + }) + if (!openTransaction.queuedBecauseHydrating) { + params.metadata!.collection.set(key, value) + } + }, + delete: (key: string) => { + const openTransaction = getOpenTransaction() + if (!openTransaction) { + throw new InvalidPersistedCollectionConfigError( + `metadata.collection.delete must be called within an open sync transaction`, + ) + } + openTransaction.collectionMetadataWrites.set(key, { + type: `delete`, + }) + if (!openTransaction.queuedBecauseHydrating) { + params.metadata!.collection.delete(key) + } + }, + list: (prefix?: string) => { + const merged = new Map( + params + .metadata!.collection.list() + .map(({ key, value }) => [key, value]), + ) + const openTransaction = getOpenTransaction() + if (openTransaction) { + for (const [ + key, + metadataWrite, + ] of openTransaction.collectionMetadataWrites) { + if (metadataWrite.type === `delete`) { + merged.delete(key) + } else { + merged.set(key, metadataWrite.value) + } + } + } + + return Array.from(merged.entries()) + .filter(([key]) => (prefix ? key.startsWith(prefix) : true)) + .map(([key, value]) => ({ + key, + value, + })) + }, + }, + } + : undefined, truncate: () => { - const openTransaction = transactionStack[transactionStack.length - 1] + const openTransaction = getOpenTransaction() if (!openTransaction) { params.truncate() return } + openTransaction.operations = [] + openTransaction.rowMetadataWrites.clear() openTransaction.truncate = true if (!openTransaction.queuedBecauseHydrating) { params.truncate() @@ -1996,6 +2428,9 @@ function createWrappedSyncConfig< if (openTransaction.queuedBecauseHydrating) { runtime.queueHydrationBufferedTransaction({ operations: openTransaction.operations, + rowMetadataWrites: openTransaction.rowMetadataWrites, + collectionMetadataWrites: + openTransaction.collectionMetadataWrites, truncate: openTransaction.truncate, internal: openTransaction.internal, }) @@ -2007,6 +2442,9 @@ function createWrappedSyncConfig< void runtime .persistAndBroadcastExternalSyncTransaction({ operations: openTransaction.operations, + rowMetadataWrites: openTransaction.rowMetadataWrites, + collectionMetadataWrites: + openTransaction.collectionMetadataWrites, truncate: openTransaction.truncate, internal: false, }) @@ -2020,21 +2458,42 @@ function createWrappedSyncConfig< }, } - const sourceResult = normalizeSyncFnResult( - sourceSyncConfig.sync(wrappedParams), - ) - void runtime.ensureStarted() + let sourceResult: SyncConfigRes = {} + const startupState = { cleanedUp: false } + fullStartPromise = runtime.ensureStarted() + const sourceResultPromise = (async () => { + await runtime.ensureStartupMetadataLoaded() + + if (startupState.cleanedUp) { + return sourceResult + } + + sourceResult = normalizeSyncFnResult( + sourceSyncConfig.sync(wrappedParams), + ) + return sourceResult + })() return { cleanup: () => { + startupState.cleanedUp = true sourceResult.cleanup?.() runtime.cleanup() runtime.clearSyncControls() }, - loadSubset: (options: LoadSubsetOptions) => - runtime.loadSubset(options, sourceResult.loadSubset), - unloadSubset: (options: LoadSubsetOptions) => - runtime.unloadSubset(options, sourceResult.unloadSubset), + loadSubset: async (options: LoadSubsetOptions) => { + cancelledLoads.delete(options as object) + await fullStartPromise + const resolvedSourceResult = await sourceResultPromise + if (startupState.cleanedUp || cancelledLoads.has(options as object)) { + return + } + await runtime.loadSubset(options, resolvedSourceResult.loadSubset) + }, + unloadSubset: (options: LoadSubsetOptions) => { + cancelledLoads.add(options as object) + runtime.unloadSubset(options, sourceResult.unloadSubset) + }, } }, } @@ -2051,6 +2510,7 @@ function createLoopbackSyncConfig< write: params.write as SyncControlFns[`write`], commit: params.commit, truncate: params.truncate, + metadata: params.metadata ?? null, }) runtime.setCollection( params.collection as Collection, diff --git a/packages/db-sqlite-persisted-collection-core/src/sqlite-core-adapter.ts b/packages/db-sqlite-persisted-collection-core/src/sqlite-core-adapter.ts index 9f1662de1..78748e849 100644 --- a/packages/db-sqlite-persisted-collection-core/src/sqlite-core-adapter.ts +++ b/packages/db-sqlite-persisted-collection-core/src/sqlite-core-adapter.ts @@ -15,8 +15,11 @@ import { import type { LoadSubsetOptions } from '@tanstack/db' import type { PersistedIndexSpec, + PersistedRowScanOptions, + PersistedScannedRow, PersistedTx, PersistenceAdapter, + ReplayableTxDelta, SQLiteDriver, } from './persisted' @@ -37,6 +40,7 @@ type CompiledSqlFragment = { type StoredSqliteRow = { key: string value: string + metadata: string | null row_version: number } @@ -64,6 +68,7 @@ export type SQLitePullSinceResult = requiresFullReload: false changedKeys: Array deletedKeys: Array + deltas: Array, TKey>> } const DEFAULT_SCHEMA_VERSION = 1 @@ -588,9 +593,32 @@ function sanitizeExpressionSqlFragment(fragment: string): string { type InMemoryRow = { key: TKey value: T + metadata?: unknown rowVersion: number } +function decodeStoredSqliteRows( + storedRows: ReadonlyArray, +): Array> { + return storedRows.map((row) => { + const key = decodePersistedStorageKey(row.key) as TKey + const value = deserializePersistedRowValue(row.value) + return { + key, + value, + metadata: + row.metadata != null + ? deserializePersistedRowValue(row.metadata) + : undefined, + rowVersion: row.row_version, + } + }) +} + +function stableStringify(value: unknown): string { + return serializePersistedRowValue(value) +} + function compileSqlExpression( expression: IR.BasicExpression, ): CompiledSqlFragment { @@ -972,6 +1000,10 @@ export class SQLiteCorePersistenceAdapter< string, CollectionTableMapping >() + private readonly collectionTableLoads = new Map< + string, + Promise + >() constructor(options: SQLiteCoreAdapterOptions) { const schemaVersion = options.schemaVersion ?? DEFAULT_SCHEMA_VERSION @@ -1035,7 +1067,7 @@ export class SQLiteCorePersistenceAdapter< collectionId: string, options: LoadSubsetOptions, ctx?: { requiredIndexSignatures?: ReadonlyArray }, - ): Promise> { + ): Promise> { const tableMapping = await this.ensureCollectionReady(collectionId) await this.touchRequiredIndexes(collectionId, ctx?.requiredIndexSignatures) @@ -1072,6 +1104,7 @@ export class SQLiteCorePersistenceAdapter< return orderedRows.map((row) => ({ key: row.key, value: row.value, + metadata: row.metadata, })) } @@ -1079,6 +1112,7 @@ export class SQLiteCorePersistenceAdapter< return rows.map((row) => ({ key: row.key, value: row.value, + metadata: row.metadata, })) } @@ -1114,6 +1148,31 @@ export class SQLiteCorePersistenceAdapter< ) const currentRowVersion = versionRows[0]?.latest_row_version ?? 0 const nextRowVersion = Math.max(currentRowVersion + 1, tx.rowVersion) + const replayDelta: ReplayableTxDelta< + Record, + TKey + > | null = tx.truncate + ? null + : { + txId: tx.txId, + latestRowVersion: nextRowVersion, + changedRows: tx.mutations + .filter((mutation) => mutation.type !== `delete`) + .map((mutation) => ({ + key: mutation.key, + value: mutation.value as Record, + })), + deletedKeys: tx.mutations + .filter((mutation) => mutation.type === `delete`) + .map((mutation) => mutation.key), + rowMetadataMutations: tx.rowMetadataMutations ?? [], + collectionMetadataMutations: tx.collectionMetadataMutations ?? [], + } + + if (tx.truncate) { + await transactionDriver.run(`DELETE FROM ${collectionTableSql}`) + await transactionDriver.run(`DELETE FROM ${tombstoneTableSql}`) + } for (const mutation of tx.mutations) { const encodedKey = encodePersistedStorageKey(mutation.key) @@ -1140,8 +1199,11 @@ export class SQLiteCorePersistenceAdapter< continue } - const existingRows = await transactionDriver.query<{ value: string }>( - `SELECT value + const existingRows = await transactionDriver.query<{ + value: string + metadata: string | null + }>( + `SELECT value, metadata FROM ${collectionTableSql} WHERE key = ? LIMIT 1`, @@ -1150,18 +1212,34 @@ export class SQLiteCorePersistenceAdapter< const existingValue = existingRows[0]?.value ? deserializePersistedRowValue(existingRows[0].value) : undefined + const existingMetadata = + existingRows[0]?.metadata != null + ? deserializePersistedRowValue(existingRows[0].metadata) + : undefined const mergedValue = mutation.type === `update` ? mergeObjectRows(existingValue, mutation.value) : mutation.value + const nextMetadata = + mutation.metadataChanged === true + ? mutation.metadata + : existingMetadata await transactionDriver.run( - `INSERT INTO ${collectionTableSql} (key, value, row_version) - VALUES (?, ?, ?) + `INSERT INTO ${collectionTableSql} (key, value, metadata, row_version) + VALUES (?, ?, ?, ?) ON CONFLICT(key) DO UPDATE SET value = excluded.value, + metadata = excluded.metadata, row_version = excluded.row_version`, - [encodedKey, serializePersistedRowValue(mergedValue), nextRowVersion], + [ + encodedKey, + serializePersistedRowValue(mergedValue), + nextMetadata === undefined + ? null + : serializePersistedRowValue(nextMetadata), + nextRowVersion, + ], ) await transactionDriver.run( `DELETE FROM ${tombstoneTableSql} @@ -1170,6 +1248,53 @@ export class SQLiteCorePersistenceAdapter< ) } + for (const rowMetadataMutation of tx.rowMetadataMutations ?? []) { + const encodedKey = encodePersistedStorageKey(rowMetadataMutation.key) + if (rowMetadataMutation.type === `delete`) { + await transactionDriver.run( + `UPDATE ${collectionTableSql} + SET metadata = NULL + WHERE key = ?`, + [encodedKey], + ) + } else { + await transactionDriver.run( + `UPDATE ${collectionTableSql} + SET metadata = ? + WHERE key = ?`, + [ + rowMetadataMutation.value === undefined + ? null + : serializePersistedRowValue(rowMetadataMutation.value), + encodedKey, + ], + ) + } + } + + for (const metadataMutation of tx.collectionMetadataMutations ?? []) { + if (metadataMutation.type === `delete`) { + await transactionDriver.run( + `DELETE FROM collection_metadata + WHERE collection_id = ? AND key = ?`, + [collectionId, metadataMutation.key], + ) + } else { + await transactionDriver.run( + `INSERT INTO collection_metadata (collection_id, key, value, updated_at) + VALUES (?, ?, ?, CAST(strftime('%s', 'now') AS INTEGER)) + ON CONFLICT(collection_id, key) DO UPDATE SET + value = excluded.value, + updated_at = excluded.updated_at`, + [ + collectionId, + metadataMutation.key, + serializePersistedRowValue(metadataMutation.value), + ], + ) + } + } + await transactionDriver.run( `INSERT INTO collection_version (collection_id, latest_row_version) VALUES (?, ?) @@ -1197,16 +1322,65 @@ export class SQLiteCorePersistenceAdapter< seq, tx_id, row_version, + replay_json, + replay_requires_full_reload, applied_at ) - VALUES (?, ?, ?, ?, ?, CAST(strftime('%s', 'now') AS INTEGER))`, - [collectionId, tx.term, tx.seq, tx.txId, nextRowVersion], + VALUES (?, ?, ?, ?, ?, ?, ?, CAST(strftime('%s', 'now') AS INTEGER))`, + [ + collectionId, + tx.term, + tx.seq, + tx.txId, + nextRowVersion, + replayDelta ? stableStringify(replayDelta) : null, + tx.truncate ? 1 : 0, + ], ) await this.pruneAppliedTxRows(collectionId, transactionDriver) }) } + async loadCollectionMetadata( + collectionId: string, + ): Promise> { + const rows = await this.driver.query<{ key: string; value: string }>( + `SELECT key, value + FROM collection_metadata + WHERE collection_id = ?`, + [collectionId], + ) + + return rows.map((row) => ({ + key: row.key, + value: deserializePersistedRowValue(row.value), + })) + } + + async scanRows( + collectionId: string, + options?: PersistedRowScanOptions, + ): Promise>> { + const tableMapping = await this.ensureCollectionReady(collectionId) + const collectionTableSql = quoteIdentifier(tableMapping.tableName) + + const storedRows = await this.driver.query( + options?.metadataOnly + ? `SELECT key, value, metadata, row_version + FROM ${collectionTableSql} + WHERE metadata IS NOT NULL` + : `SELECT key, value, metadata, row_version + FROM ${collectionTableSql}`, + ) + + return decodeStoredSqliteRows(storedRows).map((row) => ({ + key: row.key, + value: row.value, + metadata: row.metadata, + })) + } + async ensureIndex( collectionId: string, signature: string, @@ -1344,27 +1518,40 @@ export class SQLiteCorePersistenceAdapter< const collectionTableSql = quoteIdentifier(tableMapping.tableName) const tombstoneTableSql = quoteIdentifier(tableMapping.tombstoneTableName) - const [changedRows, deletedRows, latestVersionRows] = await Promise.all([ - this.driver.query<{ key: string }>( - `SELECT key + const [changedRows, deletedRows, latestVersionRows, replayRows] = + await Promise.all([ + this.driver.query<{ key: string }>( + `SELECT key FROM ${collectionTableSql} WHERE row_version > ?`, - [fromRowVersion], - ), - this.driver.query<{ key: string }>( - `SELECT key + [fromRowVersion], + ), + this.driver.query<{ key: string }>( + `SELECT key FROM ${tombstoneTableSql} WHERE row_version > ?`, - [fromRowVersion], - ), - this.driver.query<{ latest_row_version: number }>( - `SELECT latest_row_version + [fromRowVersion], + ), + this.driver.query<{ latest_row_version: number }>( + `SELECT latest_row_version FROM collection_version WHERE collection_id = ? LIMIT 1`, - [collectionId], - ), - ]) + [collectionId], + ), + this.driver.query<{ + tx_id: string + row_version: number + replay_json: string | null + replay_requires_full_reload: number + }>( + `SELECT tx_id, row_version, replay_json, replay_requires_full_reload + FROM applied_tx + WHERE collection_id = ? AND row_version > ? + ORDER BY term ASC, seq ASC`, + [collectionId, fromRowVersion], + ), + ]) const latestRowVersion = latestVersionRows[0]?.latest_row_version ?? 0 const changedKeyCount = changedRows.length + deletedRows.length @@ -1376,6 +1563,18 @@ export class SQLiteCorePersistenceAdapter< } } + if ( + replayRows.some( + (row) => + row.replay_requires_full_reload !== 0 || row.replay_json == null, + ) + ) { + return { + latestRowVersion, + requiresFullReload: true, + } + } + const decodeKey = (encodedKey: string): TKey => { try { return decodePersistedStorageKey(encodedKey) as TKey @@ -1386,11 +1585,42 @@ export class SQLiteCorePersistenceAdapter< } } + const deltas = replayRows.map((row) => { + const parsed = deserializePersistedRowValue, + TKey + > | null>(row.replay_json ?? `null`) + if (!parsed) { + throw new InvalidPersistedCollectionConfigError( + `missing replay payload for applied_tx row`, + ) + } + return parsed + }) + + const replayChangeCount = deltas.reduce( + (count, delta) => + count + + delta.changedRows.length + + delta.deletedKeys.length + + delta.rowMetadataMutations.length + + delta.collectionMetadataMutations.length, + 0, + ) + + if (replayChangeCount > this.pullSinceReloadThreshold) { + return { + latestRowVersion, + requiresFullReload: true, + } + } + return { latestRowVersion, requiresFullReload: false, changedKeys: changedRows.map((row) => decodeKey(row.key)), deletedKeys: deletedRows.map((row) => decodeKey(row.key)), + deltas, } } @@ -1405,7 +1635,7 @@ export class SQLiteCorePersistenceAdapter< const orderByCompiled = compileOrderByClauses(options.orderBy) const queryParams: Array = [] - let sql = `SELECT key, value, row_version FROM ${collectionTableSql}` + let sql = `SELECT key, value, metadata, row_version FROM ${collectionTableSql}` if (options.where && whereCompiled.supported) { sql = `${sql} WHERE ${whereCompiled.sql}` @@ -1421,15 +1651,7 @@ export class SQLiteCorePersistenceAdapter< sql, queryParams, ) - const parsedRows = storedRows.map((row) => { - const key = decodePersistedStorageKey(row.key) as TKey - const value = deserializePersistedRowValue(row.value) - return { - key, - value, - rowVersion: row.row_version, - } - }) + const parsedRows = decodeStoredSqliteRows(storedRows) const filteredRows = this.applyInMemoryWhere(parsedRows, options.where) const orderedRows = this.applyInMemoryOrderBy(filteredRows, options.orderBy) @@ -1595,6 +1817,24 @@ export class SQLiteCorePersistenceAdapter< return cached } + const inFlight = this.collectionTableLoads.get(collectionId) + if (inFlight) { + return inFlight + } + + const loadPromise = this.ensureCollectionReadyInternal(collectionId) + this.collectionTableLoads.set(collectionId, loadPromise) + + try { + return await loadPromise + } finally { + this.collectionTableLoads.delete(collectionId) + } + } + + private async ensureCollectionReadyInternal( + collectionId: string, + ): Promise { const existingRows = await this.driver.query<{ table_name: string tombstone_table_name: string @@ -1646,6 +1886,7 @@ export class SQLiteCorePersistenceAdapter< `CREATE TABLE IF NOT EXISTS ${collectionTableSql} ( key TEXT PRIMARY KEY, value TEXT NOT NULL, + metadata TEXT, row_version INTEGER NOT NULL )`, ) @@ -1790,16 +2031,37 @@ export class SQLiteCorePersistenceAdapter< seq INTEGER NOT NULL, tx_id TEXT NOT NULL, row_version INTEGER NOT NULL, + replay_json TEXT, + replay_requires_full_reload INTEGER NOT NULL DEFAULT 0, applied_at INTEGER NOT NULL, PRIMARY KEY (collection_id, term, seq) )`, ) + try { + await this.driver.exec( + `ALTER TABLE applied_tx ADD COLUMN replay_json TEXT`, + ) + } catch {} + try { + await this.driver.exec( + `ALTER TABLE applied_tx ADD COLUMN replay_requires_full_reload INTEGER NOT NULL DEFAULT 0`, + ) + } catch {} await this.driver.exec( `CREATE TABLE IF NOT EXISTS collection_version ( collection_id TEXT PRIMARY KEY, latest_row_version INTEGER NOT NULL )`, ) + await this.driver.exec( + `CREATE TABLE IF NOT EXISTS collection_metadata ( + collection_id TEXT NOT NULL, + key TEXT NOT NULL, + value TEXT NOT NULL, + updated_at INTEGER NOT NULL, + PRIMARY KEY (collection_id, key) + )`, + ) await this.driver.exec( `CREATE TABLE IF NOT EXISTS leader_term ( collection_id TEXT PRIMARY KEY, diff --git a/packages/db-sqlite-persisted-collection-core/tests/persisted.test.ts b/packages/db-sqlite-persisted-collection-core/tests/persisted.test.ts index 6ac202f13..2ccd3a721 100644 --- a/packages/db-sqlite-persisted-collection-core/tests/persisted.test.ts +++ b/packages/db-sqlite-persisted-collection-core/tests/persisted.test.ts @@ -34,6 +34,7 @@ type RecordingAdapter = PersistenceAdapter & { term: number seq: number rowVersion: number + truncate?: boolean mutations: Array<{ type: `insert` | `update` | `delete`; key: string }> } }> @@ -44,20 +45,27 @@ type RecordingAdapter = PersistenceAdapter & { options: LoadSubsetOptions requiredIndexSignatures: ReadonlyArray }> + loadCollectionMetadataCalls: Array rows: Map + rowMetadata: Map + collectionMetadata: Map } function createRecordingAdapter( initialRows: Array = [], ): RecordingAdapter { const rows = new Map(initialRows.map((row) => [row.id, row])) + const rowMetadata = new Map() const adapter: RecordingAdapter = { rows, + rowMetadata, + collectionMetadata: new Map(), applyCommittedTxCalls: [], ensureIndexCalls: [], markIndexRemovedCalls: [], loadSubsetCalls: [], + loadCollectionMetadataCalls: [], loadSubset: (collectionId, options, ctx) => { adapter.loadSubsetCalls.push({ collectionId, @@ -68,9 +76,29 @@ function createRecordingAdapter( Array.from(rows.values()).map((value) => ({ key: value.id, value, + metadata: rowMetadata.get(value.id), })), ) }, + loadCollectionMetadata: (collectionId) => { + adapter.loadCollectionMetadataCalls.push(collectionId) + return Promise.resolve( + Array.from(adapter.collectionMetadata.entries()).map( + ([key, value]) => ({ + key, + value, + }), + ), + ) + }, + scanRows: () => + Promise.resolve( + Array.from(rows.values()).map((value) => ({ + key: value.id, + value, + metadata: rowMetadata.get(value.id), + })), + ), applyCommittedTx: (collectionId, tx) => { adapter.applyCommittedTxCalls.push({ collectionId, @@ -78,6 +106,7 @@ function createRecordingAdapter( term: tx.term, seq: tx.seq, rowVersion: tx.rowVersion, + truncate: tx.truncate, mutations: tx.mutations.map((mutation) => ({ type: mutation.type, key: mutation.key, @@ -85,11 +114,37 @@ function createRecordingAdapter( }, }) + if (tx.truncate) { + rows.clear() + rowMetadata.clear() + } + for (const mutation of tx.mutations) { if (mutation.type === `delete`) { rows.delete(mutation.key) + rowMetadata.delete(mutation.key) } else { rows.set(mutation.key, mutation.value) + if (mutation.metadataChanged) { + rowMetadata.set(mutation.key, mutation.metadata) + } + } + } + for (const rowMetadataMutation of tx.rowMetadataMutations ?? []) { + if (rowMetadataMutation.type === `delete`) { + rowMetadata.delete(rowMetadataMutation.key) + } else { + rowMetadata.set(rowMetadataMutation.key, rowMetadataMutation.value) + } + } + for (const metadataMutation of tx.collectionMetadataMutations ?? []) { + if (metadataMutation.type === `delete`) { + adapter.collectionMetadata.delete(metadataMutation.key) + } else { + adapter.collectionMetadata.set( + metadataMutation.key, + metadataMutation.value, + ) } } return Promise.resolve() @@ -257,6 +312,338 @@ describe(`persistedCollectionOptions`, () => { expect(adapter.applyCommittedTxCalls).toHaveLength(1) }) + it(`loads collection metadata into collection state during startup`, async () => { + const adapter = createRecordingAdapter() + adapter.collectionMetadata.set(`electric:resume`, { + kind: `resume`, + offset: `10_0`, + handle: `handle-1`, + shapeId: `shape-1`, + updatedAt: 1, + }) + + const collection = createCollection( + persistedCollectionOptions({ + id: `persisted-startup-metadata`, + getKey: (item) => item.id, + persistence: { + adapter, + }, + }), + ) + + await collection.stateWhenReady() + + expect(adapter.loadCollectionMetadataCalls).toEqual([ + `persisted-startup-metadata`, + ]) + expect( + collection._state.syncedCollectionMetadata.get(`electric:resume`), + ).toEqual({ + kind: `resume`, + offset: `10_0`, + handle: `handle-1`, + shapeId: `shape-1`, + updatedAt: 1, + }) + }) + + it(`restores row and collection metadata after metadata-bearing full reload`, async () => { + const adapter = createRecordingAdapter([ + { + id: `1`, + title: `Tracked`, + }, + ]) + adapter.rowMetadata.set(`1`, { + source: `initial`, + }) + adapter.collectionMetadata.set(`electric:resume`, { + kind: `resume`, + offset: `10_0`, + handle: `handle-1`, + shapeId: `shape-1`, + updatedAt: 1, + }) + const coordinator = createCoordinatorHarness() + + const collection = createCollection( + persistedCollectionOptions({ + id: `sync-present`, + getKey: (item) => item.id, + sync: { + sync: ({ markReady }) => { + markReady() + }, + }, + persistence: { + adapter, + coordinator, + }, + }), + ) + + await collection.preload() + await flushAsyncWork() + + expect(collection._state.syncedMetadata.get(`1`)).toEqual({ + source: `initial`, + }) + expect( + collection._state.syncedCollectionMetadata.get(`electric:resume`), + ).toEqual({ + kind: `resume`, + offset: `10_0`, + handle: `handle-1`, + shapeId: `shape-1`, + updatedAt: 1, + }) + + adapter.rowMetadata.set(`1`, { + source: `reloaded`, + }) + adapter.collectionMetadata.delete(`electric:resume`) + adapter.collectionMetadata.set(`queryCollection:gc:q1`, { + queryHash: `q1`, + mode: `until-revalidated`, + }) + + coordinator.emit({ + type: `tx:committed`, + term: 1, + seq: 1, + txId: `tx-reload`, + latestRowVersion: 1, + requiresFullReload: true, + }) + + await flushAsyncWork() + await flushAsyncWork() + + expect(collection._state.syncedMetadata.get(`1`)).toEqual({ + source: `reloaded`, + }) + expect( + collection._state.syncedCollectionMetadata.has(`electric:resume`), + ).toBe(false) + expect( + collection._state.syncedCollectionMetadata.get(`queryCollection:gc:q1`), + ).toEqual({ + queryHash: `q1`, + mode: `until-revalidated`, + }) + }) + + it(`persists metadata-only wrapped sync transactions`, async () => { + const adapter = createRecordingAdapter() + + const collection = createCollection( + persistedCollectionOptions({ + id: `persisted-metadata-only`, + getKey: (item) => item.id, + sync: { + sync: ({ begin, commit, markReady, metadata }) => { + begin() + metadata?.collection.set(`runtime:key`, { persisted: true }) + commit() + markReady() + }, + }, + persistence: { + adapter, + }, + }), + ) + + await collection.stateWhenReady() + await flushAsyncWork() + + expect(adapter.applyCommittedTxCalls).toHaveLength(1) + expect(adapter.applyCommittedTxCalls[0]?.tx.mutations).toEqual([]) + expect(adapter.collectionMetadata.get(`runtime:key`)).toEqual({ + persisted: true, + }) + expect( + collection._state.syncedCollectionMetadata.get(`runtime:key`), + ).toEqual({ + persisted: true, + }) + }) + + it(`replays metadata-only tx:committed deltas without full reload`, async () => { + const adapter = createRecordingAdapter([ + { + id: `1`, + title: `Tracked`, + }, + ]) + adapter.rowMetadata.set(`1`, { source: `initial` }) + const coordinator = createCoordinatorHarness() + + const collection = createCollection( + persistedCollectionOptions({ + id: `sync-present`, + getKey: (item) => item.id, + sync: { + sync: ({ markReady }) => { + markReady() + }, + }, + persistence: { + adapter, + coordinator, + }, + }), + ) + + await collection.preload() + await flushAsyncWork() + const loadSubsetCallsAfterPreload = adapter.loadSubsetCalls.length + + coordinator.emit({ + type: `tx:committed`, + term: 1, + seq: 1, + txId: `tx-metadata-only`, + latestRowVersion: 2, + requiresFullReload: false, + changedRows: [], + deletedKeys: [], + rowMetadataMutations: [ + { + type: `set`, + key: `1`, + value: { source: `replayed` }, + }, + ], + collectionMetadataMutations: [ + { + type: `set`, + key: `electric:resume`, + value: { + kind: `reset`, + updatedAt: 2, + }, + }, + ], + }) + + await flushAsyncWork() + await flushAsyncWork() + + expect(adapter.loadSubsetCalls.length).toBe(loadSubsetCallsAfterPreload) + expect(collection._state.syncedMetadata.get(`1`)).toEqual({ + source: `replayed`, + }) + expect( + collection._state.syncedCollectionMetadata.get(`electric:resume`), + ).toEqual({ + kind: `reset`, + updatedAt: 2, + }) + }) + + it(`uses pullSince replay deltas for metadata-bearing seq-gap recovery`, async () => { + const adapter = createRecordingAdapter([ + { + id: `1`, + title: `Tracked`, + }, + ]) + adapter.rowMetadata.set(`1`, { source: `initial` }) + const coordinator = createCoordinatorHarness() + coordinator.setPullSinceResponse({ + type: `rpc:pullSince:res`, + rpcId: `pull-metadata`, + ok: true, + latestTerm: 1, + latestSeq: 3, + latestRowVersion: 3, + requiresFullReload: false, + changedKeys: [], + deletedKeys: [], + deltas: [ + { + txId: `tx-gap-1`, + latestRowVersion: 2, + changedRows: [], + deletedKeys: [], + rowMetadataMutations: [ + { + type: `set`, + key: `1`, + value: { source: `gap-replayed` }, + }, + ], + collectionMetadataMutations: [], + }, + { + txId: `tx-gap-2`, + latestRowVersion: 3, + changedRows: [], + deletedKeys: [], + rowMetadataMutations: [], + collectionMetadataMutations: [ + { + type: `set`, + key: `queryCollection:gc:q1`, + value: { + queryHash: `q1`, + mode: `until-revalidated`, + }, + }, + ], + }, + ], + }) + + const collection = createCollection( + persistedCollectionOptions({ + id: `sync-present`, + getKey: (item) => item.id, + sync: { + sync: ({ markReady }) => { + markReady() + }, + }, + persistence: { + adapter, + coordinator, + }, + }), + ) + + await collection.preload() + await flushAsyncWork() + const loadSubsetCallsAfterPreload = adapter.loadSubsetCalls.length + + coordinator.emit({ + type: `tx:committed`, + term: 1, + seq: 3, + txId: `tx-gap-trigger`, + latestRowVersion: 3, + requiresFullReload: false, + changedRows: [], + deletedKeys: [], + }) + + await flushAsyncWork() + await flushAsyncWork() + + expect(coordinator.pullSinceCalls).toBe(1) + expect(adapter.loadSubsetCalls.length).toBe(loadSubsetCallsAfterPreload) + expect(collection._state.syncedMetadata.get(`1`)).toEqual({ + source: `gap-replayed`, + }) + expect( + collection._state.syncedCollectionMetadata.get(`queryCollection:gc:q1`), + ).toEqual({ + queryHash: `q1`, + mode: `until-revalidated`, + }) + }) + it(`throws InvalidSyncConfigError when sync key is present but null`, () => { const invalidOptions = { id: `invalid-sync-null`, @@ -581,6 +968,190 @@ describe(`persistedCollectionOptions`, () => { }) }) + it(`reads staged metadata writes during hydration-queued transactions`, async () => { + const adapter = createRecordingAdapter([ + { + id: `cached-1`, + title: `Cached row`, + }, + ]) + adapter.rowMetadata.set(`cached-1`, { source: `persisted` }) + adapter.collectionMetadata.set(`startup:key`, { ready: true }) + + let resolveLoadSubset: (() => void) | undefined + adapter.loadSubset = async () => { + await new Promise((resolve) => { + resolveLoadSubset = resolve + }) + return [ + { + key: `cached-1`, + value: { + id: `cached-1`, + title: `Cached row`, + }, + metadata: adapter.rowMetadata.get(`cached-1`), + }, + ] + } + + let remoteBegin: (() => void) | undefined + let remoteCommit: (() => void) | undefined + let remoteTruncate: (() => void) | undefined + let remoteMetadata: + | Parameters[`sync`]>[0][`metadata`] + | undefined + + const collection = createCollection( + persistedCollectionOptions({ + id: `sync-present-metadata-read`, + getKey: (item) => item.id, + sync: { + sync: ({ begin, commit, truncate, markReady, metadata }) => { + remoteBegin = begin + remoteCommit = commit + remoteTruncate = truncate + remoteMetadata = metadata + markReady() + return {} + }, + }, + persistence: { + adapter, + }, + }), + ) + + const readyPromise = collection.stateWhenReady() + for (let attempt = 0; attempt < 20 && !resolveLoadSubset; attempt++) { + await flushAsyncWork() + } + + expect(resolveLoadSubset).toBeDefined() + expect(remoteBegin).toBeDefined() + expect(remoteMetadata).toBeDefined() + + remoteBegin?.() + remoteMetadata?.row.set(`cached-1`, { source: `staged` }) + remoteMetadata?.collection.set(`runtime:key`, { persisted: true }) + + expect(remoteMetadata?.row.get(`cached-1`)).toEqual({ source: `staged` }) + expect(remoteMetadata?.collection.get(`runtime:key`)).toEqual({ + persisted: true, + }) + expect(remoteMetadata?.collection.list()).toContainEqual({ + key: `runtime:key`, + value: { persisted: true }, + }) + + remoteTruncate?.() + + expect(remoteMetadata?.row.get(`cached-1`)).toBeUndefined() + expect(remoteMetadata?.collection.get(`startup:key`)).toEqual({ + ready: true, + }) + + remoteCommit?.() + resolveLoadSubset?.() + await readyPromise + }) + + it(`persists truncate transactions and preserves intended collection metadata`, async () => { + const adapter = createRecordingAdapter() + + let remoteBegin: (() => void) | undefined + let remoteWrite: + | ((message: { type: `insert`; value: Todo }) => void) + | undefined + let remoteCommit: (() => void) | undefined + let remoteTruncate: (() => void) | undefined + let remoteMetadata: + | Parameters[`sync`]>[0][`metadata`] + | undefined + + const collection = createCollection( + persistedCollectionOptions({ + id: `sync-present-truncate`, + getKey: (item) => item.id, + sync: { + sync: ({ begin, write, commit, truncate, markReady, metadata }) => { + remoteBegin = begin + remoteWrite = write as (message: { + type: `insert` + value: Todo + }) => void + remoteCommit = commit + remoteTruncate = truncate + remoteMetadata = metadata + markReady() + return {} + }, + }, + persistence: { + adapter, + }, + }), + ) + + await collection.stateWhenReady() + await flushAsyncWork() + + remoteBegin?.() + remoteWrite?.({ + type: `insert`, + value: { + id: `pre-truncate`, + title: `Pre truncate`, + }, + }) + remoteMetadata?.collection.set(`electric:resume`, { + kind: `reset`, + updatedAt: 1, + }) + remoteTruncate?.() + remoteWrite?.({ + type: `insert`, + value: { + id: `post-truncate`, + title: `Post truncate`, + }, + }) + remoteCommit?.() + await flushAsyncWork() + + expect(adapter.applyCommittedTxCalls.at(-1)?.tx.truncate).toBe(true) + + const reloadedCollection = createCollection( + persistedCollectionOptions({ + id: `sync-present-truncate`, + getKey: (item) => item.id, + sync: { + sync: ({ markReady }) => { + markReady() + }, + }, + persistence: { + adapter, + }, + }), + ) + + await reloadedCollection.preload() + await flushAsyncWork() + + expect(reloadedCollection.get(`pre-truncate`)).toBeUndefined() + expect(stripVirtualProps(reloadedCollection.get(`post-truncate`))).toEqual({ + id: `post-truncate`, + title: `Post truncate`, + }) + expect( + reloadedCollection._state.syncedCollectionMetadata.get(`electric:resume`), + ).toEqual({ + kind: `reset`, + updatedAt: 1, + }) + }) + it(`uses pullSince recovery when tx sequence gaps are detected`, async () => { const adapter = createRecordingAdapter([ { diff --git a/packages/db-sqlite-persisted-collection-core/tests/sqlite-core-adapter.test.ts b/packages/db-sqlite-persisted-collection-core/tests/sqlite-core-adapter.test.ts index 7c45e964b..35e28eeba 100644 --- a/packages/db-sqlite-persisted-collection-core/tests/sqlite-core-adapter.test.ts +++ b/packages/db-sqlite-persisted-collection-core/tests/sqlite-core-adapter.test.ts @@ -423,6 +423,227 @@ export function runSQLiteCoreAdapterContractSuite( expect(txRows[0]?.count).toBe(0) }) + it(`persists row metadata and collection metadata atomically`, async () => { + const { adapter, driver } = registerContractHarness() + const collectionId = `metadata-roundtrip` + + await adapter.applyCommittedTx(collectionId, { + txId: `metadata-1`, + term: 1, + seq: 1, + rowVersion: 1, + mutations: [ + { + type: `insert`, + key: `1`, + value: { + id: `1`, + title: `Tracked`, + createdAt: `2026-01-01T00:00:00.000Z`, + score: 1, + }, + metadata: { + queryCollection: { + owners: { + q1: true, + }, + }, + }, + metadataChanged: true, + }, + ], + collectionMetadataMutations: [ + { + type: `set`, + key: `electric:resume`, + value: { + kind: `resume`, + offset: `10_0`, + handle: `handle-1`, + shapeId: `shape-1`, + updatedAt: 1, + }, + }, + ], + }) + + const rows = await adapter.loadSubset(collectionId, {}) + expect(rows).toEqual([ + { + key: `1`, + value: { + id: `1`, + title: `Tracked`, + createdAt: `2026-01-01T00:00:00.000Z`, + score: 1, + }, + metadata: { + queryCollection: { + owners: { + q1: true, + }, + }, + }, + }, + ]) + + const collectionMetadata = + await adapter.loadCollectionMetadata?.(collectionId) + expect(collectionMetadata).toEqual([ + { + key: `electric:resume`, + value: { + kind: `resume`, + offset: `10_0`, + handle: `handle-1`, + shapeId: `shape-1`, + updatedAt: 1, + }, + }, + ]) + + await expect( + adapter.applyCommittedTx(collectionId, { + txId: `metadata-2`, + term: 1, + seq: 2, + rowVersion: 2, + mutations: [ + { + type: `insert`, + key: `2`, + value: { + id: `2`, + title: `Bad`, + createdAt: `2026-01-01T00:00:00.000Z`, + score: 2, + }, + }, + ], + collectionMetadataMutations: [ + { + type: `set`, + key: `broken`, + value: { + invalid: new Date(Number.NaN), + }, + }, + ], + }), + ).rejects.toThrow() + + const rowsAfterFailure = await adapter.loadSubset(collectionId, {}) + expect(rowsAfterFailure).toEqual(rows) + + const metadataRows = await driver.query<{ key: string }>( + `SELECT key + FROM collection_metadata + WHERE collection_id = ?`, + [collectionId], + ) + expect(metadataRows).toEqual([{ key: `electric:resume` }]) + }) + + it(`persists truncate transactions while preserving explicit collection metadata`, async () => { + const { adapter } = registerContractHarness() + const collectionId = `truncate-metadata-roundtrip` + + await adapter.applyCommittedTx(collectionId, { + txId: `seed-1`, + term: 1, + seq: 1, + rowVersion: 1, + mutations: [ + { + type: `insert`, + key: `1`, + value: { + id: `1`, + title: `Before truncate`, + createdAt: `2026-01-01T00:00:00.000Z`, + score: 1, + }, + metadata: { + owner: `before`, + }, + metadataChanged: true, + }, + ], + collectionMetadataMutations: [ + { + type: `set`, + key: `electric:resume`, + value: { + kind: `resume`, + offset: `10_0`, + handle: `handle-1`, + shapeId: `shape-1`, + updatedAt: 1, + }, + }, + ], + }) + + await adapter.applyCommittedTx(collectionId, { + txId: `truncate-2`, + term: 1, + seq: 2, + rowVersion: 2, + truncate: true, + mutations: [ + { + type: `insert`, + key: `2`, + value: { + id: `2`, + title: `After truncate`, + createdAt: `2026-01-02T00:00:00.000Z`, + score: 2, + }, + metadata: { + owner: `after`, + }, + metadataChanged: true, + }, + ], + collectionMetadataMutations: [ + { + type: `set`, + key: `electric:resume`, + value: { + kind: `reset`, + updatedAt: 2, + }, + }, + ], + }) + + expect(await adapter.loadSubset(collectionId, {})).toEqual([ + { + key: `2`, + value: { + id: `2`, + title: `After truncate`, + createdAt: `2026-01-02T00:00:00.000Z`, + score: 2, + }, + metadata: { + owner: `after`, + }, + }, + ]) + + expect(await adapter.loadCollectionMetadata?.(collectionId)).toEqual([ + { + key: `electric:resume`, + value: { + kind: `reset`, + updatedAt: 2, + }, + }, + ]) + }) + it(`supports pushdown operators with correctness-preserving fallback`, async () => { const { adapter } = registerContractHarness() const collectionId = `todos` @@ -864,11 +1085,143 @@ export function runSQLiteCoreAdapterContractSuite( } expect(delta.changedKeys).toEqual([]) expect(delta.deletedKeys).toEqual([`1`]) + expect(delta.deltas).toEqual([ + { + txId: `seed-pull-2`, + latestRowVersion: 2, + changedRows: [], + deletedKeys: [`1`], + rowMetadataMutations: [], + collectionMetadataMutations: [], + }, + ]) const fullReload = await adapter.pullSince(collectionId, 0) expect(fullReload.requiresFullReload).toBe(true) }) + it(`scans persisted rows with metadata and replays metadata-only deltas`, async () => { + const { adapter } = registerContractHarness() + const collectionId = `scan-and-replay` + + await adapter.applyCommittedTx(collectionId, { + txId: `scan-seed-1`, + term: 1, + seq: 1, + rowVersion: 1, + mutations: [ + { + type: `insert`, + key: `1`, + value: { + id: `1`, + title: `Tracked`, + createdAt: `2026-01-01T00:00:00.000Z`, + score: 1, + }, + metadata: { + queryCollection: { + owners: { + q1: true, + }, + }, + }, + metadataChanged: true, + }, + ], + }) + + const scannedRows = await adapter.scanRows?.(collectionId, { + metadataOnly: true, + }) + expect(scannedRows).toEqual([ + { + key: `1`, + value: { + id: `1`, + title: `Tracked`, + createdAt: `2026-01-01T00:00:00.000Z`, + score: 1, + }, + metadata: { + queryCollection: { + owners: { + q1: true, + }, + }, + }, + }, + ]) + + await adapter.applyCommittedTx(collectionId, { + txId: `scan-seed-2`, + term: 1, + seq: 2, + rowVersion: 2, + mutations: [], + rowMetadataMutations: [ + { + type: `set`, + key: `1`, + value: { + queryCollection: { + owners: { + q2: true, + }, + }, + }, + }, + ], + collectionMetadataMutations: [ + { + type: `set`, + key: `electric:resume`, + value: { + kind: `reset`, + updatedAt: 2, + }, + }, + ], + }) + + const replayDelta = await adapter.pullSince(collectionId, 1) + if (replayDelta.requiresFullReload) { + throw new Error(`Expected replay delta, received full reload`) + } + + expect(replayDelta.deltas).toEqual([ + { + txId: `scan-seed-2`, + latestRowVersion: 2, + changedRows: [], + deletedKeys: [], + rowMetadataMutations: [ + { + type: `set`, + key: `1`, + value: { + queryCollection: { + owners: { + q2: true, + }, + }, + }, + }, + ], + collectionMetadataMutations: [ + { + type: `set`, + key: `electric:resume`, + value: { + kind: `reset`, + updatedAt: 2, + }, + }, + ], + }, + ]) + }) + it(`keeps numeric and string keys distinct in storage`, async () => { const { driver } = registerContractHarness() const adapter = new SQLiteCorePersistenceAdapter< @@ -951,6 +1304,42 @@ export function runSQLiteCoreAdapterContractSuite( expect(loadedRows[0]?.key).toBe(`safe`) }) + it(`deduplicates concurrent ensureCollectionReady calls for the same collection`, async () => { + const { adapter } = registerContractHarness() + const collectionId = `concurrent-startup` + + const [rowsA, rowsB] = await Promise.all([ + adapter.loadSubset(collectionId, {}), + adapter.loadSubset(collectionId, {}), + ]) + + expect(rowsA).toEqual([]) + expect(rowsB).toEqual([]) + + await adapter.applyCommittedTx(collectionId, { + txId: `concurrent-startup-seed`, + term: 1, + seq: 1, + rowVersion: 1, + mutations: [ + { + type: `insert`, + key: `1`, + value: { + id: `1`, + title: `Seeded`, + createdAt: `2026-01-01T00:00:00.000Z`, + score: 1, + }, + }, + ], + }) + + const loadedRows = await adapter.loadSubset(collectionId, {}) + expect(loadedRows).toHaveLength(1) + expect(loadedRows[0]?.key).toBe(`1`) + }) + it(`prunes applied_tx rows by sequence threshold`, async () => { const { adapter, driver } = registerContractHarness({ appliedTxPruneMaxRows: 2, diff --git a/packages/db/src/collection/state.ts b/packages/db/src/collection/state.ts index cc435f0d9..af65cb801 100644 --- a/packages/db/src/collection/state.ts +++ b/packages/db/src/collection/state.ts @@ -28,6 +28,8 @@ interface PendingSyncedTransaction< operations: Array> truncate?: boolean deletedKeys: Set + rowMetadataWrites: Map + collectionMetadataWrites: Map optimisticSnapshot?: { upserts: Map deletes: Set @@ -40,6 +42,8 @@ interface PendingSyncedTransaction< immediate?: boolean } +type PendingMetadataWrite = { type: `set`; value: unknown } | { type: `delete` } + type InternalChangeMessage< T extends object = Record, TKey extends string | number = string | number, @@ -70,6 +74,7 @@ export class CollectionStateManager< > = [] public syncedData: SortedMap public syncedMetadata = new Map() + public syncedCollectionMetadata = new Map() // Optimistic state tracking - make public for testing public optimisticUpserts = new Map() @@ -870,6 +875,9 @@ export class CollectionStateManager< for (const operation of transaction.operations) { changedKeys.add(operation.key as TKey) } + for (const [key] of transaction.rowMetadataWrites) { + changedKeys.add(key) + } } // Use pre-captured state if available (from optimistic scenarios), @@ -959,26 +967,6 @@ export class CollectionStateManager< const key = operation.key as TKey this.syncedKeys.add(key) - // Update metadata - switch (operation.type) { - case `insert`: - this.syncedMetadata.set(key, operation.metadata) - break - case `update`: - this.syncedMetadata.set( - key, - Object.assign( - {}, - this.syncedMetadata.get(key), - operation.metadata, - ), - ) - break - case `delete`: - this.syncedMetadata.delete(key) - break - } - // Determine origin: 'local' for local-only collections or pending local changes const origin: VirtualOrigin = this.isLocalOnly || @@ -1025,6 +1013,7 @@ export class CollectionStateManager< } case `delete`: this.syncedData.delete(key) + this.syncedMetadata.delete(key) // Clean up origin and pending tracking for deleted rows this.rowOrigins.delete(key) this.pendingLocalChanges.delete(key) @@ -1036,6 +1025,25 @@ export class CollectionStateManager< break } } + + for (const [key, metadataWrite] of transaction.rowMetadataWrites) { + if (metadataWrite.type === `delete`) { + this.syncedMetadata.delete(key) + continue + } + this.syncedMetadata.set(key, metadataWrite.value) + } + + for (const [ + key, + metadataWrite, + ] of transaction.collectionMetadataWrites) { + if (metadataWrite.type === `delete`) { + this.syncedCollectionMetadata.delete(key) + continue + } + this.syncedCollectionMetadata.set(key, metadataWrite.value) + } } // After applying synced operations, if this commit included a truncate, @@ -1365,6 +1373,7 @@ export class CollectionStateManager< public cleanup(): void { this.syncedData.clear() this.syncedMetadata.clear() + this.syncedCollectionMetadata.clear() this.optimisticUpserts.clear() this.optimisticDeletes.clear() this.pendingOptimisticUpserts.clear() diff --git a/packages/db/src/collection/sync.ts b/packages/db/src/collection/sync.ts index 1f50cc889..25dc63c15 100644 --- a/packages/db/src/collection/sync.ts +++ b/packages/db/src/collection/sync.ts @@ -18,6 +18,7 @@ import type { LoadSubsetOptions, OptimisticChangeMessage, SyncConfigRes, + SyncMetadataApi, } from '../types' import type { CollectionImpl } from './index.js' import type { CollectionStateManager } from './state' @@ -93,6 +94,8 @@ export class CollectionSyncManager< committed: false, operations: [], deletedKeys: new Set(), + rowMetadataWrites: new Map(), + collectionMetadataWrites: new Map(), immediate: options?.immediate, }) }, @@ -169,6 +172,23 @@ export class CollectionSyncManager< if (messageType === `delete`) { pendingTransaction.deletedKeys.add(key) + pendingTransaction.rowMetadataWrites.set(key, { type: `delete` }) + } else if (messageType === `insert`) { + if (message.metadata !== undefined) { + pendingTransaction.rowMetadataWrites.set(key, { + type: `set`, + value: message.metadata, + }) + } else { + pendingTransaction.rowMetadataWrites.set(key, { + type: `delete`, + }) + } + } else if (message.metadata !== undefined) { + pendingTransaction.rowMetadataWrites.set(key, { + type: `set`, + value: message.metadata, + }) } }, commit: () => { @@ -205,6 +225,7 @@ export class CollectionSyncManager< // Clear all operations from the current transaction pendingTransaction.operations = [] pendingTransaction.deletedKeys.clear() + pendingTransaction.rowMetadataWrites.clear() // Mark the transaction as a truncate operation. During commit, this triggers: // - Delete events for all previously synced keys (excluding optimistic-deleted keys) @@ -220,6 +241,7 @@ export class CollectionSyncManager< deletes: new Set(this.state.optimisticDeletes), } }, + metadata: this.createSyncMetadataApi(), }), ) @@ -245,6 +267,113 @@ export class CollectionSyncManager< } } + private getActivePendingSyncTransaction() { + const pendingTransaction = + this.state.pendingSyncedTransactions[ + this.state.pendingSyncedTransactions.length - 1 + ] + + if (!pendingTransaction) { + throw new NoPendingSyncTransactionWriteError() + } + if (pendingTransaction.committed) { + throw new SyncTransactionAlreadyCommittedWriteError() + } + + return pendingTransaction + } + + private createSyncMetadataApi(): SyncMetadataApi { + return { + row: { + get: (key) => { + const pendingTransaction = + this.state.pendingSyncedTransactions[ + this.state.pendingSyncedTransactions.length - 1 + ] + const pendingWrite = pendingTransaction?.rowMetadataWrites.get(key) + if (pendingWrite) { + return pendingWrite.type === `delete` + ? undefined + : pendingWrite.value + } + if (pendingTransaction?.truncate) { + return undefined + } + return this.state.syncedMetadata.get(key) + }, + set: (key, metadata) => { + const pendingTransaction = this.getActivePendingSyncTransaction() + pendingTransaction.rowMetadataWrites.set(key, { + type: `set`, + value: metadata, + }) + }, + delete: (key) => { + const pendingTransaction = this.getActivePendingSyncTransaction() + pendingTransaction.rowMetadataWrites.set(key, { + type: `delete`, + }) + }, + }, + collection: { + get: (key) => { + const pendingTransaction = + this.state.pendingSyncedTransactions[ + this.state.pendingSyncedTransactions.length - 1 + ] + const pendingWrite = + pendingTransaction?.collectionMetadataWrites.get(key) + if (pendingWrite) { + return pendingWrite.type === `delete` + ? undefined + : pendingWrite.value + } + return this.state.syncedCollectionMetadata.get(key) + }, + set: (key, value) => { + const pendingTransaction = this.getActivePendingSyncTransaction() + pendingTransaction.collectionMetadataWrites.set(key, { + type: `set`, + value, + }) + }, + delete: (key) => { + const pendingTransaction = this.getActivePendingSyncTransaction() + pendingTransaction.collectionMetadataWrites.set(key, { + type: `delete`, + }) + }, + list: (prefix) => { + const merged = new Map(this.state.syncedCollectionMetadata) + const pendingTransaction = + this.state.pendingSyncedTransactions[ + this.state.pendingSyncedTransactions.length - 1 + ] + if (pendingTransaction) { + for (const [ + key, + pendingWrite, + ] of pendingTransaction.collectionMetadataWrites) { + if (pendingWrite.type === `delete`) { + merged.delete(key) + } else { + merged.set(key, pendingWrite.value) + } + } + } + + return Array.from(merged.entries()) + .filter(([key]) => (prefix ? key.startsWith(prefix) : true)) + .map(([key, value]) => ({ + key, + value, + })) + }, + }, + } + } + /** * Preload the collection data by starting sync if not already started * Multiple concurrent calls will share the same promise diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index 9d84a1099..0dbd01780 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -339,6 +339,7 @@ export interface SyncConfig< commit: () => void markReady: () => void truncate: () => void + metadata?: SyncMetadataApi }) => void | CleanupFn | SyncConfigRes /** @@ -357,6 +358,25 @@ export interface SyncConfig< rowUpdateMode?: `partial` | `full` } +export interface SyncMetadataApi< + TKey extends string | number = string | number, +> { + row: { + get: (key: TKey) => unknown | undefined + set: (key: TKey, metadata: unknown) => void + delete: (key: TKey) => void + } + collection: { + get: (key: string) => unknown | undefined + set: (key: string, value: unknown) => void + delete: (key: string) => void + list: (prefix?: string) => ReadonlyArray<{ + key: string + value: unknown + }> + } +} + export interface ChangeMessage< T extends object = Record, TKey extends string | number = string | number, diff --git a/packages/db/tests/collection.test.ts b/packages/db/tests/collection.test.ts index 17f9a2ed9..c5d09039e 100644 --- a/packages/db/tests/collection.test.ts +++ b/packages/db/tests/collection.test.ts @@ -1400,7 +1400,7 @@ describe(`Collection`, () => { value: `should not be cleared`, }) expect(collection._state.syncedData.size).toBe(1) - expect(collection._state.syncedMetadata.size).toBe(1) + expect(collection._state.syncedMetadata.size).toBe(0) }) it(`should handle truncate with empty collection`, async () => { @@ -1440,6 +1440,224 @@ describe(`Collection`, () => { expect(collection._state.syncedMetadata.size).toBe(0) }) + it(`should allow startup metadata reads and commit metadata-only sync transactions`, async () => { + let observedCollectionMetadata: unknown + let testSyncFunctions: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `sync-metadata-startup-read-test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, commit, markReady, metadata }) => { + observedCollectionMetadata = metadata?.collection.get(`startup:key`) + + begin() + metadata?.collection.set(`startup:key`, { ready: true }) + commit() + markReady() + + testSyncFunctions = { begin, commit, metadata } + }, + }, + }) + + await collection.stateWhenReady() + + expect(observedCollectionMetadata).toBeUndefined() + expect( + collection._state.syncedCollectionMetadata.get(`startup:key`), + ).toEqual({ + ready: true, + }) + + const { begin, commit, metadata } = testSyncFunctions + begin() + metadata.collection.set(`runtime:key`, { persisted: true }) + commit() + + expect( + collection._state.syncedCollectionMetadata.get(`runtime:key`), + ).toEqual({ persisted: true }) + }) + + it(`should use last-write-wins for row metadata in sync transactions`, async () => { + let testSyncFunctions: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `sync-row-metadata-last-write-test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, markReady, metadata }) => { + begin() + write({ + type: `insert`, + value: { id: 1, value: `initial` }, + metadata: { source: `write` }, + }) + metadata?.row.set(1, { source: `explicit-set` }) + commit() + markReady() + + testSyncFunctions = { begin, write, commit, metadata } + }, + }, + }) + + await collection.stateWhenReady() + + expect(collection._state.syncedMetadata.get(1)).toEqual({ + source: `explicit-set`, + }) + + const { begin, write, commit, metadata } = testSyncFunctions + begin() + metadata.row.set(1, { source: `set-first` }) + write({ + type: `update`, + value: { id: 1, value: `updated` }, + metadata: { source: `write-last` }, + }) + commit() + + expect(collection._state.syncedMetadata.get(1)).toEqual({ + source: `write-last`, + }) + }) + + it(`should delete row metadata when sync deletes the row`, async () => { + let testSyncFunctions: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `sync-row-metadata-delete-test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + write({ + type: `insert`, + value: { id: 1, value: `initial` }, + metadata: { source: `sync` }, + }) + commit() + markReady() + + testSyncFunctions = { begin, write, commit } + }, + }, + }) + + await collection.stateWhenReady() + expect(collection._state.syncedMetadata.get(1)).toEqual({ source: `sync` }) + + const { begin, write, commit } = testSyncFunctions + begin() + write({ + type: `delete`, + key: 1, + }) + commit() + + expect(collection._state.syncedMetadata.has(1)).toBe(false) + }) + + it(`should not retain a synced metadata entry for inserts without metadata`, async () => { + const collection = createCollection<{ id: number; value: string }>({ + id: `sync-insert-no-metadata-test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + write({ + type: `insert`, + value: { id: 1, value: `initial` }, + }) + commit() + markReady() + }, + }, + }) + + await collection.stateWhenReady() + + expect(collection._state.syncedMetadata.has(1)).toBe(false) + }) + + it(`should treat row metadata as cleared after truncate within the same sync transaction`, async () => { + let testSyncFunctions: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `sync-row-metadata-truncate-read-test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, markReady, metadata, truncate }) => { + begin() + write({ + type: `insert`, + value: { id: 1, value: `initial` }, + metadata: { source: `sync` }, + }) + commit() + markReady() + + testSyncFunctions = { begin, commit, metadata, truncate } + }, + }, + }) + + await collection.stateWhenReady() + expect(collection._state.syncedMetadata.get(1)).toEqual({ source: `sync` }) + + const { begin, commit, metadata, truncate } = testSyncFunctions + begin() + expect(metadata.row.get(1)).toEqual({ source: `sync` }) + metadata.collection.set(`survivor:key`, { persisted: true }) + truncate() + expect(metadata.row.get(1)).toBeUndefined() + expect(metadata.collection.get(`survivor:key`)).toEqual({ persisted: true }) + expect(metadata.collection.list()).toContainEqual({ + key: `survivor:key`, + value: { persisted: true }, + }) + commit() + }) + + it(`should preserve collection metadata across truncate unless explicitly changed`, async () => { + let testSyncFunctions: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `sync-collection-metadata-truncate-test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, commit, markReady, metadata, truncate }) => { + begin() + metadata?.collection.set(`startup:key`, { ready: true }) + commit() + markReady() + + testSyncFunctions = { begin, commit, metadata, truncate } + }, + }, + }) + + await collection.stateWhenReady() + + const { begin, commit, metadata, truncate } = testSyncFunctions + begin() + truncate() + expect(metadata.collection.get(`startup:key`)).toEqual({ ready: true }) + expect(metadata.collection.list()).toContainEqual({ + key: `startup:key`, + value: { ready: true }, + }) + commit() + }) + it(`open sync transaction isn't applied when optimistic mutation is resolved/rejected`, async () => { type Row = { id: number; name: string } diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 1197e7734..9761feca4 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -47,6 +47,7 @@ import type { ControlMessage, GetExtensions, Message, + Offset, PostgresSnapshot, Row, ShapeStreamOptions, @@ -322,6 +323,16 @@ function parseSnapshotMessage(message: SnapshotEndMessage): PostgresSnapshot { } } +function getStableShapeIdentity(shapeOptions: { + url: string + params?: Record +}): string { + return JSON.stringify({ + url: shapeOptions.url, + params: shapeOptions.params ?? null, + }) +} + // Check if a message contains txids in its headers function hasTxids>( message: Message, @@ -1181,7 +1192,61 @@ function createElectricSync>( return { sync: (params: Parameters[`sync`]>[0]) => { - const { begin, write, commit, markReady, truncate, collection } = params + const { + begin, + write, + commit, + markReady, + truncate, + collection, + metadata, + } = params + const readPersistedResumeState = () => { + const persistedResumeState = metadata?.collection.get(`electric:resume`) + if (!persistedResumeState || typeof persistedResumeState !== `object`) { + return undefined + } + + const record = persistedResumeState as Record + if ( + record.kind === `resume` && + typeof record.offset === `string` && + typeof record.handle === `string` && + typeof record.shapeId === `string` && + typeof record.updatedAt === `number` + ) { + return { + kind: `resume` as const, + offset: record.offset, + handle: record.handle, + shapeId: record.shapeId, + updatedAt: record.updatedAt, + } + } + + if (record.kind === `reset` && typeof record.updatedAt === `number`) { + return { + kind: `reset` as const, + updatedAt: record.updatedAt, + } + } + + return undefined + } + + const persistedResumeState = readPersistedResumeState() + const shapeIdentity = getStableShapeIdentity({ + url: shapeOptions.url, + params: shapeOptions.params as Record | undefined, + }) + const hasIncompatiblePersistedResume = + persistedResumeState?.kind === `resume` && + persistedResumeState.shapeId !== shapeIdentity + const canUsePersistedResume = + shapeOptions.offset === undefined && + shapeOptions.handle === undefined && + persistedResumeState?.kind === `resume` && + !hasIncompatiblePersistedResume // Wrap markReady to wait for test hook in progressive mode let progressiveReadyGate: Promise | null = null @@ -1239,7 +1304,15 @@ function createElectricSync>( // In on-demand mode, we only need the changes from the point of time the collection was created // so we default to `now` when there is no saved offset. offset: - shapeOptions.offset ?? (syncMode === `on-demand` ? `now` : undefined), + shapeOptions.offset ?? + (canUsePersistedResume + ? (persistedResumeState.offset as Offset) + : syncMode === `on-demand` + ? `now` + : undefined), + handle: + shapeOptions.handle ?? + (canUsePersistedResume ? persistedResumeState.handle : undefined), signal: abortController.signal, onError: (errorParams) => { // Just immediately mark ready if there's an error to avoid blocking @@ -1280,6 +1353,42 @@ function createElectricSync>( // duplicate key errors when the row's data has changed between requests. const syncedKeys = new Set() + const stageResumeMetadata = () => { + if (!metadata) { + return + } + const shapeHandle = stream.shapeHandle + const lastOffset = stream.lastOffset + if (!shapeHandle || lastOffset === `-1`) { + return + } + + metadata.collection.set(`electric:resume`, { + kind: `resume`, + offset: lastOffset, + handle: shapeHandle, + shapeId: shapeIdentity, + updatedAt: Date.now(), + }) + } + + const commitResetResumeMetadataImmediately = () => { + if (!metadata) { + return + } + + begin({ immediate: true }) + metadata.collection.set(`electric:resume`, { + kind: `reset`, + updatedAt: Date.now(), + }) + commit() + } + + if (hasIncompatiblePersistedResume) { + commitResetResumeMetadataImmediately() + } + /** * Process a change message: handle tags and write the mutation */ @@ -1458,6 +1567,8 @@ function createElectricSync>( `${collectionId ? `[${collectionId}] ` : ``}Received must-refetch message, starting transaction with truncate`, ) + commitResetResumeMetadataImmediately() + // Start a transaction and truncate the collection if (!transactionStarted) { begin() @@ -1534,6 +1645,7 @@ function createElectricSync>( } // Commit the atomic swap + stageResumeMetadata() commit() // Exit buffering phase by marking that we've received up-to-date @@ -1547,8 +1659,13 @@ function createElectricSync>( // Normal mode or on-demand: commit transaction if one was started // Both up-to-date and subset-end trigger a commit if (transactionStarted) { + stageResumeMetadata() commit() transactionStarted = false + } else if (commitPoint === `up-to-date` && metadata) { + begin() + stageResumeMetadata() + commit() } } wrappedMarkReady(isBufferingInitialSync()) diff --git a/packages/electric-db-collection/tests/electric.test.ts b/packages/electric-db-collection/tests/electric.test.ts index 87b0ee44d..7824ae4ca 100644 --- a/packages/electric-db-collection/tests/electric.test.ts +++ b/packages/electric-db-collection/tests/electric.test.ts @@ -4,6 +4,7 @@ import { createCollection, createTransaction, } from '@tanstack/db' +import { persistedCollectionOptions } from '../../db-sqlite-persisted-collection-core/src' import { electricCollectionOptions, isChangeMessage } from '../src/electric' import { stripVirtualProps } from '../../db/tests/utils' import type { ElectricCollectionUtils } from '../src/electric' @@ -12,6 +13,7 @@ import type { InsertMutationFnParams, MutationFnParams, PendingMutation, + SyncMetadataApi, Transaction, TransactionWithMutations, } from '@tanstack/db' @@ -29,6 +31,8 @@ const mockStream = { fetchSnapshot: mockFetchSnapshot, forceDisconnectAndRefresh: mockForceDisconnectAndRefresh, isUpToDate: false, + shapeHandle: undefined as string | undefined, + lastOffset: `-1` as string, } vi.mock(`@electric-sql/client`, async () => { @@ -57,6 +61,61 @@ describe(`Electric Integration`, () => { ]), ) + const createInMemorySyncMetadataApi = ( + seed?: ReadonlyMap, + ): { + api: SyncMetadataApi + collectionMetadata: Map + } => { + const collectionMetadata = new Map(seed) + return { + collectionMetadata, + api: { + row: { + get: () => undefined, + set: () => {}, + delete: () => {}, + }, + collection: { + get: (key) => collectionMetadata.get(key), + set: (key, value) => { + collectionMetadata.set(key, value) + }, + delete: (key) => { + collectionMetadata.delete(key) + }, + list: (prefix) => + Array.from(collectionMetadata.entries()) + .filter(([key]) => (prefix ? key.startsWith(prefix) : true)) + .map(([key, value]) => ({ key, value })), + }, + }, + } + } + + const createPersistedAdapter = ( + collectionMetadata?: Map, + ) => ({ + loadSubset: async () => [], + loadCollectionMetadata: async () => + Array.from((collectionMetadata ?? new Map()).entries()).map( + ([key, value]) => ({ + key, + value, + }), + ), + applyCommittedTx: async (_collectionId: string, tx: any) => { + for (const mutation of tx.collectionMetadataMutations ?? []) { + if (mutation.type === `delete`) { + collectionMetadata?.delete(mutation.key) + } else { + collectionMetadata?.set(mutation.key, mutation.value) + } + } + }, + ensureIndex: async () => {}, + }) + beforeEach(() => { vi.clearAllMocks() @@ -70,6 +129,8 @@ describe(`Electric Integration`, () => { mockRequestSnapshot.mockResolvedValue(undefined) mockForceDisconnectAndRefresh.mockResolvedValue(undefined) mockStream.isUpToDate = false + mockStream.shapeHandle = undefined + mockStream.lastOffset = `-1` // Create collection with Electric configuration const config = { @@ -2928,6 +2989,469 @@ describe(`Electric Integration`, () => { }), ) }) + + it(`should use persisted resume metadata when no explicit offset or handle is provided`, async () => { + vi.clearAllMocks() + + const { ShapeStream } = await import(`@electric-sql/client`) + const metadataHarness = createInMemorySyncMetadataApi( + new Map([ + [ + `electric:resume`, + { + kind: `resume`, + offset: `10_0`, + handle: `handle-1`, + shapeId: JSON.stringify({ + url: `http://test-url`, + params: { table: `test_table` }, + }), + updatedAt: 1, + }, + ], + ]), + ) + + const baseOptions = electricCollectionOptions({ + id: `persisted-resume-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + }, + syncMode: `on-demand` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + }) + + const originalSync = baseOptions.sync + createCollection({ + ...baseOptions, + sync: { + sync: (params: Parameters[0]) => + originalSync.sync({ + ...params, + metadata: metadataHarness.api, + }), + }, + }) + + expect(ShapeStream).toHaveBeenCalledWith( + expect.objectContaining({ + offset: `10_0`, + handle: `handle-1`, + }), + ) + }) + + it(`should ignore reset resume metadata and fall back to default startup`, async () => { + vi.clearAllMocks() + + const { ShapeStream } = await import(`@electric-sql/client`) + const metadataHarness = createInMemorySyncMetadataApi( + new Map([ + [ + `electric:resume`, + { + kind: `reset`, + updatedAt: 1, + }, + ], + ]), + ) + + const baseOptions = electricCollectionOptions({ + id: `persisted-reset-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + }, + syncMode: `on-demand` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + }) + + const originalSync = baseOptions.sync + createCollection({ + ...baseOptions, + sync: { + sync: (params: Parameters[0]) => + originalSync.sync({ + ...params, + metadata: metadataHarness.api, + }), + }, + }) + + expect(ShapeStream).toHaveBeenCalledWith( + expect.objectContaining({ + offset: `now`, + handle: undefined, + }), + ) + }) + + it(`should honor persisted reset resume metadata through the persisted wrapper`, async () => { + vi.clearAllMocks() + + const { ShapeStream } = await import(`@electric-sql/client`) + const collectionMetadata = new Map([ + [ + `electric:resume`, + { + kind: `reset`, + updatedAt: 1, + }, + ], + ]) + + const persistedCollection = createCollection( + persistedCollectionOptions({ + ...(electricCollectionOptions({ + id: `persisted-wrapper-reset-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + }, + syncMode: `on-demand` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + }) as any), + persistence: { + adapter: createPersistedAdapter(collectionMetadata), + }, + }) as any, + ) + + persistedCollection.startSyncImmediate() + await new Promise((resolve) => setTimeout(resolve, 0)) + + expect(ShapeStream).toHaveBeenCalledWith( + expect.objectContaining({ + offset: `now`, + handle: undefined, + }), + ) + }) + + it(`should not mix explicit handle with persisted offset`, async () => { + vi.clearAllMocks() + + const { ShapeStream } = await import(`@electric-sql/client`) + const metadataHarness = createInMemorySyncMetadataApi( + new Map([ + [ + `electric:resume`, + { + kind: `resume`, + offset: `10_0`, + handle: `persisted-handle`, + shapeId: JSON.stringify({ + url: `http://test-url`, + params: { table: `test_table` }, + }), + updatedAt: 1, + }, + ], + ]), + ) + + const baseOptions = electricCollectionOptions({ + id: `persisted-partial-override-handle-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + handle: `explicit-handle`, + }, + syncMode: `on-demand` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + }) + + const originalSync = baseOptions.sync + createCollection({ + ...baseOptions, + sync: { + sync: (params: Parameters[0]) => + originalSync.sync({ + ...params, + metadata: metadataHarness.api, + }), + }, + }) + + expect(ShapeStream).toHaveBeenCalledWith( + expect.objectContaining({ + offset: `now`, + handle: `explicit-handle`, + }), + ) + }) + + it(`should not mix explicit offset with persisted handle`, async () => { + vi.clearAllMocks() + + const { ShapeStream } = await import(`@electric-sql/client`) + const metadataHarness = createInMemorySyncMetadataApi( + new Map([ + [ + `electric:resume`, + { + kind: `resume`, + offset: `10_0`, + handle: `persisted-handle`, + shapeId: JSON.stringify({ + url: `http://test-url`, + params: { table: `test_table` }, + }), + updatedAt: 1, + }, + ], + ]), + ) + + const baseOptions = electricCollectionOptions({ + id: `persisted-partial-override-offset-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + offset: -1 as any, + }, + syncMode: `on-demand` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + }) + + const originalSync = baseOptions.sync + createCollection({ + ...baseOptions, + sync: { + sync: (params: Parameters[0]) => + originalSync.sync({ + ...params, + metadata: metadataHarness.api, + }), + }, + }) + + expect(ShapeStream).toHaveBeenCalledWith( + expect.objectContaining({ + offset: -1, + handle: undefined, + }), + ) + }) + + it(`should ignore malformed persisted resume metadata`, async () => { + vi.clearAllMocks() + + const { ShapeStream } = await import(`@electric-sql/client`) + const metadataHarness = createInMemorySyncMetadataApi( + new Map([ + [ + `electric:resume`, + { + kind: `resume`, + offset: 10, + updatedAt: 1, + }, + ], + ]), + ) + + const baseOptions = electricCollectionOptions({ + id: `persisted-malformed-resume-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + }, + syncMode: `on-demand` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + }) + + const originalSync = baseOptions.sync + createCollection({ + ...baseOptions, + sync: { + sync: (params: Parameters[0]) => + originalSync.sync({ + ...params, + metadata: metadataHarness.api, + }), + }, + }) + + expect(ShapeStream).toHaveBeenCalledWith( + expect.objectContaining({ + offset: `now`, + handle: undefined, + }), + ) + }) + + it(`should reset and fall back when persisted resume identity is incompatible`, async () => { + vi.clearAllMocks() + + const { ShapeStream } = await import(`@electric-sql/client`) + const metadataHarness = createInMemorySyncMetadataApi( + new Map([ + [ + `electric:resume`, + { + kind: `resume`, + offset: `10_0`, + handle: `handle-1`, + shapeId: `{"url":"http://other-url","params":{"table":"test_table"}}`, + updatedAt: 1, + }, + ], + ]), + ) + + const baseOptions = electricCollectionOptions({ + id: `persisted-incompatible-resume-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + }, + syncMode: `on-demand` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + }) + + const originalSync = baseOptions.sync + createCollection({ + ...baseOptions, + sync: { + sync: (params: Parameters[0]) => + originalSync.sync({ + ...params, + metadata: metadataHarness.api, + }), + }, + }) + + expect(ShapeStream).toHaveBeenCalledWith( + expect.objectContaining({ + offset: `now`, + handle: undefined, + }), + ) + expect(metadataHarness.collectionMetadata.get(`electric:resume`)).toEqual( + expect.objectContaining({ + kind: `reset`, + }), + ) + }) + + it(`should persist reset resume metadata immediately on must-refetch`, () => { + const metadataHarness = createInMemorySyncMetadataApi() + const baseOptions = electricCollectionOptions({ + id: `must-refetch-reset-metadata-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + }, + getKey: (item: Row) => item.id as number, + startSync: true, + }) + + const originalSync = baseOptions.sync + createCollection({ + ...baseOptions, + sync: { + sync: (params: Parameters[0]) => + originalSync.sync({ + ...params, + metadata: metadataHarness.api, + }), + }, + }) + + subscriber([ + { + headers: { control: `must-refetch` }, + }, + ]) + + expect(metadataHarness.collectionMetadata.get(`electric:resume`)).toEqual( + expect.objectContaining({ + kind: `reset`, + }), + ) + }) + + it(`should only advance resume metadata when a batch commits`, () => { + const metadataHarness = createInMemorySyncMetadataApi() + mockStream.shapeHandle = `shape-1` + mockStream.lastOffset = `10_0` + + const baseOptions = electricCollectionOptions({ + id: `resume-commit-boundary-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + }, + getKey: (item: Row) => item.id as number, + startSync: true, + }) + + const originalSync = baseOptions.sync + createCollection({ + ...baseOptions, + sync: { + sync: (params: Parameters[0]) => + originalSync.sync({ + ...params, + metadata: metadataHarness.api, + }), + }, + }) + + subscriber([ + { + key: `1`, + value: { id: 1, name: `Before commit` }, + headers: { operation: `insert` }, + }, + ]) + + expect(metadataHarness.collectionMetadata.has(`electric:resume`)).toBe( + false, + ) + + subscriber([ + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(metadataHarness.collectionMetadata.get(`electric:resume`)).toEqual( + expect.objectContaining({ + kind: `resume`, + offset: `10_0`, + handle: `shape-1`, + }), + ) + }) }) // Tests for overlapping subset queries with duplicate keys diff --git a/packages/query-db-collection/src/query.ts b/packages/query-db-collection/src/query.ts index 7bc8f532b..62efb144e 100644 --- a/packages/query-db-collection/src/query.ts +++ b/packages/query-db-collection/src/query.ts @@ -16,6 +16,7 @@ import type { InsertMutationFnParams, LoadSubsetOptions, SyncConfig, + SyncMetadataApi, UpdateMutationFnParams, UtilsRecord, } from '@tanstack/db' @@ -118,6 +119,7 @@ export interface QueryCollectionConfig< TQueryData, TQueryKey >[`staleTime`] + persistedGcTime?: number /** * Metadata to pass to the query. @@ -219,6 +221,35 @@ interface QueryCollectionState { > } +type PersistedQueryRetentionEntry = + | { + queryHash: string + mode: `ttl` + expiresAt: number + } + | { + queryHash: string + mode: `until-revalidated` + } + +const QUERY_COLLECTION_GC_PREFIX = `queryCollection:gc:` + +type PersistedScannedRowForQuery = { + key: string | number + value: TItem + metadata?: unknown +} + +type QuerySyncMetadataWithPersistedScan = SyncMetadataApi< + string | number +> & { + row: SyncMetadataApi[`row`] & { + scanPersisted?: (options?: { + metadataOnly?: boolean + }) => Promise>> + } +} + /** * Implementation class for QueryCollectionUtils with explicit dependency injection * for better testability and architectural clarity @@ -547,6 +578,7 @@ export function queryCollectionOptions( retry, retryDelay, staleTime, + persistedGcTime, getKey, onInsert, onUpdate, @@ -645,10 +677,332 @@ export function queryCollectionOptions( } const internalSync: SyncConfig[`sync`] = (params) => { - const { begin, write, commit, markReady, collection } = params + const { begin, write, commit, markReady, collection, metadata } = params + const persistedMetadata = metadata as + | QuerySyncMetadataWithPersistedScan + | undefined // Track whether sync has been started let syncStarted = false + let startupRetentionSettled = false + const retainedQueriesPendingRevalidation = new Set() + const persistedRetentionTimers = new Map< + string, + ReturnType + >() + let persistedRetentionMaintenance = Promise.resolve() + + const getRowMetadata = (rowKey: string | number) => { + return (metadata?.row.get(rowKey) ?? + collection._state.syncedMetadata.get(rowKey)) as + | Record + | undefined + } + + const getPersistedOwners = (rowKey: string | number) => { + const rowMetadata = getRowMetadata(rowKey) + const queryMetadata = rowMetadata?.queryCollection + if (!queryMetadata || typeof queryMetadata !== `object`) { + return new Set() + } + + const owners = (queryMetadata as Record).owners + if (!owners || typeof owners !== `object`) { + return new Set() + } + + return new Set(Object.keys(owners as Record)) + } + + const setPersistedOwners = ( + rowKey: string | number, + owners: Set, + ) => { + if (!metadata) { + return + } + + const currentMetadata = { ...(getRowMetadata(rowKey) ?? {}) } + if (owners.size === 0) { + delete currentMetadata.queryCollection + if (Object.keys(currentMetadata).length === 0) { + metadata.row.delete(rowKey) + } else { + metadata.row.set(rowKey, currentMetadata) + } + return + } + + metadata.row.set(rowKey, { + ...currentMetadata, + queryCollection: { + owners: Object.fromEntries( + Array.from(owners.values()).map((owner) => [owner, true]), + ), + }, + }) + } + + const parsePersistedQueryRetentionEntry = ( + value: unknown, + expectedHash: string, + ): PersistedQueryRetentionEntry | undefined => { + if (!value || typeof value !== `object`) { + return undefined + } + + const record = value as Record + if (record.queryHash !== expectedHash) { + return undefined + } + + if (record.mode === `until-revalidated`) { + return { + queryHash: expectedHash, + mode: `until-revalidated`, + } + } + + if ( + record.mode === `ttl` && + typeof record.expiresAt === `number` && + Number.isFinite(record.expiresAt) + ) { + return { + queryHash: expectedHash, + mode: `ttl`, + expiresAt: record.expiresAt, + } + } + + return undefined + } + + const runPersistedRetentionMaintenance = (task: () => Promise) => { + persistedRetentionMaintenance = persistedRetentionMaintenance.then( + task, + task, + ) + return persistedRetentionMaintenance + } + + const cancelPersistedRetentionExpiry = (hashedQueryKey: string) => { + const timer = persistedRetentionTimers.get(hashedQueryKey) + if (timer) { + clearTimeout(timer) + persistedRetentionTimers.delete(hashedQueryKey) + } + } + + const getHydratedOwnedRowsForQueryBaseline = (hashedQueryKey: string) => { + const knownRows = queryToRows.get(hashedQueryKey) + if (knownRows) { + return new Set(knownRows) + } + + const ownedRows = new Set() + for (const [rowKey] of collection._state.syncedData.entries()) { + const owners = getPersistedOwners(rowKey) + if (owners.size === 0) { + continue + } + + rowToQueries.set(rowKey, new Set(owners)) + owners.forEach((owner) => { + const queryToRowsSet = queryToRows.get(owner) || new Set() + queryToRowsSet.add(rowKey) + queryToRows.set(owner, queryToRowsSet) + }) + + if (owners.has(hashedQueryKey)) { + ownedRows.add(rowKey) + } + } + return ownedRows + } + + const loadPersistedBaselineForQuery = async ( + hashedQueryKey: string, + ): Promise< + Map< + string | number, + { + value: any + owners: Set + } + > + > => { + const knownRows = queryToRows.get(hashedQueryKey) + if ( + knownRows && + Array.from(knownRows).every((rowKey) => collection.has(rowKey)) + ) { + const baseline = new Map< + string | number, + { value: any; owners: Set } + >() + knownRows.forEach((rowKey) => { + const value = collection.get(rowKey) + const owners = rowToQueries.get(rowKey) + if (value && owners) { + baseline.set(rowKey, { + value, + owners: new Set(owners), + }) + } + }) + return baseline + } + + const scanPersisted = persistedMetadata?.row.scanPersisted + if (!scanPersisted) { + const baseline = new Map< + string | number, + { value: any; owners: Set } + >() + getHydratedOwnedRowsForQueryBaseline(hashedQueryKey).forEach( + (rowKey) => { + const value = collection.get(rowKey) + const owners = rowToQueries.get(rowKey) + if (value && owners) { + baseline.set(rowKey, { + value, + owners: new Set(owners), + }) + } + }, + ) + return baseline + } + + const baseline = new Map< + string | number, + { value: any; owners: Set } + >() + const scannedRows = await scanPersisted() + + scannedRows.forEach((row) => { + const rowMetadata = row.metadata as Record | undefined + const queryMetadata = rowMetadata?.queryCollection + if (!queryMetadata || typeof queryMetadata !== `object`) { + return + } + + const owners = (queryMetadata as Record).owners + if (!owners || typeof owners !== `object`) { + return + } + + const ownerSet = new Set(Object.keys(owners as Record)) + if (ownerSet.size === 0) { + return + } + + rowToQueries.set(row.key, new Set(ownerSet)) + ownerSet.forEach((owner) => { + const queryToRowsSet = queryToRows.get(owner) || new Set() + queryToRowsSet.add(row.key) + queryToRows.set(owner, queryToRowsSet) + }) + + if (ownerSet.has(hashedQueryKey)) { + baseline.set(row.key, { + value: row.value, + owners: ownerSet, + }) + } + }) + + return baseline + } + + const cleanupPersistedPlaceholder = async (hashedQueryKey: string) => { + if (!metadata) { + return + } + + const baseline = await loadPersistedBaselineForQuery(hashedQueryKey) + const rowsToDelete: Array = [] + + begin() + + baseline.forEach(({ value: oldItem, owners }, rowKey) => { + owners.delete(hashedQueryKey) + setPersistedOwners(rowKey, owners) + const needToRemove = removeRow(rowKey, hashedQueryKey) + if (needToRemove) { + rowsToDelete.push(oldItem) + } + }) + + rowsToDelete.forEach((row) => { + write({ type: `delete`, value: row }) + }) + + metadata.collection.delete( + `${QUERY_COLLECTION_GC_PREFIX}${hashedQueryKey}`, + ) + commit() + } + + const schedulePersistedRetentionExpiry = ( + entry: PersistedQueryRetentionEntry, + ) => { + if (entry.mode !== `ttl`) { + return + } + + cancelPersistedRetentionExpiry(entry.queryHash) + + const delay = Math.max(0, entry.expiresAt - Date.now()) + const timer = setTimeout(() => { + persistedRetentionTimers.delete(entry.queryHash) + void runPersistedRetentionMaintenance(async () => { + const currentEntry = metadata?.collection.get( + `${QUERY_COLLECTION_GC_PREFIX}${entry.queryHash}`, + ) + const parsedCurrentEntry = parsePersistedQueryRetentionEntry( + currentEntry, + entry.queryHash, + ) + if ( + !parsedCurrentEntry || + parsedCurrentEntry.mode !== `ttl` || + parsedCurrentEntry.expiresAt > Date.now() + ) { + return + } + await cleanupPersistedPlaceholder(entry.queryHash) + }) + }, delay) + + persistedRetentionTimers.set(entry.queryHash, timer) + } + + const consumePersistedQueryRetentionAtStartup = async () => { + if (!metadata) { + return + } + + const retentionEntries = metadata.collection.list( + QUERY_COLLECTION_GC_PREFIX, + ) + const now = Date.now() + + for (const { key, value } of retentionEntries) { + const hashedQueryKey = key.slice(QUERY_COLLECTION_GC_PREFIX.length) + const parsed = parsePersistedQueryRetentionEntry(value, hashedQueryKey) + if (!parsed) { + continue + } + + if (parsed.mode === `ttl` && parsed.expiresAt <= now) { + await cleanupPersistedPlaceholder(parsed.queryHash) + } else if (parsed.mode === `ttl`) { + schedulePersistedRetentionExpiry(parsed) + } + } + } /** * Generate a consistent query key from LoadSubsetOptions. @@ -671,14 +1025,48 @@ export function queryCollectionOptions( } } + const startupRetentionEntries = metadata?.collection.list( + QUERY_COLLECTION_GC_PREFIX, + ) + const startupRetentionMaintenancePromise = + !startupRetentionEntries || startupRetentionEntries.length === 0 + ? (() => { + startupRetentionSettled = true + return Promise.resolve() + })() + : runPersistedRetentionMaintenance(async () => { + try { + await consumePersistedQueryRetentionAtStartup() + } finally { + startupRetentionSettled = true + } + }) + const createQueryFromOpts = ( opts: LoadSubsetOptions = {}, queryFunction: typeof queryFn = queryFn, ): true | Promise => { + if (!startupRetentionSettled) { + return startupRetentionMaintenancePromise.then(() => { + const resumed = createQueryFromOpts(opts, queryFunction) + return resumed === true ? undefined : resumed + }) + } + // Generate key using common function const key = generateQueryKeyFromOptions(opts) const hashedQueryKey = hashKey(key) const extendedMeta = { ...meta, loadSubsetOptions: opts } + const retainedEntry = metadata?.collection.get( + `${QUERY_COLLECTION_GC_PREFIX}${hashedQueryKey}`, + ) + if ( + parsePersistedQueryRetentionEntry(retainedEntry, hashedQueryKey) !== + undefined + ) { + retainedQueriesPendingRevalidation.add(hashedQueryKey) + } + cancelPersistedRetentionExpiry(hashedQueryKey) if (state.observers.has(hashedQueryKey)) { // We already have a query for this queryKey @@ -803,65 +1191,130 @@ export function queryCollectionOptions( type UpdateHandler = Parameters[0] - // eslint-disable-next-line no-shadow - const makeQueryResultHandler = (queryKey: QueryKey) => { + const applySuccessfulResult = ( + queryKey: QueryKey, + result: QueryObserverResult, + persistedBaseline?: Map< + string | number, + { + value: any + owners: Set + } + >, + ) => { const hashedQueryKey = hashKey(queryKey) - const handleQueryResult: UpdateHandler = (result) => { - if (result.isSuccess) { - // Clear error state - state.lastError = undefined - state.errorCount = 0 - const rawData = result.data - const newItemsArray = select ? select(rawData) : rawData + if (collection.status === `cleaned-up`) { + return + } - if ( - !Array.isArray(newItemsArray) || - newItemsArray.some((item) => typeof item !== `object`) - ) { - const errorMessage = select - ? `@tanstack/query-db-collection: select() must return an array of objects. Got: ${typeof newItemsArray} for queryKey ${JSON.stringify(queryKey)}` - : `@tanstack/query-db-collection: queryFn must return an array of objects. Got: ${typeof newItemsArray} for queryKey ${JSON.stringify(queryKey)}` + // Clear error state + state.lastError = undefined + state.errorCount = 0 - console.error(errorMessage) - return - } + const rawData = result.data + const newItemsArray = select ? select(rawData) : rawData - const currentSyncedItems: Map = new Map( - collection._state.syncedData.entries(), - ) - const newItemsMap = new Map() - newItemsArray.forEach((item) => { - const key = getKey(item) - newItemsMap.set(key, item) - }) + if ( + !Array.isArray(newItemsArray) || + newItemsArray.some((item) => typeof item !== `object`) + ) { + const errorMessage = select + ? `@tanstack/query-db-collection: select() must return an array of objects. Got: ${typeof newItemsArray} for queryKey ${JSON.stringify(queryKey)}` + : `@tanstack/query-db-collection: queryFn must return an array of objects. Got: ${typeof newItemsArray} for queryKey ${JSON.stringify(queryKey)}` - begin() + console.error(errorMessage) + return + } - currentSyncedItems.forEach((oldItem, key) => { - const newItem = newItemsMap.get(key) - if (!newItem) { - const needToRemove = removeRow(key, hashedQueryKey) // returns true if the row is no longer referenced by any queries - if (needToRemove) { - write({ type: `delete`, value: oldItem }) - } - } else if (!deepEquals(oldItem, newItem)) { - // Only update if there are actual differences in the properties - write({ type: `update`, value: newItem }) - } - }) + const currentSyncedItems: Map = new Map( + collection._state.syncedData.entries(), + ) + const shouldUsePersistedBaseline = persistedBaseline !== undefined + const previouslyOwnedRows = shouldUsePersistedBaseline + ? new Set(persistedBaseline.keys()) + : getHydratedOwnedRowsForQueryBaseline(hashedQueryKey) + const newItemsMap = new Map() + newItemsArray.forEach((item) => { + const key = getKey(item) + newItemsMap.set(key, item) + }) - newItemsMap.forEach((newItem, key) => { - addRow(key, hashedQueryKey) - if (!currentSyncedItems.has(key)) { - write({ type: `insert`, value: newItem }) - } - }) + begin() + if (metadata) { + metadata.collection.delete( + `${QUERY_COLLECTION_GC_PREFIX}${hashedQueryKey}`, + ) + } - commit() + previouslyOwnedRows.forEach((key) => { + const oldItem = shouldUsePersistedBaseline + ? persistedBaseline.get(key)?.value + : currentSyncedItems.get(key) + if (!oldItem) { + return + } + const newItem = newItemsMap.get(key) + if (!newItem) { + const owners = getPersistedOwners(key) + owners.delete(hashedQueryKey) + setPersistedOwners(key, owners) + const needToRemove = removeRow(key, hashedQueryKey) + if (needToRemove) { + write({ type: `delete`, value: oldItem }) + } + } else if (!deepEquals(oldItem, newItem)) { + write({ type: `update`, value: newItem }) + } + }) - // Mark collection as ready after first successful query result - markReady() + newItemsMap.forEach((newItem, key) => { + const owners = getPersistedOwners(key) + if (!owners.has(hashedQueryKey)) { + owners.add(hashedQueryKey) + setPersistedOwners(key, owners) + } + addRow(key, hashedQueryKey) + if (!currentSyncedItems.has(key)) { + write({ type: `insert`, value: newItem }) + } + }) + + commit() + retainedQueriesPendingRevalidation.delete(hashedQueryKey) + cancelPersistedRetentionExpiry(hashedQueryKey) + + // Mark collection as ready after first successful query result + markReady() + } + + const reconcileSuccessfulResult = async ( + queryKey: QueryKey, + result: QueryObserverResult, + ) => { + const hashedQueryKey = hashKey(queryKey) + const persistedBaseline = + await loadPersistedBaselineForQuery(hashedQueryKey) + if (collection.status === `cleaned-up`) { + return + } + applySuccessfulResult(queryKey, result, persistedBaseline) + } + + // eslint-disable-next-line no-shadow + const makeQueryResultHandler = (queryKey: QueryKey) => { + const handleQueryResult: UpdateHandler = (result) => { + if (result.isSuccess) { + if (retainedQueriesPendingRevalidation.has(hashKey(queryKey))) { + void reconcileSuccessfulResult(queryKey, result).catch((error) => { + console.error( + `[QueryCollection] Error reconciling query ${String(queryKey)}:`, + error, + ) + }) + } else { + applySuccessfulResult(queryKey, result) + } } else if (result.isError) { const isNewError = result.errorUpdatedAt !== state.lastErrorUpdatedAt || @@ -943,8 +1396,15 @@ export function queryCollectionOptions( }) } } else { - // In on-demand mode, mark ready immediately since there's no initial query - markReady() + if (startupRetentionSettled) { + markReady() + } else { + // In on-demand mode, there is no initial query, but retained-placeholder + // maintenance still needs to finish before the collection is treated as ready. + void startupRetentionMaintenancePromise.then(() => { + markReady() + }) + } } // Always subscribe when sync starts (this could be from preload(), startSync config, or first subscriber) @@ -965,8 +1425,11 @@ export function queryCollectionOptions( const cleanupQueryInternal = (hashedQueryKey: string) => { unsubscribes.get(hashedQueryKey)?.() unsubscribes.delete(hashedQueryKey) + cancelPersistedRetentionExpiry(hashedQueryKey) + retainedQueriesPendingRevalidation.delete(hashedQueryKey) const rowKeys = queryToRows.get(hashedQueryKey) ?? new Set() + const nextOwnersByRow = new Map>() const rowsToDelete: Array = [] rowKeys.forEach((rowKey) => { @@ -976,22 +1439,41 @@ export function queryCollectionOptions( return } - queries.delete(hashedQueryKey) + const nextOwners = new Set(queries) + nextOwners.delete(hashedQueryKey) + nextOwnersByRow.set(rowKey, nextOwners) + + if (nextOwners.size === 0 && collection.has(rowKey)) { + rowsToDelete.push(collection.get(rowKey)) + } + }) + + const shouldWriteMetadata = + metadata !== undefined && nextOwnersByRow.size > 0 + const needsTransaction = shouldWriteMetadata || rowsToDelete.length > 0 + if (needsTransaction) { + begin() + } - if (queries.size === 0) { + nextOwnersByRow.forEach((owners, rowKey) => { + if (owners.size === 0) { rowToQueries.delete(rowKey) + } else { + rowToQueries.set(rowKey, owners) + } - if (collection.has(rowKey)) { - rowsToDelete.push(collection.get(rowKey)) - } + if (shouldWriteMetadata) { + setPersistedOwners(rowKey, owners) } }) if (rowsToDelete.length > 0) { - begin() rowsToDelete.forEach((row) => { write({ type: `delete`, value: row }) }) + } + + if (needsTransaction) { commit() } @@ -1034,6 +1516,39 @@ export function queryCollectionOptions( ) } + if (persistedGcTime !== undefined) { + if (metadata) { + begin() + metadata.collection.set( + `${QUERY_COLLECTION_GC_PREFIX}${hashedQueryKey}`, + { + queryHash: hashedQueryKey, + mode: + persistedGcTime === Number.POSITIVE_INFINITY + ? `until-revalidated` + : `ttl`, + ...(persistedGcTime === Number.POSITIVE_INFINITY + ? {} + : { expiresAt: Date.now() + persistedGcTime }), + }, + ) + commit() + if (persistedGcTime !== Number.POSITIVE_INFINITY) { + schedulePersistedRetentionExpiry({ + queryHash: hashedQueryKey, + mode: `ttl`, + expiresAt: Date.now() + persistedGcTime, + }) + } + } + unsubscribes.get(hashedQueryKey)?.() + unsubscribes.delete(hashedQueryKey) + state.observers.delete(hashedQueryKey) + hashToQueryKey.delete(hashedQueryKey) + queryRefCounts.set(hashedQueryKey, 0) + return + } + cleanupQueryInternal(hashedQueryKey) } @@ -1063,6 +1578,10 @@ export function queryCollectionOptions( const cleanup = async () => { unsubscribeFromCollectionEvents() unsubscribeFromQueries() + persistedRetentionTimers.forEach((timer) => { + clearTimeout(timer) + }) + persistedRetentionTimers.clear() const allQueryKeys = [...hashToQueryKey.values()] const allHashedKeys = [...state.observers.keys()] diff --git a/packages/query-db-collection/tests/query.test.ts b/packages/query-db-collection/tests/query.test.ts index 9ad0f251f..42171a58e 100644 --- a/packages/query-db-collection/tests/query.test.ts +++ b/packages/query-db-collection/tests/query.test.ts @@ -1,5 +1,5 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' -import { QueryClient } from '@tanstack/query-core' +import { QueryClient, hashKey } from '@tanstack/query-core' import { createCollection, createLiveQueryCollection, @@ -8,12 +8,14 @@ import { or, } from '@tanstack/db' import { stripVirtualProps } from '../../db/tests/utils' +import { persistedCollectionOptions } from '../../db-sqlite-persisted-collection-core/src' import { queryCollectionOptions } from '../src/query' import type { QueryFunctionContext } from '@tanstack/query-core' import type { Collection, DeleteMutationFnParams, InsertMutationFnParams, + SyncMetadataApi, TransactionWithMutations, UpdateMutationFnParams, } from '@tanstack/db' @@ -36,6 +38,125 @@ const getKey = (item: TestItem) => item.id // Helper to advance timers and allow microtasks to flush const flushPromises = () => new Promise((resolve) => setTimeout(resolve, 0)) +function createInMemorySyncMetadataApi< + TKey extends string | number = string | number, + TItem extends object = Record, +>(seed?: { + rowMetadata?: ReadonlyMap + collectionMetadata?: ReadonlyMap + persistedRows?: ReadonlyMap +}): { + api: SyncMetadataApi + rowMetadata: Map + collectionMetadata: Map + persistedRows: Map +} { + const rowMetadata = new Map(seed?.rowMetadata) + const collectionMetadata = new Map(seed?.collectionMetadata) + const persistedRows = new Map(seed?.persistedRows) + const api = { + row: { + get: (key: TKey) => rowMetadata.get(key), + set: (key: TKey, value: unknown) => { + rowMetadata.set(key, value) + }, + delete: (key: TKey) => { + rowMetadata.delete(key) + }, + scanPersisted: async () => + Array.from(persistedRows.entries()).map(([key, value]) => ({ + key, + value, + metadata: rowMetadata.get(key), + })), + }, + collection: { + get: (key: string) => collectionMetadata.get(key), + set: (key: string, value: unknown) => { + collectionMetadata.set(key, value) + }, + delete: (key: string) => { + collectionMetadata.delete(key) + }, + list: (prefix?: string) => + Array.from(collectionMetadata.entries()) + .filter(([key]) => (prefix ? key.startsWith(prefix) : true)) + .map(([key, value]) => ({ key, value })), + }, + } + + return { + rowMetadata, + collectionMetadata, + persistedRows, + api: api as SyncMetadataApi, + } +} + +function createPersistedQueryAdapter( + seed: { + rows?: ReadonlyMap + rowMetadata?: ReadonlyMap + collectionMetadata?: ReadonlyMap + } = {}, +) { + const rows = new Map(seed.rows) + const rowMetadata = new Map(seed.rowMetadata) + const collectionMetadata = new Map(seed.collectionMetadata) + + return { + rows, + rowMetadata, + collectionMetadata, + loadSubset: async () => + Array.from(rows.values()).map((value) => ({ + key: value.id, + value, + metadata: rowMetadata.get(value.id), + })), + loadCollectionMetadata: async () => + Array.from(collectionMetadata.entries()).map(([key, value]) => ({ + key, + value, + })), + scanRows: async () => + Array.from(rows.values()).map((value) => ({ + key: value.id, + value, + metadata: rowMetadata.get(value.id), + })), + applyCommittedTx: async (_collectionId: string, tx: any) => { + if (tx.truncate) { + rows.clear() + rowMetadata.clear() + } + for (const mutation of tx.mutations) { + if (mutation.type === `delete`) { + rows.delete(mutation.key) + rowMetadata.delete(mutation.key) + } else { + rows.set(mutation.key, mutation.value) + } + } + for (const mutation of tx.rowMetadataMutations ?? []) { + if (mutation.type === `delete`) { + rowMetadata.delete(mutation.key) + } else { + rowMetadata.set(mutation.key, mutation.value) + } + } + for (const mutation of tx.collectionMetadataMutations ?? []) { + if (mutation.type === `delete`) { + collectionMetadata.delete(mutation.key) + } else { + collectionMetadata.set(mutation.key, mutation.value) + } + } + }, + ensureIndex: async () => {}, + } +} + describe(`QueryCollection`, () => { let queryClient: QueryClient @@ -4275,6 +4396,472 @@ describe(`QueryCollection`, () => { }) }) + it(`should diff against retained query-owned rows on warm start`, async () => { + const baseQueryKey = [`persisted-baseline-test`] + const queryFn = vi.fn().mockResolvedValue([]) + + const config: QueryCollectionConfig = { + id: `persisted-baseline-test`, + queryClient, + queryKey: baseQueryKey, + queryFn, + getKey: (item) => item.id, + syncMode: `eager`, + startSync: false, + } + + const baseOptions = queryCollectionOptions(config) + const originalSync = baseOptions.sync + const ownedRow = { id: `1`, name: `Owned row`, category: `A` } + const unrelatedRow = { id: `2`, name: `Unrelated row`, category: `B` } + const ownedQueryHash = hashKey(baseQueryKey) + const metadataHarness = createInMemorySyncMetadataApi({ + rowMetadata: new Map([ + [ + ownedRow.id, + { + queryCollection: { + owners: { + [ownedQueryHash]: true, + }, + }, + }, + ], + ]), + collectionMetadata: new Map([ + [ + `queryCollection:gc:${ownedQueryHash}`, + { + queryHash: ownedQueryHash, + mode: `until-revalidated`, + }, + ], + ]), + persistedRows: new Map([ + [ownedRow.id, ownedRow], + [unrelatedRow.id, unrelatedRow], + ]), + }) + + const collection = createCollection({ + ...baseOptions, + sync: { + sync: (params: Parameters[0]) => { + params.begin({ immediate: true }) + params.write({ type: `insert`, value: ownedRow }) + params.write({ type: `insert`, value: unrelatedRow }) + params.commit() + + return originalSync.sync({ + ...params, + metadata: metadataHarness.api, + }) + }, + }, + }) + + await collection.preload() + await flushPromises() + + expect(queryFn).toHaveBeenCalledTimes(1) + expect(collection.has(ownedRow.id)).toBe(false) + expect(collection.has(unrelatedRow.id)).toBe(true) + expect( + metadataHarness.collectionMetadata.has( + `queryCollection:gc:${ownedQueryHash}`, + ), + ).toBe(false) + }) + + it(`should clean up expired persisted ttl placeholders on startup`, async () => { + const baseQueryKey = [`persisted-ttl-cleanup-test`] + const queryFn = vi.fn().mockResolvedValue([]) + const expiredQueryHash = hashKey(baseQueryKey) + const otherOwnerHash = `other-owner` + + const config: QueryCollectionConfig = { + id: `persisted-ttl-cleanup-test`, + queryClient, + queryKey: baseQueryKey, + queryFn, + getKey: (item) => item.id, + syncMode: `on-demand`, + startSync: true, + } + + const baseOptions = queryCollectionOptions(config) + const originalSync = baseOptions.sync + const orphanRow = { id: `1`, name: `Orphan`, category: `A` } + const sharedRow = { id: `2`, name: `Shared`, category: `B` } + const metadataHarness = createInMemorySyncMetadataApi({ + rowMetadata: new Map([ + [ + orphanRow.id, + { + queryCollection: { + owners: { + [expiredQueryHash]: true, + }, + }, + }, + ], + [ + sharedRow.id, + { + queryCollection: { + owners: { + [expiredQueryHash]: true, + [otherOwnerHash]: true, + }, + }, + }, + ], + ]), + collectionMetadata: new Map([ + [ + `queryCollection:gc:${expiredQueryHash}`, + { + queryHash: expiredQueryHash, + mode: `ttl`, + expiresAt: Date.now() - 1_000, + }, + ], + ]), + persistedRows: new Map([ + [orphanRow.id, orphanRow], + [sharedRow.id, sharedRow], + ]), + }) + + const collection = createCollection({ + ...baseOptions, + sync: { + sync: (params: Parameters[0]) => { + params.begin({ immediate: true }) + params.write({ type: `insert`, value: orphanRow }) + params.write({ type: `insert`, value: sharedRow }) + params.commit() + + return originalSync.sync({ + ...params, + metadata: metadataHarness.api, + }) + }, + }, + }) + + await collection.stateWhenReady() + await flushPromises() + + expect(queryFn).not.toHaveBeenCalled() + expect(collection.has(orphanRow.id)).toBe(false) + expect(collection.has(sharedRow.id)).toBe(true) + expect( + metadataHarness.collectionMetadata.get( + `queryCollection:gc:${expiredQueryHash}`, + ), + ).toBeUndefined() + expect(metadataHarness.rowMetadata.get(sharedRow.id)).toEqual({ + queryCollection: { + owners: { + [otherOwnerHash]: true, + }, + }, + }) + }) + + it(`should preserve until-revalidated retained rows on startup`, async () => { + const baseQueryKey = [`persisted-until-revalidated-test`] + const queryFn = vi.fn().mockResolvedValue([]) + const retainedQueryHash = hashKey(baseQueryKey) + + const config: QueryCollectionConfig = { + id: `persisted-until-revalidated-test`, + queryClient, + queryKey: baseQueryKey, + queryFn, + getKey: (item) => item.id, + syncMode: `on-demand`, + startSync: true, + } + + const baseOptions = queryCollectionOptions(config) + const originalSync = baseOptions.sync + const retainedRow = { id: `1`, name: `Retained`, category: `A` } + const metadataHarness = createInMemorySyncMetadataApi({ + rowMetadata: new Map([ + [ + retainedRow.id, + { + queryCollection: { + owners: { + [retainedQueryHash]: true, + }, + }, + }, + ], + ]), + collectionMetadata: new Map([ + [ + `queryCollection:gc:${retainedQueryHash}`, + { + queryHash: retainedQueryHash, + mode: `until-revalidated`, + }, + ], + ]), + }) + + const collection = createCollection({ + ...baseOptions, + sync: { + sync: (params: Parameters[0]) => { + params.begin({ immediate: true }) + params.write({ type: `insert`, value: retainedRow }) + params.commit() + + return originalSync.sync({ + ...params, + metadata: metadataHarness.api, + }) + }, + }, + }) + + await collection.stateWhenReady() + await flushPromises() + + expect(queryFn).not.toHaveBeenCalled() + expect(collection.has(retainedRow.id)).toBe(true) + expect( + metadataHarness.collectionMetadata.get( + `queryCollection:gc:${retainedQueryHash}`, + ), + ).toEqual({ + queryHash: retainedQueryHash, + mode: `until-revalidated`, + }) + }) + + it(`should clean up expired retained placeholders for cold persisted rows through the persisted wrapper`, async () => { + const queryHash = hashKey([`persisted-cold-ttl-cleanup`]) + const otherOwnerHash = `other-owner` + const orphanRow = { id: `1`, name: `Cold orphan`, category: `A` } + const sharedRow = { id: `2`, name: `Cold shared`, category: `B` } + const adapter = createPersistedQueryAdapter({ + rows: new Map([ + [orphanRow.id, orphanRow], + [sharedRow.id, sharedRow], + ]), + rowMetadata: new Map([ + [ + orphanRow.id, + { + queryCollection: { + owners: { + [queryHash]: true, + }, + }, + }, + ], + [ + sharedRow.id, + { + queryCollection: { + owners: { + [queryHash]: true, + [otherOwnerHash]: true, + }, + }, + }, + ], + ]), + collectionMetadata: new Map([ + [ + `queryCollection:gc:${queryHash}`, + { + queryHash, + mode: `ttl`, + expiresAt: Date.now() - 1_000, + }, + ], + ]), + }) + + const collection = createCollection( + persistedCollectionOptions({ + ...(queryCollectionOptions({ + id: `persisted-cold-ttl-cleanup`, + queryClient, + queryKey: [`persisted-cold-ttl-cleanup`], + queryFn: async () => [], + getKey: (item: CategorisedItem): string => item.id, + syncMode: `on-demand`, + startSync: true, + }) as any), + persistence: { + adapter, + }, + }) as any, + ) + + await collection.stateWhenReady() + await flushPromises() + + expect(adapter.rows.has(orphanRow.id)).toBe(false) + expect(adapter.rows.has(sharedRow.id)).toBe(true) + expect( + adapter.collectionMetadata.has(`queryCollection:gc:${queryHash}`), + ).toBe(false) + expect(adapter.rowMetadata.get(sharedRow.id)).toEqual({ + queryCollection: { + owners: { + [otherOwnerHash]: true, + }, + }, + }) + }) + + it(`should revalidate retained queries against cold persisted baselines through the persisted wrapper`, async () => { + const queryHash = hashKey([`persisted-cold-retained`]) + const retainedRow = { id: `1`, name: `Stale retained`, category: `A` } + const adapter = createPersistedQueryAdapter({ + rows: new Map([[retainedRow.id, retainedRow]]), + rowMetadata: new Map([ + [ + retainedRow.id, + { + queryCollection: { + owners: { + [queryHash]: true, + }, + }, + }, + ], + ]), + collectionMetadata: new Map([ + [ + `queryCollection:gc:${queryHash}`, + { + queryHash, + mode: `until-revalidated`, + }, + ], + ]), + }) + + const collection = createCollection( + persistedCollectionOptions({ + ...(queryCollectionOptions({ + id: `persisted-cold-retained`, + queryClient, + queryKey: [`persisted-cold-retained`], + queryFn: async () => [], + getKey: (item: CategorisedItem): string => item.id, + syncMode: `on-demand`, + startSync: true, + }) as any), + persistence: { + adapter, + }, + }) as any, + ) + + await collection.stateWhenReady() + expect(adapter.rows.has(retainedRow.id)).toBe(true) + + const liveQuery = createLiveQueryCollection({ + query: (q) => q.from({ item: collection }), + }) + + await liveQuery.preload() + await flushPromises() + + expect(adapter.rows.has(retainedRow.id)).toBe(false) + expect( + adapter.collectionMetadata.has(`queryCollection:gc:${queryHash}`), + ).toBe(false) + }) + + it(`should expire retained ttl placeholders while the app stays open`, async () => { + vi.useFakeTimers() + try { + const baseQueryKey = [`runtime-ttl-retention-test`] + const retainedQueryHash = hashKey(baseQueryKey) + const items: Array = [ + { id: `1`, name: `Retained`, category: `A` }, + ] + const queryFn = vi.fn().mockResolvedValue(items) + + const config: QueryCollectionConfig = { + id: `runtime-ttl-retention-test`, + queryClient, + queryKey: () => baseQueryKey, + queryFn, + getKey: (item) => item.id, + syncMode: `on-demand`, + startSync: true, + persistedGcTime: 100, + } + + const baseOptions = queryCollectionOptions(config) + const originalSync = baseOptions.sync + const metadataHarness = createInMemorySyncMetadataApi< + string | number, + CategorisedItem + >({ + persistedRows: new Map(items.map((item) => [item.id, item])), + }) + + const collection = createCollection({ + ...baseOptions, + sync: { + sync: (params: Parameters[0]) => + originalSync.sync({ + ...params, + metadata: metadataHarness.api, + }), + }, + }) + + const liveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ item: collection }) + .where(({ item }) => eq(item.category, `A`)), + }) + + await liveQuery.preload() + await vi.waitFor(() => { + expect(collection.size).toBe(1) + }) + + await liveQuery.cleanup() + + expect( + metadataHarness.collectionMetadata.get( + `queryCollection:gc:${retainedQueryHash}`, + ), + ).toEqual({ + queryHash: retainedQueryHash, + mode: `ttl`, + expiresAt: expect.any(Number), + }) + + await vi.advanceTimersByTimeAsync(150) + await vi.runOnlyPendingTimersAsync() + + expect( + metadataHarness.collectionMetadata.get( + `queryCollection:gc:${retainedQueryHash}`, + ), + ).toBeUndefined() + expect(collection.has(`1`)).toBe(false) + } finally { + vi.useRealTimers() + } + }) + it(`should reset refcount after query GC and reload (stale refcount bug)`, async () => { // This test catches Bug 2: stale refcounts after GC/remove // When TanStack Query GCs a query, the refcount should be cleaned up @@ -5207,4 +5794,131 @@ describe(`QueryCollection`, () => { customQueryClient.clear() }) }) + + describe(`rows from external sync sources`, () => { + it(`should retain pre-hydrated rows when a disjoint query correctly returns empty results`, async () => { + // Simulates a warm-start scenario where a persistence layer has already + // hydrated "history" rows into the collection. Two disjoint queries + // share the collection: + // - "history": returns the same rows (but with a server delay) + // - "live": correctly returns [] (there are no live items yet) + // + // When the live query resolves first with its correct empty result, + // the history rows should remain in the collection. The live query's + // empty result only means there are no *live* items — it should not + // affect rows from a different query's domain. + + const preHydratedItems: Array = [ + { id: `1`, name: `History 1`, category: `history` }, + { id: `2`, name: `History 2`, category: `history` }, + { id: `3`, name: `History 3`, category: `history` }, + ] + + let resolveHistoryQueryFn!: (value: Array) => void + + const isQueryCategory = (category: string, where: any): boolean => { + return ( + where && + where.type === `func` && + where.name === `eq` && + where.args[0]?.path?.[0] === `category` && + where.args[1]?.value === category + ) + } + + const queryFn = vi.fn().mockImplementation((ctx: any) => { + const where = ctx.meta?.loadSubsetOptions?.where + + if (isQueryCategory(`history`, where)) { + // History query: returns data, but the server is slow + return new Promise>((resolve) => { + resolveHistoryQueryFn = resolve + }) + } + + if (isQueryCategory(`live`, where)) { + // Live query: correctly returns empty — no live items exist yet + return Promise.resolve([]) + } + + return Promise.resolve([]) + }) + + const baseQueryKey = [`warm-start-disjoint-test`] + + const baseOptions = queryCollectionOptions({ + id: `warm-start-disjoint-test`, + queryClient, + queryKey: (opts: any) => { + if (opts.where) { + return [...baseQueryKey, opts.where] + } + return baseQueryKey + }, + queryFn, + getKey: (item: CategorisedItem) => item.id, + syncMode: `on-demand`, + startSync: true, + }) + + const originalSync = baseOptions.sync + const collection = createCollection({ + ...baseOptions, + sync: { + sync: (params: Parameters[0]) => { + // Simulate a persistence layer hydrating rows from a previous + // session on warm start, before the query layer initializes. + params.begin({ immediate: true }) + for (const item of preHydratedItems) { + params.write({ type: `insert`, value: item }) + } + params.commit() + + return originalSync.sync(params) + }, + }, + }) + + // Verify the persistence layer's hydrated rows are present + expect(collection.size).toBe(3) + + // Subscribe two disjoint queries — history (delayed) and live (immediate) + const historyQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ item: collection }) + .where(({ item }) => eq(item.category, `history`)), + }) + + const liveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ item: collection }) + .where(({ item }) => eq(item.category, `live`)), + }) + + // Trigger both queries. The history queryFn is pending; the live + // queryFn resolves immediately with []. + const historyPreload = historyQuery.preload() + await liveQuery.preload() + await flushPromises() + + // The live query correctly returned [] (no live items exist). + // The pre-hydrated history rows should still be in the collection. + expect(collection.size).toBe(3) + expect(collection.has(`1`)).toBe(true) + expect(collection.has(`2`)).toBe(true) + expect(collection.has(`3`)).toBe(true) + + // Now the history query's server response arrives + resolveHistoryQueryFn(preHydratedItems) + await historyPreload + + // Collection should still have all history items + expect(collection.size).toBe(3) + + await historyQuery.cleanup() + await liveQuery.cleanup() + }) + }) })