From e9e96eef8a97d6e59cb8d52a1048779499d2650e Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 5 May 2026 14:10:14 +0200 Subject: [PATCH 1/3] test(db): a key in BasicIndex/BTreeIndex lives in at most one bucket Pin down the invariant that calling update or add on an index never leaves the same key in two buckets at once, even when the caller- supplied oldItem disagrees with what the index recorded. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../tests/index-key-bucket-tracking.test.ts | 138 ++++++++++++++++++ 1 file changed, 138 insertions(+) create mode 100644 packages/db/tests/index-key-bucket-tracking.test.ts diff --git a/packages/db/tests/index-key-bucket-tracking.test.ts b/packages/db/tests/index-key-bucket-tracking.test.ts new file mode 100644 index 000000000..4211f0138 --- /dev/null +++ b/packages/db/tests/index-key-bucket-tracking.test.ts @@ -0,0 +1,138 @@ +import { describe, expect, it } from 'vitest' +import { createCollection } from '../src/collection/index.js' +import { createTransaction } from '../src/transactions' +import { BasicIndex } from '../src/indexes/basic-index' +import { BTreeIndex } from '../src/indexes/btree-index' +import { eq } from '../src/query/builder/functions' +import { PropRef } from '../src/query/ir' +import type { ChangeMessageOrDeleteKeyMessage } from '../src/types' + +interface Project { + id: string + name: string + current_stage_id: string + updated_at: string +} + +const row = ( + id: string, + stage: string, + t: string = `2023-01-01T00:00:00Z`, +): Project => ({ + id, + name: `Project-${id}`, + current_stage_id: stage, + updated_at: t, +}) + +describe(`a key lives in at most one bucket at a time`, () => { + it(`BasicIndex.update moves the key out of its actual bucket, regardless of the oldItem passed`, () => { + const index = new BasicIndex(1, new PropRef([`current_stage_id`])) + index.add(`P`, row(`P`, `A`)) + expect(index.equalityLookup(`A`)).toEqual(new Set([`P`])) + + // The oldItem disagrees with the value the index recorded for P. + index.update(`P`, row(`P`, `B`), row(`P`, `C`)) + + expect(index.equalityLookup(`A`)).toEqual(new Set()) + expect(index.equalityLookup(`B`)).toEqual(new Set()) + expect(index.equalityLookup(`C`)).toEqual(new Set([`P`])) + }) + + it(`BasicIndex.add called twice without an intervening remove keeps the key in the latest bucket only`, () => { + const index = new BasicIndex(1, new PropRef([`current_stage_id`])) + index.add(`P`, row(`P`, `A`)) + index.add(`P`, row(`P`, `B`)) + + expect(index.equalityLookup(`A`)).toEqual(new Set()) + expect(index.equalityLookup(`B`)).toEqual(new Set([`P`])) + }) + + it(`BTreeIndex.update moves the key out of its actual bucket, regardless of the oldItem passed`, () => { + const index = new BTreeIndex(1, new PropRef([`current_stage_id`])) + index.add(`P`, row(`P`, `A`)) + + index.update(`P`, row(`P`, `B`), row(`P`, `C`)) + + expect(index.equalityLookup(`A`)).toEqual(new Set()) + expect(index.equalityLookup(`B`)).toEqual(new Set()) + expect(index.equalityLookup(`C`)).toEqual(new Set([`P`])) + }) + + it(`BTreeIndex.add called twice without an intervening remove keeps the key in the latest bucket only`, () => { + const index = new BTreeIndex(1, new PropRef([`current_stage_id`])) + index.add(`P`, row(`P`, `A`)) + index.add(`P`, row(`P`, `B`)) + + expect(index.equalityLookup(`A`)).toEqual(new Set()) + expect(index.equalityLookup(`B`)).toEqual(new Set([`P`])) + }) + + it(`an optimistic→synced update on the indexed column ends with the row in the new bucket`, async () => { + let resolveMutation: () => void + let begin!: () => void + let write!: (m: ChangeMessageOrDeleteKeyMessage) => void + let commit!: () => void + let markReady!: () => void + + const collection = createCollection({ + id: `projects`, + getKey: (item) => item.id, + autoIndex: `eager`, + defaultIndexType: BasicIndex, + startSync: true, + sync: { + sync: (ctx) => { + begin = ctx.begin + write = ctx.write + commit = ctx.commit + markReady = ctx.markReady + begin() + write({ type: `insert`, value: row(`P`, `A`) }) + commit() + markReady() + }, + }, + }) + await collection.stateWhenReady() + + collection.subscribeChanges(() => {}, { + whereExpression: eq(new PropRef([`current_stage_id`]), `A`), + includeInitialState: true, + }) + const idx = Array.from(collection.indexes.values())[0] as BasicIndex + + // Offline-style optimistic mutation (non-direct ambient transaction). + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise((resolve) => { + resolveMutation = resolve + }) + }, + }) + tx.mutate(() => { + collection.update(`P`, (draft) => { + draft.current_stage_id = `B` + }) + }) + + const commitPromise = tx.commit() + + begin() + write({ + type: `update`, + value: row(`P`, `B`, `2023-01-02T00:00:00Z`), + }) + commit() + + resolveMutation!() + await commitPromise + await Promise.resolve() + await Promise.resolve() + + expect(collection.get(`P`)?.current_stage_id).toBe(`B`) + expect(idx.equalityLookup(`A`)).toEqual(new Set()) + expect(idx.equalityLookup(`B`)).toEqual(new Set([`P`])) + }) +}) From 993bf6d0935fdf849326cfe9e32f426d551fbf11 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 5 May 2026 14:23:52 +0200 Subject: [PATCH 2/3] fix(db): track each key's bucket inside the index Both BasicIndex and BTreeIndex now keep a key->indexedValue map and use it on remove/update instead of evaluating the caller-supplied oldItem. add() also drops the key from its current bucket first so a missed remove (or a second add) can't leave the key in two buckets at once. Co-Authored-By: Claude Opus 4.7 (1M context) --- .changeset/fix-index-stale-bucket.md | 9 ++++ packages/db/src/indexes/basic-index.ts | 74 +++++++++++++++++--------- packages/db/src/indexes/btree-index.ts | 67 ++++++++++++++--------- 3 files changed, 100 insertions(+), 50 deletions(-) create mode 100644 .changeset/fix-index-stale-bucket.md diff --git a/.changeset/fix-index-stale-bucket.md b/.changeset/fix-index-stale-bucket.md new file mode 100644 index 000000000..a63337869 --- /dev/null +++ b/.changeset/fix-index-stale-bucket.md @@ -0,0 +1,9 @@ +--- +'@tanstack/db': patch +--- + +fix(db): prevent indexes from leaving a key in two buckets at once + +`BasicIndex` and `BTreeIndex` now track each key's currently-indexed value internally and use that on `remove`/`update`, instead of trusting the caller-supplied `oldItem`. If `oldItem` evaluated to a value that didn't match the bucket the key actually lived in, the previous implementation silently no-oped on the bucket map while the subsequent `add` placed the key in a new bucket, leaving the key in two buckets simultaneously. Calling `add` for a key that's already indexed also now drops the previous bucket first, so a missed `remove` can't double-bucket the key. + +This was the underlying cause of stale auto-index buckets after an optimistic→synced update on the indexed column (e.g. a kanban card snapping back to its old column). diff --git a/packages/db/src/indexes/basic-index.ts b/packages/db/src/indexes/basic-index.ts index b80f7fb43..55139ea43 100644 --- a/packages/db/src/indexes/basic-index.ts +++ b/packages/db/src/indexes/basic-index.ts @@ -54,6 +54,11 @@ export class BasicIndex< private sortedValues: Array = [] // Set of all indexed PKs private indexedKeys = new Set() + // Reverse map: key -> currently indexed value. Lets `remove`/`update` + // find the bucket a key actually lives in instead of trusting the + // caller-supplied old item, which can be stale across optimistic→synced + // lifecycle races. + private keyToValue = new Map() // Comparator function private compareFn: (a: any, b: any) => number = defaultComparator @@ -88,6 +93,13 @@ export class BasicIndex< const normalizedValue = normalizeValue(indexedValue) + // If the key is already indexed, drop it from its current bucket first so + // an `add` after a stale or missed `remove` can't leave the key in two + // buckets at once. + if (this.indexedKeys.has(key)) { + this.removeFromBucket(key, this.keyToValue.get(key)) + } + if (this.valueMap.has(normalizedValue)) { // Value already exists, just add the key to the set this.valueMap.get(normalizedValue)!.add(key) @@ -105,40 +117,49 @@ export class BasicIndex< } this.indexedKeys.add(key) + this.keyToValue.set(key, normalizedValue) this.updateTimestamp() } + private removeFromBucket(key: TKey, normalizedValue: any): void { + if (normalizedValue === undefined && !this.valueMap.has(undefined)) { + // Key wasn't tracked yet; nothing to remove from valueMap. + return + } + const keySet = this.valueMap.get(normalizedValue) + if (!keySet) return + keySet.delete(key) + if (keySet.size === 0) { + this.valueMap.delete(normalizedValue) + deleteInSortedArray(this.sortedValues, normalizedValue, this.compareFn) + } + } + /** * Removes a value from the index */ remove(key: TKey, item: any): void { - let indexedValue: any - try { - indexedValue = this.evaluateIndexExpression(item) - } catch (error) { - console.warn( - `Failed to evaluate index expression for key ${key} during removal:`, - error, - ) - this.indexedKeys.delete(key) - this.updateTimestamp() - return - } - - const normalizedValue = normalizeValue(indexedValue) - - if (this.valueMap.has(normalizedValue)) { - const keySet = this.valueMap.get(normalizedValue)! - keySet.delete(key) - - if (keySet.size === 0) { - // No more keys for this value, remove from map and sorted array - this.valueMap.delete(normalizedValue) - deleteInSortedArray(this.sortedValues, normalizedValue, this.compareFn) + // Use the tracked bucket for this key. We intentionally do NOT trust + // the caller-supplied `item`: across optimistic→synced lifecycle races + // the previousValue passed in can disagree with the value we actually + // indexed, which would silently leave the key in a stale bucket. + if (this.keyToValue.has(key)) { + this.removeFromBucket(key, this.keyToValue.get(key)) + } else { + // Fall back to evaluating the supplied item (covers the case where + // the key was never indexed but caller still asks to remove it). + try { + const indexedValue = this.evaluateIndexExpression(item) + this.removeFromBucket(key, normalizeValue(indexedValue)) + } catch (error) { + console.warn( + `Failed to evaluate index expression for key ${key} during removal:`, + error, + ) } } - this.indexedKeys.delete(key) + this.keyToValue.delete(key) this.updateTimestamp() } @@ -168,8 +189,10 @@ export class BasicIndex< { cause: error }, ) } - entriesArray.push({ key, value: normalizeValue(indexedValue) }) + const normalizedValue = normalizeValue(indexedValue) + entriesArray.push({ key, value: normalizedValue }) this.indexedKeys.add(key) + this.keyToValue.set(key, normalizedValue) } // Group by value @@ -194,6 +217,7 @@ export class BasicIndex< this.valueMap.clear() this.sortedValues = [] this.indexedKeys.clear() + this.keyToValue.clear() this.updateTimestamp() } diff --git a/packages/db/src/indexes/btree-index.ts b/packages/db/src/indexes/btree-index.ts index 17608950a..2afe00097 100644 --- a/packages/db/src/indexes/btree-index.ts +++ b/packages/db/src/indexes/btree-index.ts @@ -50,6 +50,11 @@ export class BTreeIndex< private orderedEntries: BTree // we don't associate values with the keys of the B+ tree (the keys are indexed values) private valueMap = new Map>() // instead we store a mapping of indexed values to a set of PKs private indexedKeys = new Set() + // Reverse map: key -> currently indexed value. Lets `remove`/`update` + // find the bucket a key actually lives in instead of trusting the + // caller-supplied old item, which can be stale across optimistic→synced + // lifecycle races. + private keyToValue = new Map() private compareFn: (a: any, b: any) => number = defaultComparator constructor( @@ -93,6 +98,13 @@ export class BTreeIndex< // Normalize the value for Map key usage const normalizedValue = normalizeForBTree(indexedValue) + // If the key is already indexed, drop it from its current bucket first so + // an `add` after a stale or missed `remove` can't leave the key in two + // buckets at once. + if (this.indexedKeys.has(key)) { + this.removeFromBucket(key, this.keyToValue.get(key)) + } + // Check if this value already exists if (this.valueMap.has(normalizedValue)) { // Add to existing set @@ -105,41 +117,45 @@ export class BTreeIndex< } this.indexedKeys.add(key) + this.keyToValue.set(key, normalizedValue) this.updateTimestamp() } + private removeFromBucket(key: TKey, normalizedValue: any): void { + const keySet = this.valueMap.get(normalizedValue) + if (!keySet) return + keySet.delete(key) + if (keySet.size === 0) { + this.valueMap.delete(normalizedValue) + this.orderedEntries.delete(normalizedValue) + } + } + /** * Removes a value from the index */ remove(key: TKey, item: any): void { - let indexedValue: any - try { - indexedValue = this.evaluateIndexExpression(item) - } catch (error) { - console.warn( - `Failed to evaluate index expression for key ${key} during removal:`, - error, - ) - return - } - - // Normalize the value for Map key usage - const normalizedValue = normalizeForBTree(indexedValue) - - if (this.valueMap.has(normalizedValue)) { - const keySet = this.valueMap.get(normalizedValue)! - keySet.delete(key) - - // If set is now empty, remove the entry entirely - if (keySet.size === 0) { - this.valueMap.delete(normalizedValue) - - // Remove from ordered entries - this.orderedEntries.delete(normalizedValue) + // Use the tracked bucket for this key. We intentionally do NOT trust + // the caller-supplied `item`: across optimistic→synced lifecycle races + // the previousValue passed in can disagree with the value we actually + // indexed, which would silently leave the key in a stale bucket. + if (this.keyToValue.has(key)) { + this.removeFromBucket(key, this.keyToValue.get(key)) + } else { + // Fall back to evaluating the supplied item (covers the case where + // the key was never indexed but caller still asks to remove it). + try { + const indexedValue = this.evaluateIndexExpression(item) + this.removeFromBucket(key, normalizeForBTree(indexedValue)) + } catch (error) { + console.warn( + `Failed to evaluate index expression for key ${key} during removal:`, + error, + ) } } - this.indexedKeys.delete(key) + this.keyToValue.delete(key) this.updateTimestamp() } @@ -169,6 +185,7 @@ export class BTreeIndex< this.orderedEntries.clear() this.valueMap.clear() this.indexedKeys.clear() + this.keyToValue.clear() this.updateTimestamp() } From 72b31cce806567d04e2ed3f4ba0413ccbf956a62 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 5 May 2026 14:44:11 +0200 Subject: [PATCH 3/3] test(db): parametrize index-bucket tests over BasicIndex/BTreeIndex Use describe.each to run each test against both index implementations instead of duplicating the BasicIndex and BTreeIndex variants by hand. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../tests/index-key-bucket-tracking.test.ts | 189 +++++++++--------- 1 file changed, 89 insertions(+), 100 deletions(-) diff --git a/packages/db/tests/index-key-bucket-tracking.test.ts b/packages/db/tests/index-key-bucket-tracking.test.ts index 4211f0138..b8f7d83cc 100644 --- a/packages/db/tests/index-key-bucket-tracking.test.ts +++ b/packages/db/tests/index-key-bucket-tracking.test.ts @@ -5,6 +5,7 @@ import { BasicIndex } from '../src/indexes/basic-index' import { BTreeIndex } from '../src/indexes/btree-index' import { eq } from '../src/query/builder/functions' import { PropRef } from '../src/query/ir' +import type { BaseIndex, IndexConstructor } from '../src/indexes/base-index' import type { ChangeMessageOrDeleteKeyMessage } from '../src/types' interface Project { @@ -25,114 +26,102 @@ const row = ( updated_at: t, }) -describe(`a key lives in at most one bucket at a time`, () => { - it(`BasicIndex.update moves the key out of its actual bucket, regardless of the oldItem passed`, () => { - const index = new BasicIndex(1, new PropRef([`current_stage_id`])) - index.add(`P`, row(`P`, `A`)) - expect(index.equalityLookup(`A`)).toEqual(new Set([`P`])) - - // The oldItem disagrees with the value the index recorded for P. - index.update(`P`, row(`P`, `B`), row(`P`, `C`)) - - expect(index.equalityLookup(`A`)).toEqual(new Set()) - expect(index.equalityLookup(`B`)).toEqual(new Set()) - expect(index.equalityLookup(`C`)).toEqual(new Set([`P`])) - }) - - it(`BasicIndex.add called twice without an intervening remove keeps the key in the latest bucket only`, () => { - const index = new BasicIndex(1, new PropRef([`current_stage_id`])) - index.add(`P`, row(`P`, `A`)) - index.add(`P`, row(`P`, `B`)) - - expect(index.equalityLookup(`A`)).toEqual(new Set()) - expect(index.equalityLookup(`B`)).toEqual(new Set([`P`])) - }) - - it(`BTreeIndex.update moves the key out of its actual bucket, regardless of the oldItem passed`, () => { - const index = new BTreeIndex(1, new PropRef([`current_stage_id`])) - index.add(`P`, row(`P`, `A`)) - - index.update(`P`, row(`P`, `B`), row(`P`, `C`)) - - expect(index.equalityLookup(`A`)).toEqual(new Set()) - expect(index.equalityLookup(`B`)).toEqual(new Set()) - expect(index.equalityLookup(`C`)).toEqual(new Set([`P`])) - }) - - it(`BTreeIndex.add called twice without an intervening remove keeps the key in the latest bucket only`, () => { - const index = new BTreeIndex(1, new PropRef([`current_stage_id`])) - index.add(`P`, row(`P`, `A`)) - index.add(`P`, row(`P`, `B`)) - - expect(index.equalityLookup(`A`)).toEqual(new Set()) - expect(index.equalityLookup(`B`)).toEqual(new Set([`P`])) - }) +describe.each([ + [`BasicIndex`, BasicIndex], + [`BTreeIndex`, BTreeIndex], +] as Array<[string, IndexConstructor]>)( + `%s: a key lives in at most one bucket at a time`, + (_name, IndexType) => { + it(`update moves the key out of its actual bucket, regardless of the oldItem passed`, () => { + const index = new IndexType(1, new PropRef([`current_stage_id`])) + index.add(`P`, row(`P`, `A`)) + expect(index.equalityLookup(`A`)).toEqual(new Set([`P`])) + + // The oldItem disagrees with the value the index recorded for P. + index.update(`P`, row(`P`, `B`), row(`P`, `C`)) + + expect(index.equalityLookup(`A`)).toEqual(new Set()) + expect(index.equalityLookup(`B`)).toEqual(new Set()) + expect(index.equalityLookup(`C`)).toEqual(new Set([`P`])) + }) - it(`an optimistic→synced update on the indexed column ends with the row in the new bucket`, async () => { - let resolveMutation: () => void - let begin!: () => void - let write!: (m: ChangeMessageOrDeleteKeyMessage) => void - let commit!: () => void - let markReady!: () => void + it(`add called twice without an intervening remove keeps the key in the latest bucket only`, () => { + const index = new IndexType(1, new PropRef([`current_stage_id`])) + index.add(`P`, row(`P`, `A`)) + index.add(`P`, row(`P`, `B`)) - const collection = createCollection({ - id: `projects`, - getKey: (item) => item.id, - autoIndex: `eager`, - defaultIndexType: BasicIndex, - startSync: true, - sync: { - sync: (ctx) => { - begin = ctx.begin - write = ctx.write - commit = ctx.commit - markReady = ctx.markReady - begin() - write({ type: `insert`, value: row(`P`, `A`) }) - commit() - markReady() - }, - }, + expect(index.equalityLookup(`A`)).toEqual(new Set()) + expect(index.equalityLookup(`B`)).toEqual(new Set([`P`])) }) - await collection.stateWhenReady() - collection.subscribeChanges(() => {}, { - whereExpression: eq(new PropRef([`current_stage_id`]), `A`), - includeInitialState: true, - }) - const idx = Array.from(collection.indexes.values())[0] as BasicIndex + it(`an optimistic→synced update on the indexed column ends with the row in the new bucket`, async () => { + let resolveMutation: () => void + let begin!: () => void + let write!: (m: ChangeMessageOrDeleteKeyMessage) => void + let commit!: () => void + let markReady!: () => void + + const collection = createCollection({ + id: `projects`, + getKey: (item) => item.id, + autoIndex: `eager`, + defaultIndexType: IndexType, + startSync: true, + sync: { + sync: (ctx) => { + begin = ctx.begin + write = ctx.write + commit = ctx.commit + markReady = ctx.markReady + begin() + write({ type: `insert`, value: row(`P`, `A`) }) + commit() + markReady() + }, + }, + }) + await collection.stateWhenReady() - // Offline-style optimistic mutation (non-direct ambient transaction). - const tx = createTransaction({ - autoCommit: false, - mutationFn: async () => { - await new Promise((resolve) => { - resolveMutation = resolve + collection.subscribeChanges(() => {}, { + whereExpression: eq(new PropRef([`current_stage_id`]), `A`), + includeInitialState: true, + }) + const idx = Array.from( + collection.indexes.values(), + )[0] as BaseIndex + + // Offline-style optimistic mutation (non-direct ambient transaction). + const tx = createTransaction({ + autoCommit: false, + mutationFn: async () => { + await new Promise((resolve) => { + resolveMutation = resolve + }) + }, + }) + tx.mutate(() => { + collection.update(`P`, (draft) => { + draft.current_stage_id = `B` }) - }, - }) - tx.mutate(() => { - collection.update(`P`, (draft) => { - draft.current_stage_id = `B` }) - }) - const commitPromise = tx.commit() + const commitPromise = tx.commit() - begin() - write({ - type: `update`, - value: row(`P`, `B`, `2023-01-02T00:00:00Z`), - }) - commit() + begin() + write({ + type: `update`, + value: row(`P`, `B`, `2023-01-02T00:00:00Z`), + }) + commit() - resolveMutation!() - await commitPromise - await Promise.resolve() - await Promise.resolve() + resolveMutation!() + await commitPromise + await Promise.resolve() + await Promise.resolve() - expect(collection.get(`P`)?.current_stage_id).toBe(`B`) - expect(idx.equalityLookup(`A`)).toEqual(new Set()) - expect(idx.equalityLookup(`B`)).toEqual(new Set([`P`])) - }) -}) + expect(collection.get(`P`)?.current_stage_id).toBe(`B`) + expect(idx.equalityLookup(`A`)).toEqual(new Set()) + expect(idx.equalityLookup(`B`)).toEqual(new Set([`P`])) + }) + }, +)