Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 64 additions & 2 deletions packages/db-sqlite-persisted-collection-core/src/persisted.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import type {
DeleteMutationFnParams,
InsertMutationFnParams,
LoadSubsetOptions,
MetadataStorage,
PendingMutation,
SyncConfig,
SyncConfigRes,
Expand Down Expand Up @@ -193,7 +194,7 @@ export interface PersistenceAdapter<
collectionId: string,
options: LoadSubsetOptions,
ctx?: { requiredIndexSignatures?: ReadonlyArray<string> },
) => Promise<Array<{ key: TKey; value: T }>>
) => Promise<Array<{ key: TKey; value: T; refCount?: number }>>
applyCommittedTx: (
collectionId: string,
tx: PersistedTx<T, TKey>,
Expand All @@ -209,6 +210,20 @@ export interface PersistenceAdapter<
latestSeq: number
latestRowVersion: number
}>
updateRefCounts?: (
collectionId: string,
updates: Array<{ key: TKey; refCount: number }>,
) => Promise<void>
loadMetadata?: (
collectionId: string,
key: string,
) => Promise<unknown | undefined>
storeMetadata?: (
collectionId: string,
key: string,
value: unknown,
) => Promise<void>
deleteMetadata?: (collectionId: string, key: string) => Promise<void>
}

export interface SQLiteDriver {
Expand Down Expand Up @@ -778,6 +793,24 @@ class PersistedCollectionRuntime<
return this.isHydrating
}

createMetadataStorage(): MetadataStorage | undefined {
const adapter = this.persistence.adapter
if (
!adapter.loadMetadata ||
!adapter.storeMetadata ||
!adapter.deleteMetadata
) {
return undefined
}
const collectionId = this.collectionId
return {
load: (key: string) => adapter.loadMetadata!(collectionId, key),
store: (key: string, value: unknown) =>
adapter.storeMetadata!(collectionId, key, value),
delete: (key: string) => adapter.deleteMetadata!(collectionId, key),
}
}

isApplyingInternally(): boolean {
return this.internalApplyDepth > 0
}
Expand Down Expand Up @@ -1042,7 +1075,7 @@ class PersistedCollectionRuntime<

private loadSubsetRowsUnsafe(
options: LoadSubsetOptions,
): Promise<Array<{ key: TKey; value: T }>> {
): Promise<Array<{ key: TKey; value: T; refCount?: number }>> {
return this.persistence.adapter.loadSubset(this.collectionId, options, {
requiredIndexSignatures: this.getRequiredIndexSignatures(),
})
Expand All @@ -1059,6 +1092,32 @@ class PersistedCollectionRuntime<
const rows = await this.loadSubsetRowsUnsafe(options)

this.applyRowsToCollection(rows)

// Store hydrated ref counts in metadata so the query layer can
// restore them on init and protect rows from premature deletion.
const refCountEntries = rows
.filter((row) => (row.refCount ?? 0) > 0)
.map((row) => [String(row.key), row.refCount!] as const)
if (
refCountEntries.length > 0 &&
this.persistence.adapter.storeMetadata
) {
// Merge with any existing ref counts from prior hydrations
const existing =
((await this.persistence.adapter.loadMetadata?.(
this.collectionId,
`queryTracking:rowRefCounts`,
)) as Record<string, number> | undefined) ?? {}
const merged = { ...existing }
for (const [key, count] of refCountEntries) {
merged[key] = count
}
await this.persistence.adapter.storeMetadata(
this.collectionId,
`queryTracking:rowRefCounts`,
merged,
)
}
} finally {
this.isHydrating = false
}
Expand Down Expand Up @@ -1930,8 +1989,11 @@ function createWrappedSyncConfig<
params.collection as Collection<T, TKey, PersistedCollectionUtils>,
)

const metadataStorage = runtime.createMetadataStorage()

const wrappedParams = {
...params,
metadataStorage,
markReady: () => {
void runtime
.ensureStarted()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type StoredSqliteRow = {
key: string
value: string
row_version: number
ref_count: number
}

type SQLiteCoreAdapterSchemaMismatchPolicy =
Expand Down Expand Up @@ -589,6 +590,7 @@ type InMemoryRow<TKey extends string | number, T extends object> = {
key: TKey
value: T
rowVersion: number
refCount: number
}

function compileSqlExpression(
Expand Down Expand Up @@ -1072,13 +1074,15 @@ export class SQLiteCorePersistenceAdapter<
return orderedRows.map((row) => ({
key: row.key,
value: row.value,
refCount: row.refCount,
}))
}

const rows = await this.loadSubsetInternal(tableMapping, options)
return rows.map((row) => ({
key: row.key,
value: row.value,
refCount: row.refCount,
}))
}

Expand Down Expand Up @@ -1405,7 +1409,7 @@ export class SQLiteCorePersistenceAdapter<
const orderByCompiled = compileOrderByClauses(options.orderBy)

const queryParams: Array<SqliteSupportedValue> = []
let sql = `SELECT key, value, row_version FROM ${collectionTableSql}`
let sql = `SELECT key, value, row_version, ref_count FROM ${collectionTableSql}`

if (options.where && whereCompiled.supported) {
sql = `${sql} WHERE ${whereCompiled.sql}`
Expand All @@ -1428,6 +1432,7 @@ export class SQLiteCorePersistenceAdapter<
key,
value,
rowVersion: row.row_version,
refCount: row.ref_count,
}
})

