From 785f89e5d3b78ed111eebbe15503214a6adf793f Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 17 Mar 2026 14:47:08 +0100 Subject: [PATCH 1/2] Phase 1 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/persisted.ts | 62 ++++++++- .../src/sqlite-core-adapter.ts | 91 ++++++++++++- .../tests/sqlite-core-adapter.test.ts | 2 + packages/db/src/types.ts | 17 +++ packages/query-db-collection/src/query.ts | 89 +++++++++++- .../query-db-collection/tests/query.test.ts | 127 ++++++++++++++++++ 6 files changed, 378 insertions(+), 10 deletions(-) diff --git a/packages/db-sqlite-persisted-collection-core/src/persisted.ts b/packages/db-sqlite-persisted-collection-core/src/persisted.ts index 1691b19cd..a1a737078 100644 --- a/packages/db-sqlite-persisted-collection-core/src/persisted.ts +++ b/packages/db-sqlite-persisted-collection-core/src/persisted.ts @@ -16,6 +16,7 @@ import type { DeleteMutationFnParams, InsertMutationFnParams, LoadSubsetOptions, + MetadataStorage, PendingMutation, SyncConfig, SyncConfigRes, @@ -193,7 +194,7 @@ export interface PersistenceAdapter< collectionId: string, options: LoadSubsetOptions, ctx?: { requiredIndexSignatures?: ReadonlyArray }, - ) => Promise> + ) => Promise> applyCommittedTx: ( collectionId: string, tx: PersistedTx, @@ -209,6 +210,23 @@ export interface PersistenceAdapter< latestSeq: number latestRowVersion: number }> + updateRefCounts?: ( + collectionId: string, + updates: Array<{ key: TKey; refCount: number }>, + ) => Promise + loadMetadata?: ( + collectionId: string, + key: string, + ) => Promise + storeMetadata?: ( + collectionId: string, + key: string, + value: unknown, + ) => Promise + deleteMetadata?: ( + collectionId: string, + key: string, + ) => Promise } export interface SQLiteDriver { @@ -778,6 +796,20 @@ class PersistedCollectionRuntime< return this.isHydrating } + createMetadataStorage(): MetadataStorage | undefined { + const adapter = this.persistence.adapter + if (!adapter.loadMetadata || !adapter.storeMetadata || !adapter.deleteMetadata) { + return undefined + } + const collectionId = this.collectionId + return { + load: (key: string) => adapter.loadMetadata!(collectionId, key), + store: (key: string, value: unknown) => + adapter.storeMetadata!(collectionId, key, value), + delete: (key: string) => adapter.deleteMetadata!(collectionId, key), + } + } + isApplyingInternally(): boolean { return this.internalApplyDepth > 0 } @@ -1042,7 +1074,7 @@ class PersistedCollectionRuntime< private loadSubsetRowsUnsafe( options: LoadSubsetOptions, - ): Promise> { + ): Promise> { return this.persistence.adapter.loadSubset(this.collectionId, options, { requiredIndexSignatures: this.getRequiredIndexSignatures(), }) @@ -1059,6 +1091,29 @@ class PersistedCollectionRuntime< const rows = await this.loadSubsetRowsUnsafe(options) this.applyRowsToCollection(rows) + + // Store hydrated ref counts in metadata so the query layer can + // restore them on init and protect rows from premature deletion. + const refCountEntries = rows + .filter((row) => (row.refCount ?? 0) > 0) + .map((row) => [String(row.key), row.refCount!] as const) + if (refCountEntries.length > 0 && this.persistence.adapter.storeMetadata) { + // Merge with any existing ref counts from prior hydrations + const existing = + ((await this.persistence.adapter.loadMetadata?.( + this.collectionId, + `queryTracking:rowRefCounts`, + )) as Record | undefined) ?? {} + const merged = { ...existing } + for (const [key, count] of refCountEntries) { + merged[key] = count + } + await this.persistence.adapter.storeMetadata( + this.collectionId, + `queryTracking:rowRefCounts`, + merged, + ) + } } finally { this.isHydrating = false } @@ -1930,8 +1985,11 @@ function createWrappedSyncConfig< params.collection as Collection, ) + const metadataStorage = runtime.createMetadataStorage() + const wrappedParams = { ...params, + metadataStorage, markReady: () => { void runtime .ensureStarted() diff --git a/packages/db-sqlite-persisted-collection-core/src/sqlite-core-adapter.ts b/packages/db-sqlite-persisted-collection-core/src/sqlite-core-adapter.ts index 9f1662de1..5ae788da0 100644 --- a/packages/db-sqlite-persisted-collection-core/src/sqlite-core-adapter.ts +++ b/packages/db-sqlite-persisted-collection-core/src/sqlite-core-adapter.ts @@ -38,6 +38,7 @@ type StoredSqliteRow = { key: string value: string row_version: number + ref_count: number } type SQLiteCoreAdapterSchemaMismatchPolicy = @@ -589,6 +590,7 @@ type InMemoryRow = { key: TKey value: T rowVersion: number + refCount: number } function compileSqlExpression( @@ -1072,6 +1074,7 @@ export class SQLiteCorePersistenceAdapter< return orderedRows.map((row) => ({ key: row.key, value: row.value, + refCount: row.refCount, })) } @@ -1079,6 +1082,7 @@ export class SQLiteCorePersistenceAdapter< return rows.map((row) => ({ key: row.key, value: row.value, + refCount: row.refCount, })) } @@ -1405,7 +1409,7 @@ export class SQLiteCorePersistenceAdapter< const orderByCompiled = compileOrderByClauses(options.orderBy) const queryParams: Array = [] - let sql = `SELECT key, value, row_version FROM ${collectionTableSql}` + let sql = `SELECT key, value, row_version, ref_count FROM ${collectionTableSql}` if (options.where && whereCompiled.supported) { sql = `${sql} WHERE ${whereCompiled.sql}` @@ -1428,6 +1432,7 @@ export class SQLiteCorePersistenceAdapter< key, value, rowVersion: row.row_version, + refCount: row.ref_count, } }) @@ -1646,9 +1651,18 @@ export class SQLiteCorePersistenceAdapter< `CREATE TABLE IF NOT EXISTS ${collectionTableSql} ( key TEXT PRIMARY KEY, value TEXT NOT NULL, - row_version INTEGER NOT NULL + row_version INTEGER NOT NULL, + ref_count INTEGER NOT NULL DEFAULT 0 )`, ) + // Migration for existing tables that lack the ref_count column + try { + await this.driver.exec( + `ALTER TABLE ${collectionTableSql} ADD COLUMN ref_count INTEGER NOT NULL DEFAULT 0`, + ) + } catch { + // Column already exists — safe to ignore + } await this.driver.exec( `CREATE INDEX IF NOT EXISTS ${quoteIdentifier(`${tableName}_row_version_idx`)} ON ${collectionTableSql} (row_version)`, @@ -1828,9 +1842,82 @@ export class SQLiteCorePersistenceAdapter< updated_at INTEGER NOT NULL )`, ) + await this.driver.exec( + `CREATE TABLE IF NOT EXISTS collection_metadata ( + collection_id TEXT NOT NULL, + key TEXT NOT NULL, + value TEXT NOT NULL, + updated_at INTEGER NOT NULL, + PRIMARY KEY (collection_id, key) + )`, + ) this.initialized = true } + + async updateRefCounts( + collectionId: string, + updates: Array<{ key: string | number; refCount: number }>, + ): Promise { + if (updates.length === 0) { + return + } + + const tableMapping = await this.ensureCollectionReady(collectionId) + const collectionTableSql = quoteIdentifier(tableMapping.tableName) + + await this.runInTransaction(async (transactionDriver) => { + for (const update of updates) { + const encodedKey = encodePersistedStorageKey(update.key) + await transactionDriver.run( + `UPDATE ${collectionTableSql} SET ref_count = ? WHERE key = ?`, + [update.refCount, encodedKey], + ) + } + }) + } + + async loadMetadata( + collectionId: string, + key: string, + ): Promise { + await this.ensureInitialized() + const rows = await this.driver.query<{ value: string }>( + `SELECT value FROM collection_metadata WHERE collection_id = ? AND key = ?`, + [collectionId, key], + ) + if (rows.length === 0) { + return undefined + } + return JSON.parse(rows[0]!.value) + } + + async storeMetadata( + collectionId: string, + key: string, + value: unknown, + ): Promise { + await this.ensureInitialized() + await this.driver.run( + `INSERT INTO collection_metadata (collection_id, key, value, updated_at) + VALUES (?, ?, ?, CAST(strftime('%s', 'now') AS INTEGER)) + ON CONFLICT(collection_id, key) DO UPDATE SET + value = excluded.value, + updated_at = excluded.updated_at`, + [collectionId, key, JSON.stringify(value)], + ) + } + + async deleteMetadata( + collectionId: string, + key: string, + ): Promise { + await this.ensureInitialized() + await this.driver.run( + `DELETE FROM collection_metadata WHERE collection_id = ? AND key = ?`, + [collectionId, key], + ) + } } export function createSQLiteCorePersistenceAdapter< diff --git a/packages/db-sqlite-persisted-collection-core/tests/sqlite-core-adapter.test.ts b/packages/db-sqlite-persisted-collection-core/tests/sqlite-core-adapter.test.ts index 7c45e964b..148e3b2cd 100644 --- a/packages/db-sqlite-persisted-collection-core/tests/sqlite-core-adapter.test.ts +++ b/packages/db-sqlite-persisted-collection-core/tests/sqlite-core-adapter.test.ts @@ -334,6 +334,7 @@ export function runSQLiteCoreAdapterContractSuite( expect(updated).toEqual([ { key: `1`, + refCount: 0, value: { id: `1`, title: `Updated`, @@ -494,6 +495,7 @@ export function runSQLiteCoreAdapterContractSuite( expect(filtered).toEqual([ { key: `2`, + refCount: 0, value: { id: `2`, title: `Task Beta`, diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index 9d84a1099..85d842697 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -323,6 +323,18 @@ export type SyncConfigRes = { loadSubset?: LoadSubsetFn unloadSubset?: UnloadSubsetFn } +/** + * Optional metadata storage for persisting query tracking state across + * restarts. When provided by a persistence layer, the query layer can + * persist per-row ref counts so that on warm start, pre-hydrated rows + * are not incorrectly deleted by disjoint queries. + */ +export interface MetadataStorage { + load: (key: string) => Promise + store: (key: string, value: unknown) => Promise + delete: (key: string) => Promise +} + export interface SyncConfig< T extends object = Record, TKey extends string | number = string | number, @@ -339,6 +351,11 @@ export interface SyncConfig< commit: () => void markReady: () => void truncate: () => void + /** + * Optional metadata storage for persisting query tracking state. + * Provided by the persistence layer when available. + */ + metadataStorage?: MetadataStorage }) => void | CleanupFn | SyncConfigRes /** diff --git a/packages/query-db-collection/src/query.ts b/packages/query-db-collection/src/query.ts index 7bc8f532b..9b753166a 100644 --- a/packages/query-db-collection/src/query.ts +++ b/packages/query-db-collection/src/query.ts @@ -620,20 +620,37 @@ export function queryCollectionOptions( // 3. Decrements refcount and GCs rows where count reaches 0 const queryRefCounts = new Map() + // Per-row reference count — the source of truth for deletion decisions. + // Incremented when a query newly claims a row, decremented when a query + // releases it. A row is only eligible for deletion when its ref count + // reaches zero. On warm start, ref counts can be restored from persistence + // so that pre-hydrated rows are protected from deletion by disjoint queries. + const rowRefCounts = new Map() + // Helper function to add a row to the internal state const addRow = (rowKey: string | number, hashedQueryKey: string) => { const rowToQueriesSet = rowToQueries.get(rowKey) || new Set() + const isNewClaim = !rowToQueriesSet.has(hashedQueryKey) rowToQueriesSet.add(hashedQueryKey) rowToQueries.set(rowKey, rowToQueriesSet) const queryToRowsSet = queryToRows.get(hashedQueryKey) || new Set() queryToRowsSet.add(rowKey) queryToRows.set(hashedQueryKey, queryToRowsSet) + + if (isNewClaim) { + rowRefCounts.set(rowKey, (rowRefCounts.get(rowKey) ?? 0) + 1) + } } - // Helper function to remove a row from the internal state + // Helper function to remove a row from the internal state. + // Returns true only if the row's ref count reaches zero, meaning no query + // (including from a previous session) owns it and it can be deleted. + // If the query never owned this row (wasPresent is false), returns false — + // a query should not delete rows it never loaded. const removeRow = (rowKey: string | number, hashedQuerKey: string) => { const rowToQueriesSet = rowToQueries.get(rowKey) || new Set() + const wasPresent = rowToQueriesSet.has(hashedQuerKey) rowToQueriesSet.delete(hashedQuerKey) rowToQueries.set(rowKey, rowToQueriesSet) @@ -641,11 +658,59 @@ export function queryCollectionOptions( queryToRowsSet.delete(rowKey) queryToRows.set(hashedQuerKey, queryToRowsSet) - return rowToQueriesSet.size === 0 + if (wasPresent) { + const newCount = Math.max(0, (rowRefCounts.get(rowKey) ?? 0) - 1) + rowRefCounts.set(rowKey, newCount) + return newCount === 0 + } + + // Query never owned this row — do not delete it + return false } const internalSync: SyncConfig[`sync`] = (params) => { - const { begin, write, commit, markReady, collection } = params + const { begin, write, commit, markReady, collection, metadataStorage } = + params + + // Load persisted ref counts from metadata storage on init. + // This runs before any query result is processed, ensuring that + // rows hydrated from persistence are protected from premature deletion. + if (metadataStorage) { + void metadataStorage + .load(`queryTracking:rowRefCounts`) + .then((persisted) => { + if (persisted && typeof persisted === `object`) { + for (const [key, count] of Object.entries( + persisted as Record, + )) { + if (count > 0 && !rowRefCounts.has(key)) { + rowRefCounts.set(key, count) + } + } + } + }) + .catch(() => { + // Metadata loading is best-effort + }) + } + + // Persist ref counts to metadata storage after tracking changes. + const persistRefCounts = () => { + if (!metadataStorage) { + return + } + const refCountSnapshot: Record = {} + for (const [key, count] of rowRefCounts.entries()) { + if (count > 0) { + refCountSnapshot[String(key)] = count + } + } + void metadataStorage + .store(`queryTracking:rowRefCounts`, refCountSnapshot) + .catch(() => { + // Metadata persistence is best-effort + }) + } // Track whether sync has been started let syncStarted = false @@ -860,6 +925,8 @@ export function queryCollectionOptions( commit() + persistRefCounts() + // Mark collection as ready after first successful query result markReady() } else if (result.isError) { @@ -976,14 +1043,22 @@ export function queryCollectionOptions( return } + const wasPresent = queries.has(hashedQueryKey) queries.delete(hashedQueryKey) + if (wasPresent) { + const newCount = Math.max(0, (rowRefCounts.get(rowKey) ?? 0) - 1) + rowRefCounts.set(rowKey, newCount) + } + if (queries.size === 0) { rowToQueries.delete(rowKey) + } - if (collection.has(rowKey)) { - rowsToDelete.push(collection.get(rowKey)) - } + const currentCount = rowRefCounts.get(rowKey) ?? 0 + if (currentCount <= 0 && collection.has(rowKey)) { + rowsToDelete.push(collection.get(rowKey)) + rowRefCounts.delete(rowKey) } }) @@ -995,6 +1070,8 @@ export function queryCollectionOptions( commit() } + persistRefCounts() + state.observers.delete(hashedQueryKey) queryToRows.delete(hashedQueryKey) hashToQueryKey.delete(hashedQueryKey) diff --git a/packages/query-db-collection/tests/query.test.ts b/packages/query-db-collection/tests/query.test.ts index 9ad0f251f..b4a6330b8 100644 --- a/packages/query-db-collection/tests/query.test.ts +++ b/packages/query-db-collection/tests/query.test.ts @@ -5207,4 +5207,131 @@ describe(`QueryCollection`, () => { customQueryClient.clear() }) }) + + describe(`rows from external sync sources`, () => { + it(`should retain pre-hydrated rows when a disjoint query correctly returns empty results`, async () => { + // Simulates a warm-start scenario where a persistence layer has already + // hydrated "history" rows into the collection. Two disjoint queries + // share the collection: + // - "history": returns the same rows (but with a server delay) + // - "live": correctly returns [] (there are no live items yet) + // + // When the live query resolves first with its correct empty result, + // the history rows should remain in the collection. The live query's + // empty result only means there are no *live* items — it should not + // affect rows from a different query's domain. + + const preHydratedItems: Array = [ + { id: `1`, name: `History 1`, category: `history` }, + { id: `2`, name: `History 2`, category: `history` }, + { id: `3`, name: `History 3`, category: `history` }, + ] + + let resolveHistoryQueryFn!: (value: Array) => void + + const isQueryCategory = (category: string, where: any): boolean => { + return ( + where && + where.type === `func` && + where.name === `eq` && + where.args[0]?.path?.[0] === `category` && + where.args[1]?.value === category + ) + } + + const queryFn = vi.fn().mockImplementation((ctx: any) => { + const where = ctx.meta?.loadSubsetOptions?.where + + if (isQueryCategory(`history`, where)) { + // History query: returns data, but the server is slow + return new Promise>((resolve) => { + resolveHistoryQueryFn = resolve + }) + } + + if (isQueryCategory(`live`, where)) { + // Live query: correctly returns empty — no live items exist yet + return Promise.resolve([]) + } + + return Promise.resolve([]) + }) + + const baseQueryKey = [`warm-start-disjoint-test`] + + const baseOptions = queryCollectionOptions({ + id: `warm-start-disjoint-test`, + queryClient, + queryKey: (opts: any) => { + if (opts.where) { + return [...baseQueryKey, opts.where] + } + return baseQueryKey + }, + queryFn, + getKey: (item: CategorisedItem) => item.id, + syncMode: `on-demand`, + startSync: true, + }) + + const originalSync = baseOptions.sync + const collection = createCollection({ + ...baseOptions, + sync: { + sync: (params: Parameters[0]) => { + // Simulate a persistence layer hydrating rows from a previous + // session on warm start, before the query layer initializes. + params.begin({ immediate: true }) + for (const item of preHydratedItems) { + params.write({ type: `insert`, value: item }) + } + params.commit() + + return originalSync.sync(params) + }, + }, + }) + + // Verify the persistence layer's hydrated rows are present + expect(collection.size).toBe(3) + + // Subscribe two disjoint queries — history (delayed) and live (immediate) + const historyQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ item: collection }) + .where(({ item }) => eq(item.category, `history`)), + }) + + const liveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ item: collection }) + .where(({ item }) => eq(item.category, `live`)), + }) + + // Trigger both queries. The history queryFn is pending; the live + // queryFn resolves immediately with []. + const historyPreload = historyQuery.preload() + await liveQuery.preload() + await flushPromises() + + // The live query correctly returned [] (no live items exist). + // The pre-hydrated history rows should still be in the collection. + expect(collection.size).toBe(3) + expect(collection.has(`1`)).toBe(true) + expect(collection.has(`2`)).toBe(true) + expect(collection.has(`3`)).toBe(true) + + // Now the history query's server response arrives + resolveHistoryQueryFn(preHydratedItems) + await historyPreload + + // Collection should still have all history items + expect(collection.size).toBe(3) + + await historyQuery.cleanup() + await liveQuery.cleanup() + }) + }) }) From d118a479784978397a63b4fde58ee0ed95307e10 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Tue, 17 Mar 2026 13:48:39 +0000 Subject: [PATCH 2/2] ci: apply automated fixes --- .../src/persisted.ts | 16 ++++++++++------ .../src/sqlite-core-adapter.ts | 5 +---- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/packages/db-sqlite-persisted-collection-core/src/persisted.ts b/packages/db-sqlite-persisted-collection-core/src/persisted.ts index a1a737078..e85e29472 100644 --- a/packages/db-sqlite-persisted-collection-core/src/persisted.ts +++ b/packages/db-sqlite-persisted-collection-core/src/persisted.ts @@ -223,10 +223,7 @@ export interface PersistenceAdapter< key: string, value: unknown, ) => Promise - deleteMetadata?: ( - collectionId: string, - key: string, - ) => Promise + deleteMetadata?: (collectionId: string, key: string) => Promise } export interface SQLiteDriver { @@ -798,7 +795,11 @@ class PersistedCollectionRuntime< createMetadataStorage(): MetadataStorage | undefined { const adapter = this.persistence.adapter - if (!adapter.loadMetadata || !adapter.storeMetadata || !adapter.deleteMetadata) { + if ( + !adapter.loadMetadata || + !adapter.storeMetadata || + !adapter.deleteMetadata + ) { return undefined } const collectionId = this.collectionId @@ -1097,7 +1098,10 @@ class PersistedCollectionRuntime< const refCountEntries = rows .filter((row) => (row.refCount ?? 0) > 0) .map((row) => [String(row.key), row.refCount!] as const) - if (refCountEntries.length > 0 && this.persistence.adapter.storeMetadata) { + if ( + refCountEntries.length > 0 && + this.persistence.adapter.storeMetadata + ) { // Merge with any existing ref counts from prior hydrations const existing = ((await this.persistence.adapter.loadMetadata?.( diff --git a/packages/db-sqlite-persisted-collection-core/src/sqlite-core-adapter.ts b/packages/db-sqlite-persisted-collection-core/src/sqlite-core-adapter.ts index 5ae788da0..60b92b49b 100644 --- a/packages/db-sqlite-persisted-collection-core/src/sqlite-core-adapter.ts +++ b/packages/db-sqlite-persisted-collection-core/src/sqlite-core-adapter.ts @@ -1908,10 +1908,7 @@ export class SQLiteCorePersistenceAdapter< ) } - async deleteMetadata( - collectionId: string, - key: string, - ): Promise { + async deleteMetadata(collectionId: string, key: string): Promise { await this.ensureInitialized() await this.driver.run( `DELETE FROM collection_metadata WHERE collection_id = ? AND key = ?`,