From b465298fcfc84eb7fdca3776e7b263ae02921fbe Mon Sep 17 00:00:00 2001 From: prosdev Date: Sat, 14 Mar 2026 02:09:41 -0700 Subject: [PATCH 1/3] chore: add research-planner agent for structured feature planning Autonomous agent that researches the codebase, asks clarifying questions, presents architectural options, and writes plan files with commit plans and detailed todolists. Co-Authored-By: Claude Opus 4.6 (1M context) --- .claude/agents/research-planner.md | 48 ++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 .claude/agents/research-planner.md diff --git a/.claude/agents/research-planner.md b/.claude/agents/research-planner.md new file mode 100644 index 0000000..4251ce5 --- /dev/null +++ b/.claude/agents/research-planner.md @@ -0,0 +1,48 @@ +--- +name: research-planner +description: "Researches and plans features through structured dialogue. Use when starting a new phase or feature — identifies assumptions, presents architectural options, and breaks work into demoable phases." +tools: Read, Edit, Write, Grep, Glob, Bash, WebSearch, WebFetch +model: opus +color: green +--- + +You are a senior software engineer helping me plan features. + +## Behavior +- Before proposing a solution, identify 2-3 key assumptions and ask me to confirm them. +- For non-trivial decisions (architecture, library choice, data model), present 2-3 options as a + table: approach | tradeoffs | risk | mitigation. +- Default to the simplest approach unless I indicate otherwise. +- Flag irreversible decisions explicitly (e.g., schema changes, public API contracts). + +## Planning +- Break work into phases. Each phase should be independently demoable or revertable. +- For each phase, call out: what could go wrong, how we'd detect it, and how we'd roll back. +- Distinguish between "must decide now" vs "can defer" choices. + +## Communication +- Be direct. Skip preamble. +- When you're uncertain, say so and quantify your confidence if possible. +- If my request is ambiguous, ask a focused clarifying question rather than guessing. + Limit to 3 questions at a time — batch if needed. +- Ask lots of clarifying questions. Don't assume — probe. Cover: scope boundaries, + expected behavior, edge cases, integration points, and anything that could be + interpreted two ways. It's better to ask too many questions than to build the wrong thing. + +## Research Process +1. Read relevant existing code, plans, and skills before proposing anything. +2. Check library documentation and APIs when making technical recommendations. +3. Cross-reference the PROPOSAL.md roadmap and CLAUDE.md constraints. +4. Verify assumptions against the actual codebase — don't guess at file paths or APIs. + +## Output +- Plans go in `.claude/gw-plans/` following the existing structure. +- Each plan should be self-contained: someone reading only the plan file should understand what to build and why. +- Include a "Not in Scope" section to prevent scope creep. +- Include a "Decisions & Risks" section documenting assumptions and their mitigations. +- Include a "Commit Plan" section: ordered list of commits, each with a conventional commit + message, the files touched, and what the commit achieves. Each commit should be independently + buildable and testable — never leave the codebase in a broken state between commits. +- Include a "Detailed Todolist" section: granular, ordered checklist of implementation steps + that Claude can follow mechanically. Each item should be small enough to complete without + further clarification. Group by commit where possible. From ad7d1e784991fa97ea32641c0647dcdc39d15fca Mon Sep 17 00:00:00 2001 From: prosdev Date: Sat, 14 Mar 2026 03:47:07 -0700 Subject: [PATCH 2/3] chore: update research-planner with plan structure and revision workflow Add plan structure guidance (small vs large features, folder layout), line limits (350-400 aim, 500 max), and revision workflow for parallel plan-reviewer findings. Co-Authored-By: Claude Opus 4.6 (1M context) --- .claude/agents/research-planner.md | 33 +++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/.claude/agents/research-planner.md b/.claude/agents/research-planner.md index 4251ce5..ffd09cf 100644 --- a/.claude/agents/research-planner.md +++ b/.claude/agents/research-planner.md @@ -37,7 +37,8 @@ You are a senior software engineer helping me plan features. ## Output - Plans go in `.claude/gw-plans/` following the existing structure. -- Each plan should be self-contained: someone reading only the plan file should understand what to build and why. +- Each plan should be self-contained: someone reading only the plan file should understand + what to build and why. - Include a "Not in Scope" section to prevent scope creep. - Include a "Decisions & Risks" section documenting assumptions and their mitigations. - Include a "Commit Plan" section: ordered list of commits, each with a conventional commit @@ -46,3 +47,33 @@ You are a senior software engineer helping me plan features. - Include a "Detailed Todolist" section: granular, ordered checklist of implementation steps that Claude can follow mechanically. Each item should be small enough to complete without further clarification. Group by commit where possible. + +## Plan Structure — Small vs Large Features +- **Small feature** (1-2 commits, ~1 file changed): single plan file. + Example: `execution/phase-4-api-routes.md` +- **Large feature** (3+ commits, multiple modules): use a folder with an overview + per-commit + part files. This keeps each file reviewable in one pass (~250-300 lines max). + Example: + ``` + execution/phase-3/ + overview.md — architecture, decisions, SSE contract, not-in-scope + 3.1-builder-checkpointer.md — commit plan + detailed todolist for part 1 + 3.2-run-manager.md — commit plan + detailed todolist for part 2 + 3.3-executor-core.md — commit plan + detailed todolist for part 3 + 3.4-routes.md — commit plan + detailed todolist for part 4 + ``` +- The **overview** contains: architecture diagrams, execution flow, decisions & risks table, + SSE/API contracts, not-in-scope. This is the "what and why" — reviewed once. +- Each **part file** contains: commit message, files touched, detailed todolist, tests. + This is the "how" — reviewed per-commit. +- Aim for 350-400 lines per part file, 500 lines max. +- Use your judgement on the threshold. If a plan exceeds ~400 lines or has 3+ distinct + commits touching different modules, split it. + +## Revision Workflow (for the orchestrating agent) +When plan-reviewer findings need to be applied to a large feature (overview + parts): +1. **Fix the overview first** (sequentially) — it sets the architecture decisions that parts reference. +2. **Fix the part files in parallel** — they are independent of each other and can reference + the updated overview. This gives consistency and speed. +The research-planner cannot spawn sub-agents itself. The orchestrating agent (main conversation) +should launch parallel research-planner invocations for the part files after the overview is done. From 79c52f89fe62bfc300da3f50581602db97e29aa0 Mon Sep 17 00:00:00 2001 From: prosdev Date: Sat, 14 Mar 2026 03:47:13 -0700 Subject: [PATCH 3/3] docs: add Phase 3 execution plan (executor + SSE streaming) Split into overview + 4 parts for reviewability: - 3.1: Builder checkpointer parameter - 3.2: RunContext, RunManager, helpers - 3.3: Executor core, streaming, resume protocol - 3.4: Routes, schemas, app wiring Reviewed and revised through 2 plan-review cycles. Key decisions: - Interrupt detection via aget_state().tasks (not __interrupt__) - Schema-based edge derivation (not state.next) - Cooperative timeout excluding pause time - Event IDs for SSE reconnection dedup - Simplified resume protocol (buffer-based, no SSE wait) Co-Authored-By: Claude Opus 4.6 (1M context) --- .claude/gw-plans/execution/README.md | 2 +- .../phase-3/3.1-builder-checkpointer.md | 76 ++++ .../execution/phase-3/3.2-run-manager.md | 204 ++++++++++ .../execution/phase-3/3.3-executor-core.md | 376 ++++++++++++++++++ .../gw-plans/execution/phase-3/3.4-routes.md | 265 ++++++++++++ .../gw-plans/execution/phase-3/overview.md | 322 +++++++++++++++ 6 files changed, 1244 insertions(+), 1 deletion(-) create mode 100644 .claude/gw-plans/execution/phase-3/3.1-builder-checkpointer.md create mode 100644 .claude/gw-plans/execution/phase-3/3.2-run-manager.md create mode 100644 .claude/gw-plans/execution/phase-3/3.3-executor-core.md create mode 100644 .claude/gw-plans/execution/phase-3/3.4-routes.md create mode 100644 .claude/gw-plans/execution/phase-3/overview.md diff --git a/.claude/gw-plans/execution/README.md b/.claude/gw-plans/execution/README.md index 637929d..dc74ad4 100644 --- a/.claude/gw-plans/execution/README.md +++ b/.claude/gw-plans/execution/README.md @@ -9,6 +9,6 @@ FastAPI + LangGraph backend phases. | 1 | [DB, Tools, State Utils](phase-1-db-tools-state-utils.md) | Merged | [#1](https://github.com/prosdevlab/graphweave/pull/1) | | 1.5 | [Scoped API Key Auth](phase-1.5-execution-auth.md) | Merged | [#2](https://github.com/prosdevlab/graphweave/pull/2) | | 2 | [GraphSchema -> LangGraph Builder](phase-2-graph-schema-langgraph-builder.md) | Merged | [#3](https://github.com/prosdevlab/graphweave/pull/3) | -| 3 | Executor + SSE streaming | Not started | — | +| 3 | [Executor + SSE Streaming](phase-3/overview.md) | Planned | — | | 4 | API routes (run, stream, resume, validate, export) | Not started | — | | 5 | Exporter + remaining tools + SSRF transport | Not started | — | diff --git a/.claude/gw-plans/execution/phase-3/3.1-builder-checkpointer.md b/.claude/gw-plans/execution/phase-3/3.1-builder-checkpointer.md new file mode 100644 index 0000000..f483f5c --- /dev/null +++ b/.claude/gw-plans/execution/phase-3/3.1-builder-checkpointer.md @@ -0,0 +1,76 @@ +# Part 3.1: Builder Checkpointer Parameter + +See [overview.md](overview.md) for architecture context. + +## Summary + +Add an optional `checkpointer` keyword argument to `build_graph()` in `app/builder.py`. When provided, it overrides the existing auto-detection logic (which only adds `InMemorySaver` for `human_input` graphs). When not provided, existing behavior is preserved. This is additive -- no existing tests break. + +The executor (Part 3.3) will call `build_graph(schema, checkpointer=InMemorySaver())` so every graph has a checkpointer, enabling `aget_state()` for state snapshots. + +## Implementation + +### Change to `build_graph()` signature + +```python +def build_graph( + schema: dict, + *, + llm_override=None, + checkpointer: "BaseCheckpointSaver | None" = None, # NEW: optional override +) -> BuildResult: +``` + +### Change to compilation section + +```python + has_human_input = any(n["type"] == "human_input" for n in schema["nodes"]) + try: + if checkpointer is not None: + compiled = graph.compile(checkpointer=checkpointer) + elif has_human_input: + from langgraph.checkpoint.memory import InMemorySaver + compiled = graph.compile(checkpointer=InMemorySaver()) + else: + compiled = graph.compile() + return BuildResult(graph=compiled, defaults=defaults) + except Exception as exc: + logger.exception("Graph compilation failed") + raise GraphBuildError("Graph compilation failed") from exc +``` + +## Files + +| Action | File | +|--------|------| +| **modify** | `app/builder.py` | +| **modify** | `tests/unit/test_builder.py` | + +## Tests (3) + +- **test_checkpointer_parameter**: Call `build_graph(schema, checkpointer=InMemorySaver())` with a schema that has no human_input nodes. Verify the provided checkpointer is used. Note: verify actual LangGraph behavior before writing the assertion -- if `result.graph.checkpointer` is not directly accessible, use `isinstance` or check that `aget_state` works (which requires a checkpointer). +- **test_checkpointer_none_preserves_behavior**: Call `build_graph(schema)` with a schema that has no human_input nodes. Verify graph compiles without checkpointer (same as existing behavior). +- **test_checkpointer_overrides_human_input_auto_detection**: Create a schema with human_input nodes. Pass an explicit `saver = InMemorySaver()`. Verify identity: `result.graph.checkpointer is saver` -- the builder must use the provided instance, not create a second one. + +## Commit + +``` +feat: add checkpointer parameter to build_graph + +Allow callers to provide an explicit checkpointer for graph compilation. +The executor uses this to enable state snapshots on all graphs. + +Co-Authored-By: Claude Opus 4.6 (1M context) +``` + +## Detailed Todolist + +- [ ] Read `app/builder.py` current `build_graph()` signature and compilation section +- [ ] Add `checkpointer: BaseCheckpointSaver | None = None` parameter to `build_graph()` function signature (import `BaseCheckpointSaver` from `langgraph.checkpoint.base`) +- [ ] Update docstring to document the new parameter +- [ ] Modify compilation section: if `checkpointer is not None`, use it; elif `has_human_input`, use `InMemorySaver()`; else compile without +- [ ] In `tests/unit/test_builder.py`, add `test_checkpointer_parameter`: call `build_graph(schema, checkpointer=InMemorySaver())`, verify graph has checkpointer. Note: verify actual LangGraph attribute access before writing assertion -- if `result.graph.checkpointer` is not directly accessible, verify `aget_state` works instead. +- [ ] Add `test_checkpointer_none_preserves_behavior`: call `build_graph(schema)` with a schema that has no human_input nodes, verify graph compiles without checkpointer (same as existing behavior) +- [ ] Add `test_checkpointer_overrides_human_input_auto_detection`: create schema with human_input nodes, pass `saver = InMemorySaver()`, verify `result.graph.checkpointer is saver` -- the builder must use the provided instance, not create a second one +- [ ] Run `uv run ruff check app/builder.py tests/unit/test_builder.py` +- [ ] Run `uv run pytest tests/unit/test_builder.py -v` -- all tests pass including new ones diff --git a/.claude/gw-plans/execution/phase-3/3.2-run-manager.md b/.claude/gw-plans/execution/phase-3/3.2-run-manager.md new file mode 100644 index 0000000..d53542c --- /dev/null +++ b/.claude/gw-plans/execution/phase-3/3.2-run-manager.md @@ -0,0 +1,204 @@ +# Part 3.2: RunContext, RunManager, and Helpers + +See [overview.md](overview.md) for architecture context (RunContext fields, concurrent limits, cancellation). + +This part and [3.3-executor-core.md](3.3-executor-core.md) together form a single commit -- they are both `app/executor.py`. Split for readability. + +## Summary + +Define the `RunContext` dataclass, `RunManager` class, and shared helper functions (`format_sse`, `_emit`, `_emit_keepalive`, `_safe_update_run`, `_utcnow_iso`, `_elapsed_ms`). These are the building blocks used by the execution functions in Part 3.3. + +## Implementation + +### SSE Event Helpers + +```python +def format_sse(event: str, data: dict, event_id: int | None = None) -> str: + """Format a server-sent event string. + + Args: + event: SSE event type (e.g. "node_completed") + data: JSON-serializable dict for the data field + event_id: Sequential ID for reconnection. If None, no id: line + is emitted (used for keepalive events). + """ + parts = [] + if event_id is not None: + parts.append(f"id: {event_id}") + parts.append(f"event: {event}") + parts.append(f"data: {json.dumps(data, default=str)}") + parts.append("") # trailing newline + return "\n".join(parts) + "\n" +``` + +Note: `default=str` in `json.dumps` handles datetime objects and other non-serializable types gracefully. + +### RunContext Dataclass + +```python +@dataclass +class RunContext: + run_id: str + graph_id: str + owner_id: str + queue: asyncio.Queue # SSE events (dict) or None (sentinel) + task: asyncio.Task | None + cancel_event: asyncio.Event + status: str # running | paused | completed | error + started_at: float # time.monotonic() + resume_event: asyncio.Event + resume_value: Any = None + compiled_graph: CompiledStateGraph # always provided via start_run + config: dict = field(default_factory=dict) + events: list[dict] = field(default_factory=list) + event_counter: int = 0 # monotonic counter for SSE id: field + schema_dict: dict = field(default_factory=dict) + total_pause_time: float = 0.0 # excluded from timeout +``` + +Each call to `_emit()` increments `event_counter` and stores the ID with the event. Keepalive events bypass the counter (no `id:` field). + +### RunManager Class + +```python +class RunManager: + def __init__(self): + self._runs: dict[str, RunContext] = {} + self._max_per_key: int = int(os.getenv("MAX_RUNS_PER_KEY", "3")) + self._max_global: int = int(os.getenv("MAX_RUNS_GLOBAL", "10")) + self._run_timeout: int = int(os.getenv("RUN_TIMEOUT_SECONDS", "300")) + + def get_run(self, run_id: str) -> RunContext | None + + def active_count_for_owner(self, owner_id: str) -> int + + def active_count_global(self) -> int + + async def start_run( + self, run_id, graph_id, owner_id, compiled_graph, + config, input_data, defaults, schema_dict, db, + ) -> RunContext: + # 1. Check concurrent limits (raise ValueError) + # 2. Create RunContext + # 3. Register in self._runs + # 4. Create asyncio.Task for _execute_run() + # 5. Return RunContext + + async def cancel_run(self, run_id: str) -> bool: + # Set cancel_event. Task will detect and clean up. + + async def submit_resume(self, run_id: str, value: Any) -> bool: + # Store resume_value, signal resume_event + # Returns False if run not found or not paused + + def cleanup_run(self, run_id: str) -> None: + # Remove from self._runs. + # Called by _execute_run (Part 3.3) in its finally block after a + # grace period delay (asyncio.sleep(300) or configurable TTL) so + # reconnecting clients can still replay events. +``` + +### Emit Helpers + +```python +def _emit(ctx: RunContext, event: str, data: dict) -> None: + """Push an SSE event to the run's queue and buffer with sequential ID. + Must only be called from the asyncio event loop thread.""" + ctx.event_counter += 1 + event_dict = {"id": ctx.event_counter, "event": event, "data": data} + ctx.events.append(event_dict) + try: + ctx.queue.put_nowait(event_dict) + except asyncio.QueueFull: + logger.warning( + "SSE queue full for run %s (event %d dropped from live stream, " + "available in replay buffer)", ctx.run_id, ctx.event_counter, + ) + +def _emit_keepalive(ctx: RunContext) -> None: + """Emit a keepalive event with no ID (not buffered for replay).""" + event_dict = {"id": None, "event": "keepalive", "data": {}} + try: + ctx.queue.put_nowait(event_dict) + except asyncio.QueueFull: + pass # best-effort + +async def _safe_update_run(db, run_id: str, **fields) -> None: + """Update run in DB, logging but not raising on failure.""" + try: + await update_run(db, run_id, **fields) + except Exception: + logger.exception("Failed to update run %s in DB", run_id) +``` + +## Files + +| Action | File | +|--------|------| +| **rewrite** | `app/executor.py` (shared with Part 3.3) | +| **create** | `tests/unit/test_run_manager.py` | + +## Tests (8) -- `tests/unit/test_run_manager.py` + +Since `_execute_run` is defined in Part 3.3, RunManager tests mock it with a coroutine that immediately completes (or sleeps briefly for concurrent tests). This keeps 3.2 tests independent of execution logic. + +- **test_concurrent_limit_per_key**: Monkeypatch MAX_RUNS_PER_KEY=2 BEFORE constructing RunManager. Start 2 runs for same owner, 3rd raises ValueError. +- **test_concurrent_limit_global**: Monkeypatch MAX_RUNS_GLOBAL=2 BEFORE constructing RunManager. Start 2 runs for different owners, 3rd raises ValueError. +- **test_concurrent_limit_boundary**: Monkeypatch MAX_RUNS_PER_KEY=2 BEFORE constructing RunManager. Start exactly 2 (both succeed), 3rd fails. Verify boundary is exact. +- **test_cleanup_after_completion**: Run completes, verify cleaned from RunManager. +- **test_get_run_not_found**: `get_run("nonexistent")` returns None. +- **test_cancel_run**: Start run, cancel, verify cancel_event is set and status becomes error. +- **test_submit_resume_sets_value_and_event**: Start run, manually set ctx.status="paused". Call submit_resume("user input"). Verify ctx.resume_value == "user input" and ctx.resume_event.is_set(). +- **test_submit_resume_not_paused_returns_false**: Start run (status="running"). Call submit_resume. Verify returns False, resume_event not set. + +## Detailed Todolist + +#### RunContext and helpers + +- [ ] Open `app/executor.py`, remove existing stub content (keep the file) +- [ ] Add imports: `asyncio`, `json`, `logging`, `os`, `time`, `dataclasses`, `typing`, `datetime`, `aiosqlite`, `langgraph` types, `Command` from `langgraph.types` +- [ ] Add `from app.db.crud import update_run` +- [ ] Implement `format_sse(event, data, event_id=None)`: include `id: N` line when event_id is not None, use `json.dumps(data, default=str)` +- [ ] Implement `_utcnow_iso()` helper +- [ ] Implement `_elapsed_ms(start)` helper +- [ ] Define `RunContext` dataclass with all fields: run_id, graph_id, owner_id, queue (asyncio.Queue), task, cancel_event, status, started_at, resume_event, resume_value, compiled_graph, config, events, event_counter (int = 0), schema_dict, total_pause_time (float = 0.0) + +#### RunManager + +- [ ] Implement `RunManager.__init__()`: read env vars for limits (`MAX_RUNS_PER_KEY`, `MAX_RUNS_GLOBAL`, `RUN_TIMEOUT_SECONDS`) and timeout, init `_runs` dict +- [ ] Implement `RunManager.get_run(run_id)` -> RunContext | None +- [ ] Implement `RunManager.active_count_for_owner(owner_id)` -> int (count runs with status in running/paused) +- [ ] Implement `RunManager.active_count_global()` -> int +- [ ] Implement `RunManager.start_run(...)`: + - Check `active_count_for_owner >= max_per_key` -> raise ValueError with message + - Check `active_count_global >= max_global` -> raise ValueError with message + - Create RunContext with asyncio.Queue(maxsize=1000), asyncio.Event for cancel and resume, event_counter=0 + - Create asyncio.Task for `_execute_run(ctx, input_data, defaults, db)` + - Store in `_runs[run_id]` + - Return RunContext +- [ ] Implement `RunManager.cancel_run(run_id)` -> bool: set cancel_event if run exists +- [ ] Implement `RunManager.submit_resume(run_id, value)` -> bool: + - Get run, verify status is "paused" + - Store `ctx.resume_value = value` + - Set `ctx.resume_event` + - Return True (or False if not found/not paused) +- [ ] Implement `RunManager.cleanup_run(run_id)`: remove from `_runs` dict. Note: called by `_execute_run` (Part 3.3) in its finally block after a grace period delay. + +#### Emit helpers + +- [ ] Implement `_emit(ctx, event, data)`: add docstring comment "Must only be called from the asyncio event loop thread." Increment `ctx.event_counter`, create event_dict with id, append to `ctx.events`, `try: ctx.queue.put_nowait(event_dict) except asyncio.QueueFull: logger.warning(...)` +- [ ] Implement `_emit_keepalive(ctx)`: create event_dict with id=None, `try: ctx.queue.put_nowait(...) except asyncio.QueueFull: pass` +- [ ] Implement `_safe_update_run(db, run_id, **fields)`: `try: await update_run(db, run_id, **fields) except Exception: logger.exception(...)` + +#### Tests + +- [ ] Create `tests/unit/test_run_manager.py` +- [ ] Mock `_execute_run` with a coroutine that immediately completes (or sleeps briefly for concurrent tests). This keeps tests independent of Part 3.3 execution logic. +- [ ] `test_concurrent_limit_per_key`: monkeypatch MAX_RUNS_PER_KEY=2 BEFORE constructing RunManager. Start 2 runs for same owner, 3rd raises ValueError. +- [ ] `test_concurrent_limit_global`: monkeypatch MAX_RUNS_GLOBAL=2 BEFORE constructing RunManager. Start 2 runs for different owners, 3rd raises ValueError. +- [ ] `test_concurrent_limit_boundary`: monkeypatch MAX_RUNS_PER_KEY=2 BEFORE constructing RunManager. Start exactly 2 runs (both succeed), then start 3rd (fails). Verify boundary is exact -- not off-by-one. +- [ ] `test_cleanup_after_completion`: start run, wait for completion, verify run removed from manager (or still accessible for replay) +- [ ] `test_get_run_not_found`: verify get_run("nonexistent") returns None +- [ ] `test_cancel_run`: start run, cancel, verify cancel_event is set and status becomes error +- [ ] `test_submit_resume_sets_value_and_event`: start run, manually set ctx.status="paused". Call submit_resume("user input"). Verify ctx.resume_value == "user input" and ctx.resume_event.is_set(). +- [ ] `test_submit_resume_not_paused_returns_false`: start run (status="running"). Call submit_resume. Verify returns False, resume_event not set. diff --git a/.claude/gw-plans/execution/phase-3/3.3-executor-core.md b/.claude/gw-plans/execution/phase-3/3.3-executor-core.md new file mode 100644 index 0000000..1918ec0 --- /dev/null +++ b/.claude/gw-plans/execution/phase-3/3.3-executor-core.md @@ -0,0 +1,376 @@ +# Part 3.3: Executor Core -- Execution, Streaming, Resume + +See [overview.md](overview.md) for architecture context (execution flow, resume protocol, timeout, reconnection). + +This part and [3.2-run-manager.md](3.2-run-manager.md) together form a single commit -- they are both `app/executor.py`. Split for readability. + +## Summary + +Implement the core execution functions: `_execute_run` (top-level error handling), `_stream_graph` (LangGraph astream loop with post-loop interrupt detection, paired node_started/completed events, schema-based edge_traversed derivation, cooperative timeout), `_wait_for_resume` (keepalive during pause), and `stream_run_sse` (replay + live streaming with deduplication). + +## Implementation + +### `_execute_run` -- Top-Level Execution + +```python +async def _execute_run(ctx, input_data, defaults, db, run_timeout): + """Background task. Never raises -- errors become SSE events.""" + run_start = time.monotonic() + ctx.started_at = run_start + try: + _emit(ctx, "run_started", {"run_id": ctx.run_id, "timestamp": _utcnow_iso()}) + initial_state = {**defaults, **input_data} + # Timeout checked cooperatively inside _stream_graph, NOT via + # asyncio.wait_for (which would include pause time). + await _stream_graph(ctx, initial_state, db, run_timeout) + except asyncio.CancelledError: + _emit(ctx, "error", {"message": "Run cancelled", "recoverable": False}) + ctx.status = "error" + await _safe_update_run(db, ctx.run_id, status="error", + error="Cancelled", duration_ms=_elapsed_ms(run_start)) + except Exception as exc: + logger.exception("Unexpected error in run %s", ctx.run_id) + _emit(ctx, "error", {"message": f"Internal error: {type(exc).__name__}", + "recoverable": False}) + ctx.status = "error" + await _safe_update_run(db, ctx.run_id, status="error", + error=str(exc), duration_ms=_elapsed_ms(run_start)) + finally: + await ctx.queue.put(None) # sentinel closes SSE streams +``` + +### `_stream_graph` -- LangGraph Streaming Loop + +**Key design decisions** (see overview for full rationale): + +- **Interrupt detection**: Do NOT check for `"__interrupt__"` in stream updates. Instead, after `astream` exhausts (for/else), call `aget_state(config)` and check `state.tasks` for interrupts. This is the correct LangGraph mechanism. +- **Node timing**: With `stream_mode="updates"`, updates arrive AFTER the node executes. We emit paired `node_started`/`node_completed` events when each update arrives, using a pending start timestamp to measure duration. Both events are emitted together -- the frontend gets accurate duration data even though it receives them at the same time. +- **Edge derivation**: Do NOT use `aget_state().next` mid-stream. Instead, derive edges from `schema_dict["edges"]`. For non-condition nodes, emit immediately. For condition nodes, defer until the next update reveals which node actually ran, then emit retroactively with the correct `condition_result`. + +```python +async def _stream_graph(ctx, initial_state, db, run_timeout): + """Stream execution, handling interrupts, resume, timeout.""" + graph, config = ctx.compiled_graph, ctx.config + input_data = initial_state + nodes_by_id = {n["id"]: n for n in ctx.schema_dict.get("nodes", [])} + condition_ids = {n["id"] for n in ctx.schema_dict.get("nodes", []) + if n.get("type") == "condition"} + # Build edge lookup: source_id -> list of (target_id, condition_branch) + edges_by_source: dict[str, list[tuple[str, str | None]]] = {} + for edge in ctx.schema_dict.get("edges", []): + edges_by_source.setdefault(edge["source"], []).append( + (edge["target"], edge.get("condition_branch"))) + + while True: # Loop handles resume cycles + pending_node_start = time.monotonic() # start time for the next node + prev_node: str | None = None # last completed node (for deferred condition edges) + deferred_condition_edges: list[tuple[str, str | None]] = [] + # Edges from a condition node -- deferred until we see which node runs next + + async for update in graph.astream(input_data, config=config, + stream_mode="updates"): + if ctx.cancel_event.is_set(): + raise asyncio.CancelledError() + + for node_name, node_output in update.items(): + now = time.monotonic() + + # Emit deferred condition edge_traversed (if previous node was condition) + if deferred_condition_edges: + for source_id, branch_name in deferred_condition_edges: + cond_config = nodes_by_id[source_id].get("config", {}) + branches = cond_config.get("branches", {}) + # Find which branch maps to the node that actually ran + condition_result = None + for bname, target_id in branches.items(): + if target_id == node_name: + condition_result = bname + break + _emit(ctx, "edge_traversed", {"from": source_id, + "to": node_name, "condition_result": condition_result}) + deferred_condition_edges = [] + + # Emit node_started + node_completed as a pair + node_type = nodes_by_id.get(node_name, {}).get("type", "unknown") + _emit(ctx, "node_started", {"node_id": node_name, + "node_type": node_type, "timestamp": _utcnow_iso()}) + + duration_ms = int((now - pending_node_start) * 1000) + state = await graph.aget_state(config) + state_snapshot = state.values if hasattr(state, 'values') else {} + + _emit(ctx, "node_completed", {"node_id": node_name, + "output": node_output, "state_snapshot": state_snapshot, + "duration_ms": duration_ms}) + + # Emit edge_traversed from schema edges + outgoing = edges_by_source.get(node_name, []) + if node_name in condition_ids: + # Defer -- we need to see which node runs next + deferred_condition_edges = [(node_name, branch) + for _, branch in outgoing] + else: + for target_id, _ in outgoing: + _emit(ctx, "edge_traversed", {"from": node_name, + "to": target_id, "condition_result": None}) + + prev_node = node_name + pending_node_start = time.monotonic() + + # Cooperative timeout (excludes pause time) + execution_time = now - ctx.started_at - ctx.total_pause_time + if execution_time >= run_timeout: + timeout_s = int(execution_time) + _emit(ctx, "error", {"message": f"Run timed out after {timeout_s}s of execution", + "recoverable": False}) + ctx.status = "error" + await _safe_update_run(db, ctx.run_id, status="error", + error=f"Timeout after {timeout_s}s", + duration_ms=_elapsed_ms(ctx.started_at)) + return + + # astream exhausted -- check for interrupt via aget_state + state = await graph.aget_state(config) + has_interrupt = (hasattr(state, 'tasks') and state.tasks and + any(t.interrupts for t in state.tasks)) + + if has_interrupt: + interrupt_val = state.tasks[0].interrupts[0].value + _emit(ctx, "graph_paused", { + "node_id": interrupt_val.get("node_id", "unknown"), + "prompt": interrupt_val.get("prompt", ""), + "run_id": ctx.run_id, + "input_key": interrupt_val.get("input_key", ""), + }) + ctx.status = "paused" + await _safe_update_run(db, ctx.run_id, status="paused", + paused_node_id=interrupt_val.get("node_id"), + paused_prompt=interrupt_val.get("prompt")) + + pause_start = time.monotonic() + await _wait_for_resume(ctx) + ctx.total_pause_time += time.monotonic() - pause_start + + input_data = Command(resume=ctx.resume_value) + ctx.status = "running" + await _safe_update_run(db, ctx.run_id, status="running", + paused_node_id=None, paused_prompt=None) + continue # re-enter outer while with Command(resume=...) + + # No interrupt -- graph completed + duration_ms = int((time.monotonic() - ctx.started_at) * 1000) + final_state = state.values if hasattr(state, 'values') else {} + _emit(ctx, "graph_completed", {"final_state": final_state, + "duration_ms": duration_ms}) + ctx.status = "completed" + await _safe_update_run(db, ctx.run_id, status="completed", + final_state=final_state, duration_ms=duration_ms) + return +``` + +### `_wait_for_resume` -- Keepalive During Pause + +```python +async def _wait_for_resume(ctx: RunContext) -> None: + while not ctx.resume_event.is_set(): + try: + await asyncio.wait_for(ctx.resume_event.wait(), timeout=15.0) + except asyncio.TimeoutError: + _emit_keepalive(ctx) + continue + ctx.resume_event.clear() +``` + +### `stream_run_sse` -- SSE Generator with Deduplication + +```python +async def stream_run_sse(ctx, last_event_id=0): + """Replay buffered events after last_event_id, then stream live. + Deduplicates: live loop skips events with id <= last_replayed_id.""" + last_replayed_id = last_event_id + for event_dict in ctx.events: + eid = event_dict["id"] + if eid is not None and eid > last_event_id: + yield format_sse(event_dict["event"], event_dict["data"], event_id=eid) + last_replayed_id = eid + + while True: + event_dict = await ctx.queue.get() + if event_dict is None: + break + eid = event_dict.get("id") + if eid is not None and eid <= last_replayed_id: + continue # already replayed from buffer + yield format_sse(event_dict["event"], event_dict["data"], event_id=eid) +``` + +## Files + +| Action | File | +|--------|------| +| **rewrite** | `app/executor.py` (shared with Part 3.2) | +| **create** | `tests/unit/test_executor.py` | +| **create** | `tests/unit/test_executor_human.py` | +| **create** | `tests/unit/test_executor_reconnect.py` | + +## Tests + +### `test_executor.py` (17 tests) + +Uses `FakeListChatModel` from `langchain_core`. No real LLM calls. + +1. **test_simple_run_completes**: start -> llm -> end. Verify events: run_started, node_started, node_completed (with state_snapshot), graph_completed. +2. **test_tool_run_emits_events**: start -> tool (calculator) -> end. Verify node_completed includes tool output. +3. **test_run_error_handling**: LLM node that raises. Verify error event emitted, DB updated. +4. **test_run_timeout**: Set timeout to 0.1s. Verify timeout error event. +5. **test_run_cancellation**: Start run, set cancel_event. Verify error event. +6. **test_state_snapshot_in_node_completed**: Verify state_snapshot contains full state. +7. **test_edge_traversed_events**: Multi-node graph, verify edge_traversed between nodes with correct source/target from schema edges. +8. **test_format_sse**: Verify format with event_id produces `id: N\nevent: ...\ndata: ...\n\n`. +9. **test_format_sse_no_id**: Verify event_id=None omits `id:` line. +10. **test_format_sse_non_serializable**: Verify `default=str` handles non-JSON types. +11. **test_node_started_events_emitted**: Verify paired node_started (with node_type) and node_completed for each node. Both emitted per update. +12. **test_emit_queue_full_does_not_crash**: Queue(maxsize=1), fill it, call _emit. No exception, event in ctx.events. +13. **test_condition_node_routing_emits_events**: Condition node (field_equals). Verify edge_traversed has correct condition_result derived retroactively from the node that actually ran next. +14. **test_event_ids_are_sequential**: Verify IDs are 1, 2, 3, ... with no gaps. +15. **test_safe_update_run_db_failure_logs_not_raises**: Mock update_run to raise. Verify no exception, error logged. +16. **test_timeout_excludes_pause_time**: Set timeout to 2s. Pause for 3s. Resume. Verify no timeout because pause excluded. +17. **test_stream_after_completion_replays_all_events**: Start run, wait for completion (queue sentinel consumed). Call stream_run_sse from ctx.events. Verify all events are replayed correctly from the buffer alone. + +### `test_executor_human.py` (5 tests) + +17. **test_pause_emits_graph_paused**: Verify graph_paused fields: node_id, prompt, run_id, input_key. +18. **test_resume_continues_execution**: Pause, submit string, verify completion. +19. **test_resume_with_dict_input**: Pause, submit dict, verify completion. +20. **test_keepalive_during_pause**: Verify keepalive events with id=None. +21. **test_double_pause_resume**: start -> human -> llm -> human -> end. Two pause/resume cycles. Exercises outer while True and resume_event.clear(). + +### `test_executor_reconnect.py` (5 tests) + +22. **test_reconnection_replays_from_last_event_id**: stream_run_sse with last_event_id=N skips first N events. +23. **test_reconnection_replays_all_when_no_id**: last_event_id=0 replays all. +24. **test_keepalive_not_replayed**: Keepalive (id=None) skipped in replay. +25. **test_reconnection_no_duplicate_events**: Events in both buffer and queue. Verify live loop skips duplicates. +26. **test_stream_after_completion_replays_all_events**: Client connects after run completed and queue sentinel consumed. Verify replay works from ctx.events alone. + +## Commit + +Parts 3.2 and 3.3 share this commit: + +``` +feat: implement executor with SSE streaming and run management + +Add RunManager for tracking active runs with per-key and global limits. +Execute graphs via astream with state snapshots after each node. +Sequential event IDs for duplicate-free SSE reconnection replay. +Emit node_started before node_completed for each node. +Derive condition_result in edge_traversed from schema branches. +Human-in-the-loop resume with buffered replay (no SSE-listener wait). +Run timeout (5min default) and cancellation via asyncio.Event. +Safe DB updates in exception handlers via _safe_update_run. + +Co-Authored-By: Claude Opus 4.6 (1M context) +``` + +## Detailed Todolist + +#### Core execution + +- [ ] Implement `_execute_run(ctx, input_data, defaults, db, run_timeout)`: + - Record `run_start = time.monotonic()`, explicitly set `ctx.started_at = run_start` + - Emit `run_started` event + - Merge `defaults` with `input_data` -> `initial_state` + - Call `await _stream_graph(ctx, initial_state, db, run_timeout)` -- no asyncio.wait_for (timeout is checked cooperatively inside _stream_graph, excluding pause time) + - Catch `asyncio.CancelledError`: emit error event FIRST, then `await _safe_update_run(...)`, set ctx.status + - Catch `Exception`: log, emit error event FIRST, then `await _safe_update_run(...)`, set ctx.status + - Finally: `await ctx.queue.put(None)` (sentinel). Then after grace period delay, call `run_manager.cleanup_run(ctx.run_id)`. +- [ ] Implement `_stream_graph(ctx, initial_state, db, run_timeout)`: + - Build `nodes_by_id`, `condition_ids`, and `edges_by_source` lookups from `ctx.schema_dict` + - `edges_by_source`: dict mapping source_id -> list of (target_id, condition_branch) + - `input_data = initial_state` + - Outer `while True` loop (for resume cycles) + - Track `pending_node_start = time.monotonic()` before entering astream + - Track `prev_node: str | None = None` and `deferred_condition_edges: list = []` + - Inner `async for update in graph.astream(input_data, config=ctx.config, stream_mode="updates")` + - Check `ctx.cancel_event.is_set()` -> raise CancelledError + - For each `node_name, node_output` in `update.items()`: + - If `deferred_condition_edges` is non-empty (previous node was a condition), emit edge_traversed for each, looking up which branch maps to `node_name` as target. Then clear deferred list. + - Emit `node_started(node_name, node_type, timestamp)` -- node_type from nodes_by_id + - Compute `duration_ms = now - pending_node_start` + - Call `await graph.aget_state(config)` for state snapshot + - Emit `node_completed(node_name, output, state_snapshot, duration_ms)` + - Look up outgoing edges from `edges_by_source[node_name]`: + - If node is a condition: defer edges (store in `deferred_condition_edges`) + - Else: emit `edge_traversed(from=node_name, to=target, condition_result=None)` for each + - Set `pending_node_start = time.monotonic()` for the next node + - Check execution timeout: `execution_time = now - ctx.started_at - ctx.total_pause_time`. If `>= run_timeout`: emit error, safe_update_run, set status, return + - After astream exhausts (falls through the for loop): call `state = await graph.aget_state(config)` + - Check for interrupt: `state.tasks` non-empty and any task has `interrupts` + - If interrupted: + - Extract `interrupt_val = state.tasks[0].interrupts[0].value` + - Emit `graph_paused` with node_id, prompt, run_id, input_key + - Set `ctx.status = "paused"` + - `await _safe_update_run(db, ...)` (status=paused, paused_node_id, paused_prompt) + - Record `pause_start`, call `await _wait_for_resume(ctx)`, accumulate total_pause_time + - Set `input_data = Command(resume=ctx.resume_value)` + - Set `ctx.status = "running"`, safe_update_run (clear pause fields) + - `continue` outer while + - If not interrupted (graph completed): + - Compute duration_ms, get final_state from state.values + - Emit `graph_completed` + - `await _safe_update_run(...)` (status=completed, final_state, duration_ms) + - Set ctx.status, return +- [ ] Implement `_wait_for_resume(ctx)`: + - Loop: `asyncio.wait_for(ctx.resume_event.wait(), timeout=15.0)` + - On timeout: call `_emit_keepalive(ctx)`, continue loop + - When resume_event is set: clear it, return + +#### SSE stream generator + +- [ ] Implement `stream_run_sse(ctx, last_event_id=0)` -> AsyncGenerator[str, None]: + - Track `last_replayed_id = last_event_id` + - Replay: iterate `ctx.events`, for events where `e["id"] is not None and e["id"] > last_event_id`: yield `format_sse(...)`, update `last_replayed_id = e["id"]` + - Live: loop `await ctx.queue.get()`, break on None. Skip events where `eid is not None and eid <= last_replayed_id` (deduplication). Yield remaining events. + +#### Tests -- test_executor.py + +- [ ] Create `tests/unit/test_executor.py` +- [ ] Add imports: pytest, asyncio, FakeListChatModel, build_graph, InMemorySaver, RunManager, RunContext, format_sse, _emit, executor internals +- [ ] Create helper: `_make_simple_schema()` returning start -> llm -> end schema (reuse pattern from test_builder.py) +- [ ] Create fixture: `db` (reuse from conftest.py), `run_manager` (fresh RunManager per test) +- [ ] `test_format_sse`: verify format_sse("test", {"key": "val"}, event_id=1) == "id: 1\nevent: test\ndata: {\"key\": \"val\"}\n\n" +- [ ] `test_format_sse_no_id`: verify format_sse("test", {"key": "val"}, event_id=None) has no "id:" line +- [ ] `test_format_sse_non_serializable`: verify datetime objects handled by default=str +- [ ] `test_simple_run_completes`: build graph with FakeListChatModel, start run via RunManager, collect events from queue, verify: run_started, node_started, node_completed (with state_snapshot dict), graph_completed (with final_state, duration_ms > 0) +- [ ] `test_tool_run_emits_events`: schema with calculator tool node, verify node_completed includes tool output dict +- [ ] `test_run_error_handling`: mock LLM that raises, verify error event emitted and DB status is "error" +- [ ] `test_run_timeout`: set timeout to 0.1s, use asyncio.sleep in a custom node, verify timeout error event +- [ ] `test_run_cancellation`: start run, immediately set cancel_event, verify error event with "cancelled" +- [ ] `test_state_snapshot_in_node_completed`: verify state_snapshot is a dict with all state fields +- [ ] `test_edge_traversed_events`: multi-node graph (start -> tool -> llm -> end), verify edge_traversed emitted between nodes +- [ ] `test_node_started_events_emitted`: multi-node graph, collect events, verify node_started (with node_type field) appears before node_completed for each node +- [ ] `test_emit_queue_full_does_not_crash`: create RunContext with Queue(maxsize=1), put one item to fill it, call _emit, verify no exception and event is in ctx.events +- [ ] `test_condition_node_routing_emits_events`: graph with condition node (field_equals routing), run with state that triggers a specific branch, verify edge_traversed has condition_result matching the branch name +- [ ] `test_event_ids_are_sequential`: run a simple graph, collect all events from ctx.events, verify IDs are 1, 2, 3, ... with no gaps and no duplicates +- [ ] `test_safe_update_run_db_failure_logs_not_raises`: mock `update_run` to raise Exception. Call `_safe_update_run(db, run_id, status="error")`. Verify no exception propagates. Verify logger.exception was called (use caplog or mock). +- [ ] `test_timeout_excludes_pause_time`: schema with human_input node. Set run_timeout to 2s. Start run, wait for pause. Wait 3s (simulating slow user -- exceeds timeout). Submit resume. Verify run does NOT emit timeout error. Verify run completes normally because pause time is excluded from execution time. +- [ ] `test_stream_after_completion_replays_all_events`: start run, wait for completion so queue sentinel is consumed. Call stream_run_sse(ctx, last_event_id=0). Verify all events from ctx.events are yielded. Verifies replay works when queue is already drained. + +#### Tests -- test_executor_human.py + +- [ ] Create `tests/unit/test_executor_human.py` +- [ ] Create helper: `_make_human_schema()` returning start -> human_input -> llm -> end +- [ ] `test_pause_emits_graph_paused`: start run, collect events until graph_paused. Verify fields: node_id, prompt, run_id, input_key. Verify event has an id field. +- [ ] `test_resume_continues_execution`: start run, wait for pause, submit_resume with string value, verify execution continues with node_completed and graph_completed. +- [ ] `test_resume_with_dict_input`: start run, wait for pause, submit_resume with dict value `{"answer": "yes"}`, verify execution completes successfully. +- [ ] `test_keepalive_during_pause`: start run, wait for pause, collect events for >15s worth of timeout cycles, verify keepalive events appear and have id=None (no id field). +- [ ] `test_double_pause_resume`: create schema start -> human_1 -> llm -> human_2 -> end. Start run. Wait for first pause (human_1). Submit resume with "first answer". Wait for second pause (human_2). Submit resume with "second answer". Verify graph completes with both values in state. This exercises the outer `while True` loop and verifies `resume_event.clear()` works correctly between pauses. + +#### Tests -- test_executor_reconnect.py + +- [ ] Create `tests/unit/test_executor_reconnect.py` +- [ ] `test_reconnection_replays_from_last_event_id`: start a run, wait for completion so ctx.events is populated. Call stream_run_sse with last_event_id equal to the 2nd event's ID. Collect yielded events. Verify: first yielded event has id == 3rd event's ID (skipped first two), no duplicates, total count == len(ctx.events) - 2. +- [ ] `test_reconnection_replays_all_when_no_id`: call stream_run_sse with last_event_id=0. Verify all events from ctx.events are replayed. +- [ ] `test_keepalive_not_replayed`: manually add a keepalive event (id=None) to ctx.events, call stream_run_sse with last_event_id=0, verify keepalive is skipped in replay. +- [ ] `test_reconnection_no_duplicate_events`: start a run, let it complete so events are in both ctx.events and the queue. Call stream_run_sse with last_event_id=2 (simulating client saw first 2 events). Collect all yielded SSE strings. Parse event IDs from output. Verify no ID appears twice -- the live loop must skip events with id <= last_replayed_id. +- [ ] `test_stream_after_completion_replays_all_events`: start run, wait for completion (queue sentinel consumed). Call stream_run_sse(ctx, last_event_id=0). Verify all events are replayed from ctx.events buffer alone. diff --git a/.claude/gw-plans/execution/phase-3/3.4-routes.md b/.claude/gw-plans/execution/phase-3/3.4-routes.md new file mode 100644 index 0000000..81f4c1c --- /dev/null +++ b/.claude/gw-plans/execution/phase-3/3.4-routes.md @@ -0,0 +1,265 @@ +# Part 3.4: Routes, Schemas, and App Wiring + +See [overview.md](overview.md) for architecture context (route map, reconnection recovery, resume protocol). + +**Prerequisite**: Part 3.1 must be implemented before 3.4. The route calls `build_graph(schema, checkpointer=saver)` which requires the checkpointer parameter added in 3.1. + +## Summary + +Add Pydantic request/response schemas for runs. Add 4 routes split across two routers: + +- `POST /v1/graphs/{graph_id}/run` on the **existing graphs router** (`app/routes/graphs.py`) +- `GET /v1/runs/{id}/stream`, `POST /v1/runs/{id}/resume`, `GET /v1/runs/{id}/status` on a **new runs router** (`app/routes/runs.py`) + +Wire `RunManager` to `app.state` in the lifespan and include the runs router. + +## Pydantic Schemas -- `app/schemas/runs.py` + +```python +class StartRunRequest(BaseModel): + input: dict = Field(default_factory=dict, + description="Initial input values to merge with state defaults") + +class StartRunResponse(BaseModel): + run_id: str + status: str = "running" + +class RunStatusResponse(BaseModel): + run_id: str + graph_id: str + status: str # running | paused | completed | error + node_id: str | None = None # when paused + prompt: str | None = None # when paused + final_state: dict | None = None # when completed + duration_ms: int | None = None # when completed or error + error: str | None = None # when error + +class ResumeRunRequest(BaseModel): + input: bool | str | dict | list | int | float = Field(..., + description="The human input to resume the paused run. " + "Type depends on the human_input node's input_key state field.") +``` + +## Routes + +### E1. Start Run (on graphs router -- `app/routes/graphs.py`) + +``` +POST /v1/graphs/{graph_id}/run +Scope: runs:write +Body: StartRunRequest +Response: 202 Accepted, StartRunResponse + +Flow: +1. Fetch graph from DB (404 if not found / not owned) +2. build_graph(schema, checkpointer=InMemorySaver()) +3. Check concurrent run limits (429 if exceeded) +4. create_run(db, ..., status="running") +5. run_manager.start_run(...) +6. Return { run_id, status: "running" } +``` + +### E2. Stream SSE (new runs router) + +``` +GET /v1/runs/{run_id}/stream +Scope: runs:read +Response: text/event-stream (SSE) +Query params: last_event_id (optional, int) +Header: Last-Event-ID (optional) + +Flow: +1. Auth check + ownership verification (verify ctx.owner_id == auth.owner_id) +2. Parse last_event_id from header or query param, default 0 +3. Get RunContext from RunManager (verify ownership) +4. If run not in RunManager but exists in DB: + a. completed/error: single terminal event + close + b. paused: single graph_paused event + close + c. running but not in manager: error "Run lost (server restarted)" +5. Return StreamingResponse with stream_run_sse(ctx, last_event_id) + Headers: Cache-Control: no-cache, X-Accel-Buffering: no +``` + +### E3. Resume Run (new runs router) + +``` +POST /v1/runs/{run_id}/resume +Scope: runs:write +Body: ResumeRunRequest +Response: 202 Accepted + +Flow: +1. Auth check + ownership (verify ctx.owner_id == auth.owner_id) +2. Get RunContext from RunManager (404 if not found or wrong owner) +3. Verify status is "paused" (409 if not) +4. run_manager.submit_resume(run_id, input) +5. Return 202 +``` + +### E4. Run Status (new runs router) + +``` +GET /v1/runs/{run_id}/status +Scope: runs:read +Response: RunStatusResponse + +Flow: +1. Auth check + ownership (verify ctx.owner_id == auth.owner_id) +2. Check RunManager first (live status, verify ownership) +3. Fall back to DB (pass owner_id to get_run query) +4. Return RunStatusResponse with relevant fields +``` + +## App Wiring -- `app/main.py` + +1. Add `RunManager` to `app.state` in lifespan +2. Add cleanup in lifespan: cancel all active runs on shutdown +3. Include runs router +4. Add `"Runs"` to OpenAPI tags + +## Files + +| Action | File | +|--------|------| +| **create** | `app/schemas/runs.py` | +| **modify** | `app/schemas/__init__.py` | +| **modify** | `app/routes/graphs.py` | +| **create** | `app/routes/runs.py` | +| **modify** | `app/main.py` | +| **create** | `tests/unit/test_routes_runs.py` | + +## Tests (19) -- `tests/unit/test_routes_runs.py` + +1. **test_start_run_returns_202**: POST /v1/graphs/{id}/run with valid graph. +2. **test_start_run_graph_not_found**: 404 for nonexistent graph. +3. **test_start_run_wrong_owner**: 404 for graph owned by different key. +4. **test_start_run_invalid_scope**: 403 without runs:write. +5. **test_start_run_invalid_schema_returns_422**: Malformed schema (missing start node). Verify 422. +6. **test_start_run_concurrent_limit_returns_429**: Start max runs, attempt one more, verify 429. +7. **test_run_status_running**: Start run, check status. +8. **test_run_status_completed**: Wait for completion, check status. +9. **test_run_status_not_found**: 404 for nonexistent run. +10. **test_run_status_falls_back_to_db**: Complete run, remove from RunManager, verify GET /status returns from DB. +11. **test_stream_endpoint_content_type**: Verify text/event-stream. +12. **test_stream_with_last_event_id_query_param**: Verify ?last_event_id=N skips events. +13. **test_stream_completed_run_returns_terminal_event**: Completed run not in RunManager. GET /stream returns single graph_completed. +14. **test_stream_lost_run_returns_error_event**: Running in DB but not in RunManager. GET /stream returns error. +15. **test_stream_wrong_owner_returns_404**: Start run with key A, GET /stream with key B, verify 404. +16. **test_resume_paused_run_returns_202**: Create graph with human_input, start run, wait for pause, POST resume, verify 202. +17. **test_resume_not_paused**: 409 when run is not paused. +18. **test_resume_wrong_owner_returns_404**: Pause run with key A, POST resume with key B, verify 404. +19. **test_resume_after_server_restart_returns_404**: Paused in DB but not in RunManager. POST /resume returns 404. + +## Commit + +``` +feat: add run routes for start, stream, resume, and status + +POST /v1/graphs/{id}/run starts execution and returns run_id. +GET /v1/runs/{id}/stream opens SSE with Last-Event-ID reconnection. +POST /v1/runs/{id}/resume accepts any JSON type as human input. +GET /v1/runs/{id}/status supports reconnection with DB fallback. + +Co-Authored-By: Claude Opus 4.6 (1M context) +``` + +## Detailed Todolist + +#### Pydantic schemas + +- [ ] Create `app/schemas/runs.py` +- [ ] Define `StartRunRequest(BaseModel)`: input: dict with default_factory=dict, Field description +- [ ] Define `StartRunResponse(BaseModel)`: run_id: str, status: str = "running" +- [ ] Define `RunStatusResponse(BaseModel)`: run_id, graph_id, status, node_id (optional), prompt (optional), final_state (optional), duration_ms (optional), error (optional) +- [ ] Define `ResumeRunRequest(BaseModel)`: input: `bool | str | dict | list | int | float` (bool before int to avoid Pydantic coercion) with Field description noting type depends on the human_input node's input_key state field +- [ ] Update `app/schemas/__init__.py` to export new schemas + +#### Routes + +- [ ] Modify `app/routes/graphs.py` to add the run endpoint: + - Import: build_graph, InMemorySaver, GraphBuildError, RunManager, crud (create_run), StartRunRequest, StartRunResponse + - Add helper `_get_run_manager(request) -> RunManager` that reads from `request.app.state.run_manager` + - Implement `POST /v1/graphs/{graph_id}/run`: + - Scope: `runs:write` + - Fetch graph from DB with owner_id check -> 404 + - Call `build_graph(graph.schema_json, checkpointer=InMemorySaver())` + - Catch `GraphBuildError` -> 422 with error detail + - Check concurrent limits (catch ValueError from RunManager) -> 429 + - `create_run(db, graph_id, owner_id, "running", body.input)` + - `run_manager.start_run(...)` with config `{"configurable": {"thread_id": run_id}}`, pass schema_dict=graph.schema_json + - Return 202 with `StartRunResponse(run_id=run.id)` + +- [ ] Create `app/routes/runs.py` +- [ ] Import: APIRouter, Depends, Header, Query, HTTPException, Request, StreamingResponse, require_scope, get_db, RunManager, crud, schemas +- [ ] Define `router = APIRouter(prefix="/v1/runs", tags=["Runs"])` +- [ ] Define helper `_get_run_manager(request) -> RunManager` that reads from `request.app.state.run_manager` + +- [ ] Implement `GET /v1/runs/{run_id}/stream`: + - Scope: `runs:read` + - Parse `last_event_id`: read from `Last-Event-ID` header (standard SSE), fall back to `?last_event_id=N` query param, default 0. Parse as int, ignore if not a valid int. + - Get RunContext from RunManager. Verify `ctx.owner_id == auth.owner_id` (404 if wrong owner). + - If not in RunManager: check DB via `get_run()` with `owner_id=auth.owner_id` + - completed: yield single `graph_completed` event with final_state/duration_ms from DB, then close + - error: yield single `error` event with error message from DB, then close + - paused: yield single `graph_paused` event with paused_node_id/paused_prompt from DB, then close + - running but not in manager: yield `error` event "Run lost (server restarted)", then close + - not found: 404 + - If in RunManager: return `StreamingResponse(stream_run_sse(ctx, last_event_id), media_type="text/event-stream")` + - Set response headers: `Cache-Control: no-cache`, `X-Accel-Buffering: no` (for nginx proxy) + +- [ ] Implement `POST /v1/runs/{run_id}/resume`: + - Scope: `runs:write` + - Get RunContext from RunManager -> 404 if not found. Verify `ctx.owner_id == auth.owner_id` (404 if wrong owner). + - Verify status is "paused" -> 409 Conflict if not + - `run_manager.submit_resume(run_id, body.input)` + - Return 202 + +- [ ] Implement `GET /v1/runs/{run_id}/status`: + - Scope: `runs:read` + - Check RunManager first (live status from RunContext). Verify `ctx.owner_id == auth.owner_id`. + - If not in RunManager, fall back to DB via `get_run()` with `owner_id=auth.owner_id` + - 404 if not found in either + - Return `RunStatusResponse` with fields populated from live or DB data + +#### App wiring + +- [ ] In `app/main.py`, import `RunManager` from `app.executor` +- [ ] In `app/main.py`, import `runs router` from `app.routes.runs` +- [ ] In lifespan, create `RunManager()` and attach to `app.state.run_manager` +- [ ] Add cleanup in lifespan yield: cancel all active runs on shutdown (iterate RunManager._runs, call cancel_run) +- [ ] Add `app.include_router(runs_router)` +- [ ] Add `"Runs"` entry to `tags_metadata` with description +- [ ] Add `"Last-Event-ID"` to `allow_headers` in CORS middleware in `app/main.py` +- [ ] Verify `ContentTypeMiddleware` only applies to POST/PUT/PATCH, not GET. If it blocks GET requests, add exclusion for the `/v1/runs/{id}/stream` endpoint (which is GET but returns text/event-stream, not JSON). + +#### Route tests + +- [ ] Create `tests/unit/test_routes_runs.py` +- [ ] Create fixture: `client` with httpx.AsyncClient + ASGITransport, set up DB, RunManager on app.state, create test API key with all scopes +- [ ] Create helper: `_create_test_graph(client, api_key)` that POSTs a valid graph with a simple start -> llm -> end schema and returns graph_id +- [ ] `test_start_run_returns_202`: create graph, POST /v1/graphs/{id}/run, verify 202 + run_id in response +- [ ] `test_start_run_graph_not_found`: POST with nonexistent graph_id -> 404 +- [ ] `test_start_run_wrong_owner`: create graph with key A, try to run with key B -> 404 +- [ ] `test_start_run_invalid_scope`: use key without runs:write -> 403 +- [ ] `test_start_run_invalid_schema_returns_422`: create a graph whose schema_json is missing a start node. POST /v1/graphs/{id}/run. Verify 422 response with error detail from GraphBuildError. +- [ ] `test_start_run_concurrent_limit_returns_429`: monkeypatch MAX_RUNS_PER_KEY=1 BEFORE constructing RunManager. Start 1 run, start 2nd, verify 429. +- [ ] `test_run_status_running`: start run, immediately GET /v1/runs/{id}/status -> status: "running" +- [ ] `test_run_status_completed`: start run with FakeListChatModel, wait briefly for completion, GET status -> "completed" with final_state +- [ ] `test_run_status_not_found`: GET status for nonexistent run -> 404 +- [ ] `test_run_status_falls_back_to_db`: start run, wait for completion, call RunManager.cleanup_run to remove from live state, GET /v1/runs/{id}/status still returns from DB with status "completed" +- [ ] `test_stream_endpoint_content_type`: GET /v1/runs/{id}/stream, verify Content-Type: text/event-stream +- [ ] `test_stream_with_last_event_id_query_param`: start run, wait for completion, GET /v1/runs/{id}/stream?last_event_id=2, verify response skips first 2 events +- [ ] `test_stream_completed_run_returns_terminal_event`: start run, wait for completion, call RunManager.cleanup_run to remove live state. GET /v1/runs/{id}/stream returns a single graph_completed event (from DB data) and closes. +- [ ] `test_stream_lost_run_returns_error_event`: insert a run in DB with status="running" directly (not via RunManager). GET /v1/runs/{id}/stream returns a single error event "Run lost (server restarted)" and closes. +- [ ] `test_stream_wrong_owner_returns_404`: start run with key A. GET /v1/runs/{id}/stream with key B. Verify 404. +- [ ] `test_resume_paused_run_returns_202`: create graph with human_input, start run, wait for pause, POST /v1/runs/{id}/resume with valid input, verify 202. +- [ ] `test_resume_not_paused`: start run (no human_input), POST resume -> 409 +- [ ] `test_resume_wrong_owner_returns_404`: pause run with key A. POST /v1/runs/{id}/resume with key B. Verify 404. +- [ ] `test_resume_after_server_restart_returns_404`: insert a run in DB with status="paused" directly (not via RunManager). POST /v1/runs/{id}/resume returns 404 because RunContext is not in RunManager. + +#### Post-implementation housekeeping + +- [ ] Update `.claude/skills/gw-execution/SKILL.md`: add run routes to "API routes (implemented)" section, move from "planned" section. Update route URLs to match actual implementation. +- [ ] Run full test suite: `uv run pytest tests/unit/ -v` +- [ ] Run linter: `uv run ruff check app/ tests/` +- [ ] Run formatter: `uv run ruff format --check app/ tests/` diff --git a/.claude/gw-plans/execution/phase-3/overview.md b/.claude/gw-plans/execution/phase-3/overview.md new file mode 100644 index 0000000..a4a2b9b --- /dev/null +++ b/.claude/gw-plans/execution/phase-3/overview.md @@ -0,0 +1,322 @@ +# Phase 3: Executor + SSE Streaming -- Overview + +## Context + +The execution layer has a DB layer with tools and state utils (Phase 1), scoped API key auth with CRUD routes (Phase 1.5), and the GraphSchema-to-LangGraph builder (Phase 2). The builder produces a compiled `StateGraph` via `build_graph()` that returns a `BuildResult(graph, defaults)`. The executor stub in `app/executor.py` has a placeholder `stream_run()` and a `format_sse()` helper. + +This phase implements the core execution engine: take a compiled graph, run it, stream SSE events to the client in real time, handle human-in-the-loop interrupts with the full resume protocol, persist run state to the database, and support reconnection recovery. + +**This phase also includes a minimal set of routes** (`POST /v1/graphs/{id}/run`, `GET /v1/runs/{id}/stream`, `POST /v1/runs/{id}/resume`, `GET /v1/runs/{id}/status`) so the executor is demoable end-to-end. The remaining routes (validate, export, run history listing) stay in Phase 4. + +**URL scheme note**: This plan uses `/v1/runs/{id}/...` for run-specific routes. The PROPOSAL and some skill docs use `/v1/graphs/run/{id}/...`. We deliberately adopt the REST-standard plural resource pattern (`/v1/runs/...`) per the `gw-api-design` skill. The PROPOSAL URLs, `gw-execution` skill doc, and frontend stub (`packages/canvas/src/api/runs.ts`) will be updated during implementation to match. + +## Parts + +| Part | File | Commit | Summary | +|------|------|--------|---------| +| 3.1 | [builder-checkpointer](3.1-builder-checkpointer.md) | 1 | Add optional `checkpointer` param to `build_graph()` | +| 3.2 | [run-manager](3.2-run-manager.md) | 2a | RunContext, RunManager, helpers (`_emit`, `format_sse`, `_safe_update_run`) | +| 3.3 | [executor-core](3.3-executor-core.md) | 2b | `_execute_run`, `_stream_graph`, `_wait_for_resume`, `stream_run_sse` | +| 3.4 | [routes](3.4-routes.md) | 3 | Pydantic schemas, 4 routes, app wiring | + +Parts 3.2 and 3.3 are in the same commit (they are both `app/executor.py`). They are split into separate plan files for readability. + +No new Python dependencies are added. Everything needed is already installed: `fastapi` (StreamingResponse), `langgraph` (astream, InMemorySaver, Command), `aiosqlite` (run persistence). + +--- + +## Architecture + +### Execution Flow + +``` +POST /v1/graphs/{id}/run + { input: {...} } + | + v + +-----------------+ + | Route handler | 1. Fetch graph from DB + | | 2. build_graph(schema) -> BuildResult + | | 3. create_run(db, ..., status="running") + | | 4. Return { run_id } immediately (202 Accepted) + | | 5. Launch execute_run() as background task + +-----------------+ + | + v + +-----------------+ + | execute_run() | Runs in background asyncio task. + | | Calls graph.astream() with stream_mode="updates". + | | Pushes SSE events to an asyncio.Queue per run. + | | Updates run status in DB on completion/error/pause. + +-----------------+ + | + | asyncio.Queue (SSE events) + v + +-------------------+ + | GET /stream | Opens EventSource connection. + | | Reads from the run's event queue. + | | Yields SSE-formatted strings. + | | Closes on graph_completed or error. + +-------------------+ +``` + +### SSE Event Contract + +SSE (Server-Sent Events) is a simple protocol: the server sends a stream of `event: \ndata: \n\n` messages over a long-lived HTTP connection. The browser uses `EventSource` to consume them. Key properties: + +- **Unidirectional**: server to client only (client uses POST for input) +- **Auto-reconnect**: `EventSource` reconnects automatically on disconnect, sending `Last-Event-ID` header +- **Text-based**: each event is `id: \nevent: \ndata: \n\n` +- **Resumable**: sequential `id:` field enables reconnection without duplicate events + +Every SSE event carries a monotonically increasing `id:` field (integer, per run, starting at 1). This is the standard SSE mechanism for reconnection: when `EventSource` reconnects, it sends a `Last-Event-ID` header with the last received ID. The server replays only events after that ID from its buffer. + +``` +id: 1 +event: run_started +data: {"run_id": "...", "timestamp": "..."} + +id: 2 +event: node_started +data: {"node_id": "llm_1", "node_type": "llm", "timestamp": "..."} + +id: 3 +event: node_completed +data: {"node_id": "llm_1", "output": {"result": "..."}, + "state_snapshot": {"messages": [...], "result": "..."}, + "duration_ms": 342} + +id: 4 +event: edge_traversed +data: {"from": "llm_1", "to": "cond_1", "condition_result": "go_a"} + +id: 5 +event: graph_paused +data: {"node_id": "human_1", "prompt": "Enter your input", + "run_id": "...", "input_key": "user_input"} + +id: 6 +event: graph_completed +data: {"final_state": {...}, "duration_ms": 1523} + +id: 7 +event: error +data: {"node_id": "web_search_1", "message": "Rate limit exceeded", + "recoverable": true} + +event: keepalive +data: {} +``` + +Note: `keepalive` events have no `id:` -- they are not meaningful for replay and should not advance the client's `Last-Event-ID`. + +**Wire protocol updates needed**: `packages/shared/src/events.ts` must be updated to match these events. Specifically: add `node_type: string` to `node_started`, and add `input_key: string` to `graph_paused`. These are wire protocol types (not persisted) -- no migration needed, just a TypeScript type change. + +### State Snapshot Approach + +We use `stream_mode="updates"` (not combined `["updates", "values"]`). The `updates` mode gives us `{node_name: returned_dict}` after each node. For the full state snapshot, we call `graph.aget_state(config)` after each node completion. + +**Why not `stream_mode=["updates", "values"]`**: Combined mode yields tuples of `("updates", {...})` and `("values", {...})` interleaved. Correlating which `values` event belongs to which `updates` event requires position tracking. Using `aget_state()` after each update is simpler, adds negligible overhead (reads from in-memory checkpointer), and gives us the exact same data. + +### RunContext -- In-Memory State + +```python +@dataclass +class RunContext: + run_id: str + graph_id: str + owner_id: str + queue: asyncio.Queue[dict | None] # None = sentinel for stream end + task: asyncio.Task | None + cancel_event: asyncio.Event + status: str # running | paused | completed | error + started_at: float # time.monotonic() + resume_event: asyncio.Event # signaled when resume value is submitted + resume_value: Any # the value to feed to Command(resume=...) + compiled_graph: CompiledStateGraph + config: dict # LangGraph config with thread_id + events: list[dict] # buffered events for reconnection replay + event_counter: int # monotonic counter for SSE id: field + schema_dict: dict # original GraphSchema for condition routing + total_pause_time: float # accumulated seconds spent paused +``` + +The `RunManager` is a singleton attached to `app.state`. It holds no persistent data -- all persistence goes through the DB. If the server restarts, active runs are lost (acceptable for v1 -- the frontend detects this via the `/status` endpoint and shows "Run lost"). + +### Human-in-the-Loop Resume Protocol + +The PROPOSAL describes a two-phase wait (wait for resume POST, then wait for SSE listener). We simplify this: since all events are buffered in `ctx.events` with sequential IDs, the SSE listener wait is unnecessary. If the frontend reconnects late, it uses `Last-Event-ID` to replay missed events. No events are lost. + +``` +1. Graph hits interrupt() in human_input node +2. Executor detects __interrupt__ in stream updates +3. Emits graph_paused SSE event +4. Updates DB: status="paused", paused_node_id, paused_prompt +5. Executor enters _wait_for_resume() loop (keepalive every 15s) + +--- User submits input --- + +6. POST /v1/runs/{id}/resume { input: "user's answer" } +7. Server stores resume_value on RunContext +8. Server sets resume_event +9. Returns 202 Accepted + +--- Executor resumes immediately --- + +10. _wait_for_resume() detects resume_event, returns +11. Executor feeds Command(resume=value) to graph.astream() +12. Execution continues, events buffered in ctx.events +13. Frontend reconnects to GET /stream with Last-Event-ID +14. Server replays only events after that ID, then streams live +``` + +**Why no SSE listener wait**: The event buffer with sequential IDs solves the problem more cleanly. The two-phase wait added complexity (extra asyncio.Event, timeout logic, notify_sse_connected coordination) for a problem that the buffered replay already handles. + +**Server restart during resume**: If the server restarts between the resume POST and the frontend reconnecting, the run is lost. The `/status` endpoint returns DB state showing the last persisted status (likely "paused" or "running"). The frontend handles this as "Run lost." This is acceptable for v1 -- persistent checkpointer in Phase 5 addresses it. + +### Reconnection Recovery + +``` +GET /v1/runs/{id}/status + { status: "running", run_id, graph_id } -> client opens /stream + { status: "paused", run_id, graph_id, node_id, prompt } + { status: "completed", run_id, graph_id, final_state, duration_ms } + { status: "error", run_id, graph_id, error } + 404 -> run not found or not owned by caller +``` + +When a client reconnects to `/stream`, it sends the `Last-Event-ID` header. The server replays only events from `RunContext.events` where `event_id > last_event_id`, then streams live. The live loop skips any event whose `id <= last_replayed_id` to avoid duplicates from the buffer/queue overlap. + +### Cancellation + +Each `RunContext` has a `cancel_event: asyncio.Event`. When set, the executor checks between node executions, breaks out of the loop, emits an error event, and updates the DB. For Phase 3, cancellation is triggered by the RunManager on timeout. + +### Run Timeout + +Every run has a 5-minute **execution time** timeout (excluding pause time). The timeout is tracked manually via cumulative execution time on `RunContext`: +- After each node completes, `_stream_graph` checks `execution_time >= timeout` +- `execution_time = time.monotonic() - started_at - total_pause_time` +- When entering `_wait_for_resume`, the pause start time is recorded +- When resuming, the pause duration is added to `total_pause_time` + +This replaces `asyncio.wait_for` which would include pause time. Configurable via `RUN_TIMEOUT_SECONDS` env var (default: 300). + +### Checkpointer Strategy + +**Decision: `InMemorySaver` for Phase 3. Add `langgraph-checkpoint-sqlite` in Phase 5.** + +`build_graph()` gains an optional `checkpointer` parameter. The executor creates a single `InMemorySaver` instance, passes it to `build_graph(schema, checkpointer=saver)`, and uses the same instance when calling `astream(config=...)`. The builder uses the provided checkpointer instead of creating its own -- this is critical because `aget_state()` must read from the same checkpointer that `astream()` writes to. Swapping to `AsyncSqliteSaver` later is a one-line change. Risk of losing paused runs on server restart is acceptable for v1 -- the `/status` endpoint returns DB state, and the frontend shows "Run lost." + +### Concurrent Run Limit + +**Decision: 3 concurrent runs per API key, 10 globally.** + +`RunManager` checks both limits before launching. Returns 429 if exceeded. Configurable via `MAX_RUNS_PER_KEY` (default: 3) and `MAX_RUNS_GLOBAL` (default: 10). + +--- + +## Engineering Decisions + +| # | Approach (chosen) | Alternatives | Key tradeoff | +|---|-------------------|-------------|--------------| +| 1 | **Background asyncio.Task** | Inline in SSE response; Celery worker | Run survives SSE disconnect. Events buffered for replay. | +| 2 | **Manual StreamingResponse** | `sse-starlette` package | No extra dependency. Full control over format. | +| 3 | **`aget_state()` per node** | Combined stream_mode; values-only | Clean, explicit. No tuple correlation. Negligible overhead. | +| 4 | **Optional checkpointer param** | Always add checkpointer; Re-compile in executor | Additive, backward-compatible. Existing tests unaffected. | + +--- + +## Files Summary + +| Action | File | Part | Notes | +|--------|------|------|-------| +| **modify** | `app/builder.py` | 3.1 | Add optional `checkpointer` param | +| **rewrite** | `app/executor.py` | 3.2, 3.3 | RunContext, RunManager, execution, streaming | +| **create** | `app/schemas/runs.py` | 3.4 | Request/response models | +| **create** | `app/routes/runs.py` | 3.4 | GET stream, POST resume, GET status | +| **modify** | `app/routes/graphs.py` | 3.4 | Add POST /v1/graphs/{id}/run | +| **modify** | `app/main.py` | 3.4 | RunManager in lifespan, include router | +| **modify** | `app/schemas/__init__.py` | 3.4 | Export new schemas | +| **modify** | `tests/unit/test_builder.py` | 3.1 | 3 checkpointer tests | +| **create** | `tests/unit/test_executor.py` | 3.3 | 17 core execution tests | +| **create** | `tests/unit/test_executor_human.py` | 3.3 | 5 human-in-the-loop tests | +| **create** | `tests/unit/test_executor_reconnect.py` | 3.3 | 5 reconnection tests | +| **create** | `tests/unit/test_run_manager.py` | 3.2 | 8 RunManager tests | +| **create** | `tests/unit/test_routes_runs.py` | 3.4 | 19 route tests | + +--- + +## Not in Scope + +- **Remaining Phase 4 routes**: `/validate`, `/export`, run history listing, run deletion +- **Persistent checkpointer**: `langgraph-checkpoint-sqlite` (Phase 5) +- **Token-level streaming**: `stream_mode="messages"` for LLM token streaming (future) +- **WebSocket alternative**: SSE is sufficient for unidirectional streaming +- **Run history pagination**: Listing past runs with filters (Phase 4) +- **Frontend SSE hook**: The `runSlice.ts` and EventSource integration (frontend phase) +- **LLM retry/circuit-breaker**: Retry logic for transient LLM errors (Phase 5) +- **Custom httpx transport**: SSRF-safe DNS-pinning transport (Phase 5) +- **Multi-server run coordination**: Run state sharing across instances (not v1) + +--- + +## Decisions & Risks + +| Decision / Risk | Mitigation | +|-----------------|------------| +| In-memory RunManager loses state on server restart | DB has run status. Frontend `/status` endpoint detects lost runs. Persistent checkpointer in Phase 5. | +| `asyncio.Queue` can grow unbounded if SSE client disconnects | Queue has `maxsize=1000`. `_emit` catches `QueueFull`, logs warning. Event stays in `ctx.events` for replay. If the SSE client reads too slowly and the queue fills, live events are dropped but kept in `ctx.events`. Client should reconnect with `Last-Event-ID` to recover. | +| Reconnection replay could duplicate events | Sequential `id:` field. `stream_run_sse` replays only `id > last_event_id`. Live loop skips `id <= last_replayed_id`. | +| `aget_state()` after each node adds latency | Reads from `InMemorySaver` -- effectively a dict lookup. <1ms. | +| `default=str` in `json.dumps` silently converts unknown types | Better than crashing the stream. | +| Background task leak if exception in finally block | `RunManager.cleanup_run()` is idempotent. Timeout also cleans up. | +| DB calls in exception handlers can fail | `_safe_update_run()` catches and logs. `_emit()` called before DB update. | +| Resume after pause: frontend might miss events | Events buffered with sequential IDs. Frontend replays via `Last-Event-ID`. | +| `condition_result` in edge_traversed | After each `node_completed`, look up outgoing edges from `schema_dict["edges"]` where `source == completed_node_id`. For non-condition nodes, emit `edge_traversed` with `condition_result: null`. For condition nodes, wait until the next update to see which node actually executes, then emit `edge_traversed` retroactively with the branch name that maps to that target in the condition's `branches` config. | +| Concurrent limit too low for power users | Configurable via env vars (3/key, 10/global). | +| Run timeout kills long-running graphs | 5-min default, configurable. Excludes pause time. | +| `StreamingResponse` doesn't detect disconnect immediately | Keepalive every 15s ensures detection within 15s. | +| Modifying `build_graph()` signature | Additive (optional kwarg). All existing tests pass. | +| `tool_node` is sync but executor is async | LangGraph handles thread pool execution for sync node functions. | +| `gw-execution` skill doc has stale route URLs | Todolist includes updating after implementation. | + +--- + +## Verification + +```bash +cd packages/execution +uv run ruff check app/ tests/ +uv run ruff format --check app/ tests/ +uv run pytest tests/unit/ -v +``` + +Manual testing: + +```bash +# 1. Bootstrap admin key (if not already done) +uv run python -m app.cli create-key --name admin --scopes all + +# 2. Start server +pnpm dev:exec + +# 3. Create a graph with a simple schema (start -> llm -> end) +curl -X POST localhost:8000/v1/graphs \ + -H "X-API-Key: gw_" \ + -H "Content-Type: application/json" \ + -d '{"name": "Test", "schema_json": { }}' + +# 4. Start a run +curl -X POST localhost:8000/v1/graphs//run \ + -H "X-API-Key: gw_" \ + -H "Content-Type: application/json" \ + -d '{"input": {"messages": [["human", "Hello"]]}}' + +# 5. Stream SSE events +curl -N localhost:8000/v1/runs//stream \ + -H "X-API-Key: gw_" + +# 6. Check status +curl localhost:8000/v1/runs//status \ + -H "X-API-Key: gw_" +```