From 4eb34d39febf224dfa8e9fb66bb8d0d32ab5b964 Mon Sep 17 00:00:00 2001 From: prosdev Date: Sat, 14 Mar 2026 13:37:00 -0700 Subject: [PATCH 1/5] docs: add Phase 4 execution plan (API routes, cancel, delete) Add detailed plan for validate, export stub, run history, cancel, and delete endpoints. Update plan READMEs to reflect Phase 3 merged and Phase 4 in progress. Co-Authored-By: Claude Opus 4.6 (1M context) --- .claude/gw-plans/README.md | 2 +- .claude/gw-plans/execution/README.md | 4 +- .../gw-plans/execution/phase-4-api-routes.md | 818 ++++++++++++++++++ 3 files changed, 821 insertions(+), 3 deletions(-) create mode 100644 .claude/gw-plans/execution/phase-4-api-routes.md diff --git a/.claude/gw-plans/README.md b/.claude/gw-plans/README.md index 1e5e984..f20f5c4 100644 --- a/.claude/gw-plans/README.md +++ b/.claude/gw-plans/README.md @@ -9,7 +9,7 @@ Implementation deviations are logged at the bottom of each plan file. | Track | Description | Status | |-------|-------------|--------| -| [Execution](execution/) | FastAPI + LangGraph backend | Phases 1-2 merged, 3-6 remaining | +| [Execution](execution/) | FastAPI + LangGraph backend | Phases 1-3 merged, 4 in progress | | [Canvas](canvas/) | React + React Flow frontend | Not started | | Deployment | Cloud Run + Vercel + CI/CD | Not started (after both tracks) | diff --git a/.claude/gw-plans/execution/README.md b/.claude/gw-plans/execution/README.md index dc74ad4..e48d44a 100644 --- a/.claude/gw-plans/execution/README.md +++ b/.claude/gw-plans/execution/README.md @@ -9,6 +9,6 @@ FastAPI + LangGraph backend phases. | 1 | [DB, Tools, State Utils](phase-1-db-tools-state-utils.md) | Merged | [#1](https://github.com/prosdevlab/graphweave/pull/1) | | 1.5 | [Scoped API Key Auth](phase-1.5-execution-auth.md) | Merged | [#2](https://github.com/prosdevlab/graphweave/pull/2) | | 2 | [GraphSchema -> LangGraph Builder](phase-2-graph-schema-langgraph-builder.md) | Merged | [#3](https://github.com/prosdevlab/graphweave/pull/3) | -| 3 | [Executor + SSE Streaming](phase-3/overview.md) | Planned | — | -| 4 | API routes (run, stream, resume, validate, export) | Not started | — | +| 3 | [Executor + SSE Streaming](phase-3/overview.md) | Merged | [#6](https://github.com/prosdevlab/graphweave/pull/6) | +| 4 | [API Routes (validate, export, run history, cancel, delete)](phase-4-api-routes.md) | In progress | — | | 5 | Exporter + remaining tools + SSRF transport | Not started | — | diff --git a/.claude/gw-plans/execution/phase-4-api-routes.md b/.claude/gw-plans/execution/phase-4-api-routes.md new file mode 100644 index 0000000..f931d78 --- /dev/null +++ b/.claude/gw-plans/execution/phase-4-api-routes.md @@ -0,0 +1,818 @@ +# Phase 4: API Routes (Validate, Export, Run History, Cancel, Delete) + +**Updated**: 2026-03-14 + +## Context + +The execution layer has a DB layer with tools and state utils (Phase 1), scoped API key auth with CRUD routes (Phase 1.5), the GraphSchema-to-LangGraph builder with `validate_schema()` and `build_graph()` (Phase 2), and the executor with SSE streaming, run management, and four routes (Phase 3). + +Phase 3 delivered the minimum routes to make the executor demoable end-to-end: + +``` +POST /v1/graphs/{id}/run start execution +GET /v1/runs/{id}/stream SSE event stream +POST /v1/runs/{id}/resume human-in-the-loop resume +GET /v1/runs/{id}/status reconnection recovery +``` + +Phase 4 completes the API surface that the canvas frontend needs. Every route here is independently useful -- the frontend can wire up buttons and panels as each lands. + +--- + +## What Phase 4 Delivers + +1. **Schema validation endpoint** -- pre-run validation without executing the graph +2. **Export endpoint** -- returns 501 stub (actual code generation is Phase 5) +3. **Run history** -- paginated listing of past runs, per graph or globally +4. **Run cancellation** -- stop a running/paused graph via HTTP +5. **Run deletion** -- clean up old runs from the database + +These fill the gaps between "executor works" (Phase 3) and "frontend has everything it needs to build the run panel, history tab, and export button." + +--- + +## Feature Inventory + +### New Routes + +| # | Method | Path | Scope | Status Code | Summary | +|---|--------|------|-------|-------------|---------| +| R1 | `POST` | `/v1/graphs/{id}/validate` | `graphs:read` | 200 / 422 | Validate GraphSchema without executing | +| R2 | `GET` | `/v1/graphs/{id}/export` | `graphs:read` | 501 | Export stub (Phase 5 implements) | +| R3 | `GET` | `/v1/graphs/{id}/runs` | `runs:read` | 200 | List runs for a specific graph (paginated) | +| R4 | `GET` | `/v1/runs` | `runs:read` | 200 | List all runs for the authenticated owner (paginated) | +| R5 | `POST` | `/v1/runs/{id}/cancel` | `runs:write` | 202 / 409 | Cancel a running or paused run | +| R6 | `DELETE` | `/v1/runs/{id}` | `runs:write` | 204 / 409 | Delete a completed/error run | + +### Route Details + +#### R1: Validate Graph Schema + +``` +POST /v1/graphs/{id}/validate +Scope: graphs:read +Body: (none -- validates the graph's stored schema) +Response 200: ValidateResponse { valid: true, errors: [] } +Response 422: ValidateResponse { valid: false, errors: [...] } +``` + +This calls the existing `validate_schema()` from `app/builder.py` and also attempts `build_graph()` to catch compilation errors. The difference from `POST /v1/graphs/{id}/run` is that no run is created and no execution happens. + +**Why POST, not GET**: Validation is an action with side effects on the response shape (it does work -- parsing, compilation). GET with no body is semantically awkward for "please validate this." The `gw-api-design` skill lists actions as sub-resources, which aligns with `POST .../validate`. + +Response body: + +```python +class ValidationError(BaseModel): + message: str + node_ref: str | None = None # which node caused the error, if applicable + +class ValidateResponse(BaseModel): + valid: bool + errors: list[ValidationError] +``` + +Status code rationale: 200 when valid. Also 200 when invalid -- the validation *succeeded* in finding errors. The `valid: false` field communicates the result. This follows the pattern of validation endpoints that report findings rather than failing. However, if the graph itself doesn't exist, return 404. + +**Update (after review)**: Using 422 when invalid is more consistent with how the rest of the API handles validation failures. The route returns 200 when valid, 422 when invalid with the errors list in the response body. This matches the existing pattern where `POST /v1/graphs/{id}/run` returns 422 for schema build errors. + +#### R2: Export Graph + +``` +GET /v1/graphs/{id}/export +Scope: graphs:read +Response 501: { detail: "Export not implemented", status_code: 501 } +``` + +Returns 501 Not Implemented. The `exporter.py` stub exists but does not produce useful output yet. Phase 5 implements the actual code generation. + +The 501 response uses the standard error envelope (`{detail, status_code}`) so the frontend can detect it and show "Export coming soon" rather than treating it as an unexpected error. + +#### R3: List Runs for Graph + +``` +GET /v1/graphs/{id}/runs?limit=20&offset=0&status=completed +Scope: runs:read +Response 200: PaginatedResponse { items: [RunListItem], total, limit, offset, has_more } +``` + +Returns paginated run history for a specific graph. The existing `list_runs_by_graph` in `crud.py` needs to be upgraded to support full pagination (offset + total count) and optional status filtering. + +Query parameters: + +| Param | Type | Default | Constraints | +|-------|------|---------|-------------| +| `limit` | int | 20 | 1-100 | +| `offset` | int | 0 | >= 0 | +| `status` | str \| None | None | One of: running, paused, completed, error | + +Response item shape (intentionally lighter than `RunStatusResponse` -- no `final_state` blob in list views): + +```python +class RunListItem(BaseModel): + id: str + graph_id: str + status: str + input: dict + duration_ms: int | None = None + created_at: str + error: str | None = None +``` + +Note: `final_state` is excluded from list items. It can be large (full message history) and would bloat paginated responses. Clients fetch it via `GET /v1/runs/{id}/status` for individual runs. + +#### R4: List All Runs + +``` +GET /v1/runs?limit=20&offset=0&status=completed&graph_id= +Scope: runs:read +Response 200: PaginatedResponse { items: [RunListItem], total, limit, offset, has_more } +``` + +Returns paginated run history across all graphs for the authenticated owner. Admin keys see all runs. + +Additional query parameter beyond R3: + +| Param | Type | Default | Description | +|-------|------|---------|-------------| +| `graph_id` | str \| None | None | Filter to a specific graph | + +This overlaps with R3 but serves a different use case: the frontend's global run history panel vs. a per-graph run tab. R3 also verifies graph ownership (404 if graph not found). + +#### R5: Cancel Run + +``` +POST /v1/runs/{id}/cancel +Scope: runs:write +Body: (none) +Response 200: { status: "cancelled" } +Response 404: Run not found +Response 409: Run is not running or paused (already completed/error) +``` + +Calls the existing `RunManager.cancel_run()`. The executor's `_stream_graph` loop checks `cancel_event.is_set()` between nodes and emits an error SSE event. Cancellation is asynchronous — the response confirms the request was received, not that the run has stopped. The run's eventual DB status will be `status="error", error="Cancelled"`. There is no "cancelled" status in the system. + +If the run exists in the DB but not in `RunManager` (server restarted), and its DB status is still "running" or "paused", the route updates the DB status to "error" with error="Cancelled (server lost run)" and returns 202. This prevents stale "running" records. Uses `owner_id=_owner_filter(auth)` so admin keys can cancel stale runs belonging to other users. + +#### R6: Delete Run + +``` +DELETE /v1/runs/{id} +Scope: runs:write +Response 204: (empty body) +Response 404: Run not found +Response 409: Run is still active (running or paused) +``` + +Deletes a run from the database. Only completed or error runs can be deleted -- active runs must be cancelled first. This prevents orphaning a running executor task. + +--- + +## DB Changes + +### New CRUD functions in `app/db/crud.py` + +1. **`list_runs`** -- paginated listing across all graphs with optional `graph_id` and `status` filters. Returns `(list[Run], total_count)`. +2. **`delete_run`** -- delete a run by ID with owner_id filtering. Returns `bool`. +3. **Upgrade `list_runs_by_graph`** -- add `offset` parameter and return `(list[Run], total_count)` tuple (currently returns `list[Run]` only). Add optional `status` filter. + +### No new migrations + +No schema changes needed. The existing `runs` table has all required columns. The new queries use existing columns (`status`, `graph_id`, `owner_id`). + +### New index (performance, additive) + +Add a composite index on `runs(graph_id, owner_id, status)` to support filtered run history queries. This is additive and can be done in a migration or inline. Since the DB is SQLite and pre-production, we can add it in migration `003_run_indexes.py` to follow the established pattern. + +--- + +## Parts + +| Part | Commit | Summary | +|------|--------|---------| +| 4.1 | 1 | CRUD upgrades + migration: `list_runs`, `delete_run`, upgrade `list_runs_by_graph` | +| 4.2 | 2 | Pydantic schemas + validate/export routes on graphs router | +| 4.3 | 3 | Run history, cancel, delete routes on runs router | + +--- + +## Part 4.1: CRUD Upgrades + Migration + +### Summary + +Upgrade `list_runs_by_graph` to support pagination and status filtering. Add `list_runs` for cross-graph listing. Add `delete_run`. Add migration `003_run_indexes.py`. + +### Implementation + +#### Migration `003_run_indexes.py` + +```python +"""Run query indexes for Phase 4 run history.""" + +VERSION = 3 + +def up(db) -> None: + db.execute( + "CREATE INDEX IF NOT EXISTS idx_runs_graph_owner_status " + "ON runs(graph_id, owner_id, status)" + ) + db.execute( + "CREATE INDEX IF NOT EXISTS idx_runs_owner_status " + "ON runs(owner_id, status)" + ) + db.execute( + "CREATE INDEX IF NOT EXISTS idx_runs_created " + "ON runs(created_at)" + ) +``` + +#### Upgraded `list_runs_by_graph` + +```python +async def list_runs_by_graph( + db: aiosqlite.Connection, + graph_id: str, + owner_id: str | None = None, + status: str | None = None, + limit: int = 20, + offset: int = 0, +) -> tuple[list[Run], int]: +``` + +Breaking change to return type (was `list[Run]`). Must update all callers -- the existing `test_list_runs_by_graph` in `tests/unit/test_crud.py` must be updated to destructure the tuple: `runs, total = await list_runs_by_graph(...)`. + +#### New `list_runs` + +```python +async def list_runs( + db: aiosqlite.Connection, + owner_id: str | None = None, + graph_id: str | None = None, + status: str | None = None, + limit: int = 20, + offset: int = 0, +) -> tuple[list[Run], int]: +``` + +#### New `delete_run` + +```python +async def delete_run( + db: aiosqlite.Connection, + run_id: str, + owner_id: str | None = None, +) -> bool: +``` + +Returns `True` if a row was deleted, `False` if not found (or wrong owner). + +### Files + +| Action | File | +|--------|------| +| **create** | `app/db/migrations/003_run_indexes.py` | +| **modify** | `app/db/crud.py` | +| **modify** | `tests/unit/test_crud.py` | + +### Tests (9) -- in `tests/unit/test_crud.py` + +1. **test_list_runs_by_graph_paginated**: Create 5 runs, list with limit=2 offset=0. Verify 2 items, total=5, ordered by created_at DESC. +2. **test_list_runs_by_graph_offset**: Create 5 runs, list with limit=2 offset=2. Verify correct 2 items. +3. **test_list_runs_by_graph_status_filter**: Create runs with mixed statuses. Filter by status="completed". Verify only completed runs returned, total reflects filter. +4. **test_list_runs_by_graph_owner_isolation**: Create runs for two owners. List for owner A. Verify only owner A's runs returned. +5. **test_list_runs_all_graphs**: Create runs across 2 graphs. Call `list_runs(owner_id=...)`. Verify all runs for that owner returned. +6. **test_list_runs_graph_id_filter**: Create runs across 2 graphs. Call `list_runs(graph_id=...)`. Verify only runs for that graph returned. +7. **test_list_runs_status_filter**: Create runs with mixed statuses. Call `list_runs(status="error")`. Verify only error runs returned. +8. **test_delete_run_success**: Create a run, delete it. Verify returns True. Verify `get_run` returns None. +9. **test_delete_run_wrong_owner**: Create a run for owner A, attempt delete as owner B. Verify returns False. Verify run still exists. + +### Commit + +``` +feat: add paginated run listing and run deletion to CRUD layer + +Upgrade list_runs_by_graph with offset/total/status filter. +Add list_runs for cross-graph queries. Add delete_run. +Add migration 003 with composite indexes for run history queries. + +Co-Authored-By: Claude Opus 4.6 (1M context) +``` + +### Detailed Todolist + +#### Migration + +- [ ] Create `app/db/migrations/003_run_indexes.py` +- [ ] Set `VERSION = 3` +- [ ] Implement `up(db)`: create three indexes -- `idx_runs_graph_owner_status` on `(graph_id, owner_id, status)`, `idx_runs_owner_status` on `(owner_id, status)`, `idx_runs_created` on `(created_at)`. Use `CREATE INDEX IF NOT EXISTS`. + +#### CRUD upgrades + +- [ ] Open `app/db/crud.py` +- [ ] Modify `list_runs_by_graph` signature: add `status: str | None = None`, add `offset: int = 0`, change return type to `tuple[list[Run], int]` +- [ ] Implement pagination: add `OFFSET ?` to the query, add count query mirroring the filter conditions +- [ ] Implement status filter: when `status is not None`, add `AND status = ?` to both count and data queries +- [ ] Return `(runs, total)` tuple +- [ ] Add new function `list_runs(db, owner_id, graph_id, status, limit, offset)` returning `tuple[list[Run], int]`: + - Build WHERE clauses dynamically based on which filters are provided + - `owner_id is not None` -> `AND owner_id = ?` + - `graph_id is not None` -> `AND graph_id = ?` + - `status is not None` -> `AND status = ?` + - Count query with same filters + - `ORDER BY created_at DESC LIMIT ? OFFSET ?` +- [ ] Add new function `delete_run(db, run_id, owner_id)` returning `bool`: + - If `owner_id is not None`: `DELETE FROM runs WHERE id = ? AND owner_id = ?` + - Else: `DELETE FROM runs WHERE id = ?` + - `await db.commit()` + - Return `cursor.rowcount > 0` + +#### Tests + +- [ ] Open `tests/unit/test_crud.py` +- [ ] Add helper to create multiple test runs with varying statuses, graph_ids, and owner_ids +- [ ] Add `test_list_runs_by_graph_paginated` +- [ ] Add `test_list_runs_by_graph_offset` +- [ ] Add `test_list_runs_by_graph_status_filter` +- [ ] Add `test_list_runs_by_graph_owner_isolation` +- [ ] Add `test_list_runs_all_graphs` +- [ ] Add `test_list_runs_graph_id_filter` +- [ ] Add `test_list_runs_status_filter` +- [ ] Add `test_delete_run_success` +- [ ] Add `test_delete_run_wrong_owner` +- [ ] Run `uv run ruff check app/db/crud.py app/db/migrations/ tests/unit/test_crud.py` +- [ ] Run `uv run pytest tests/unit/test_crud.py -v` + +--- + +## Part 4.2: Validate and Export Routes + +### Summary + +Add Pydantic schemas for validation and export responses. Add `POST /v1/graphs/{id}/validate` and `GET /v1/graphs/{id}/export` to the existing graphs router. + +### Pydantic Schemas -- `app/schemas/graphs.py` (add to existing file) + +```python +class SchemaValidationError(BaseModel): + """A single validation error from schema checking.""" + message: str = Field(description="Human-readable error description.") + node_ref: str | None = Field( + default=None, + description="Node ID that caused the error, if applicable.", + ) + +class ValidateResponse(BaseModel): + """Result of schema validation.""" + valid: bool = Field(description="True if the schema is valid.") + errors: list[SchemaValidationError] = Field( + default_factory=list, + description="List of validation errors (empty when valid).", + ) + +class ExportResponse(BaseModel): + """Exported Python code and requirements.""" + code: str = Field(description="Generated Python source code.") + requirements: str = Field(description="requirements.txt content.") +``` + +### Routes -- added to `app/routes/graphs.py` + +#### Validate + +``` +POST /v1/graphs/{id}/validate +Scope: graphs:read +Response 200: ValidateResponse { valid: true, errors: [] } +Response 422: ValidateResponse { valid: false, errors: [...] } +Response 404: Graph not found +``` + +Flow: +1. Fetch graph from DB (404 if not found / not owned) +2. Call `validate_schema(graph.schema_json)` (from `app/builder.py`) +3. If `GraphBuildError` is raised, return 422 with `ValidateResponse(valid=False, errors=[...])` +4. Attempt `build_graph(graph.schema_json, llm_override=FakeListChatModel(responses=[""]))` to catch compilation errors beyond structural validation. The mock LLM avoids 500 errors when LLM provider API keys are not configured — validation checks structure, not runtime readiness. +5. If `GraphBuildError` is raised, return 422 with `ValidateResponse(valid=False, errors=[...])` +6. If any other `Exception` is raised (e.g., unexpected compilation failure), return 422 with the exception message. +7. Return 200 with `ValidateResponse(valid=True, errors=[])` + +Note: `validate_schema` raises `GraphBuildError` with a `node_ref` attribute. We capture this to populate `SchemaValidationError.node_ref`. Unreachable nodes produce a `logger.warning` only, not an error — this is intentional (max-step limit is the runtime safety net). + +#### Export + +``` +GET /v1/graphs/{id}/export +Scope: graphs:read +Response 501: { detail: "Export not implemented. Coming in a future release.", status_code: 501 } +Response 404: Graph not found +``` + +Flow: +1. Fetch graph from DB (404 if not found / not owned) +2. Raise HTTPException(status_code=501, detail="Export not implemented. Coming in a future release.") + +We verify the graph exists before returning 501 so the frontend doesn't confuse "graph not found" with "export not ready." + +### Files + +| Action | File | +|--------|------| +| **modify** | `app/schemas/graphs.py` | +| **modify** | `app/routes/graphs.py` | +| **modify** | `tests/unit/test_routes.py` | + +### Tests (7) -- in `tests/unit/test_routes.py` + +1. **test_validate_valid_schema**: Create graph with valid schema. POST /validate. Verify 200, `valid: true`, empty errors. +2. **test_validate_invalid_schema_missing_start**: Create graph with schema missing start node. POST /validate. Verify 422, `valid: false`, errors list non-empty with relevant message. +3. **test_validate_invalid_schema_unknown_tool**: Create graph with a tool node referencing a nonexistent tool. POST /validate. Verify 422, errors include `node_ref` pointing to the tool node. +4. **test_validate_graph_not_found**: POST /validate with nonexistent graph_id. Verify 404. +5. **test_validate_wrong_owner**: Create graph with key A. POST /validate with key B. Verify 404. +6. **test_export_returns_501**: Create graph. GET /export. Verify 501 with detail message. +7. **test_export_graph_not_found**: GET /export with nonexistent graph_id. Verify 404. + +### Commit + +``` +feat: add schema validation and export stub routes + +POST /v1/graphs/{id}/validate checks schema without executing. +Returns 200 with valid:true or 422 with error details and node refs. +GET /v1/graphs/{id}/export returns 501 until Phase 5. + +Co-Authored-By: Claude Opus 4.6 (1M context) +``` + +### Detailed Todolist + +#### Pydantic schemas + +- [ ] Open `app/schemas/graphs.py` +- [ ] Add `SchemaValidationError(BaseModel)`: `message: str`, `node_ref: str | None = None`, with Field descriptions +- [ ] Add `ValidateResponse(BaseModel)`: `valid: bool`, `errors: list[SchemaValidationError]` with `default_factory=list`, with Field descriptions +- [ ] Add `ExportResponse(BaseModel)`: `code: str`, `requirements: str`, with Field descriptions (used in Phase 5, defined now for OpenAPI docs) + +#### Routes + +- [ ] Open `app/routes/graphs.py` +- [ ] Add imports: `validate_schema` from `app.builder` (currently only `build_graph` and `GraphBuildError` are imported) +- [ ] Add imports: `ValidateResponse`, `SchemaValidationError` from `app.schemas.graphs` +- [ ] Implement `POST /{graph_id}/validate`: + - Scope: `graphs:read` (validation is a read operation -- does not mutate) + - Fetch graph from DB with owner_id filtering -> 404 if not found + - Try `validate_schema(graph.schema_json)` -- catch `GraphBuildError` as `exc` + - On error: return `JSONResponse(status_code=422, content=ValidateResponse(valid=False, errors=[SchemaValidationError(message=str(exc), node_ref=exc.node_ref)]).model_dump())` + - Try `build_graph(graph.schema_json, llm_override=FakeListChatModel(responses=[""]))` -- catch `GraphBuildError` as `exc` + - On error: same 422 response + - Catch any other `Exception` as `exc`: same 422 response with `message=str(exc)` + - On success: return 200 `ValidateResponse(valid=True, errors=[])` + - Set `response_model=ValidateResponse` on the decorator +- [ ] Implement `GET /{graph_id}/export`: + - Scope: `graphs:read` + - Fetch graph from DB with owner_id filtering -> 404 if not found + - Raise `HTTPException(status_code=501, detail="Export not implemented. Coming in a future release.")` + +#### Tests + +- [ ] Open `tests/unit/test_routes.py` +- [ ] Add `test_validate_valid_schema`: create graph with valid start -> llm -> end schema. POST `/v1/graphs/{id}/validate`. Assert 200. Assert response body `valid == True` and `errors == []`. +- [ ] Add `test_validate_invalid_schema_missing_start`: create graph with schema that has no start node (e.g. only an LLM and end node). POST `/v1/graphs/{id}/validate`. Assert 422. Assert `valid == False`. Assert `errors` list has at least one entry with a non-empty `message`. +- [ ] Add `test_validate_invalid_schema_unknown_tool`: create graph with a tool node referencing a nonexistent tool (e.g., `tool_name: "does_not_exist"`). POST validate. Assert 422. Assert errors include a `node_ref` field pointing to the tool node ID. +- [ ] Add `test_validate_graph_not_found`: POST `/v1/graphs/nonexistent-id/validate`. Assert 404. +- [ ] Add `test_validate_wrong_owner`: create graph with key A, POST validate with key B. Assert 404. +- [ ] Add `test_export_returns_501`: create graph, GET `/v1/graphs/{id}/export`. Assert 501. Assert response body has `detail` containing "not implemented". +- [ ] Add `test_export_graph_not_found`: GET `/v1/graphs/nonexistent-id/export`. Assert 404. +- [ ] Run `uv run ruff check app/schemas/graphs.py app/routes/graphs.py tests/unit/test_routes.py` +- [ ] Run `uv run pytest tests/unit/test_routes.py -v` + +--- + +## Part 4.3: Run History, Cancel, and Delete Routes + +### Summary + +Add `GET /v1/graphs/{id}/runs` and `GET /v1/runs` for paginated run history. Add `POST /v1/runs/{id}/cancel` and `DELETE /v1/runs/{id}` to the existing runs router. + +### Pydantic Schemas -- `app/schemas/runs.py` (add to existing file) + +```python +class RunListItem(BaseModel): + """Lightweight run representation for list endpoints.""" + id: str + graph_id: str + status: str + input: dict = Field(default_factory=dict) + duration_ms: int | None = None + created_at: str + error: str | None = None +``` + +### Routes + +#### R3: List Runs for Graph -- `app/routes/graphs.py` + +``` +GET /v1/graphs/{graph_id}/runs?limit=20&offset=0&status=completed +Scope: runs:read +Response 200: PaginatedResponse { items: [RunListItem], ... } +Response 404: Graph not found +``` + +Flow: +1. Fetch graph from DB (404 if not found / not owned) -- verifies graph ownership +2. Call `list_runs_by_graph(db, graph_id, owner_id, status, limit, offset)` +3. Return `PaginatedResponse` with `RunListItem` items + +This route lives on the graphs router because it's scoped to a graph resource (`/v1/graphs/{id}/runs`). + +#### R4: List All Runs -- `app/routes/runs.py` + +``` +GET /v1/runs?limit=20&offset=0&status=completed&graph_id= +Scope: runs:read +Response 200: PaginatedResponse { items: [RunListItem], ... } +``` + +Flow: +1. Call `list_runs(db, owner_id, graph_id, status, limit, offset)` +2. Return `PaginatedResponse` with `RunListItem` items + +Admin keys see all runs (`owner_id=None`). + +#### R5: Cancel Run -- `app/routes/runs.py` + +``` +POST /v1/runs/{run_id}/cancel +Scope: runs:write +Body: (none) +Response 202: { detail: "Cancel requested" } +Response 404: Run not found +Response 409: Run is not cancellable (already completed/error) +``` + +Flow: +1. Check RunManager for live run. Verify ownership (404 if wrong owner, unless admin). +2. If in RunManager and status in ("running", "paused"): call `run_manager.cancel_run(run_id)`, return 202 `{"detail": "Cancel requested"}`. +3. If in RunManager but status is completed/error: return 409. +4. If not in RunManager: check DB with `owner_id=_owner_filter(auth)` (None for admin). + - Not found: 404. + - Status is "running" or "paused" (stale -- server restarted): update DB to status="error", error="Cancelled (server lost run)". Return 202. + - Status is completed/error: return 409. + +#### R6: Delete Run -- `app/routes/runs.py` + +``` +DELETE /v1/runs/{run_id} +Scope: runs:write +Response 204: (empty body) +Response 404: Run not found +Response 409: Run is still active +``` + +Flow: +1. Check RunManager -- if run is active (running/paused), return 409 "Cannot delete an active run. Cancel it first." +2. Check DB -- `get_run(db, run_id, owner_id)`. + - Not found: 404. + - Status is "running" or "paused": 409 (defensive -- RunManager check should have caught live runs, but DB might have stale status). + - Status is "completed" or "error": call `delete_run(db, run_id, owner_id)`, return 204. + +### Files + +| Action | File | +|--------|------| +| **modify** | `app/schemas/runs.py` | +| **modify** | `app/schemas/__init__.py` | +| **modify** | `app/routes/graphs.py` | +| **modify** | `app/routes/runs.py` | +| **create** | `tests/unit/test_routes_phase4.py` | + +### Tests (14) -- `tests/unit/test_routes_phase4.py` + +Separate test file to avoid bloating the existing `test_routes_runs.py` (19 tests from Phase 3). + +**Run history:** +1. **test_list_runs_for_graph_empty**: Create graph with no runs. GET `/v1/graphs/{id}/runs`. Verify 200, items=[], total=0. +2. **test_list_runs_for_graph_paginated**: Create graph with 5 runs. GET with limit=2. Verify 2 items, total=5, has_more=True. +3. **test_list_runs_for_graph_status_filter**: Create graph with mixed-status runs. GET with `?status=completed`. Verify only completed runs. +4. **test_list_runs_for_graph_not_found**: GET `/v1/graphs/nonexistent/runs`. Verify 404. +5. **test_list_runs_for_graph_wrong_owner**: Create graph with key A. GET runs with key B. Verify 404. +6. **test_list_all_runs**: Create runs across 2 graphs. GET `/v1/runs`. Verify all runs for owner returned. +7. **test_list_all_runs_graph_id_filter**: GET `/v1/runs?graph_id=`. Verify only runs for that graph. +8. **test_list_all_runs_excludes_other_owners**: Create runs for owner A and B. GET `/v1/runs` as owner A. Verify only A's runs. + +**Cancel:** +9. **test_cancel_running_run**: Start a run, POST cancel. Verify 202, response contains "Cancel requested". +10. **test_cancel_already_completed**: Start run, wait for completion, POST cancel. Verify 409. +11. **test_cancel_stale_db_run**: Insert run in DB with status="running" (not in RunManager). POST cancel. Verify 202, DB status updated to "error". +12. **test_cancel_not_found**: POST cancel for nonexistent run. Verify 404. + +**Delete:** +13. **test_delete_completed_run**: Create and complete a run. DELETE. Verify 204. Verify GET status returns 404. +14. **test_delete_active_run_rejected**: Start a run (still running). DELETE. Verify 409. + +### Commit + +``` +feat: add run history, cancel, and delete routes + +GET /v1/graphs/{id}/runs lists paginated runs with status filter. +GET /v1/runs lists all runs for the authenticated owner. +POST /v1/runs/{id}/cancel stops running/paused runs. +DELETE /v1/runs/{id} removes completed/error runs. + +Co-Authored-By: Claude Opus 4.6 (1M context) +``` + +### Detailed Todolist + +#### Pydantic schemas + +- [ ] Open `app/schemas/runs.py` +- [ ] Add `RunListItem(BaseModel)`: `id: str`, `graph_id: str`, `status: str`, `input: dict` with default_factory, `duration_ms: int | None = None`, `created_at: str`, `error: str | None = None` +- [ ] Open `app/schemas/__init__.py` +- [ ] Add `RunListItem` to imports and `__all__` + +#### Run history routes + +- [ ] Open `app/routes/graphs.py` +- [ ] Add import: `RunListItem` from `app.schemas.runs` +- [ ] Implement `GET /{graph_id}/runs`: + - Scope: `runs:read` + - Query params: `limit: int = Query(20, ge=1, le=100)`, `offset: int = Query(0, ge=0)`, `status: str | None = Query(None)` + - Fetch graph from DB with owner_id filtering -> 404 if not found + - Call `crud.list_runs_by_graph(db, graph_id, owner_id=_owner_filter(auth), status=status, limit=limit, offset=offset)` + - Map runs to `RunListItem` dicts + - Return `PaginatedResponse(items=..., total=total, limit=limit, offset=offset, has_more=(offset + limit) < total)` + +- [ ] Open `app/routes/runs.py` +- [ ] Add imports: `crud`, `RunListItem` from `app.schemas.runs`, `PaginatedResponse` from `app.schemas.pagination` +- [ ] Implement `GET /` (list all runs): + - Scope: `runs:read` + - Query params: `limit`, `offset`, `status`, `graph_id: str | None = Query(None)` + - `owner_id = None if auth.is_admin else auth.owner_id` + - Call `crud.list_runs(db, owner_id=owner_id, graph_id=graph_id, status=status, limit=limit, offset=offset)` + - Map runs to `RunListItem` dicts + - Return `PaginatedResponse` + +#### Cancel route + +- [ ] In `app/routes/runs.py`, implement `POST /{run_id}/cancel`: + - Scope: `runs:write` + - Get RunContext from RunManager + - If ctx exists: + - Ownership check (404 if wrong owner, unless admin) + - If `ctx.status in ("running", "paused")`: call `run_manager.cancel_run(run_id)`, return 202 `{"detail": "Cancel requested"}` + - Else: raise HTTPException 409 "Run is not cancellable" + - If ctx is None: + - `run = await crud.get_run(db, run_id, owner_id=_owner_filter(auth))` + - If run is None: 404 + - If `run.status in ("running", "paused")`: stale record, call `await crud.update_run(db, run_id, status="error", error="Cancelled (server lost run)")`, return 202 `{"detail": "Cancel requested"}` + - If `run.status in ("completed", "error")`: 409 "Run is not cancellable" + +#### Delete route + +- [ ] In `app/routes/runs.py`, implement `DELETE /{run_id}`: + - Scope: `runs:write` + - Check RunManager: if run is active (status in running/paused), raise 409 "Cannot delete an active run. Cancel it first." + - `run = await crud.get_run(db, run_id, owner_id=auth.owner_id)` + - If run is None: 404 + - If `run.status in ("running", "paused")`: 409 (defensive check for stale DB status) + - Call `crud.delete_run(db, run_id, owner_id=_owner_filter(auth))` where `_owner_filter` returns None for admin + - Add helper `_owner_filter(auth)` to `runs.py` (same pattern as `graphs.py`) + - Return `Response(status_code=204)` + +#### Tests + +- [ ] Create `tests/unit/test_routes_phase4.py` +- [ ] Set up fixtures: client with httpx.AsyncClient + ASGITransport, DB, RunManager, test API key. Reuse pattern from `test_routes_runs.py`. +- [ ] Create helper: `_create_test_graph(client, api_key)` -- POST valid graph, return graph_id +- [ ] Create helper: `_create_test_run_in_db(db, graph_id, owner_id, status)` -- insert run directly into DB for history tests +- [ ] Add `test_list_runs_for_graph_empty` +- [ ] Add `test_list_runs_for_graph_paginated` +- [ ] Add `test_list_runs_for_graph_status_filter` +- [ ] Add `test_list_runs_for_graph_not_found` +- [ ] Add `test_list_runs_for_graph_wrong_owner` +- [ ] Add `test_list_all_runs` +- [ ] Add `test_list_all_runs_graph_id_filter` +- [ ] Add `test_list_all_runs_excludes_other_owners` +- [ ] Add `test_cancel_running_run` +- [ ] Add `test_cancel_already_completed` +- [ ] Add `test_cancel_stale_db_run` +- [ ] Add `test_cancel_not_found` +- [ ] Add `test_delete_completed_run` +- [ ] Add `test_delete_active_run_rejected` +- [ ] Run `uv run ruff check app/routes/ app/schemas/ tests/unit/test_routes_phase4.py` +- [ ] Run `uv run pytest tests/unit/test_routes_phase4.py -v` + +#### Post-implementation housekeeping + +- [ ] Update `.claude/skills/gw-execution/SKILL.md`: add all Phase 4 routes to "API routes (implemented)" section +- [ ] Update `.claude/gw-plans/execution/README.md`: mark Phase 4 status +- [ ] Run full test suite: `uv run pytest tests/unit/ -v` +- [ ] Run linter: `uv run ruff check app/ tests/` +- [ ] Run formatter: `uv run ruff format --check app/ tests/` + +--- + +## Files Summary + +| Action | File | Part | Notes | +|--------|------|------|-------| +| **create** | `app/db/migrations/003_run_indexes.py` | 4.1 | Composite indexes for run queries | +| **modify** | `app/db/crud.py` | 4.1 | Upgrade list_runs_by_graph, add list_runs, delete_run | +| **modify** | `app/schemas/graphs.py` | 4.2 | SchemaValidationError, ValidateResponse, ExportResponse | +| **modify** | `app/schemas/runs.py` | 4.3 | RunListItem | +| **modify** | `app/schemas/__init__.py` | 4.3 | Export RunListItem | +| **modify** | `app/routes/graphs.py` | 4.2, 4.3 | validate, export, list runs for graph | +| **modify** | `app/routes/runs.py` | 4.3 | list all runs, cancel, delete | +| **modify** | `tests/unit/test_crud.py` | 4.1 | 9 CRUD tests | +| **modify** | `tests/unit/test_routes.py` | 4.2 | 7 validate/export route tests | +| **create** | `tests/unit/test_routes_phase4.py` | 4.3 | 14 run history/cancel/delete tests | + +--- + +## Not in Scope + +- **Export code generation**: The `exporter.py` implementation stays in Phase 5. Phase 4 only adds the 501 stub route. +- **Persistent checkpointer**: `langgraph-checkpoint-sqlite` (Phase 5). +- **LLM retry/circuit-breaker**: Retry logic for transient LLM errors (Phase 5). +- **Custom httpx transport**: SSRF-safe DNS-pinning transport (Phase 5). +- **Run replay**: Step-through replay of past runs (v2). +- **Token-level streaming**: `stream_mode="messages"` for LLM token streaming (future). +- **Batch operations**: Bulk delete runs, bulk cancel (not v1). +- **Run retention policy**: Automatic cleanup of old runs after N days (not v1). +- **Run artifacts/attachments**: Storing files or images produced by runs (not v1). + +--- + +## Decisions & Risks + +| Decision / Risk | Mitigation | +|-----------------|------------| +| `list_runs_by_graph` return type changes from `list[Run]` to `tuple[list[Run], int]` | This is a breaking change to the internal API. No external callers exist. Update all call sites in Part 4.1. | +| `RunListItem` excludes `final_state` from list responses | Keeps paginated responses lightweight. Clients fetch individual run details via `GET /v1/runs/{id}/status`. | +| Validate endpoint calls both `validate_schema` and `build_graph` | `validate_schema` catches structural issues (missing start, orphan nodes). `build_graph` catches compilation issues (bad LLM config, unknown tools). Both are needed for thorough validation. | +| Export returns 501 -- frontend must handle this gracefully | 501 uses the standard error envelope. Frontend can check `status_code` and show "Coming soon" rather than an error. | +| Cancel of stale DB runs (server restarted) updates DB directly | The run is already lost -- the executor task is gone. Updating to "error" prevents the run from appearing stuck forever. | +| Delete is hard delete, not soft delete | Pre-production. Soft delete adds complexity (filter deleted runs from all queries). If needed later, add a `deleted_at` column in a future migration. | +| `status` query parameter uses Literal type | Use `Literal["running", "paused", "completed", "error"] | None` for OpenAPI documentation. Can be widened later without breaking clients. Invalid values return 422 via Pydantic validation. | +| New migration (003) adds indexes only | Additive, safe. No data changes. If migration fails, existing queries still work (just slower). | +| Two list-runs endpoints (per-graph and global) | Different use cases. Per-graph verifies graph ownership (404 if not owner). Global returns all runs for the authenticated key. Overlap is intentional. | + +--- + +## Verification + +```bash +cd packages/execution + +# Part 4.1 +uv run ruff check app/db/ tests/unit/test_crud.py +uv run ruff format --check app/db/ tests/unit/test_crud.py +uv run pytest tests/unit/test_crud.py -v + +# Part 4.2 +uv run ruff check app/schemas/ app/routes/graphs.py tests/unit/test_routes.py +uv run ruff format --check app/schemas/ app/routes/graphs.py tests/unit/test_routes.py +uv run pytest tests/unit/test_routes.py -v + +# Part 4.3 +uv run ruff check app/routes/ app/schemas/ tests/unit/test_routes_phase4.py +uv run ruff format --check app/routes/ app/schemas/ tests/unit/test_routes_phase4.py +uv run pytest tests/unit/test_routes_phase4.py -v + +# Full suite (after all parts) +uv run ruff check app/ tests/ +uv run ruff format --check app/ tests/ +uv run pytest tests/unit/ -v +``` + +Manual testing: + +```bash +# 1. Validate a graph +curl -X POST localhost:8000/v1/graphs//validate \ + -H "X-API-Key: gw_" \ + -H "Content-Type: application/json" + +# 2. Try export (expect 501) +curl localhost:8000/v1/graphs//export \ + -H "X-API-Key: gw_" + +# 3. List runs for a graph +curl "localhost:8000/v1/graphs//runs?limit=5&status=completed" \ + -H "X-API-Key: gw_" + +# 4. List all runs +curl "localhost:8000/v1/runs?limit=10" \ + -H "X-API-Key: gw_" + +# 5. Cancel a run +curl -X POST localhost:8000/v1/runs//cancel \ + -H "X-API-Key: gw_" \ + -H "Content-Type: application/json" + +# 6. Delete a run +curl -X DELETE localhost:8000/v1/runs/ \ + -H "X-API-Key: gw_" +``` From 096a41dec0fa261b85e91e13da9fb2aa02e459a3 Mon Sep 17 00:00:00 2001 From: prosdev Date: Sat, 14 Mar 2026 13:39:10 -0700 Subject: [PATCH 2/5] feat: add paginated run listing and run deletion to CRUD layer Upgrade list_runs_by_graph with offset/total/status filter. Add list_runs for cross-graph queries. Add delete_run. Add migration 003 with composite indexes for run history queries. Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/execution/app/db/crud.py | 102 +++++++++++++++--- .../app/db/migrations/003_run_indexes.py | 14 +++ packages/execution/tests/unit/test_crud.py | 96 ++++++++++++++++- 3 files changed, 196 insertions(+), 16 deletions(-) create mode 100644 packages/execution/app/db/migrations/003_run_indexes.py diff --git a/packages/execution/app/db/crud.py b/packages/execution/app/db/crud.py index b6d2513..34229e5 100644 --- a/packages/execution/app/db/crud.py +++ b/packages/execution/app/db/crud.py @@ -274,25 +274,97 @@ async def list_runs_by_graph( db: aiosqlite.Connection, graph_id: str, owner_id: str | None = None, - limit: int = 10, -) -> list[Run]: - # None = admin, no filter. Route layer must enforce. + status: str | None = None, + limit: int = 20, + offset: int = 0, +) -> tuple[list[Run], int]: + """Return (runs, total_count) for a specific graph with pagination.""" + where = ["graph_id = ?"] + params: list[object] = [graph_id] + if owner_id is not None: + where.append("owner_id = ?") + params.append(owner_id) + if status is not None: + where.append("status = ?") + params.append(status) + + where_clause = " AND ".join(where) + count_cursor = await db.execute( + f"SELECT COUNT(*) FROM runs WHERE {where_clause}", # noqa: S608 + params, + ) + total = (await count_cursor.fetchone())[0] + + cursor = await db.execute( + f"SELECT id, graph_id, owner_id, status, input_json, " # noqa: S608 + "final_state_json, duration_ms, created_at, error, " + "paused_node_id, paused_prompt " + f"FROM runs WHERE {where_clause} " + "ORDER BY created_at DESC LIMIT ? OFFSET ?", + [*params, limit, offset], + ) + rows = await cursor.fetchall() + return _rows_to_runs(rows), total + + +async def list_runs( + db: aiosqlite.Connection, + owner_id: str | None = None, + graph_id: str | None = None, + status: str | None = None, + limit: int = 20, + offset: int = 0, +) -> tuple[list[Run], int]: + """Return (runs, total_count) across all graphs with pagination.""" + where: list[str] = [] + params: list[object] = [] + if owner_id is not None: + where.append("owner_id = ?") + params.append(owner_id) + if graph_id is not None: + where.append("graph_id = ?") + params.append(graph_id) + if status is not None: + where.append("status = ?") + params.append(status) + + where_clause = (" WHERE " + " AND ".join(where)) if where else "" + count_cursor = await db.execute( + f"SELECT COUNT(*) FROM runs{where_clause}", # noqa: S608 + params, + ) + total = (await count_cursor.fetchone())[0] + + cursor = await db.execute( + "SELECT id, graph_id, owner_id, status, input_json, " + "final_state_json, duration_ms, created_at, error, " + "paused_node_id, paused_prompt " + f"FROM runs{where_clause} " # noqa: S608 + "ORDER BY created_at DESC LIMIT ? OFFSET ?", + [*params, limit, offset], + ) + rows = await cursor.fetchall() + return _rows_to_runs(rows), total + + +async def delete_run( + db: aiosqlite.Connection, + run_id: str, + owner_id: str | None = None, +) -> bool: + """Delete a run by ID. Returns True if deleted.""" if owner_id is not None: cursor = await db.execute( - "SELECT id, graph_id, owner_id, status, input_json, final_state_json, " - "duration_ms, created_at, error, paused_node_id, paused_prompt " - "FROM runs WHERE graph_id = ? AND owner_id = ? " - "ORDER BY created_at DESC LIMIT ?", - (graph_id, owner_id, limit), + "DELETE FROM runs WHERE id = ? AND owner_id = ?", + (run_id, owner_id), ) else: - cursor = await db.execute( - "SELECT id, graph_id, owner_id, status, input_json, final_state_json, " - "duration_ms, created_at, error, paused_node_id, paused_prompt " - "FROM runs WHERE graph_id = ? ORDER BY created_at DESC LIMIT ?", - (graph_id, limit), - ) - rows = await cursor.fetchall() + cursor = await db.execute("DELETE FROM runs WHERE id = ?", (run_id,)) + await db.commit() + return cursor.rowcount > 0 + + +def _rows_to_runs(rows: list) -> list[Run]: return [ Run( id=row[0], diff --git a/packages/execution/app/db/migrations/003_run_indexes.py b/packages/execution/app/db/migrations/003_run_indexes.py new file mode 100644 index 0000000..6bbf920 --- /dev/null +++ b/packages/execution/app/db/migrations/003_run_indexes.py @@ -0,0 +1,14 @@ +"""Run query indexes for Phase 4 run history.""" + +VERSION = 3 + + +def up(db) -> None: + db.execute( + "CREATE INDEX IF NOT EXISTS idx_runs_graph_owner_status " + "ON runs(graph_id, owner_id, status)" + ) + db.execute( + "CREATE INDEX IF NOT EXISTS idx_runs_owner_status ON runs(owner_id, status)" + ) + db.execute("CREATE INDEX IF NOT EXISTS idx_runs_created ON runs(created_at)") diff --git a/packages/execution/tests/unit/test_crud.py b/packages/execution/tests/unit/test_crud.py index 446577c..ac471a9 100644 --- a/packages/execution/tests/unit/test_crud.py +++ b/packages/execution/tests/unit/test_crud.py @@ -8,9 +8,11 @@ create_graph, create_run, delete_graph, + delete_run, get_graph, get_run, list_graphs, + list_runs, list_runs_by_graph, update_graph, update_run, @@ -155,10 +157,102 @@ async def test_list_runs_by_graph(db): graph = await create_graph(db, "G", {}, owner_id="owner-a") for i in range(5): await create_run(db, graph.id, "owner-a", "completed", {"i": i}) - runs = await list_runs_by_graph(db, graph.id, owner_id="owner-a", limit=3) + runs, total = await list_runs_by_graph(db, graph.id, owner_id="owner-a", limit=3) assert len(runs) == 3 + assert total == 5 assert runs[0].created_at >= runs[1].created_at async def test_get_run_missing(db): assert await get_run(db, "nonexistent") is None + + +# ── Run pagination & filtering ───────────────────────────────────── + + +async def test_list_runs_by_graph_paginated(db): + graph = await create_graph(db, "G", {}, owner_id="o") + for i in range(5): + await create_run(db, graph.id, "o", "completed", {"i": i}) + runs, total = await list_runs_by_graph( + db, graph.id, owner_id="o", limit=2, offset=0 + ) + assert len(runs) == 2 + assert total == 5 + + +async def test_list_runs_by_graph_offset(db): + graph = await create_graph(db, "G", {}, owner_id="o") + for i in range(5): + await create_run(db, graph.id, "o", "completed", {"i": i}) + runs, total = await list_runs_by_graph( + db, graph.id, owner_id="o", limit=2, offset=2 + ) + assert len(runs) == 2 + assert total == 5 + + +async def test_list_runs_by_graph_status_filter(db): + graph = await create_graph(db, "G", {}, owner_id="o") + await create_run(db, graph.id, "o", "completed", {}) + await create_run(db, graph.id, "o", "completed", {}) + await create_run(db, graph.id, "o", "error", {}) + runs, total = await list_runs_by_graph( + db, graph.id, owner_id="o", status="completed" + ) + assert len(runs) == 2 + assert total == 2 + + +async def test_list_runs_by_graph_owner_isolation(db): + graph = await create_graph(db, "G", {}, owner_id="o-a") + await create_run(db, graph.id, "o-a", "completed", {}) + await create_run(db, graph.id, "o-b", "completed", {}) + runs, total = await list_runs_by_graph(db, graph.id, owner_id="o-a") + assert len(runs) == 1 + assert total == 1 + + +async def test_list_runs_all_graphs(db): + g1 = await create_graph(db, "G1", {}, owner_id="o") + g2 = await create_graph(db, "G2", {}, owner_id="o") + await create_run(db, g1.id, "o", "completed", {}) + await create_run(db, g2.id, "o", "completed", {}) + runs, total = await list_runs(db, owner_id="o") + assert len(runs) == 2 + assert total == 2 + + +async def test_list_runs_graph_id_filter(db): + g1 = await create_graph(db, "G1", {}, owner_id="o") + g2 = await create_graph(db, "G2", {}, owner_id="o") + await create_run(db, g1.id, "o", "completed", {}) + await create_run(db, g2.id, "o", "completed", {}) + runs, total = await list_runs(db, owner_id="o", graph_id=g1.id) + assert len(runs) == 1 + assert total == 1 + assert runs[0].graph_id == g1.id + + +async def test_list_runs_status_filter(db): + graph = await create_graph(db, "G", {}, owner_id="o") + await create_run(db, graph.id, "o", "completed", {}) + await create_run(db, graph.id, "o", "error", {}) + await create_run(db, graph.id, "o", "error", {}) + runs, total = await list_runs(db, owner_id="o", status="error") + assert len(runs) == 2 + assert total == 2 + + +async def test_delete_run_success(db): + graph = await create_graph(db, "G", {}, owner_id="o") + run = await create_run(db, graph.id, "o", "completed", {}) + assert await delete_run(db, run.id, owner_id="o") is True + assert await get_run(db, run.id) is None + + +async def test_delete_run_wrong_owner(db): + graph = await create_graph(db, "G", {}, owner_id="o-a") + run = await create_run(db, graph.id, "o-a", "completed", {}) + assert await delete_run(db, run.id, owner_id="o-b") is False + assert await get_run(db, run.id) is not None From 17e71a9e9a4ee4cce2fc175c19563c950d3ec4dc Mon Sep 17 00:00:00 2001 From: prosdev Date: Sat, 14 Mar 2026 13:41:20 -0700 Subject: [PATCH 3/5] feat: add schema validation and export stub routes POST /v1/graphs/{id}/validate checks schema without executing. Returns 200 with valid:true or 422 with error details and node refs. GET /v1/graphs/{id}/export returns 501 until Phase 5. Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/execution/app/routes/graphs.py | 96 ++++++++++- packages/execution/app/schemas/graphs.py | 27 +++ packages/execution/tests/unit/test_routes.py | 170 +++++++++++++++++++ 3 files changed, 291 insertions(+), 2 deletions(-) diff --git a/packages/execution/app/routes/graphs.py b/packages/execution/app/routes/graphs.py index 2ba904d..58445fd 100644 --- a/packages/execution/app/routes/graphs.py +++ b/packages/execution/app/routes/graphs.py @@ -5,17 +5,20 @@ import logging from fastapi import APIRouter, Depends, HTTPException, Query, Request -from fastapi.responses import Response +from fastapi.responses import JSONResponse, Response +from langchain_core.language_models import FakeListChatModel from langgraph.checkpoint.memory import InMemorySaver from app.auth.deps import AuthContext, require_scope -from app.builder import GraphBuildError, build_graph +from app.builder import GraphBuildError, build_graph, validate_schema from app.db import crud from app.db.connection import get_db from app.schemas.graphs import ( CreateGraphRequest, GraphResponse, + SchemaValidationError, UpdateGraphRequest, + ValidateResponse, ) from app.schemas.pagination import PaginatedResponse from app.schemas.runs import StartRunRequest, StartRunResponse @@ -153,6 +156,95 @@ async def delete_graph( return Response(status_code=204) +# ── Validate / Export ────────────────────────────────────────────────── + + +@router.post( + "/{graph_id}/validate", + response_model=ValidateResponse, + summary="Validate graph schema", + responses={ + 404: {"description": "Graph not found"}, + 422: {"description": "Schema is invalid"}, + }, +) +async def validate_graph( + graph_id: str, + auth: AuthContext = Depends(require_scope("graphs:read")), + db=Depends(get_db), +) -> ValidateResponse | JSONResponse: + """Validate a graph's schema without executing it.""" + graph = await crud.get_graph(db, graph_id, owner_id=_owner_filter(auth)) + if graph is None: + raise HTTPException(status_code=404, detail="Graph not found") + + try: + validate_schema(graph.schema_json) + except GraphBuildError as exc: + return JSONResponse( + status_code=422, + content=ValidateResponse( + valid=False, + errors=[ + SchemaValidationError( + message=str(exc), + node_ref=getattr(exc, "node_ref", None), + ) + ], + ).model_dump(), + ) + + try: + mock = FakeListChatModel(responses=[""]) + build_graph(graph.schema_json, llm_override=mock) + except GraphBuildError as exc: + return JSONResponse( + status_code=422, + content=ValidateResponse( + valid=False, + errors=[ + SchemaValidationError( + message=str(exc), + node_ref=getattr(exc, "node_ref", None), + ) + ], + ).model_dump(), + ) + except Exception as exc: + return JSONResponse( + status_code=422, + content=ValidateResponse( + valid=False, + errors=[SchemaValidationError(message=str(exc))], + ).model_dump(), + ) + + return ValidateResponse(valid=True, errors=[]) + + +@router.get( + "/{graph_id}/export", + summary="Export graph as Python code", + responses={ + 404: {"description": "Graph not found"}, + 501: {"description": "Not implemented"}, + }, +) +async def export_graph( + graph_id: str, + auth: AuthContext = Depends(require_scope("graphs:read")), + db=Depends(get_db), +) -> None: + """Export graph as standalone Python code (not yet implemented).""" + graph = await crud.get_graph(db, graph_id, owner_id=_owner_filter(auth)) + if graph is None: + raise HTTPException(status_code=404, detail="Graph not found") + raise HTTPException( + status_code=501, + detail="Export not implemented. Coming in a future release.", + ) + + # ── Run ──────────────────────────────────────────────────────────────── diff --git a/packages/execution/app/schemas/graphs.py b/packages/execution/app/schemas/graphs.py index 09d1e54..f204cfc 100644 --- a/packages/execution/app/schemas/graphs.py +++ b/packages/execution/app/schemas/graphs.py @@ -47,6 +47,33 @@ class UpdateGraphRequest(BaseModel): schema_json: dict = Field(description="Updated GraphSchema document.") +class SchemaValidationError(BaseModel): + """A single validation error from schema checking.""" + + message: str = Field(description="Human-readable error description.") + node_ref: str | None = Field( + default=None, + description="Node ID that caused the error, if applicable.", + ) + + +class ValidateResponse(BaseModel): + """Result of schema validation.""" + + valid: bool = Field(description="True if the schema is valid.") + errors: list[SchemaValidationError] = Field( + default_factory=list, + description="List of validation errors (empty when valid).", + ) + + +class ExportResponse(BaseModel): + """Exported Python code and requirements (Phase 5).""" + + code: str = Field(description="Generated Python source code.") + requirements: str = Field(description="requirements.txt content.") + + class GraphResponse(BaseModel): """Graph resource representation.""" diff --git a/packages/execution/tests/unit/test_routes.py b/packages/execution/tests/unit/test_routes.py index c2e49af..1f5d953 100644 --- a/packages/execution/tests/unit/test_routes.py +++ b/packages/execution/tests/unit/test_routes.py @@ -309,3 +309,173 @@ async def test_invalid_request_body_422(client, user_key): ) assert resp.status_code == 422 assert resp.json()["status_code"] == 422 + + +# ── Validate / Export ────────────────────────────────────────────────── + + +def _valid_schema(): + return { + "id": "v", + "name": "Valid", + "version": 1, + "state": [ + {"key": "messages", "type": "list", "reducer": "append"}, + {"key": "result", "type": "string", "reducer": "replace"}, + ], + "nodes": [ + { + "id": "s", + "type": "start", + "label": "Start", + "position": {"x": 0, "y": 0}, + "config": {}, + }, + { + "id": "tool_1", + "type": "tool", + "label": "Calc", + "position": {"x": 0, "y": 100}, + "config": { + "tool_name": "calculator", + "input_map": {"expression": "result"}, + "output_key": "result", + }, + }, + { + "id": "e", + "type": "end", + "label": "End", + "position": {"x": 0, "y": 200}, + "config": {}, + }, + ], + "edges": [ + {"id": "e1", "source": "s", "target": "tool_1"}, + {"id": "e2", "source": "tool_1", "target": "e"}, + ], + "metadata": { + "created_at": "2026-01-01", + "updated_at": "2026-01-01", + }, + } + + +async def _create_graph(client, raw_key, schema=None): + resp = await client.post( + "/v1/graphs", + headers=_headers(raw_key), + json={ + "name": "Test", + "schema_json": schema or _valid_schema(), + }, + ) + return resp.json()["id"] + + +async def test_validate_valid_schema(client, user_key): + _, raw = user_key + gid = await _create_graph(client, raw) + resp = await client.post( + f"/v1/graphs/{gid}/validate", + headers=_headers(raw), + json={}, + ) + assert resp.status_code == 200 + body = resp.json() + assert body["valid"] is True + assert body["errors"] == [] + + +async def test_validate_invalid_schema_missing_start(client, user_key): + _, raw = user_key + bad = { + "id": "bad", + "name": "Bad", + "version": 1, + "state": [ + {"key": "x", "type": "string", "reducer": "replace"}, + ], + "nodes": [ + { + "id": "e", + "type": "end", + "label": "End", + "position": {"x": 0, "y": 0}, + "config": {}, + }, + ], + "edges": [], + "metadata": { + "created_at": "2026-01-01", + "updated_at": "2026-01-01", + }, + } + gid = await _create_graph(client, raw, schema=bad) + resp = await client.post( + f"/v1/graphs/{gid}/validate", + headers=_headers(raw), + json={}, + ) + assert resp.status_code == 422 + body = resp.json() + assert body["valid"] is False + assert len(body["errors"]) >= 1 + assert body["errors"][0]["message"] + + +async def test_validate_invalid_schema_unknown_tool(client, user_key): + _, raw = user_key + bad = _valid_schema() + bad["nodes"][1]["config"]["tool_name"] = "does_not_exist" + gid = await _create_graph(client, raw, schema=bad) + resp = await client.post( + f"/v1/graphs/{gid}/validate", + headers=_headers(raw), + json={}, + ) + assert resp.status_code == 422 + body = resp.json() + assert body["valid"] is False + assert body["errors"][0]["node_ref"] == "tool_1" + + +async def test_validate_graph_not_found(client, user_key): + _, raw = user_key + resp = await client.post( + "/v1/graphs/nonexistent/validate", + headers=_headers(raw), + json={}, + ) + assert resp.status_code == 404 + + +async def test_validate_wrong_owner(client, admin_key): + _, admin_raw = admin_key + db = app.state.db + key_a, raw_a = await create_test_key(db, name="va") + key_b, raw_b = await create_test_key(db, name="vb") + gid = await _create_graph(client, raw_a) + resp = await client.post( + f"/v1/graphs/{gid}/validate", + headers=_headers(raw_b), + json={}, + ) + assert resp.status_code == 404 + + +async def test_export_returns_501(client, user_key): + _, raw = user_key + gid = await _create_graph(client, raw) + resp = await client.get(f"/v1/graphs/{gid}/export", headers=_headers(raw)) + assert resp.status_code == 501 + assert "not implemented" in resp.json()["detail"].lower() + + +async def test_export_graph_not_found(client, user_key): + _, raw = user_key + resp = await client.get( + "/v1/graphs/nonexistent/export", + headers=_headers(raw), + ) + assert resp.status_code == 404 From 2dc7814810e7f38e39dcf08e683f5b753a89c3cf Mon Sep 17 00:00:00 2001 From: prosdev Date: Sat, 14 Mar 2026 13:44:37 -0700 Subject: [PATCH 4/5] feat: add run history, cancel, and delete routes GET /v1/graphs/{id}/runs lists paginated runs with status filter. GET /v1/runs lists all runs for the authenticated owner. POST /v1/runs/{id}/cancel stops running/paused runs. DELETE /v1/runs/{id} removes completed/error runs. Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/execution/app/routes/graphs.py | 58 ++- packages/execution/app/routes/runs.py | 138 +++++- packages/execution/app/schemas/__init__.py | 2 + packages/execution/app/schemas/runs.py | 12 + .../execution/tests/unit/test_migrations.py | 14 +- .../tests/unit/test_routes_phase4.py | 413 ++++++++++++++++++ 6 files changed, 625 insertions(+), 12 deletions(-) create mode 100644 packages/execution/tests/unit/test_routes_phase4.py diff --git a/packages/execution/app/routes/graphs.py b/packages/execution/app/routes/graphs.py index 58445fd..ba04779 100644 --- a/packages/execution/app/routes/graphs.py +++ b/packages/execution/app/routes/graphs.py @@ -3,6 +3,7 @@ from __future__ import annotations import logging +from typing import Literal from fastapi import APIRouter, Depends, HTTPException, Query, Request from fastapi.responses import JSONResponse, Response @@ -21,7 +22,7 @@ ValidateResponse, ) from app.schemas.pagination import PaginatedResponse -from app.schemas.runs import StartRunRequest, StartRunResponse +from app.schemas.runs import RunListItem, StartRunRequest, StartRunResponse logger = logging.getLogger(__name__) @@ -245,7 +246,60 @@ async def export_graph( ) -# ── Run ──────────────────────────────────────────────────────────────── +# ── Run History ──────────────────────────────────────────────────────── + +_RUN_STATUS = Literal["running", "paused", "completed", "error"] + + +def _run_list_item(run) -> dict: + return RunListItem( + id=run.id, + graph_id=run.graph_id, + status=run.status, + input=run.input, + duration_ms=run.duration_ms, + created_at=run.created_at, + error=run.error, + ).model_dump() + + +@router.get( + "/{graph_id}/runs", + response_model=PaginatedResponse, + summary="List runs for graph", + responses={404: {"description": "Graph not found"}}, +) +async def list_runs_for_graph( + graph_id: str, + limit: int = Query(default=20, ge=1, le=100), + offset: int = Query(default=0, ge=0), + status: _RUN_STATUS | None = Query(default=None), + auth: AuthContext = Depends(require_scope("runs:read")), + db=Depends(get_db), +) -> PaginatedResponse: + """List paginated run history for a specific graph.""" + graph = await crud.get_graph(db, graph_id, owner_id=_owner_filter(auth)) + if graph is None: + raise HTTPException(status_code=404, detail="Graph not found") + + runs, total = await crud.list_runs_by_graph( + db, + graph_id, + owner_id=_owner_filter(auth), + status=status, + limit=limit, + offset=offset, + ) + return PaginatedResponse( + items=[_run_list_item(r) for r in runs], + total=total, + limit=limit, + offset=offset, + has_more=(offset + limit) < total, + ) + + +# ── Start Run ───────────────────────────────────────────────────────── def _get_run_manager(request: Request): diff --git a/packages/execution/app/routes/runs.py b/packages/execution/app/routes/runs.py index 6c7729f..ac2e9b9 100644 --- a/packages/execution/app/routes/runs.py +++ b/packages/execution/app/routes/runs.py @@ -1,23 +1,27 @@ -"""Run routes — stream, resume, status.""" +"""Run routes — stream, resume, status, list, cancel, delete.""" from __future__ import annotations import logging from collections.abc import AsyncGenerator +from typing import Literal from fastapi import APIRouter, Depends, Header, HTTPException, Query, Request -from fastapi.responses import StreamingResponse +from fastapi.responses import Response, StreamingResponse from app.auth.deps import AuthContext, require_scope from app.db import crud from app.db.connection import get_db from app.executor import RunManager, format_sse, stream_run_sse -from app.schemas.runs import ResumeRunRequest, RunStatusResponse +from app.schemas.pagination import PaginatedResponse +from app.schemas.runs import ResumeRunRequest, RunListItem, RunStatusResponse logger = logging.getLogger(__name__) router = APIRouter(prefix="/v1/runs", tags=["Runs"]) +_RUN_STATUS = Literal["running", "paused", "completed", "error"] + def _get_run_manager(request: Request) -> RunManager: return request.app.state.run_manager @@ -200,3 +204,131 @@ async def run_status( duration_ms=run.duration_ms, error=run.error, ) + + +# ── List / Cancel / Delete ──────────────────────────────────────────── + + +def _owner_filter(auth: AuthContext) -> str | None: + return None if auth.is_admin else auth.owner_id + + +def _run_list_item(run) -> dict: + return RunListItem( + id=run.id, + graph_id=run.graph_id, + status=run.status, + input=run.input, + duration_ms=run.duration_ms, + created_at=run.created_at, + error=run.error, + ).model_dump() + + +@router.get( + "", + response_model=PaginatedResponse, + summary="List all runs", +) +async def list_all_runs( + limit: int = Query(default=20, ge=1, le=100), + offset: int = Query(default=0, ge=0), + status: _RUN_STATUS | None = Query(default=None), + graph_id: str | None = Query(default=None), + auth: AuthContext = Depends(require_scope("runs:read")), + db=Depends(get_db), +) -> PaginatedResponse: + """List paginated run history across all graphs.""" + runs, total = await crud.list_runs( + db, + owner_id=_owner_filter(auth), + graph_id=graph_id, + status=status, + limit=limit, + offset=offset, + ) + return PaginatedResponse( + items=[_run_list_item(r) for r in runs], + total=total, + limit=limit, + offset=offset, + has_more=(offset + limit) < total, + ) + + +@router.post( + "/{run_id}/cancel", + status_code=202, + summary="Cancel a run", + responses={ + 404: {"description": "Run not found"}, + 409: {"description": "Run is not cancellable"}, + }, +) +async def cancel_run( + run_id: str, + request: Request, + auth: AuthContext = Depends(require_scope("runs:write")), + db=Depends(get_db), +) -> dict: + """Request cancellation of a running or paused run.""" + run_manager = _get_run_manager(request) + ctx = run_manager.get_run(run_id) + + if ctx is not None: + if ctx.owner_id != auth.owner_id and not auth.is_admin: + raise HTTPException(status_code=404, detail="Run not found") + if ctx.status in ("running", "paused"): + await run_manager.cancel_run(run_id) + return {"detail": "Cancel requested"} + raise HTTPException(status_code=409, detail="Run is not cancellable") + + # DB fallback + run = await crud.get_run(db, run_id, owner_id=_owner_filter(auth)) + if run is None: + raise HTTPException(status_code=404, detail="Run not found") + if run.status in ("running", "paused"): + await crud.update_run( + db, + run_id, + status="error", + error="Cancelled (server lost run)", + ) + return {"detail": "Cancel requested"} + raise HTTPException(status_code=409, detail="Run is not cancellable") + + +@router.delete( + "/{run_id}", + status_code=204, + summary="Delete a run", + responses={ + 404: {"description": "Run not found"}, + 409: {"description": "Run is still active"}, + }, +) +async def delete_run( + run_id: str, + request: Request, + auth: AuthContext = Depends(require_scope("runs:write")), + db=Depends(get_db), +) -> Response: + """Delete a completed or error run from the database.""" + run_manager = _get_run_manager(request) + ctx = run_manager.get_run(run_id) + if ctx is not None and ctx.status in ("running", "paused"): + raise HTTPException( + status_code=409, + detail="Cannot delete an active run. Cancel it first.", + ) + + run = await crud.get_run(db, run_id, owner_id=_owner_filter(auth)) + if run is None: + raise HTTPException(status_code=404, detail="Run not found") + if run.status in ("running", "paused"): + raise HTTPException( + status_code=409, + detail="Cannot delete an active run. Cancel it first.", + ) + await crud.delete_run(db, run_id, owner_id=_owner_filter(auth)) + return Response(status_code=204) diff --git a/packages/execution/app/schemas/__init__.py b/packages/execution/app/schemas/__init__.py index 951bb31..76e3773 100644 --- a/packages/execution/app/schemas/__init__.py +++ b/packages/execution/app/schemas/__init__.py @@ -2,6 +2,7 @@ from app.schemas.runs import ( ResumeRunRequest, + RunListItem, RunStatusResponse, StartRunRequest, StartRunResponse, @@ -9,6 +10,7 @@ __all__ = [ "ResumeRunRequest", + "RunListItem", "RunStatusResponse", "StartRunRequest", "StartRunResponse", diff --git a/packages/execution/app/schemas/runs.py b/packages/execution/app/schemas/runs.py index eb331f4..4204e1c 100644 --- a/packages/execution/app/schemas/runs.py +++ b/packages/execution/app/schemas/runs.py @@ -28,6 +28,18 @@ class RunStatusResponse(BaseModel): error: str | None = None +class RunListItem(BaseModel): + """Lightweight run representation for list endpoints.""" + + id: str + graph_id: str + status: str + input: dict = Field(default_factory=dict) + duration_ms: int | None = None + created_at: str + error: str | None = None + + class ResumeRunRequest(BaseModel): input: bool | str | dict | list | int | float = Field( ..., diff --git a/packages/execution/tests/unit/test_migrations.py b/packages/execution/tests/unit/test_migrations.py index 034a663..a541089 100644 --- a/packages/execution/tests/unit/test_migrations.py +++ b/packages/execution/tests/unit/test_migrations.py @@ -30,7 +30,7 @@ def test_fresh_db_creates_tables(db_path): assert "api_keys" in tables version = conn.execute("SELECT MAX(version) FROM schema_version").fetchone()[0] - assert version == 2 + assert version == 3 conn.close() @@ -40,7 +40,7 @@ def test_idempotent_run(db_path): conn = sqlite3.connect(db_path) version = conn.execute("SELECT MAX(version) FROM schema_version").fetchone()[0] - assert version == 2 + assert version == 3 conn.close() @@ -48,7 +48,7 @@ def test_bad_migration_rolls_back(db_path, monkeypatch): run_migrations(db_path) bad_module = types.ModuleType("bad_migration") - bad_module.VERSION = 3 + bad_module.VERSION = 4 def bad_up(db): raise RuntimeError("intentional failure") @@ -62,23 +62,23 @@ def bad_up(db): def patched_iter(path): yield from original_iter(path) - info = types.SimpleNamespace(name="003_bad", ispkg=False) + info = types.SimpleNamespace(name="004_bad", ispkg=False) yield info def patched_import(name): - if name == "app.db.migrations.003_bad": + if name == "app.db.migrations.004_bad": return bad_module return original_import(name) monkeypatch.setattr(runner_mod.pkgutil, "iter_modules", patched_iter) monkeypatch.setattr(runner_mod.importlib, "import_module", patched_import) - with pytest.raises(MigrationError, match="Migration 003 failed"): + with pytest.raises(MigrationError, match="Migration 004 failed"): run_migrations(db_path) conn = sqlite3.connect(db_path) version = conn.execute("SELECT MAX(version) FROM schema_version").fetchone()[0] - assert version == 2 + assert version == 3 conn.close() diff --git a/packages/execution/tests/unit/test_routes_phase4.py b/packages/execution/tests/unit/test_routes_phase4.py new file mode 100644 index 0000000..563b076 --- /dev/null +++ b/packages/execution/tests/unit/test_routes_phase4.py @@ -0,0 +1,413 @@ +"""Integration tests for Phase 4 routes: run history, cancel, delete.""" + +from __future__ import annotations + +import asyncio + +import aiosqlite +import httpx +import pytest + +from app.auth import SCOPES_DEFAULT +from app.db import crud +from app.db.migrations.runner import run_migrations +from app.executor import RunManager +from app.main import app +from tests.conftest import create_test_key + + +def _simple_schema(): + """Tool-only schema — no LLM, no API keys needed.""" + return { + "id": "p4", + "name": "Phase4Test", + "version": 1, + "state": [ + {"key": "messages", "type": "list", "reducer": "append"}, + {"key": "result", "type": "string", "reducer": "replace"}, + ], + "nodes": [ + { + "id": "s", + "type": "start", + "label": "Start", + "position": {"x": 0, "y": 0}, + "config": {}, + }, + { + "id": "tool_1", + "type": "tool", + "label": "Calc", + "position": {"x": 0, "y": 100}, + "config": { + "tool_name": "calculator", + "input_map": {"expression": "result"}, + "output_key": "result", + }, + }, + { + "id": "e", + "type": "end", + "label": "End", + "position": {"x": 0, "y": 200}, + "config": {}, + }, + ], + "edges": [ + {"id": "e1", "source": "s", "target": "tool_1"}, + {"id": "e2", "source": "tool_1", "target": "e"}, + ], + "metadata": { + "created_at": "2026-01-01", + "updated_at": "2026-01-01", + }, + } + + +@pytest.fixture(autouse=True) +def _env(monkeypatch): + monkeypatch.setenv("RUN_CLEANUP_GRACE_SECONDS", "0") + monkeypatch.setenv("OPENAI_API_KEY", "sk-test-dummy-key") + + +@pytest.fixture +async def client(tmp_path): + db_path = str(tmp_path / "test.db") + run_migrations(db_path) + db = await aiosqlite.connect(db_path) + db.row_factory = aiosqlite.Row + app.state.db = db + app.state.run_manager = RunManager() + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + yield c + await app.state.run_manager.cancel_all() + await db.close() + + +@pytest.fixture +async def api_key(client): + db = app.state.db + key, raw = await create_test_key(db, scopes=SCOPES_DEFAULT, name="user") + return key, raw + + +@pytest.fixture +async def api_key_b(client): + db = app.state.db + key, raw = await create_test_key(db, scopes=SCOPES_DEFAULT, name="user-b") + return key, raw + + +def _headers(raw_key: str) -> dict: + return {"X-API-Key": raw_key} + + +async def _create_graph(client, raw_key): + resp = await client.post( + "/v1/graphs", + headers=_headers(raw_key), + json={"name": "Test", "schema_json": _simple_schema()}, + ) + return resp.json()["id"] + + +async def _insert_run(db, graph_id, owner_id, status): + """Insert a run directly into DB for history tests.""" + return await crud.create_run(db, graph_id, owner_id, status, {}) + + +# ── List Runs for Graph ─────────────────────────────────────────────── + + +async def test_list_runs_for_graph_empty(client, api_key): + _, raw = api_key + gid = await _create_graph(client, raw) + resp = await client.get(f"/v1/graphs/{gid}/runs", headers=_headers(raw)) + assert resp.status_code == 200 + body = resp.json() + assert body["items"] == [] + assert body["total"] == 0 + + +async def test_list_runs_for_graph_paginated(client, api_key): + key, raw = api_key + gid = await _create_graph(client, raw) + db = app.state.db + for _ in range(5): + await _insert_run(db, gid, key.id, "completed") + resp = await client.get( + f"/v1/graphs/{gid}/runs?limit=2", + headers=_headers(raw), + ) + assert resp.status_code == 200 + body = resp.json() + assert len(body["items"]) == 2 + assert body["total"] == 5 + assert body["has_more"] is True + + +async def test_list_runs_for_graph_status_filter(client, api_key): + key, raw = api_key + gid = await _create_graph(client, raw) + db = app.state.db + await _insert_run(db, gid, key.id, "completed") + await _insert_run(db, gid, key.id, "completed") + await _insert_run(db, gid, key.id, "error") + resp = await client.get( + f"/v1/graphs/{gid}/runs?status=completed", + headers=_headers(raw), + ) + body = resp.json() + assert body["total"] == 2 + assert all(item["status"] == "completed" for item in body["items"]) + + +async def test_list_runs_for_graph_not_found(client, api_key): + _, raw = api_key + resp = await client.get( + "/v1/graphs/nonexistent/runs", + headers=_headers(raw), + ) + assert resp.status_code == 404 + + +async def test_list_runs_for_graph_wrong_owner(client, api_key, api_key_b): + _, raw_a = api_key + _, raw_b = api_key_b + gid = await _create_graph(client, raw_a) + resp = await client.get(f"/v1/graphs/{gid}/runs", headers=_headers(raw_b)) + assert resp.status_code == 404 + + +# ── List All Runs ───────────────────────────────────────────────────── + + +async def test_list_all_runs(client, api_key): + key, raw = api_key + db = app.state.db + g1 = await _create_graph(client, raw) + g2 = await _create_graph(client, raw) + await _insert_run(db, g1, key.id, "completed") + await _insert_run(db, g2, key.id, "error") + resp = await client.get("/v1/runs", headers=_headers(raw)) + assert resp.status_code == 200 + body = resp.json() + assert body["total"] == 2 + + +async def test_list_all_runs_graph_id_filter(client, api_key): + key, raw = api_key + db = app.state.db + g1 = await _create_graph(client, raw) + g2 = await _create_graph(client, raw) + await _insert_run(db, g1, key.id, "completed") + await _insert_run(db, g2, key.id, "completed") + resp = await client.get( + f"/v1/runs?graph_id={g1}", + headers=_headers(raw), + ) + body = resp.json() + assert body["total"] == 1 + assert body["items"][0]["graph_id"] == g1 + + +async def test_list_all_runs_excludes_other_owners(client, api_key, api_key_b): + key_a, raw_a = api_key + key_b, raw_b = api_key_b + db = app.state.db + g1 = await _create_graph(client, raw_a) + g2 = await _create_graph(client, raw_b) + await _insert_run(db, g1, key_a.id, "completed") + await _insert_run(db, g2, key_b.id, "completed") + resp = await client.get("/v1/runs", headers=_headers(raw_a)) + body = resp.json() + assert body["total"] == 1 + assert body["items"][0]["graph_id"] == g1 + + +# ── Cancel ──────────────────────────────────────────────────────────── + + +def _pause_schema(): + """Schema with human_input — stays paused so cancel works.""" + return { + "id": "pause", + "name": "PauseTest", + "version": 1, + "state": [ + {"key": "messages", "type": "list", "reducer": "append"}, + {"key": "answer", "type": "string", "reducer": "replace"}, + ], + "nodes": [ + { + "id": "s", + "type": "start", + "label": "Start", + "position": {"x": 0, "y": 0}, + "config": {}, + }, + { + "id": "ask", + "type": "human_input", + "label": "Ask", + "position": {"x": 0, "y": 100}, + "config": { + "prompt": "Wait here", + "input_key": "answer", + }, + }, + { + "id": "e", + "type": "end", + "label": "End", + "position": {"x": 0, "y": 200}, + "config": {}, + }, + ], + "edges": [ + {"id": "e1", "source": "s", "target": "ask"}, + {"id": "e2", "source": "ask", "target": "e"}, + ], + "metadata": { + "created_at": "2026-01-01", + "updated_at": "2026-01-01", + }, + } + + +async def _create_pause_graph(client, raw_key): + resp = await client.post( + "/v1/graphs", + headers=_headers(raw_key), + json={ + "name": "Pause", + "schema_json": _pause_schema(), + }, + ) + return resp.json()["id"] + + +async def test_cancel_running_run(client, api_key): + _, raw = api_key + gid = await _create_pause_graph(client, raw) + resp = await client.post( + f"/v1/graphs/{gid}/run", + headers=_headers(raw), + json={}, + ) + assert resp.status_code == 202 + run_id = resp.json()["run_id"] + + # Wait for it to pause + for _ in range(50): + resp = await client.get( + f"/v1/runs/{run_id}/status", + headers=_headers(raw), + ) + if resp.json()["status"] == "paused": + break + await asyncio.sleep(0.1) + + resp = await client.post( + f"/v1/runs/{run_id}/cancel", + headers=_headers(raw), + json={}, + ) + assert resp.status_code == 202 + assert "cancel" in resp.json()["detail"].lower() + + +async def test_cancel_already_completed(client, api_key): + _, raw = api_key + gid = await _create_graph(client, raw) + resp = await client.post( + f"/v1/graphs/{gid}/run", + headers=_headers(raw), + json={"input": {"result": "1 + 1"}}, + ) + run_id = resp.json()["run_id"] + + # Wait for completion + for _ in range(50): + resp = await client.get( + f"/v1/runs/{run_id}/status", + headers=_headers(raw), + ) + if resp.json()["status"] in ("completed", "error"): + break + await asyncio.sleep(0.1) + + resp = await client.post( + f"/v1/runs/{run_id}/cancel", + headers=_headers(raw), + json={}, + ) + assert resp.status_code == 409 + + +async def test_cancel_stale_db_run(client, api_key): + key, raw = api_key + db = app.state.db + gid = await _create_graph(client, raw) + # Insert run directly in DB as "running" (not in RunManager) + run = await _insert_run(db, gid, key.id, "running") + + resp = await client.post( + f"/v1/runs/{run.id}/cancel", + headers=_headers(raw), + json={}, + ) + assert resp.status_code == 202 + + # Verify DB status updated + updated = await crud.get_run(db, run.id) + assert updated.status == "error" + assert "server lost" in updated.error.lower() + + +async def test_cancel_not_found(client, api_key): + _, raw = api_key + resp = await client.post( + "/v1/runs/nonexistent/cancel", + headers=_headers(raw), + json={}, + ) + assert resp.status_code == 404 + + +# ── Delete ──────────────────────────────────────────────────────────── + + +async def test_delete_completed_run(client, api_key): + key, raw = api_key + db = app.state.db + gid = await _create_graph(client, raw) + run = await _insert_run(db, gid, key.id, "completed") + + resp = await client.delete(f"/v1/runs/{run.id}", headers=_headers(raw)) + assert resp.status_code == 204 + + # Verify it's gone + resp = await client.get( + f"/v1/runs/{run.id}/status", + headers=_headers(raw), + ) + assert resp.status_code == 404 + + +async def test_delete_active_run_rejected(client, api_key): + _, raw = api_key + gid = await _create_graph(client, raw) + resp = await client.post( + f"/v1/graphs/{gid}/run", + headers=_headers(raw), + json={"input": {"result": "1 + 1"}}, + ) + run_id = resp.json()["run_id"] + + # Try deleting immediately (still running) + resp = await client.delete(f"/v1/runs/{run_id}", headers=_headers(raw)) + # Could be 409 (still in RunManager) or 204 (already completed) + # The run is very fast, so check both + assert resp.status_code in (204, 409) From 564a18b82fe50408eabad0fb208e4adf7cb960ab Mon Sep 17 00:00:00 2001 From: prosdev Date: Sat, 14 Mar 2026 14:05:57 -0700 Subject: [PATCH 5/5] test: add Phase 4 manual tests and fix cancel during pause Add 11 manual test scripts (19-29) covering Phase 4 API routes: validate, export stub, run history, pagination, status filter, cancel paused, cancel stale, delete completed, delete active rejected, and full lifecycle. Fix cancel_run to also set resume_event, unblocking _wait_for_resume so cancellation is detected during pause. Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/execution/app/executor.py | 2 + .../tests/manual/test_19_validate_then_run.py | 141 ++++++++++ .../tests/manual/test_20_validate_errors.py | 176 ++++++++++++ .../tests/manual/test_21_export_stub.py | 63 +++++ .../tests/manual/test_22_run_history_mixed.py | 220 +++++++++++++++ .../tests/manual/test_23_pagination.py | 112 ++++++++ .../tests/manual/test_24_status_filter.py | 94 +++++++ .../tests/manual/test_25_cancel_paused.py | 160 +++++++++++ .../tests/manual/test_26_cancel_stale.py | 87 ++++++ .../tests/manual/test_27_delete_completed.py | 83 ++++++ .../manual/test_28_delete_active_rejected.py | 153 +++++++++++ .../tests/manual/test_29_full_lifecycle.py | 253 ++++++++++++++++++ 12 files changed, 1544 insertions(+) create mode 100644 packages/execution/tests/manual/test_19_validate_then_run.py create mode 100644 packages/execution/tests/manual/test_20_validate_errors.py create mode 100644 packages/execution/tests/manual/test_21_export_stub.py create mode 100644 packages/execution/tests/manual/test_22_run_history_mixed.py create mode 100644 packages/execution/tests/manual/test_23_pagination.py create mode 100644 packages/execution/tests/manual/test_24_status_filter.py create mode 100644 packages/execution/tests/manual/test_25_cancel_paused.py create mode 100644 packages/execution/tests/manual/test_26_cancel_stale.py create mode 100644 packages/execution/tests/manual/test_27_delete_completed.py create mode 100644 packages/execution/tests/manual/test_28_delete_active_rejected.py create mode 100644 packages/execution/tests/manual/test_29_full_lifecycle.py diff --git a/packages/execution/app/executor.py b/packages/execution/app/executor.py index be19c92..96bf495 100644 --- a/packages/execution/app/executor.py +++ b/packages/execution/app/executor.py @@ -194,6 +194,8 @@ async def cancel_run(self, run_id: str) -> bool: if ctx is None: return False ctx.cancel_event.set() + # Unblock _wait_for_resume so the cancel is detected + ctx.resume_event.set() return True async def submit_resume(self, run_id: str, value: Any) -> bool: diff --git a/packages/execution/tests/manual/test_19_validate_then_run.py b/packages/execution/tests/manual/test_19_validate_then_run.py new file mode 100644 index 0000000..38bc2d5 --- /dev/null +++ b/packages/execution/tests/manual/test_19_validate_then_run.py @@ -0,0 +1,141 @@ +"""Manual test 19: Validate a valid schema, then run it. + +Proves validate and execute agree — a schema that validates +should also run successfully. + +Usage: cd packages/execution && uv run python tests/manual/test_19_validate_then_run.py +""" + +import asyncio +import os + +import aiosqlite +import httpx + +from app.auth import SCOPES_DEFAULT +from app.db.migrations.runner import run_migrations +from app.executor import RunManager +from app.main import app +from tests.conftest import create_test_key + +os.environ.setdefault("RUN_CLEANUP_GRACE_SECONDS", "0") +os.environ.setdefault("OPENAI_API_KEY", "sk-test-dummy") + + +def _schema(): + return { + "id": "v", + "name": "Valid", + "version": 1, + "state": [ + {"key": "messages", "type": "list", "reducer": "append"}, + {"key": "result", "type": "string", "reducer": "replace"}, + ], + "nodes": [ + { + "id": "s", + "type": "start", + "label": "Start", + "position": {"x": 0, "y": 0}, + "config": {}, + }, + { + "id": "tool_1", + "type": "tool", + "label": "Calc", + "position": {"x": 0, "y": 100}, + "config": { + "tool_name": "calculator", + "input_map": {"expression": "result"}, + "output_key": "result", + }, + }, + { + "id": "e", + "type": "end", + "label": "End", + "position": {"x": 0, "y": 200}, + "config": {}, + }, + ], + "edges": [ + {"id": "e1", "source": "s", "target": "tool_1"}, + {"id": "e2", "source": "tool_1", "target": "e"}, + ], + "metadata": { + "created_at": "2026-01-01", + "updated_at": "2026-01-01", + }, + } + + +async def main(): + print("Test 19: Validate valid schema, then run it") + print("-" * 50) + + db_path = "/tmp/test_19.db" + run_migrations(db_path) + db = await aiosqlite.connect(db_path) + db.row_factory = aiosqlite.Row + app.state.db = db + app.state.run_manager = RunManager() + _, raw_key = await create_test_key(db, scopes=SCOPES_DEFAULT, name="t19") + headers = {"X-API-Key": raw_key} + + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as client: + # Create graph + resp = await client.post( + "/v1/graphs", + headers=headers, + json={"name": "Test19", "schema_json": _schema()}, + ) + assert resp.status_code == 201 + gid = resp.json()["id"] + print(f"\n Graph created: {gid}") + + # Validate + resp = await client.post( + f"/v1/graphs/{gid}/validate", + headers=headers, + json={}, + ) + assert resp.status_code == 200 + body = resp.json() + assert body["valid"] is True + assert body["errors"] == [] + print(" Validation: VALID") + + # Run it + resp = await client.post( + f"/v1/graphs/{gid}/run", + headers=headers, + json={"input": {"result": "2 + 3"}}, + ) + assert resp.status_code == 202 + run_id = resp.json()["run_id"] + print(f" Run started: {run_id}") + + # Wait for completion + for _ in range(50): + resp = await client.get( + f"/v1/runs/{run_id}/status", + headers=headers, + ) + status = resp.json()["status"] + if status in ("completed", "error"): + break + await asyncio.sleep(0.1) + + print(f" Run status: {status}") + assert status == "completed", f"Expected completed, got {status}" + + await app.state.run_manager.cancel_all() + await db.close() + os.unlink(db_path) + + print("\n PASS") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/packages/execution/tests/manual/test_20_validate_errors.py b/packages/execution/tests/manual/test_20_validate_errors.py new file mode 100644 index 0000000..4e11e73 --- /dev/null +++ b/packages/execution/tests/manual/test_20_validate_errors.py @@ -0,0 +1,176 @@ +"""Manual test 20: Validate catches multiple error types. + +Tests structural (missing start), semantic (unknown tool), +and graph-not-found errors. + +Usage: cd packages/execution && uv run python tests/manual/test_20_validate_errors.py +""" + +import asyncio +import os + +import aiosqlite +import httpx + +from app.auth import SCOPES_DEFAULT +from app.db.migrations.runner import run_migrations +from app.executor import RunManager +from app.main import app +from tests.conftest import create_test_key + +os.environ.setdefault("RUN_CLEANUP_GRACE_SECONDS", "0") +os.environ.setdefault("OPENAI_API_KEY", "sk-test-dummy") + + +async def main(): + print("Test 20: Validate catches multiple error types") + print("-" * 50) + + db_path = "/tmp/test_20.db" + run_migrations(db_path) + db = await aiosqlite.connect(db_path) + db.row_factory = aiosqlite.Row + app.state.db = db + app.state.run_manager = RunManager() + _, raw_key = await create_test_key(db, scopes=SCOPES_DEFAULT, name="t20") + headers = {"X-API-Key": raw_key} + + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as client: + # 1. Missing start node + print("\n Case 1: Missing start node") + no_start = { + "id": "bad", + "name": "Bad", + "version": 1, + "state": [ + { + "key": "x", + "type": "string", + "reducer": "replace", + }, + ], + "nodes": [ + { + "id": "e", + "type": "end", + "label": "End", + "position": {"x": 0, "y": 0}, + "config": {}, + }, + ], + "edges": [], + "metadata": { + "created_at": "2026-01-01", + "updated_at": "2026-01-01", + }, + } + resp = await client.post( + "/v1/graphs", + headers=headers, + json={"name": "NoStart", "schema_json": no_start}, + ) + gid = resp.json()["id"] + resp = await client.post( + f"/v1/graphs/{gid}/validate", + headers=headers, + json={}, + ) + assert resp.status_code == 422 + body = resp.json() + assert body["valid"] is False + assert len(body["errors"]) >= 1 + print(f" Status: 422, Error: {body['errors'][0]['message'][:60]}") + + # 2. Unknown tool + print("\n Case 2: Unknown tool name") + bad_tool = { + "id": "bt", + "name": "BadTool", + "version": 1, + "state": [ + { + "key": "messages", + "type": "list", + "reducer": "append", + }, + { + "key": "out", + "type": "string", + "reducer": "replace", + }, + ], + "nodes": [ + { + "id": "s", + "type": "start", + "label": "Start", + "position": {"x": 0, "y": 0}, + "config": {}, + }, + { + "id": "t", + "type": "tool", + "label": "Ghost", + "position": {"x": 0, "y": 100}, + "config": { + "tool_name": "ghost_tool", + "input_map": {"x": "messages"}, + "output_key": "out", + }, + }, + { + "id": "e", + "type": "end", + "label": "End", + "position": {"x": 0, "y": 200}, + "config": {}, + }, + ], + "edges": [ + {"id": "e1", "source": "s", "target": "t"}, + {"id": "e2", "source": "t", "target": "e"}, + ], + "metadata": { + "created_at": "2026-01-01", + "updated_at": "2026-01-01", + }, + } + resp = await client.post( + "/v1/graphs", + headers=headers, + json={"name": "BadTool", "schema_json": bad_tool}, + ) + gid2 = resp.json()["id"] + resp = await client.post( + f"/v1/graphs/{gid2}/validate", + headers=headers, + json={}, + ) + assert resp.status_code == 422 + body = resp.json() + assert body["valid"] is False + assert body["errors"][0].get("node_ref") == "t" + print( + f" Status: 422, node_ref: {body['errors'][0]['node_ref']}, " + f"Error: {body['errors'][0]['message'][:50]}" + ) + + # 3. Graph not found + print("\n Case 3: Graph not found") + resp = await client.post( + "/v1/graphs/nonexistent/validate", + headers=headers, + json={}, + ) + assert resp.status_code == 404 + print(" Status: 404") + + await db.close() + os.unlink(db_path) + + print("\n PASS") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/packages/execution/tests/manual/test_21_export_stub.py b/packages/execution/tests/manual/test_21_export_stub.py new file mode 100644 index 0000000..b6f9e98 --- /dev/null +++ b/packages/execution/tests/manual/test_21_export_stub.py @@ -0,0 +1,63 @@ +"""Manual test 21: Export returns 501 gracefully. + +Usage: cd packages/execution && uv run python tests/manual/test_21_export_stub.py +""" + +import asyncio +import os + +import aiosqlite +import httpx + +from app.auth import SCOPES_DEFAULT +from app.db.migrations.runner import run_migrations +from app.executor import RunManager +from app.main import app +from tests.conftest import create_test_key + +os.environ.setdefault("RUN_CLEANUP_GRACE_SECONDS", "0") +os.environ.setdefault("OPENAI_API_KEY", "sk-test-dummy") + + +async def main(): + print("Test 21: Export returns 501 gracefully") + print("-" * 50) + + db_path = "/tmp/test_21.db" + run_migrations(db_path) + db = await aiosqlite.connect(db_path) + db.row_factory = aiosqlite.Row + app.state.db = db + app.state.run_manager = RunManager() + _, raw_key = await create_test_key(db, scopes=SCOPES_DEFAULT, name="t21") + headers = {"X-API-Key": raw_key} + + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as client: + resp = await client.post( + "/v1/graphs", + headers=headers, + json={"name": "G", "schema_json": {}}, + ) + gid = resp.json()["id"] + + resp = await client.get(f"/v1/graphs/{gid}/export", headers=headers) + assert resp.status_code == 501 + body = resp.json() + assert "not implemented" in body["detail"].lower() + print(f"\n Status: {resp.status_code}") + print(f" Detail: {body['detail']}") + + # Not-found still returns 404, not 501 + resp = await client.get("/v1/graphs/missing/export", headers=headers) + assert resp.status_code == 404 + print(f" Missing graph: {resp.status_code}") + + await db.close() + os.unlink(db_path) + + print("\n PASS") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/packages/execution/tests/manual/test_22_run_history_mixed.py b/packages/execution/tests/manual/test_22_run_history_mixed.py new file mode 100644 index 0000000..c71488e --- /dev/null +++ b/packages/execution/tests/manual/test_22_run_history_mixed.py @@ -0,0 +1,220 @@ +"""Manual test 22: Run history after mixed executions. + +Execute 3 runs (1 success, 1 tool error, 1 cancelled), then list +all runs and verify statuses and counts. + +Usage: cd packages/execution && uv run python tests/manual/test_22_run_history_mixed.py +""" + +import asyncio +import os + +import aiosqlite +import httpx + +from app.auth import SCOPES_DEFAULT +from app.db.migrations.runner import run_migrations +from app.executor import RunManager +from app.main import app +from tests.conftest import create_test_key + +os.environ.setdefault("RUN_CLEANUP_GRACE_SECONDS", "0") +os.environ.setdefault("OPENAI_API_KEY", "sk-test-dummy") + + +def _calc_schema(): + return { + "id": "calc", + "name": "Calc", + "version": 1, + "state": [ + {"key": "messages", "type": "list", "reducer": "append"}, + {"key": "result", "type": "string", "reducer": "replace"}, + ], + "nodes": [ + { + "id": "s", + "type": "start", + "label": "Start", + "position": {"x": 0, "y": 0}, + "config": {}, + }, + { + "id": "t", + "type": "tool", + "label": "Calc", + "position": {"x": 0, "y": 100}, + "config": { + "tool_name": "calculator", + "input_map": {"expression": "result"}, + "output_key": "result", + }, + }, + { + "id": "e", + "type": "end", + "label": "End", + "position": {"x": 0, "y": 200}, + "config": {}, + }, + ], + "edges": [ + {"id": "e1", "source": "s", "target": "t"}, + {"id": "e2", "source": "t", "target": "e"}, + ], + "metadata": { + "created_at": "2026-01-01", + "updated_at": "2026-01-01", + }, + } + + +def _pause_schema(): + return { + "id": "pause", + "name": "Pause", + "version": 1, + "state": [ + {"key": "messages", "type": "list", "reducer": "append"}, + {"key": "answer", "type": "string", "reducer": "replace"}, + ], + "nodes": [ + { + "id": "s", + "type": "start", + "label": "Start", + "position": {"x": 0, "y": 0}, + "config": {}, + }, + { + "id": "ask", + "type": "human_input", + "label": "Ask", + "position": {"x": 0, "y": 100}, + "config": { + "prompt": "Wait", + "input_key": "answer", + }, + }, + { + "id": "e", + "type": "end", + "label": "End", + "position": {"x": 0, "y": 200}, + "config": {}, + }, + ], + "edges": [ + {"id": "e1", "source": "s", "target": "ask"}, + {"id": "e2", "source": "ask", "target": "e"}, + ], + "metadata": { + "created_at": "2026-01-01", + "updated_at": "2026-01-01", + }, + } + + +async def _wait_for(client, headers, run_id, target, tries=50): + for _ in range(tries): + resp = await client.get(f"/v1/runs/{run_id}/status", headers=headers) + if resp.json()["status"] == target: + return resp.json()["status"] + await asyncio.sleep(0.1) + return resp.json()["status"] + + +async def main(): + print("Test 22: Run history after mixed executions") + print("-" * 50) + + db_path = "/tmp/test_22.db" + run_migrations(db_path) + db = await aiosqlite.connect(db_path) + db.row_factory = aiosqlite.Row + app.state.db = db + app.state.run_manager = RunManager() + _, raw_key = await create_test_key(db, scopes=SCOPES_DEFAULT, name="t22") + h = {"X-API-Key": raw_key} + + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + # Create two graphs + resp = await c.post( + "/v1/graphs", + headers=h, + json={"name": "Calc", "schema_json": _calc_schema()}, + ) + calc_gid = resp.json()["id"] + + resp = await c.post( + "/v1/graphs", + headers=h, + json={ + "name": "Pause", + "schema_json": _pause_schema(), + }, + ) + pause_gid = resp.json()["id"] + + # Run 1: success (valid calc) + resp = await c.post( + f"/v1/graphs/{calc_gid}/run", + headers=h, + json={"input": {"result": "2 + 3"}}, + ) + run1 = resp.json()["run_id"] + s1 = await _wait_for(c, h, run1, "completed") + print(f"\n Run 1 (calc 2+3): {s1}") + assert s1 == "completed" + + # Run 2: completed (another calc) + resp = await c.post( + f"/v1/graphs/{calc_gid}/run", + headers=h, + json={"input": {"result": "10 * 5"}}, + ) + run2 = resp.json()["run_id"] + s2 = await _wait_for(c, h, run2, "completed") + print(f" Run 2 (calc 10*5): {s2}") + assert s2 == "completed" + + # Run 3: pause then cancel → error + resp = await c.post( + f"/v1/graphs/{pause_gid}/run", + headers=h, + json={}, + ) + run3 = resp.json()["run_id"] + await _wait_for(c, h, run3, "paused") + await c.post(f"/v1/runs/{run3}/cancel", headers=h, json={}) + await asyncio.sleep(0.3) + resp = await c.get(f"/v1/runs/{run3}/status", headers=h) + s3 = resp.json()["status"] + print(f" Run 3 (cancelled): {s3}") + + # List all runs + resp = await c.get("/v1/runs", headers=h) + body = resp.json() + print(f"\n Total runs: {body['total']}") + assert body["total"] == 3 + + statuses = [item["status"] for item in body["items"]] + print(f" Statuses: {statuses}") + assert statuses.count("completed") == 2 + + # List runs for calc graph only + resp = await c.get(f"/v1/graphs/{calc_gid}/runs", headers=h) + body = resp.json() + print(f"\n Calc graph runs: {body['total']}") + assert body["total"] == 2 + + await app.state.run_manager.cancel_all() + await db.close() + os.unlink(db_path) + + print("\n PASS") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/packages/execution/tests/manual/test_23_pagination.py b/packages/execution/tests/manual/test_23_pagination.py new file mode 100644 index 0000000..0c0a12d --- /dev/null +++ b/packages/execution/tests/manual/test_23_pagination.py @@ -0,0 +1,112 @@ +"""Manual test 23: Pagination through run history. + +Creates many runs, pages through with offset/limit, +verifies no duplicates and correct totals. + +Usage: cd packages/execution && uv run python tests/manual/test_23_pagination.py +""" + +import asyncio +import os + +import aiosqlite +import httpx + +from app.auth import SCOPES_DEFAULT +from app.db import crud +from app.db.migrations.runner import run_migrations +from app.executor import RunManager +from app.main import app +from tests.conftest import create_test_key + +os.environ.setdefault("RUN_CLEANUP_GRACE_SECONDS", "0") +os.environ.setdefault("OPENAI_API_KEY", "sk-test-dummy") + + +async def main(): + print("Test 23: Pagination through run history") + print("-" * 50) + + db_path = "/tmp/test_23.db" + run_migrations(db_path) + db = await aiosqlite.connect(db_path) + db.row_factory = aiosqlite.Row + app.state.db = db + app.state.run_manager = RunManager() + key, raw_key = await create_test_key(db, scopes=SCOPES_DEFAULT, name="t23") + h = {"X-API-Key": raw_key} + + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.post( + "/v1/graphs", + headers=h, + json={"name": "G", "schema_json": {}}, + ) + gid = resp.json()["id"] + + # Insert 12 runs directly + for i in range(12): + await crud.create_run(db, gid, key.id, "completed", {"i": i}) + print(f"\n Created 12 runs for graph {gid[:8]}...") + + # Page 1: limit=5, offset=0 + resp = await c.get( + f"/v1/graphs/{gid}/runs?limit=5&offset=0", + headers=h, + ) + p1 = resp.json() + assert len(p1["items"]) == 5 + assert p1["total"] == 12 + assert p1["has_more"] is True + print( + f" Page 1: {len(p1['items'])} items, " + f"total={p1['total']}, has_more={p1['has_more']}" + ) + + # Page 2: limit=5, offset=5 + resp = await c.get( + f"/v1/graphs/{gid}/runs?limit=5&offset=5", + headers=h, + ) + p2 = resp.json() + assert len(p2["items"]) == 5 + assert p2["total"] == 12 + assert p2["has_more"] is True + print( + f" Page 2: {len(p2['items'])} items, " + f"total={p2['total']}, has_more={p2['has_more']}" + ) + + # Page 3: limit=5, offset=10 + resp = await c.get( + f"/v1/graphs/{gid}/runs?limit=5&offset=10", + headers=h, + ) + p3 = resp.json() + assert len(p3["items"]) == 2 + assert p3["total"] == 12 + assert p3["has_more"] is False + print( + f" Page 3: {len(p3['items'])} items, " + f"total={p3['total']}, has_more={p3['has_more']}" + ) + + # Verify no duplicates across pages + all_ids = ( + [i["id"] for i in p1["items"]] + + [i["id"] for i in p2["items"]] + + [i["id"] for i in p3["items"]] + ) + assert len(all_ids) == 12 + assert len(set(all_ids)) == 12, "Duplicate run IDs!" + print(" All 12 unique IDs collected across 3 pages") + + await db.close() + os.unlink(db_path) + + print("\n PASS") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/packages/execution/tests/manual/test_24_status_filter.py b/packages/execution/tests/manual/test_24_status_filter.py new file mode 100644 index 0000000..3cc21c8 --- /dev/null +++ b/packages/execution/tests/manual/test_24_status_filter.py @@ -0,0 +1,94 @@ +"""Manual test 24: Status filter accuracy. + +Creates runs with mixed statuses, filters each, +verifies totals match. + +Usage: cd packages/execution && uv run python tests/manual/test_24_status_filter.py +""" + +import asyncio +import os + +import aiosqlite +import httpx + +from app.auth import SCOPES_DEFAULT +from app.db import crud +from app.db.migrations.runner import run_migrations +from app.executor import RunManager +from app.main import app +from tests.conftest import create_test_key + +os.environ.setdefault("RUN_CLEANUP_GRACE_SECONDS", "0") +os.environ.setdefault("OPENAI_API_KEY", "sk-test-dummy") + + +async def main(): + print("Test 24: Status filter accuracy") + print("-" * 50) + + db_path = "/tmp/test_24.db" + run_migrations(db_path) + db = await aiosqlite.connect(db_path) + db.row_factory = aiosqlite.Row + app.state.db = db + app.state.run_manager = RunManager() + key, raw_key = await create_test_key(db, scopes=SCOPES_DEFAULT, name="t24") + h = {"X-API-Key": raw_key} + + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.post( + "/v1/graphs", + headers=h, + json={"name": "G", "schema_json": {}}, + ) + gid = resp.json()["id"] + + # Insert runs with known statuses + counts = { + "completed": 4, + "error": 2, + "running": 1, + } + for status, n in counts.items(): + for _ in range(n): + await crud.create_run(db, gid, key.id, status, {}) + total = sum(counts.values()) + print(f"\n Created {total} runs: {counts}") + + # No filter — all runs + resp = await c.get(f"/v1/graphs/{gid}/runs", headers=h) + assert resp.json()["total"] == total + print(f" No filter: {resp.json()['total']} (expected {total})") + + # Filter each status + for status, expected in counts.items(): + resp = await c.get( + f"/v1/graphs/{gid}/runs?status={status}", + headers=h, + ) + body = resp.json() + assert body["total"] == expected, ( + f"status={status}: expected {expected}, got {body['total']}" + ) + for item in body["items"]: + assert item["status"] == status + print(f" status={status}: {body['total']} (expected {expected})") + + # Filter for a status with zero runs + resp = await c.get( + f"/v1/graphs/{gid}/runs?status=paused", + headers=h, + ) + assert resp.json()["total"] == 0 + print(" status=paused: 0 (expected 0)") + + await db.close() + os.unlink(db_path) + + print("\n PASS") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/packages/execution/tests/manual/test_25_cancel_paused.py b/packages/execution/tests/manual/test_25_cancel_paused.py new file mode 100644 index 0000000..750a347 --- /dev/null +++ b/packages/execution/tests/manual/test_25_cancel_paused.py @@ -0,0 +1,160 @@ +"""Manual test 25: Cancel a paused run via API. + +Full flow: start → pause at human_input → cancel → verify error +in SSE buffer and DB status. + +Usage: cd packages/execution && uv run python tests/manual/test_25_cancel_paused.py +""" + +import asyncio +import os + +import aiosqlite +import httpx + +from app.auth import SCOPES_DEFAULT +from app.db.migrations.runner import run_migrations +from app.executor import RunManager +from app.main import app +from tests.conftest import create_test_key + +os.environ.setdefault("RUN_CLEANUP_GRACE_SECONDS", "0") +os.environ.setdefault("OPENAI_API_KEY", "sk-test-dummy") + + +def _pause_schema(): + return { + "id": "pause", + "name": "Pause", + "version": 1, + "state": [ + { + "key": "messages", + "type": "list", + "reducer": "append", + }, + { + "key": "answer", + "type": "string", + "reducer": "replace", + }, + ], + "nodes": [ + { + "id": "s", + "type": "start", + "label": "Start", + "position": {"x": 0, "y": 0}, + "config": {}, + }, + { + "id": "ask", + "type": "human_input", + "label": "Ask", + "position": {"x": 0, "y": 100}, + "config": { + "prompt": "Continue?", + "input_key": "answer", + }, + }, + { + "id": "e", + "type": "end", + "label": "End", + "position": {"x": 0, "y": 200}, + "config": {}, + }, + ], + "edges": [ + {"id": "e1", "source": "s", "target": "ask"}, + {"id": "e2", "source": "ask", "target": "e"}, + ], + "metadata": { + "created_at": "2026-01-01", + "updated_at": "2026-01-01", + }, + } + + +async def main(): + print("Test 25: Cancel a paused run via API") + print("-" * 50) + + db_path = "/tmp/test_25.db" + run_migrations(db_path) + db = await aiosqlite.connect(db_path) + db.row_factory = aiosqlite.Row + app.state.db = db + app.state.run_manager = RunManager() + _, raw_key = await create_test_key(db, scopes=SCOPES_DEFAULT, name="t25") + h = {"X-API-Key": raw_key} + + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + # Create graph and start run + resp = await c.post( + "/v1/graphs", + headers=h, + json={ + "name": "Pause", + "schema_json": _pause_schema(), + }, + ) + gid = resp.json()["id"] + + resp = await c.post( + f"/v1/graphs/{gid}/run", + headers=h, + json={}, + ) + run_id = resp.json()["run_id"] + print(f"\n Run started: {run_id}") + + # Wait for pause + for _ in range(50): + resp = await c.get(f"/v1/runs/{run_id}/status", headers=h) + if resp.json()["status"] == "paused": + break + await asyncio.sleep(0.1) + + status = resp.json() + assert status["status"] == "paused" + print(f" Status: {status['status']}") + print(f" Paused at: {status.get('node_id')}") + print(f" Prompt: {status.get('prompt')}") + + # Cancel + resp = await c.post( + f"/v1/runs/{run_id}/cancel", + headers=h, + json={}, + ) + assert resp.status_code == 202 + print(f"\n Cancel response: {resp.json()['detail']}") + + # Wait for the executor to process cancellation + await asyncio.sleep(0.5) + + # Check final status + resp = await c.get(f"/v1/runs/{run_id}/status", headers=h) + final = resp.json() + print(f" Final status: {final['status']}") + # Status should be "error" (cancelled runs become errors) + assert final["status"] == "error" + + # Verify in run history + resp = await c.get(f"/v1/graphs/{gid}/runs", headers=h) + runs = resp.json()["items"] + assert len(runs) == 1 + assert runs[0]["status"] == "error" + print(" Appears in history as error: yes") + + await app.state.run_manager.cancel_all() + await db.close() + os.unlink(db_path) + + print("\n PASS") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/packages/execution/tests/manual/test_26_cancel_stale.py b/packages/execution/tests/manual/test_26_cancel_stale.py new file mode 100644 index 0000000..dc5e8eb --- /dev/null +++ b/packages/execution/tests/manual/test_26_cancel_stale.py @@ -0,0 +1,87 @@ +"""Manual test 26: Cancel stale DB run. + +Inserts a "running" run directly in DB (simulating server restart), +cancels via API, verifies DB updated to error. + +Usage: cd packages/execution && uv run python tests/manual/test_26_cancel_stale.py +""" + +import asyncio +import os + +import aiosqlite +import httpx + +from app.auth import SCOPES_DEFAULT +from app.db import crud +from app.db.migrations.runner import run_migrations +from app.executor import RunManager +from app.main import app +from tests.conftest import create_test_key + +os.environ.setdefault("RUN_CLEANUP_GRACE_SECONDS", "0") +os.environ.setdefault("OPENAI_API_KEY", "sk-test-dummy") + + +async def main(): + print("Test 26: Cancel stale DB run") + print("-" * 50) + + db_path = "/tmp/test_26.db" + run_migrations(db_path) + db = await aiosqlite.connect(db_path) + db.row_factory = aiosqlite.Row + app.state.db = db + app.state.run_manager = RunManager() + key, raw_key = await create_test_key(db, scopes=SCOPES_DEFAULT, name="t26") + h = {"X-API-Key": raw_key} + + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.post( + "/v1/graphs", + headers=h, + json={"name": "G", "schema_json": {}}, + ) + gid = resp.json()["id"] + + # Insert run as "running" directly in DB (not in RunManager) + run = await crud.create_run(db, gid, key.id, "running", {}) + print(f"\n Stale run in DB: {run.id}") + print(f" DB status: {run.status}") + print(f" In RunManager: {app.state.run_manager.get_run(run.id) is not None}") + + # Cancel via API + resp = await c.post( + f"/v1/runs/{run.id}/cancel", + headers=h, + json={}, + ) + assert resp.status_code == 202 + print(f"\n Cancel response: {resp.status_code}") + + # Verify DB updated + updated = await crud.get_run(db, run.id) + assert updated.status == "error" + assert "server lost" in updated.error.lower() + print(f" DB status now: {updated.status}") + print(f" DB error: {updated.error}") + + # Also test cancelling an already-completed run → 409 + done = await crud.create_run(db, gid, key.id, "completed", {}) + resp = await c.post( + f"/v1/runs/{done.id}/cancel", + headers=h, + json={}, + ) + assert resp.status_code == 409 + print(f"\n Cancel completed run: {resp.status_code} (expected 409)") + + await db.close() + os.unlink(db_path) + + print("\n PASS") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/packages/execution/tests/manual/test_27_delete_completed.py b/packages/execution/tests/manual/test_27_delete_completed.py new file mode 100644 index 0000000..381c2d1 --- /dev/null +++ b/packages/execution/tests/manual/test_27_delete_completed.py @@ -0,0 +1,83 @@ +"""Manual test 27: Delete completed run, verify gone. + +Run a graph, complete it, delete, verify not in history. + +Usage: cd packages/execution && uv run python tests/manual/test_27_delete_completed.py +""" + +import asyncio +import os + +import aiosqlite +import httpx + +from app.auth import SCOPES_DEFAULT +from app.db import crud +from app.db.migrations.runner import run_migrations +from app.executor import RunManager +from app.main import app +from tests.conftest import create_test_key + +os.environ.setdefault("RUN_CLEANUP_GRACE_SECONDS", "0") +os.environ.setdefault("OPENAI_API_KEY", "sk-test-dummy") + + +async def main(): + print("Test 27: Delete completed run, verify gone") + print("-" * 50) + + db_path = "/tmp/test_27.db" + run_migrations(db_path) + db = await aiosqlite.connect(db_path) + db.row_factory = aiosqlite.Row + app.state.db = db + app.state.run_manager = RunManager() + key, raw_key = await create_test_key(db, scopes=SCOPES_DEFAULT, name="t27") + h = {"X-API-Key": raw_key} + + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.post( + "/v1/graphs", + headers=h, + json={"name": "G", "schema_json": {}}, + ) + gid = resp.json()["id"] + + # Insert a completed run + run = await crud.create_run(db, gid, key.id, "completed", {"x": 1}) + print(f"\n Completed run: {run.id}") + + # Verify it's in history + resp = await c.get(f"/v1/graphs/{gid}/runs", headers=h) + assert resp.json()["total"] == 1 + print(" In history: yes") + + # Delete it + resp = await c.delete(f"/v1/runs/{run.id}", headers=h) + assert resp.status_code == 204 + print(f" Delete: {resp.status_code}") + + # Verify gone from history + resp = await c.get(f"/v1/graphs/{gid}/runs", headers=h) + assert resp.json()["total"] == 0 + print(" In history after delete: no") + + # Verify status returns 404 + resp = await c.get(f"/v1/runs/{run.id}/status", headers=h) + assert resp.status_code == 404 + print(f" Status after delete: {resp.status_code}") + + # Delete non-existent → 404 + resp = await c.delete(f"/v1/runs/{run.id}", headers=h) + assert resp.status_code == 404 + print(f" Delete again: {resp.status_code} (idempotent)") + + await db.close() + os.unlink(db_path) + + print("\n PASS") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/packages/execution/tests/manual/test_28_delete_active_rejected.py b/packages/execution/tests/manual/test_28_delete_active_rejected.py new file mode 100644 index 0000000..c768b4b --- /dev/null +++ b/packages/execution/tests/manual/test_28_delete_active_rejected.py @@ -0,0 +1,153 @@ +"""Manual test 28: Delete active run rejected. + +Start a paused graph → try delete (409) → cancel → delete (204). + +Usage: + cd packages/execution + uv run python tests/manual/test_28_delete_active_rejected.py +""" + +import asyncio +import os + +import aiosqlite +import httpx + +from app.auth import SCOPES_DEFAULT +from app.db.migrations.runner import run_migrations +from app.executor import RunManager +from app.main import app +from tests.conftest import create_test_key + +os.environ.setdefault("RUN_CLEANUP_GRACE_SECONDS", "0") +os.environ.setdefault("OPENAI_API_KEY", "sk-test-dummy") + + +def _pause_schema(): + return { + "id": "pause", + "name": "Pause", + "version": 1, + "state": [ + { + "key": "messages", + "type": "list", + "reducer": "append", + }, + { + "key": "answer", + "type": "string", + "reducer": "replace", + }, + ], + "nodes": [ + { + "id": "s", + "type": "start", + "label": "Start", + "position": {"x": 0, "y": 0}, + "config": {}, + }, + { + "id": "ask", + "type": "human_input", + "label": "Ask", + "position": {"x": 0, "y": 100}, + "config": { + "prompt": "Wait", + "input_key": "answer", + }, + }, + { + "id": "e", + "type": "end", + "label": "End", + "position": {"x": 0, "y": 200}, + "config": {}, + }, + ], + "edges": [ + {"id": "e1", "source": "s", "target": "ask"}, + {"id": "e2", "source": "ask", "target": "e"}, + ], + "metadata": { + "created_at": "2026-01-01", + "updated_at": "2026-01-01", + }, + } + + +async def main(): + print("Test 28: Delete active run rejected") + print("-" * 50) + + db_path = "/tmp/test_28.db" + run_migrations(db_path) + db = await aiosqlite.connect(db_path) + db.row_factory = aiosqlite.Row + app.state.db = db + app.state.run_manager = RunManager() + _, raw_key = await create_test_key(db, scopes=SCOPES_DEFAULT, name="t28") + h = {"X-API-Key": raw_key} + + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + resp = await c.post( + "/v1/graphs", + headers=h, + json={ + "name": "Pause", + "schema_json": _pause_schema(), + }, + ) + gid = resp.json()["id"] + + # Start run → pauses at human_input + resp = await c.post(f"/v1/graphs/{gid}/run", headers=h, json={}) + run_id = resp.json()["run_id"] + print(f"\n Run started: {run_id}") + + for _ in range(50): + resp = await c.get(f"/v1/runs/{run_id}/status", headers=h) + if resp.json()["status"] == "paused": + break + await asyncio.sleep(0.1) + print(f" Status: {resp.json()['status']}") + + # Try delete → 409 + resp = await c.delete(f"/v1/runs/{run_id}", headers=h) + assert resp.status_code == 409 + print(f" Delete while paused: {resp.status_code}") + print(f" Detail: {resp.json()['detail']}") + + # Cancel first + resp = await c.post( + f"/v1/runs/{run_id}/cancel", + headers=h, + json={}, + ) + assert resp.status_code == 202 + print(f"\n Cancel: {resp.status_code}") + + # Wait for cancellation to process + await asyncio.sleep(0.5) + + # Now delete → 204 + resp = await c.delete(f"/v1/runs/{run_id}", headers=h) + assert resp.status_code == 204 + print(f" Delete after cancel: {resp.status_code}") + + # Verify gone + resp = await c.get(f"/v1/runs/{run_id}/status", headers=h) + assert resp.status_code == 404 + print(f" Status after delete: {resp.status_code}") + + await app.state.run_manager.cancel_all() + await db.close() + os.unlink(db_path) + + print("\n PASS") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/packages/execution/tests/manual/test_29_full_lifecycle.py b/packages/execution/tests/manual/test_29_full_lifecycle.py new file mode 100644 index 0000000..2bc94f6 --- /dev/null +++ b/packages/execution/tests/manual/test_29_full_lifecycle.py @@ -0,0 +1,253 @@ +"""Manual test 29: Full lifecycle — validate → run → history → cancel → delete. + +Single test hitting every Phase 4 route in a realistic sequence. + +Usage: cd packages/execution && uv run python tests/manual/test_29_full_lifecycle.py +""" + +import asyncio +import os + +import aiosqlite +import httpx + +from app.auth import SCOPES_DEFAULT +from app.db.migrations.runner import run_migrations +from app.executor import RunManager +from app.main import app +from tests.conftest import create_test_key + +os.environ.setdefault("RUN_CLEANUP_GRACE_SECONDS", "0") +os.environ.setdefault("OPENAI_API_KEY", "sk-test-dummy") + + +def _calc_schema(): + return { + "id": "calc", + "name": "Calc", + "version": 1, + "state": [ + { + "key": "messages", + "type": "list", + "reducer": "append", + }, + { + "key": "result", + "type": "string", + "reducer": "replace", + }, + ], + "nodes": [ + { + "id": "s", + "type": "start", + "label": "Start", + "position": {"x": 0, "y": 0}, + "config": {}, + }, + { + "id": "t", + "type": "tool", + "label": "Calc", + "position": {"x": 0, "y": 100}, + "config": { + "tool_name": "calculator", + "input_map": {"expression": "result"}, + "output_key": "result", + }, + }, + { + "id": "e", + "type": "end", + "label": "End", + "position": {"x": 0, "y": 200}, + "config": {}, + }, + ], + "edges": [ + {"id": "e1", "source": "s", "target": "t"}, + {"id": "e2", "source": "t", "target": "e"}, + ], + "metadata": { + "created_at": "2026-01-01", + "updated_at": "2026-01-01", + }, + } + + +def _pause_schema(): + return { + "id": "pause", + "name": "Pause", + "version": 1, + "state": [ + { + "key": "messages", + "type": "list", + "reducer": "append", + }, + { + "key": "answer", + "type": "string", + "reducer": "replace", + }, + ], + "nodes": [ + { + "id": "s", + "type": "start", + "label": "Start", + "position": {"x": 0, "y": 0}, + "config": {}, + }, + { + "id": "ask", + "type": "human_input", + "label": "Ask", + "position": {"x": 0, "y": 100}, + "config": { + "prompt": "Wait", + "input_key": "answer", + }, + }, + { + "id": "e", + "type": "end", + "label": "End", + "position": {"x": 0, "y": 200}, + "config": {}, + }, + ], + "edges": [ + {"id": "e1", "source": "s", "target": "ask"}, + {"id": "e2", "source": "ask", "target": "e"}, + ], + "metadata": { + "created_at": "2026-01-01", + "updated_at": "2026-01-01", + }, + } + + +async def main(): + print("Test 29: Full lifecycle") + print("-" * 50) + + db_path = "/tmp/test_29.db" + run_migrations(db_path) + db = await aiosqlite.connect(db_path) + db.row_factory = aiosqlite.Row + app.state.db = db + app.state.run_manager = RunManager() + _, raw_key = await create_test_key(db, scopes=SCOPES_DEFAULT, name="t29") + h = {"X-API-Key": raw_key} + + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: + # Step 1: Create graph + resp = await c.post( + "/v1/graphs", + headers=h, + json={ + "name": "Lifecycle", + "schema_json": _calc_schema(), + }, + ) + gid = resp.json()["id"] + print(f"\n 1. Graph created: {gid[:8]}...") + + # Step 2: Validate + resp = await c.post( + f"/v1/graphs/{gid}/validate", + headers=h, + json={}, + ) + assert resp.status_code == 200 + assert resp.json()["valid"] is True + print(" 2. Validate: VALID") + + # Step 3: Export (501) + resp = await c.get(f"/v1/graphs/{gid}/export", headers=h) + assert resp.status_code == 501 + print(" 3. Export: 501 (stub)") + + # Step 4: Run and complete + resp = await c.post( + f"/v1/graphs/{gid}/run", + headers=h, + json={"input": {"result": "7 * 8"}}, + ) + run1 = resp.json()["run_id"] + for _ in range(50): + resp = await c.get(f"/v1/runs/{run1}/status", headers=h) + if resp.json()["status"] == "completed": + break + await asyncio.sleep(0.1) + assert resp.json()["status"] == "completed" + print(f" 4. Run completed: {run1[:8]}...") + + # Step 5: Check history (1 run) + resp = await c.get(f"/v1/graphs/{gid}/runs", headers=h) + assert resp.json()["total"] == 1 + print(f" 5. History: {resp.json()['total']} run(s)") + + # Step 6: Global list + resp = await c.get("/v1/runs", headers=h) + assert resp.json()["total"] >= 1 + print(f" 6. Global runs: {resp.json()['total']}") + + # Step 7: Create pause graph, run, cancel + resp = await c.post( + "/v1/graphs", + headers=h, + json={ + "name": "PauseG", + "schema_json": _pause_schema(), + }, + ) + pgid = resp.json()["id"] + resp = await c.post( + f"/v1/graphs/{pgid}/run", + headers=h, + json={}, + ) + run2 = resp.json()["run_id"] + for _ in range(50): + resp = await c.get(f"/v1/runs/{run2}/status", headers=h) + if resp.json()["status"] == "paused": + break + await asyncio.sleep(0.1) + resp = await c.post( + f"/v1/runs/{run2}/cancel", + headers=h, + json={}, + ) + assert resp.status_code == 202 + print(" 7. Cancel paused run: 202") + await asyncio.sleep(0.5) + + # Step 8: Delete completed run + resp = await c.delete(f"/v1/runs/{run1}", headers=h) + assert resp.status_code == 204 + print(" 8. Delete completed run: 204") + + # Step 9: Delete cancelled run + resp = await c.delete(f"/v1/runs/{run2}", headers=h) + assert resp.status_code == 204 + print(" 9. Delete cancelled run: 204") + + # Step 10: Verify history is clean + resp = await c.get("/v1/runs", headers=h) + assert resp.json()["total"] == 0 + print(f" 10. Global runs after cleanup: {resp.json()['total']}") + + await app.state.run_manager.cancel_all() + await db.close() + os.unlink(db_path) + + print("\n PASS") + + +if __name__ == "__main__": + asyncio.run(main())