Expand Down Expand Up @@ -1646,9 +1651,18 @@ export class SQLiteCorePersistenceAdapter<
`CREATE TABLE IF NOT EXISTS ${collectionTableSql} (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
row_version INTEGER NOT NULL
row_version INTEGER NOT NULL,
ref_count INTEGER NOT NULL DEFAULT 0
)`,
)
// Migration for existing tables that lack the ref_count column
try {
await this.driver.exec(
`ALTER TABLE ${collectionTableSql} ADD COLUMN ref_count INTEGER NOT NULL DEFAULT 0`,
)
} catch {
// Column already exists — safe to ignore
}
await this.driver.exec(
`CREATE INDEX IF NOT EXISTS ${quoteIdentifier(`${tableName}_row_version_idx`)}
ON ${collectionTableSql} (row_version)`,
Expand Down Expand Up @@ -1828,9 +1842,79 @@ export class SQLiteCorePersistenceAdapter<
updated_at INTEGER NOT NULL
)`,
)
await this.driver.exec(
`CREATE TABLE IF NOT EXISTS collection_metadata (
collection_id TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
updated_at INTEGER NOT NULL,
PRIMARY KEY (collection_id, key)
)`,
)

this.initialized = true
}

async updateRefCounts(
collectionId: string,
updates: Array<{ key: string | number; refCount: number }>,
): Promise<void> {
if (updates.length === 0) {
return
}

const tableMapping = await this.ensureCollectionReady(collectionId)
const collectionTableSql = quoteIdentifier(tableMapping.tableName)

await this.runInTransaction(async (transactionDriver) => {
for (const update of updates) {
const encodedKey = encodePersistedStorageKey(update.key)
await transactionDriver.run(
`UPDATE ${collectionTableSql} SET ref_count = ? WHERE key = ?`,
[update.refCount, encodedKey],
)
}
})
}

async loadMetadata(
collectionId: string,
key: string,
): Promise<unknown | undefined> {
await this.ensureInitialized()
const rows = await this.driver.query<{ value: string }>(
`SELECT value FROM collection_metadata WHERE collection_id = ? AND key = ?`,
[collectionId, key],
)
if (rows.length === 0) {
return undefined
}
return JSON.parse(rows[0]!.value)
}

async storeMetadata(
collectionId: string,
key: string,
value: unknown,
): Promise<void> {
await this.ensureInitialized()
await this.driver.run(
`INSERT INTO collection_metadata (collection_id, key, value, updated_at)
VALUES (?, ?, ?, CAST(strftime('%s', 'now') AS INTEGER))
ON CONFLICT(collection_id, key) DO UPDATE SET
value = excluded.value,
updated_at = excluded.updated_at`,
[collectionId, key, JSON.stringify(value)],
)
}

async deleteMetadata(collectionId: string, key: string): Promise<void> {
await this.ensureInitialized()
await this.driver.run(
`DELETE FROM collection_metadata WHERE collection_id = ? AND key = ?`,
[collectionId, key],
)
}
}

export function createSQLiteCorePersistenceAdapter<
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ export function runSQLiteCoreAdapterContractSuite(
expect(updated).toEqual([
{
key: `1`,
refCount: 0,
value: {
id: `1`,
title: `Updated`,
Expand Down Expand Up @@ -494,6 +495,7 @@ export function runSQLiteCoreAdapterContractSuite(
expect(filtered).toEqual([
{
key: `2`,
refCount: 0,
value: {
id: `2`,
title: `Task Beta`,
Expand Down
17 changes: 17 additions & 0 deletions packages/db/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,18 @@ export type SyncConfigRes = {
loadSubset?: LoadSubsetFn
unloadSubset?: UnloadSubsetFn
}
/**
* Optional metadata storage for persisting query tracking state across
* restarts. When provided by a persistence layer, the query layer can
* persist per-row ref counts so that on warm start, pre-hydrated rows
* are not incorrectly deleted by disjoint queries.
*/
export interface MetadataStorage {
load: (key: string) => Promise<unknown | undefined>
store: (key: string, value: unknown) => Promise<void>
delete: (key: string) => Promise<void>
}

export interface SyncConfig<
T extends object = Record<string, unknown>,
TKey extends string | number = string | number,
Expand All @@ -339,6 +351,11 @@ export interface SyncConfig<
commit: () => void
markReady: () => void
truncate: () => void
/**
* Optional metadata storage for persisting query tracking state.
* Provided by the persistence layer when available.
*/
metadataStorage?: MetadataStorage
}) => void | CleanupFn | SyncConfigRes

/**
Expand Down
Loading
Loading