Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
570 changes: 515 additions & 55 deletions packages/db-sqlite-persisted-collection-core/src/persisted.ts

Large diffs are not rendered by default.

326 changes: 294 additions & 32 deletions packages/db-sqlite-persisted-collection-core/src/sqlite-core-adapter.ts

Large diffs are not rendered by default.

571 changes: 571 additions & 0 deletions packages/db-sqlite-persisted-collection-core/tests/persisted.test.ts

Large diffs are not rendered by default.

Large diffs are not rendered by default.

49 changes: 29 additions & 20 deletions packages/db/src/collection/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ interface PendingSyncedTransaction<
operations: Array<OptimisticChangeMessage<T>>
truncate?: boolean
deletedKeys: Set<string | number>
rowMetadataWrites: Map<TKey, PendingMetadataWrite>
collectionMetadataWrites: Map<string, PendingMetadataWrite>
optimisticSnapshot?: {
upserts: Map<TKey, T>
deletes: Set<TKey>
Expand All @@ -40,6 +42,8 @@ interface PendingSyncedTransaction<
immediate?: boolean
}

type PendingMetadataWrite = { type: `set`; value: unknown } | { type: `delete` }

type InternalChangeMessage<
T extends object = Record<string, unknown>,
TKey extends string | number = string | number,
Expand Down Expand Up @@ -70,6 +74,7 @@ export class CollectionStateManager<
> = []
public syncedData: SortedMap<TKey, TOutput>
public syncedMetadata = new Map<TKey, unknown>()
public syncedCollectionMetadata = new Map<string, unknown>()

// Optimistic state tracking - make public for testing
public optimisticUpserts = new Map<TKey, TOutput>()
Expand Down Expand Up @@ -870,6 +875,9 @@ export class CollectionStateManager<
for (const operation of transaction.operations) {
changedKeys.add(operation.key as TKey)
}
for (const [key] of transaction.rowMetadataWrites) {
changedKeys.add(key)
}
}

// Use pre-captured state if available (from optimistic scenarios),
Expand Down Expand Up @@ -959,26 +967,6 @@ export class CollectionStateManager<
const key = operation.key as TKey
this.syncedKeys.add(key)

// Update metadata
switch (operation.type) {
case `insert`:
this.syncedMetadata.set(key, operation.metadata)
break
case `update`:
this.syncedMetadata.set(
key,
Object.assign(
{},
this.syncedMetadata.get(key),
operation.metadata,
),
)
break
case `delete`:
this.syncedMetadata.delete(key)
break
}

// Determine origin: 'local' for local-only collections or pending local changes
const origin: VirtualOrigin =
this.isLocalOnly ||
Expand Down Expand Up @@ -1025,6 +1013,7 @@ export class CollectionStateManager<
}
case `delete`:
this.syncedData.delete(key)
this.syncedMetadata.delete(key)
// Clean up origin and pending tracking for deleted rows
this.rowOrigins.delete(key)
this.pendingLocalChanges.delete(key)
Expand All @@ -1036,6 +1025,25 @@ export class CollectionStateManager<
break
}
}

for (const [key, metadataWrite] of transaction.rowMetadataWrites) {
if (metadataWrite.type === `delete`) {
this.syncedMetadata.delete(key)
continue
}
this.syncedMetadata.set(key, metadataWrite.value)
}

for (const [
key,
metadataWrite,
] of transaction.collectionMetadataWrites) {
if (metadataWrite.type === `delete`) {
this.syncedCollectionMetadata.delete(key)
continue
}
this.syncedCollectionMetadata.set(key, metadataWrite.value)
}
}

// After applying synced operations, if this commit included a truncate,
Expand Down Expand Up @@ -1365,6 +1373,7 @@ export class CollectionStateManager<
public cleanup(): void {
this.syncedData.clear()
this.syncedMetadata.clear()
this.syncedCollectionMetadata.clear()
this.optimisticUpserts.clear()
this.optimisticDeletes.clear()
this.pendingOptimisticUpserts.clear()
Expand Down
129 changes: 129 additions & 0 deletions packages/db/src/collection/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import type {
LoadSubsetOptions,
OptimisticChangeMessage,
SyncConfigRes,
SyncMetadataApi,
} from '../types'
import type { CollectionImpl } from './index.js'
import type { CollectionStateManager } from './state'
Expand Down Expand Up @@ -93,6 +94,8 @@ export class CollectionSyncManager<
committed: false,
operations: [],
deletedKeys: new Set(),
rowMetadataWrites: new Map(),
collectionMetadataWrites: new Map(),
immediate: options?.immediate,
})
},
Expand Down Expand Up @@ -169,6 +172,23 @@ export class CollectionSyncManager<

