fix: resolve stale pending tool state during session sync#381
fix: resolve stale pending tool state during session sync#381
Conversation
There was a problem hiding this comment.
Pull request overview
This PR updates the session message/part merge logic in syncSession() to prevent stale tool-call UI state (notably tool parts stuck in pending) by merging tool parts by part.id and preferring newer tool state transitions using timing/status signals.
Changes:
- Add tool-specific freshness helpers (
toolStart/toolEnd/toolRank) and amergePart()routine to select the newer tool state when merging. - Replace the previous “part-count” heuristic with
mergeMessage()that merges parts bypart.idand preserves parts/messages that exist only locally (e.g. SSE-only). - Update
syncSession()to use the new message/part merge path before refreshing thepartstore.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| function mergePart(existing: Part, synced: Part): Part { | ||
| if (existing.type !== "tool") return synced | ||
| if (synced.type !== "tool") return synced |
There was a problem hiding this comment.
mergePart() always returns synced for non-tool parts. This can overwrite newer local SSE updates if a message.part.updated event for the same part.id arrives while syncSession() is in-flight (the sync payload can be an older snapshot than the current store). Consider adding a generic freshness check for non-tool parts (e.g. compare part.time.start/end when present, or default to keeping existing when both are same id and you can't prove synced is newer), while keeping the tool-specific state merge logic.
| function mergePart(existing: Part, synced: Part): Part { | |
| if (existing.type !== "tool") return synced | |
| if (synced.type !== "tool") return synced | |
| function partTimeValue(part: Part, key: "start" | "end"): number | undefined { | |
| const value = (part as { time?: { start?: unknown; end?: unknown } }).time?.[key] | |
| return typeof value === "number" ? value : undefined | |
| } | |
| function mergeNonToolPart(existing: Part, synced: Part): Part { | |
| const existingEnd = partTimeValue(existing, "end") | |
| const syncedEnd = partTimeValue(synced, "end") | |
| if (existingEnd !== undefined && syncedEnd !== undefined) { | |
| if (existingEnd > syncedEnd) return existing | |
| if (syncedEnd > existingEnd) return synced | |
| } | |
| const existingStart = partTimeValue(existing, "start") | |
| const syncedStart = partTimeValue(synced, "start") | |
| if (existingStart !== undefined && syncedStart !== undefined) { | |
| if (existingStart > syncedStart) return existing | |
| if (syncedStart > existingStart) return synced | |
| } | |
| if (existing.id && synced.id && existing.id === synced.id) return existing | |
| return synced | |
| } | |
| function mergePart(existing: Part, synced: Part): Part { | |
| if (existing.type !== "tool" || synced.type !== "tool") { | |
| return mergeNonToolPart(existing, synced) | |
| } |
| return { | ||
| info: synced.info, | ||
| parts: sortParts(merged), | ||
| } |
There was a problem hiding this comment.
mergeMessage() always uses info: synced.info, which can regress message metadata if newer message.updated SSE events arrived after the sync request started (same race as parts). Consider preferring the info object with the newer time.completed (or other monotonic fields) when both sides refer to the same message, instead of always taking synced.info.
| const map = new Map(existing.parts.map((part) => [part.id, part])) | ||
| const merged = synced.parts.map((part) => { | ||
| const current = map.get(part.id) | ||
| if (!current) return part | ||
| return mergePart(current, part) | ||
| }) | ||
| const ids = new Set(merged.map((part) => part.id)) | ||
|
|
||
| for (const part of existing.parts) { |
There was a problem hiding this comment.
mergeMessage keys the part map by part.id, but sortParts() explicitly treats id as potentially missing/falsy. If a part ever has no id, Map/Set will treat it as undefined and cause collisions (wrong merges) and may drop/preserve the wrong parts. Consider filtering/handling no-id parts separately (e.g. only map parts with a truthy id, and always append no-id parts without attempting to merge).
| const map = new Map(existing.parts.map((part) => [part.id, part])) | |
| const merged = synced.parts.map((part) => { | |
| const current = map.get(part.id) | |
| if (!current) return part | |
| return mergePart(current, part) | |
| }) | |
| const ids = new Set(merged.map((part) => part.id)) | |
| for (const part of existing.parts) { | |
| const map = new Map(existing.parts.filter((part) => !!part?.id).map((part) => [part.id, part])) | |
| const merged = synced.parts.map((part) => { | |
| if (!part?.id) return part | |
| const current = map.get(part.id) | |
| if (!current) return part | |
| return mergePart(current, part) | |
| }) | |
| const ids = new Set(merged.filter((part) => !!part?.id).map((part) => part.id)) | |
| for (const part of existing.parts) { | |
| if (!part?.id) { | |
| merged.push(part) | |
| continue | |
| } |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 1 out of 1 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return { ...m, parts: newParts } | ||
| }) |
There was a problem hiding this comment.
There are stray/unmatched lines after the msgs.map(...) call (return { ...m, parts: newParts } / extra })) which makes this block syntactically invalid and references m/newParts out of scope. Please remove the extra lines so the setStore("message", ...) updater returns only the msgs.map(...) result.
| return { ...m, parts: newParts } | |
| }) |
| function isNewer(a: Part, b: Part): boolean { | ||
| const aEnd = a.time?.completed ?? a.time?.start | ||
| const bEnd = b.time?.completed ?? b.time?.start | ||
| if (!aEnd) return false | ||
| if (!bEnd) return true | ||
| return aEnd > bEnd |
There was a problem hiding this comment.
isNewer() reads part.time.completed, but Part.time objects in the SDK don’t have a completed field (messages do; parts use start/end/created depending on type). This should be updated to compare the correct timestamps (e.g., time.end ?? time.start for text/reasoning, time.created for retry, and/or tool state.time.*), otherwise this won’t typecheck and won’t correctly protect against out-of-order updates.
| function isNewer(a: Part, b: Part): boolean { | |
| const aEnd = a.time?.completed ?? a.time?.start | |
| const bEnd = b.time?.completed ?? b.time?.start | |
| if (!aEnd) return false | |
| if (!bEnd) return true | |
| return aEnd > bEnd | |
| function toTimestamp(value: unknown): number | undefined { | |
| if (typeof value === "number") return Number.isNaN(value) ? undefined : value | |
| if (typeof value === "string") { | |
| const parsed = Date.parse(value) | |
| return Number.isNaN(parsed) ? undefined : parsed | |
| } | |
| if (value instanceof Date) { | |
| const parsed = value.getTime() | |
| return Number.isNaN(parsed) ? undefined : parsed | |
| } | |
| return undefined | |
| } | |
| function getPartTimestamp(part: Part): number | undefined { | |
| const time = part.time as { end?: unknown; start?: unknown; created?: unknown } | undefined | |
| const directTimestamp = | |
| toTimestamp(time?.end) ?? toTimestamp(time?.start) ?? toTimestamp(time?.created) | |
| if (directTimestamp !== undefined) return directTimestamp | |
| const stateTime = ( | |
| part as Part & { | |
| state?: { | |
| time?: { | |
| completed?: unknown | |
| end?: unknown | |
| start?: unknown | |
| created?: unknown | |
| } | |
| } | |
| } | |
| ).state?.time | |
| return ( | |
| toTimestamp(stateTime?.completed) ?? | |
| toTimestamp(stateTime?.end) ?? | |
| toTimestamp(stateTime?.start) ?? | |
| toTimestamp(stateTime?.created) | |
| ) | |
| } | |
| function isNewer(a: Part, b: Part): boolean { | |
| const aTime = getPartTimestamp(a) | |
| const bTime = getPartTimestamp(b) | |
| if (aTime === undefined) return false | |
| if (bTime === undefined) return true | |
| return aTime > bTime |
| function mergePart(existing: Part, synced: Part): Part { | ||
| if (existing.type !== "tool") return synced | ||
| if (synced.type !== "tool") return synced | ||
|
|
There was a problem hiding this comment.
mergePart() currently returns synced for all non-tool parts, which means a full-session sync can overwrite newer local/SSE part updates for the same part.id (contrary to the issue acceptance criterion about not regressing newer SSE state). Consider extending the freshness comparison to non-tool parts using their time fields (e.g., end/start/created), or defaulting to existing unless synced is provably newer.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 1 out of 1 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
| return { ...m, parts: [...m.parts, part] } | ||
| }) | ||
| return { ...m, parts: newParts } |
There was a problem hiding this comment.
There’s an extra return { ...m, parts: newParts } after the msgs.map(...) return. m/newParts are out of scope here, which will fail to compile and also makes the control flow invalid. This line should be removed (the msgs.map(...) already returns the updated array).
| return { ...m, parts: newParts } |
| const map = new Map(existing.parts.map((part) => [part.id, part])) | ||
| const merged = synced.parts.map((part) => { | ||
| const current = map.get(part.id) | ||
| if (!current) return part | ||
| return mergePart(current, part) | ||
| }) | ||
| const ids = new Set(merged.map((part) => part.id)) | ||
|
|
||
| for (const part of existing.parts) { | ||
| if (ids.has(part.id)) continue |
There was a problem hiding this comment.
mergeMessage() keys the existing.parts map by part.id without handling missing/falsey IDs, but elsewhere in this file (message.part.updated) parts are explicitly treated as possibly having no id. If any parts are id-less, they’ll all collide under the same Map/Set key and can be incorrectly merged or dropped. Consider only mapping/merging parts that have an id, and always append/retain id-less parts separately (or match them by a different stable key).
| const map = new Map(existing.parts.map((part) => [part.id, part])) | |
| const merged = synced.parts.map((part) => { | |
| const current = map.get(part.id) | |
| if (!current) return part | |
| return mergePart(current, part) | |
| }) | |
| const ids = new Set(merged.map((part) => part.id)) | |
| for (const part of existing.parts) { | |
| if (ids.has(part.id)) continue | |
| const map = new Map( | |
| existing.parts | |
| .filter((part) => !!part.id) | |
| .map((part) => [part.id, part] as const), | |
| ) | |
| const merged = synced.parts.map((part) => { | |
| if (!part.id) return part | |
| const current = map.get(part.id) | |
| if (!current) return part | |
| return mergePart(current, part) | |
| }) | |
| const ids = new Set(merged.flatMap((part) => (part.id ? [part.id] : []))) | |
| for (const part of existing.parts) { | |
| if (part.id && ids.has(part.id)) continue |
| function isNewer(a: Part, b: Part): boolean { | ||
| if (a.type !== "tool" || b.type !== "tool") return false | ||
| const aState = a.state as ToolPart["state"] | ||
| const bState = b.state as ToolPart["state"] | ||
| const aEnd = aState.time?.end ?? aState.time?.start | ||
| const bEnd = bState.time?.end ?? bState.time?.start | ||
| if (!aEnd) return false | ||
| if (!bEnd) return true | ||
| return aEnd > bEnd | ||
| } |
There was a problem hiding this comment.
isNewer() references ToolPart["state"], but ToolPart is not imported/defined in this module, so this won’t typecheck. Import ToolPart from ../sdk/client (like other SDK types) or rewrite the narrowing to use Extract<Part, { type: "tool" }>/the existing helper functions (toolStart/toolEnd) without needing ToolPart.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 1 out of 1 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| function mergePart(existing: Part, synced: Part): Part { | ||
| if (existing.type !== "tool") { | ||
| if (synced.type !== "tool") { | ||
| const existingTime = (existing as TextPart).time | ||
| const syncedTime = (synced as TextPart).time | ||
| const existingEnd = existingTime?.end ?? existingTime?.start ?? 0 | ||
| const syncedEnd = syncedTime?.end ?? syncedTime?.start ?? 0 | ||
| if (existingEnd > syncedEnd) return existing | ||
| return synced |
There was a problem hiding this comment.
mergePart() treats all non-tool parts as TextPart and compares time.start/end. In the SDK types, several Part variants either have a different time shape (e.g. RetryPart.time.created) or no time field at all, so this comparison can incorrectly prefer the synced payload and overwrite newer SSE state. Consider narrowing by part.type (e.g. handle text/reasoning with start/end, handle retry with created, and otherwise fall back to a deterministic rule like preferring synced).
| function mergeMessage(existing: MessageWithParts, synced: MessageWithParts): MessageWithParts { | ||
| const existingWithId = existing.parts.filter((p) => !!p.id) | ||
| const map = new Map(existingWithId.map((part) => [part.id, part])) | ||
| const merged = synced.parts.map((part) => { | ||
| const current = map.get(part.id) | ||
| if (!current) return part | ||
| return mergePart(current, part) | ||
| }) | ||
| const ids = new Set(merged.filter((p) => !!p.id).map((part) => part.id)) | ||
|
|
||
| for (const part of existingWithId) { | ||
| if (ids.has(part.id)) continue | ||
| merged.push(part) | ||
| } |
There was a problem hiding this comment.
mergeMessage() only tracks existing parts that have an id (existingWithId). Any locally-added parts without an id (which this file explicitly handles in message.part.updated) will be dropped during a full-session sync merge if they aren’t present in the synced payload. If id-less parts are expected, they should be preserved similarly to how you preserve id-bearing parts (e.g., append existing parts with no id after merging, or incorporate them into the merge output explicitly).
| return { | ||
| info: synced.info, | ||
| parts: sortParts(merged), | ||
| } |
There was a problem hiding this comment.
mergeMessage() always takes synced.info. Elsewhere (message.updated) you already guard against out-of-order updates by preferring the Message with newer time.completed. To truly “preserve newer SSE updates” during syncSession(), consider merging info with a similar freshness check (at least for assistant messages) so a stale sync payload can’t overwrite newer local info fields.
| const newParts = partIdx === -1 ? [...m.parts, part] : m.parts.map((p, pi) => (pi === partIdx ? part : p)) | ||
| return { ...m, parts: newParts } |
There was a problem hiding this comment.
In the message.part.updated handler, the part store update uses isNewer(existingPart, part) to ignore stale tool-part updates, but the corresponding update of store.message[...] does not. Since most UI code reads msg.parts from sync.messages(), an out-of-order stale tool update (e.g. reverting completed back to pending) can still regress the visible state even if the part store ignored it. Apply the same freshness guard when replacing parts inside msgs.map(...) to keep the two stores consistent and prevent stale tool state from reappearing.
| const newParts = partIdx === -1 ? [...m.parts, part] : m.parts.map((p, pi) => (pi === partIdx ? part : p)) | |
| return { ...m, parts: newParts } | |
| if (partIdx === -1) return { ...m, parts: [...m.parts, part] } | |
| const existingPart = m.parts[partIdx] | |
| if (!existingPart.id || isNewer(existingPart, part)) return m | |
| return { ...m, parts: m.parts.map((p, pi) => (pi === partIdx ? part : p)) } |
…eMessage (#3112022389, #3112022429, #3112022489)
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 1 out of 1 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return { | ||
| info: synced.info, | ||
| parts: sortParts(merged), | ||
| } |
There was a problem hiding this comment.
mergeMessage() always sets info: synced.info, which can overwrite newer message info already received via SSE (e.g., a newer time.completed, token/cost updates, or errors) and violates the intent to preserve newer local state during full-session sync. Consider merging existing.info vs synced.info using a freshness signal (like time.completed/time.created) similar to the logic used in the message.updated handler, and keep the newer one.
| const existingEnd = getTimeValue(existing) | ||
| const syncedEnd = getTimeValue(synced) | ||
| if (existingEnd > syncedEnd) return existing | ||
| return synced | ||
| } |
There was a problem hiding this comment.
mergePart() defaults to returning synced when the compared freshness signals are equal (e.g., existingEnd === syncedEnd), which can overwrite newer in-memory/SSE state when freshness can't be established. Consider preferring existing on ties (or adding a tie-breaker) to avoid regressing local state during sync merges.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 1 out of 1 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| function isNewer(a: Part, b: Part): boolean { | ||
| if (a.type !== "tool" || b.type !== "tool") return false | ||
| const aState = a.state as Extract<Part, { type: "tool" }>["state"] | ||
| const bState = b.state as Extract<Part, { type: "tool" }>["state"] | ||
| const aEnd = aState.time?.end ?? aState.time?.start | ||
| const bEnd = bState.time?.end ?? bState.time?.start | ||
| if (!aEnd) return false | ||
| if (!bEnd) return true | ||
| return aEnd > bEnd | ||
| } |
There was a problem hiding this comment.
isNewer() reads aState.time/bState.time directly, but ToolStatePending has no time field in the generated SDK types. With strict: true this should fail typechecking and can also lead to incorrect comparisons. Narrow on state.status !== "pending" (or use a type guard like "time" in state) before reading time, and compute a comparable timestamp from end ?? start only when the field exists.
| return { | ||
| info: synced.info, | ||
| parts: sortParts(merged), | ||
| } |
There was a problem hiding this comment.
mergeMessage() always returns info: synced.info, which can overwrite newer message info already applied via SSE (e.g. time.completed/error) if the syncSession() request started before the SSE update and returns a stale snapshot. Consider merging existing.info vs synced.info with the same “prefer newer time.completed” logic used in the message.updated handler, so full sync can’t regress message completion/error state.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 1 out of 1 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 1 out of 1 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Summary
syncSession()that used part-count only to decide freshnesspart.idand prefer newer state transitions using tool timing/status signals so stalependingstates can be correctedmessage.part.updatedeventsTesting
bun run typecheck(fails in this worktree becausetscis unavailable:command not found)Closes #379