Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .changeset/fix-infinite-loop-orderby-limit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
'@tanstack/db': patch
'@tanstack/db-ivm': patch
---

Fix infinite loop in ORDER BY + LIMIT queries when WHERE filters out most data.

**The problem**: Query asks for "top 10 where category='rare'" but only 3 rare items exist locally. System keeps asking "give me more!" but local index has nothing else. Loop forever.

**The fix**: Added `localIndexExhausted` flag. When local index says "nothing left," we remember and stop asking. Flag resets when genuinely new data arrives from sync layer.

Also adds safety iteration limits as backstops (D2: 100k, maybeRunGraph: 10k, requestLimitedSnapshot: 10k).
12 changes: 12 additions & 0 deletions packages/db-ivm/src/d2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,19 @@ export class D2 implements ID2 {
}

run(): void {
// Safety limit to prevent infinite loops in case of circular data flow
// or other bugs that cause operators to perpetually produce output.
// For legitimate pipelines, data should flow through in finite steps.
const MAX_RUN_ITERATIONS = 100000
let iterations = 0

while (this.pendingWork()) {
if (++iterations > MAX_RUN_ITERATIONS) {
throw new Error(
`D2 graph execution exceeded ${MAX_RUN_ITERATIONS} iterations. ` +
`This may indicate an infinite loop in the dataflow graph.`,
)
}
this.step()
}
}
Expand Down
31 changes: 28 additions & 3 deletions packages/db/src/collection/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -410,13 +410,15 @@ export class CollectionSubscription
* Note 1: it may load more rows than the provided LIMIT because it loads all values equal to the first cursor value + limit values greater.
* This is needed to ensure that it does not accidentally skip duplicate values when the limit falls in the middle of some duplicated values.
* Note 2: it does not send keys that have already been sent before.
*
* @returns true if local data was found and sent, false if the local index was exhausted
*/
requestLimitedSnapshot({
orderBy,
limit,
minValues,
offset,
}: RequestLimitedSnapshotOptions) {
}: RequestLimitedSnapshotOptions): boolean {
if (!limit) throw new Error(`limit is required`)

if (!this.orderByIndex) {
Expand Down Expand Up @@ -502,8 +504,24 @@ export class CollectionSubscription
? compileExpression(new PropRef(orderByExpression.path), true)
: null

// Safety limit to prevent infinite loops if the index iteration or filtering
// logic has issues. The loop should naturally terminate when the index is
// exhausted, but this provides a backstop. 10000 iterations is generous
// for any legitimate use case.
const MAX_SNAPSHOT_ITERATIONS = 10000
let snapshotIterations = 0
let hitIterationLimit = false

while (valuesNeeded() > 0 && !collectionExhausted()) {
const insertedKeys = new Set<string | number>() // Track keys we add to `changes` in this iteration
if (++snapshotIterations > MAX_SNAPSHOT_ITERATIONS) {
console.error(
`[TanStack DB] requestLimitedSnapshot exceeded ${MAX_SNAPSHOT_ITERATIONS} iterations. ` +
`This may indicate an infinite loop in index iteration or filtering. ` +
`Breaking out to prevent app freeze. Collection: ${this.collection.id}`,
)
hitIterationLimit = true
break
}

for (const key of keys) {
const value = this.collection.get(key)!
Expand All @@ -515,7 +533,6 @@ export class CollectionSubscription
// Extract the indexed value (e.g., salary) from the row, not the full row
// This is needed for index.take() to work correctly with the BTree comparator
biggestObservedValue = valueExtractor ? valueExtractor(value) : value
insertedKeys.add(key) // Track this key
}

keys = index.take(valuesNeeded(), biggestObservedValue, filterFn)
Expand Down Expand Up @@ -597,6 +614,14 @@ export class CollectionSubscription
// Track this loadSubset call
this.loadedSubsets.push(loadOptions)
this.trackLoadSubsetPromise(syncResult)

// Return whether local data was found and iteration completed normally.
// Return false if:
// - No local data was found (index exhausted)
// - Iteration limit was hit (abnormal exit)
// Either case signals that the caller should stop trying to load more.
// The async loadSubset may still return data later.
return changes.length > 0 && !hitIterationLimit
}

// TODO: also add similar test but that checks that it can also load it from the collection's loadSubset function
Expand Down
29 changes: 28 additions & 1 deletion packages/db/src/query/live/collection-config-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,35 @@ export class CollectionConfigBuilder<

// Always run the graph if subscribed (eager execution)
if (syncState.subscribedToAllCollections) {
// Safety limit to prevent infinite loops when data loading and graph processing
// create a feedback cycle. This can happen when:
// 1. OrderBy/limit queries filter out most data, causing dataNeeded() > 0
// 2. Loading more data triggers updates that get filtered out
// 3. The cycle continues indefinitely
// 10000 iterations is generous for legitimate use cases but prevents hangs.
const MAX_GRAPH_ITERATIONS = 10000
let iterations = 0

while (syncState.graph.pendingWork()) {
syncState.graph.run()
if (++iterations > MAX_GRAPH_ITERATIONS) {
this.transitionToError(
`Graph execution exceeded ${MAX_GRAPH_ITERATIONS} iterations. ` +
`This likely indicates an infinite loop caused by data loading ` +
`triggering continuous graph updates.`,
)
return
}

try {
syncState.graph.run()
} catch (error) {
// D2 graph throws when it exceeds its internal iteration limit
// Transition to error state so callers can detect incomplete data
this.transitionToError(
error instanceof Error ? error.message : String(error),
)
return
}
// Flush accumulated changes after each graph step to commit them as one transaction.
// This ensures intermediate join states (like null on one side) don't cause
// duplicate key errors when the full join result arrives in the same step.
Expand Down
71 changes: 65 additions & 6 deletions packages/db/src/query/live/collection-subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ export class CollectionSubscriber<
// can potentially send the same item to D2 multiple times.
private sentToD2Keys = new Set<string | number>()

// Track when the local index has been exhausted for the current cursor position.
// When true, loadMoreIfNeeded will not try to load more data until new data arrives.
// This prevents infinite loops when the TopK can't be filled because WHERE filters
// out all available data.
private localIndexExhausted = false

constructor(
private alias: string,
private collectionId: string,
Expand Down Expand Up @@ -222,11 +228,31 @@ export class CollectionSubscriber<
const sendChangesInRange = (
changes: Iterable<ChangeMessage<any, string | number>>,
) => {
// Check for inserts before splitting updates, to determine if we should
// reset localIndexExhausted. We reset on inserts because:
//
// 1. splitUpdates (below) converts updates to delete+insert pairs for D2,
// but those "fake" inserts shouldn't reset the flag - they don't represent
// new rows that could fill the TopK.
//
// 2. "Reset only on inserts" is correct because updates to existing rows
// don't add new rows to scan in the local index. The updated row is
// already being processed in the current graph run.
//
// 3. Edge case: "update makes row match WHERE" is handled correctly because
// the subscription's filterAndFlipChanges converts "update for unseen key"
// to "insert" before we receive it here. So if a row that was previously
// filtered out by WHERE now matches after an update, it arrives as an
// insert and correctly resets the flag.
const changesArray = Array.isArray(changes) ? changes : [...changes]
const hasOriginalInserts = changesArray.some((c) => c.type === `insert`)

// Split live updates into a delete of the old value and an insert of the new value
const splittedChanges = splitUpdates(changes)
const splittedChanges = splitUpdates(changesArray)
this.sendChangesToPipelineWithTracking(
splittedChanges,
subscriptionHolder.current!,
hasOriginalInserts,
)
}

Expand Down Expand Up @@ -301,26 +327,54 @@ export class CollectionSubscriber<
return true
}

// If we've already exhausted the local index, don't try to load more.
// This prevents infinite loops when the TopK can't be filled because
// the WHERE clause filters out all available local data.
// The flag is reset when new data arrives from the sync layer.
if (this.localIndexExhausted) {
return true
}

// `dataNeeded` probes the orderBy operator to see if it needs more data
// if it needs more data, it returns the number of items it needs
const n = dataNeeded()
if (n > 0) {
this.loadNextItems(n, subscription)
const foundLocalData = this.loadNextItems(n, subscription)
if (!foundLocalData) {
// No local data found - mark the index as exhausted so we don't
// keep trying in subsequent graph iterations. The sync layer's
// loadSubset has been called and may return data asynchronously.
this.localIndexExhausted = true
}
}
return true
}

private sendChangesToPipelineWithTracking(
changes: Iterable<ChangeMessage<any, string | number>>,
subscription: CollectionSubscription,
hasOriginalInserts?: boolean,
) {
const orderByInfo = this.getOrderByInfo()
if (!orderByInfo) {
this.sendChangesToPipeline(changes)
return
}

const trackedChanges = this.trackSentValues(changes, orderByInfo.comparator)
// Reset localIndexExhausted when genuinely new data arrives from the sync layer.
// This allows loadMoreIfNeeded to try loading again since there's new data.
// We only reset on ORIGINAL inserts - not fake inserts from splitUpdates.
// splitUpdates converts updates to delete+insert for D2, but those shouldn't
// reset the flag since they don't represent new data that could fill the TopK.
const changesArray = Array.isArray(changes) ? changes : [...changes]
if (hasOriginalInserts) {
this.localIndexExhausted = false
}

const trackedChanges = this.trackSentValues(
changesArray,
orderByInfo.comparator,
)

// Cache the loadMoreIfNeeded callback on the subscription using a symbol property.
// This ensures we pass the same function instance to the scheduler each time,
Expand All @@ -342,10 +396,14 @@ export class CollectionSubscriber<

// Loads the next `n` items from the collection
// starting from the biggest item it has sent
private loadNextItems(n: number, subscription: CollectionSubscription) {
// Returns true if local data was found, false if the local index is exhausted
private loadNextItems(
n: number,
subscription: CollectionSubscription,
): boolean {
const orderByInfo = this.getOrderByInfo()
if (!orderByInfo) {
return
return false
}
const { orderBy, valueExtractorForRawRow, offset } = orderByInfo
const biggestSentRow = this.biggest
Expand All @@ -369,7 +427,8 @@ export class CollectionSubscriber<

// Take the `n` items after the biggest sent value
// Pass the current window offset to ensure proper deduplication
subscription.requestLimitedSnapshot({
// Returns true if local data was found
return subscription.requestLimitedSnapshot({
orderBy: normalizedOrderBy,
limit: n,
minValues,
Expand Down
Loading
Loading