if (messageType === `delete`) {
pendingTransaction.deletedKeys.add(key)
pendingTransaction.rowMetadataWrites.set(key, { type: `delete` })
} else if (messageType === `insert`) {
if (message.metadata !== undefined) {
pendingTransaction.rowMetadataWrites.set(key, {
type: `set`,
value: message.metadata,
})
} else {
pendingTransaction.rowMetadataWrites.set(key, {
type: `delete`,
})
}
} else if (message.metadata !== undefined) {
pendingTransaction.rowMetadataWrites.set(key, {
type: `set`,
value: message.metadata,
})
}
},
commit: () => {
Expand Down Expand Up @@ -205,6 +225,7 @@ export class CollectionSyncManager<
// Clear all operations from the current transaction
pendingTransaction.operations = []
pendingTransaction.deletedKeys.clear()
pendingTransaction.rowMetadataWrites.clear()

// Mark the transaction as a truncate operation. During commit, this triggers:
// - Delete events for all previously synced keys (excluding optimistic-deleted keys)
Expand All @@ -220,6 +241,7 @@ export class CollectionSyncManager<
deletes: new Set(this.state.optimisticDeletes),
}
},
metadata: this.createSyncMetadataApi(),
}),
)

Expand All @@ -245,6 +267,113 @@ export class CollectionSyncManager<
}
}

private getActivePendingSyncTransaction() {
const pendingTransaction =
this.state.pendingSyncedTransactions[
this.state.pendingSyncedTransactions.length - 1
]

if (!pendingTransaction) {
throw new NoPendingSyncTransactionWriteError()
}
if (pendingTransaction.committed) {
throw new SyncTransactionAlreadyCommittedWriteError()
}

return pendingTransaction
}

private createSyncMetadataApi(): SyncMetadataApi<TKey> {
return {
row: {
get: (key) => {
const pendingTransaction =
this.state.pendingSyncedTransactions[
this.state.pendingSyncedTransactions.length - 1
]
const pendingWrite = pendingTransaction?.rowMetadataWrites.get(key)
if (pendingWrite) {
return pendingWrite.type === `delete`
? undefined
: pendingWrite.value
}
if (pendingTransaction?.truncate) {
return undefined
}
return this.state.syncedMetadata.get(key)
},
set: (key, metadata) => {
const pendingTransaction = this.getActivePendingSyncTransaction()
pendingTransaction.rowMetadataWrites.set(key, {
type: `set`,
value: metadata,
})
},
delete: (key) => {
const pendingTransaction = this.getActivePendingSyncTransaction()
pendingTransaction.rowMetadataWrites.set(key, {
type: `delete`,
})
},
},
collection: {
get: (key) => {
const pendingTransaction =
this.state.pendingSyncedTransactions[
this.state.pendingSyncedTransactions.length - 1
]
const pendingWrite =
pendingTransaction?.collectionMetadataWrites.get(key)
if (pendingWrite) {
return pendingWrite.type === `delete`
? undefined
: pendingWrite.value
}
return this.state.syncedCollectionMetadata.get(key)
},
set: (key, value) => {
const pendingTransaction = this.getActivePendingSyncTransaction()
pendingTransaction.collectionMetadataWrites.set(key, {
type: `set`,
value,
})
},
delete: (key) => {
const pendingTransaction = this.getActivePendingSyncTransaction()
pendingTransaction.collectionMetadataWrites.set(key, {
type: `delete`,
})
},
list: (prefix) => {
const merged = new Map(this.state.syncedCollectionMetadata)
const pendingTransaction =
this.state.pendingSyncedTransactions[
this.state.pendingSyncedTransactions.length - 1
]
if (pendingTransaction) {
for (const [
key,
pendingWrite,
] of pendingTransaction.collectionMetadataWrites) {
if (pendingWrite.type === `delete`) {
merged.delete(key)
} else {
merged.set(key, pendingWrite.value)
}
}
}

return Array.from(merged.entries())
.filter(([key]) => (prefix ? key.startsWith(prefix) : true))
.map(([key, value]) => ({
key,
value,
}))
},
},
}
}

/**
* Preload the collection data by starting sync if not already started
* Multiple concurrent calls will share the same promise
Expand Down
20 changes: 20 additions & 0 deletions packages/db/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ export interface SyncConfig<
commit: () => void
markReady: () => void
truncate: () => void
metadata?: SyncMetadataApi<TKey>
}) => void | CleanupFn | SyncConfigRes

/**
Expand All @@ -357,6 +358,25 @@ export interface SyncConfig<
rowUpdateMode?: `partial` | `full`
}

export interface SyncMetadataApi<
TKey extends string | number = string | number,
> {
row: {
get: (key: TKey) => unknown | undefined
set: (key: TKey, metadata: unknown) => void
delete: (key: TKey) => void
}
collection: {
get: (key: string) => unknown | undefined
set: (key: string, value: unknown) => void
delete: (key: string) => void
list: (prefix?: string) => ReadonlyArray<{
key: string
value: unknown
}>
}
}

export interface ChangeMessage<
T extends object = Record<string, unknown>,
TKey extends string | number = string | number,
Expand Down
Loading
Loading