From 4ab41eee80006578ceb8e0f03be129babc7bdedd Mon Sep 17 00:00:00 2001 From: Vahid Ahmadi Date: Wed, 6 May 2026 12:46:49 +0100 Subject: [PATCH] =?UTF-8?q?Add=20Stop=20=E2=86=92=20Continue:=20resume=20t?= =?UTF-8?q?runcated=20or=20aborted=20assistant=20turns=20(#44)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Backend (chatbot.py): - Capture `final.stop_reason` from each Anthropic stream and propagate it on the `done` SSE event. The frontend uses "max_tokens" to detect truncation; "end_turn" / "stop_sequence" mean the model finished cleanly. Frontend (ChatPage.tsx): - Extend Message with `stop_reason` and `stopped` flags. The `done` handler stores `stop_reason`; the AbortError catch (user clicks Stop) sets `stopped: true`. Both flags survive saveConversation/loadConversation via the untyped messages JSON column on the backend. - Render a Continue affordance below any message where `stop_reason === "max_tokens" || stopped`, hidden if a tool is still pending in the message (no orphan tool calls). - New `continueMessage(idx)` posts the conversation up to and including the partial assistant turn back to /chat/message. Anthropic's assistant-prefill behaviour means the model continues the same logical turn — no "Continue from where you stopped" nudge needed. Streamed content appends into the SAME message bubble; cost is summed onto the existing `cost_gbp`. If continue itself truncates or is stopped, the affordance comes back (user-driven, not auto-loop). Acceptance criteria: - max_tokens → Continue button appears. - User clicks Stop mid-stream → partial preserved + Continue appears. - Continue resumes in-place, single bubble, summed cost. - Out of scope: indefinitely auto-continuing — kept user-triggered. Closes #44 Co-Authored-By: Claude Opus 4.7 (1M context) --- backend/routes/chatbot.py | 4 +- frontend/src/app/ChatPage.tsx | 217 ++++++++++++++++++++++++++++++++-- 2 files changed, 212 insertions(+), 9 deletions(-) diff --git a/backend/routes/chatbot.py b/backend/routes/chatbot.py index fa1af54..7486429 100644 --- a/backend/routes/chatbot.py +++ b/backend/routes/chatbot.py @@ -297,6 +297,7 @@ async def generate_stream(): iteration += 1 tool_uses = [] assistant_content = "" + last_stop_reason: str | None = None logger.info(f"[CHAT] Iteration {iteration}: calling Anthropic, {len(conversation)} messages") # Stream from Anthropic with retry on transient errors @@ -352,6 +353,7 @@ async def generate_stream(): # Use final message for complete, parsed tool inputs final = await stream.get_final_message() + last_stop_reason = getattr(final, "stop_reason", None) for block in final.content: if block.type == "tool_use": if plan_mode: @@ -394,7 +396,7 @@ async def generate_stream(): ) except Exception as e: logger.warning(f"[CHAT] Failed to record usage: {e}") - yield f"data: {json.dumps({'type': 'done', 'content': assistant_content, 'session_id': session_id, 'model': model, 'usage': {'input_tokens': total_input_tokens, 'output_tokens': total_output_tokens, 'cache_creation_input_tokens': total_cache_creation_input_tokens, 'cache_read_input_tokens': total_cache_read_input_tokens}, 'cost_gbp': billing['cost_gbp'] if billing else None, 'balance': billing['balance'] if billing else None})}\n\n" + yield f"data: {json.dumps({'type': 'done', 'content': assistant_content, 'session_id': session_id, 'model': model, 'stop_reason': last_stop_reason, 'usage': {'input_tokens': total_input_tokens, 'output_tokens': total_output_tokens, 'cache_creation_input_tokens': total_cache_creation_input_tokens, 'cache_read_input_tokens': total_cache_read_input_tokens}, 'cost_gbp': billing['cost_gbp'] if billing else None, 'balance': billing['balance'] if billing else None})}\n\n" break # Detect infinite loops diff --git a/frontend/src/app/ChatPage.tsx b/frontend/src/app/ChatPage.tsx index b650092..f836891 100644 --- a/frontend/src/app/ChatPage.tsx +++ b/frontend/src/app/ChatPage.tsx @@ -115,6 +115,10 @@ interface Message { events?: StreamEvent[]; isComplete?: boolean; cost_gbp?: number; + /** Anthropic stop_reason on the final iteration. "max_tokens" means truncated. */ + stop_reason?: string; + /** True when the user clicked Stop and the stream was aborted mid-flight. */ + stopped?: boolean; } interface BalanceSummary { @@ -234,7 +238,18 @@ export default function ChatPage() { try { const data = conversationCache.current.get(conv.id) || await apiRequest("GET", `conversations/${conv.id}`); if (!data?.messages?.length) { console.error("No messages in conversation", data); return; } - const loaded: Message[] = data.messages.map((m) => ({ role: m.role as "user" | "assistant", content: m.content || "", isComplete: true, events: m.events })); + const loaded: Message[] = data.messages.map((m) => { + const raw = m as { role: string; content: string; events?: StreamEvent[]; stop_reason?: string; stopped?: boolean; cost_gbp?: number }; + return { + role: raw.role as "user" | "assistant", + content: raw.content || "", + isComplete: true, + events: raw.events, + stop_reason: raw.stop_reason, + stopped: raw.stopped, + cost_gbp: raw.cost_gbp, + }; + }); const collapsed = new Set(loaded.map((m, i) => (m.role === "assistant" && m.events?.some((e) => e.type === "tool") ? i : -1)).filter((i) => i >= 0)); sessionId.current = data.session_id; setActiveConversationId(data.id); @@ -287,10 +302,14 @@ export default function ChatPage() { } catch (e) { console.error("Title generation failed", e); } const apiMessages = msgs.map((m) => { - if (m.role === "assistant" && m.isComplete && m.events?.length) { - return { role: m.role, content: m.content, events: m.events }; + const base: Record = { role: m.role, content: m.content }; + if (m.role === "assistant") { + if (m.isComplete && m.events?.length) base.events = m.events; + if (m.cost_gbp !== undefined) base.cost_gbp = m.cost_gbp; + if (m.stop_reason) base.stop_reason = m.stop_reason; + if (m.stopped) base.stopped = true; } - return { role: m.role, content: m.content }; + return base; }); try { @@ -482,12 +501,13 @@ export default function ChatPage() { updateMessage(); if (data.session_id) sessionId.current = data.session_id; const msgCost = typeof data.cost_gbp === "number" ? data.cost_gbp : undefined; + const stopReason = typeof data.stop_reason === "string" ? data.stop_reason : undefined; const hasTools = events.some((e) => e.type === "tool"); if (hasTools) { setMessages((prev) => { const newMsgs = [...prev]; const lastIdx = newMsgs.length - 1; - if (newMsgs[lastIdx]?.role === "assistant") newMsgs[lastIdx] = { ...newMsgs[lastIdx], isComplete: true, cost_gbp: msgCost }; + if (newMsgs[lastIdx]?.role === "assistant") newMsgs[lastIdx] = { ...newMsgs[lastIdx], isComplete: true, cost_gbp: msgCost, stop_reason: stopReason }; setCollapsedWorking((c) => new Set(c).add(lastIdx)); return newMsgs; }); @@ -495,7 +515,7 @@ export default function ChatPage() { setMessages((prev) => { const newMsgs = [...prev]; const lastIdx = newMsgs.length - 1; - if (newMsgs[lastIdx]?.role === "assistant") newMsgs[lastIdx] = { ...newMsgs[lastIdx], cost_gbp: msgCost }; + if (newMsgs[lastIdx]?.role === "assistant") newMsgs[lastIdx] = { ...newMsgs[lastIdx], cost_gbp: msgCost, stop_reason: stopReason }; return newMsgs; }); } @@ -521,14 +541,15 @@ export default function ChatPage() { } } catch (error) { if (error instanceof DOMException && error.name === "AbortError") { - // User stopped the stream — flush what we have + // User stopped the stream — flush what we have, mark `stopped` + // so the UI shows a Continue affordance. if (drainTimer) { clearInterval(drainTimer); drainTimer = null; } displayedText = currentText; updateMessage(); setMessages((prev) => { const newMsgs = [...prev]; const lastIdx = newMsgs.length - 1; - if (newMsgs[lastIdx]?.role === "assistant") newMsgs[lastIdx] = { ...newMsgs[lastIdx], isComplete: true }; + if (newMsgs[lastIdx]?.role === "assistant") newMsgs[lastIdx] = { ...newMsgs[lastIdx], isComplete: true, stopped: true }; return newMsgs; }); } else { @@ -545,6 +566,169 @@ export default function ChatPage() { const stopStreaming = () => { abortRef.current?.abort(); }; + /** Resume a previously truncated or stopped assistant turn. + * + * The conversation up to and including the partial assistant message is + * sent to /chat/message; the model continues from there via Anthropic's + * assistant-prefill behaviour (no extra user nudge needed). New text and + * events are appended into the SAME message bubble so the user sees one + * continuous answer. + */ + const continueMessage = async (idx: number) => { + if (isStreaming) return; + const target = messages[idx]; + if (!target || target.role !== "assistant" || !target.isComplete) return; + // Refuse if a tool is still pending in this message — re-triggering + // would orphan the partial tool call. + if (target.events?.some((e) => e.type === "tool" && e.data.status === "pending")) return; + + const priorMessages = messages.slice(0, idx + 1); + const apiMessages = priorMessages.map((msg) => { + let content = msg.content; + if (msg.role === "assistant" && msg.events) { + const toolResults = msg.events.filter((e): e is { type: "tool"; data: ToolData } => e.type === "tool" && !!e.data.result_summary).map((e) => `[Tool: ${e.data.tool_name}] ${e.data.result_summary}`).join("\n\n"); + if (toolResults) content += "\n\n---\nTool results:\n" + toolResults; + } + return { role: msg.role, content }; + }); + + // Optimistic: clear truncation/stopped flags and mark in-flight. + setMessages((prev) => prev.map((m, i) => i === idx ? { ...m, isComplete: false, stop_reason: undefined, stopped: undefined } : m)); + setIsStreaming(true); + setIsWaiting(true); + + const controller = new AbortController(); + abortRef.current = controller; + + let appendedText = ""; + const newEvents: StreamEvent[] = []; + const toolsMap = new Map(); + const baseContent = target.content; + const baseEvents = target.events ? [...target.events] : []; + const baseCost = target.cost_gbp || 0; + + const flushTarget = () => { + setMessages((prev) => prev.map((m, i) => i === idx ? { + ...m, + content: baseContent + appendedText, + events: [...baseEvents, ...newEvents], + } : m)); + }; + + try { + const headers: Record = { "Content-Type": "application/json" }; + if (user?.id) headers["X-User-Id"] = user.id; + const response = await fetch(getBackendEndpoint("chat/message"), { + method: "POST", + headers, + body: JSON.stringify({ messages: apiMessages, session_id: sessionId.current, user_id: user?.id || null, plan_mode: false }), + signal: controller.signal, + }); + if (response.status === 402) { + const err = await response.json().catch(() => ({ error: "No credit remaining" })); + setMessages((prev) => [...prev, { role: "assistant", content: err.error || "No credit remaining. Please top up to continue.", isComplete: true }]); + return; + } + if (response.status === 429) { + const seconds = parseInt(response.headers.get("retry-after") || "60", 10); + setMessages((prev) => [...prev, { role: "assistant", content: `You're sending messages a bit fast — please wait ~${seconds}s and try again.`, isComplete: true }]); + return; + } + if (!response.ok) throw new Error("Request failed"); + const reader = response.body?.getReader(); + const decoder = new TextDecoder(); + if (!reader) throw new Error("No body"); + + let buffer = ""; + while (true) { + const { done, value } = await reader.read(); + if (done) break; + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split("\n"); + buffer = lines.pop() || ""; + for (const line of lines) { + if (!line.startsWith("data: ")) continue; + try { + const data = JSON.parse(line.slice(6)); + if (data.type === "chunk") { + setIsWaiting(false); + const lastEvent = newEvents[newEvents.length - 1]; + if (lastEvent?.type === "text" && !lastEvent.thinking) lastEvent.content += data.content; + else newEvents.push({ type: "text", content: data.content }); + appendedText += data.content; + flushTarget(); + } else if (data.type === "tool_start") { + setIsWaiting(false); + const toolData: ToolData = { tool_name: data.tool_name, tool_id: data.tool_id, status: "pending" }; + toolsMap.set(data.tool_id, toolData); + newEvents.push({ type: "tool", data: toolData }); + flushTarget(); + } else if (data.type === "tool_use") { + const existing = toolsMap.get(data.tool_id); + if (existing) existing.input = data.tool_input; + else { const td: ToolData = { tool_name: data.tool_name, tool_id: data.tool_id, status: "pending", input: data.tool_input }; toolsMap.set(data.tool_id, td); newEvents.push({ type: "tool", data: td }); } + flushTarget(); + } else if (data.type === "tool_result") { + const tool = toolsMap.get(data.tool_id); + if (tool) { tool.status = data.status; tool.result_summary = data.result_summary; } + flushTarget(); + setIsWaiting(true); + } else if (data.type === "done") { + setIsWaiting(false); + const newCost = typeof data.cost_gbp === "number" ? data.cost_gbp : 0; + const stopReason = typeof data.stop_reason === "string" ? data.stop_reason : undefined; + const finalContent = baseContent + appendedText; + const finalEvents = [...baseEvents, ...newEvents]; + setMessages((prev) => prev.map((m, i) => i === idx ? { + ...m, + isComplete: true, + content: finalContent, + events: finalEvents, + cost_gbp: baseCost + newCost, + stop_reason: stopReason, + stopped: false, + } : m)); + if (data.balance) setBalance(data.balance); else fetchBalance(); + if (sessionId.current) { + const savedMessages = messages.map((m, i) => i === idx ? { ...m, isComplete: true, content: finalContent, events: finalEvents } : m); + saveConversation(savedMessages, sessionId.current); + } + } else if (data.type === "error") { + const errorText = `Error: ${data.content || "Something went wrong"}`; + const lastEvent = newEvents[newEvents.length - 1]; + if (lastEvent?.type === "text") lastEvent.content += "\n\n" + errorText; + else newEvents.push({ type: "text", content: errorText }); + appendedText += errorText; + flushTarget(); + } + } catch {} + } + } + } catch (error) { + if (error instanceof DOMException && error.name === "AbortError") { + setMessages((prev) => prev.map((m, i) => i === idx ? { + ...m, + isComplete: true, + content: baseContent + appendedText, + events: [...baseEvents, ...newEvents], + stopped: true, + } : m)); + } else { + const errorText = `Continuation failed: ${error instanceof Error ? error.message : "Unknown error"}`; + setMessages((prev) => prev.map((m, i) => i === idx ? { + ...m, + isComplete: true, + content: baseContent + appendedText + "\n\n" + errorText, + events: [...baseEvents, ...newEvents], + } : m)); + } + } finally { + abortRef.current = null; + setIsStreaming(false); + setIsWaiting(false); + } + }; + const handleKeyDown = (e: React.KeyboardEvent) => { if (e.key === "Enter" && !e.shiftKey) { e.preventDefault(); sendMessage(); } }; @@ -928,6 +1112,23 @@ export default function ChatPage() { {msg.cost_gbp < 0.01 ? `${(msg.cost_gbp * 100).toFixed(2)}p` : `£${msg.cost_gbp.toFixed(3)}`} )} + {msg.isComplete && !isStreaming && (msg.stop_reason === "max_tokens" || msg.stopped) && !msg.events?.some((e) => e.type === "tool" && e.data.status === "pending") && ( +
+ + + {msg.stop_reason === "max_tokens" ? "Truncated at max length" : "Stopped"} + +
+ )} )}