From 0b7174d5802402116b219d4ce33be61e37e1e68d Mon Sep 17 00:00:00 2001 From: prosdev Date: Sat, 14 Mar 2026 14:13:27 -0700 Subject: [PATCH 1/2] fix: address code review findings for Phase 4 - Add ownership check on delete endpoint's RunManager path (prevents cross-tenant run state leak) - Fix stream/status DB fallback to use _owner_filter(auth) for admin bypass consistency - Consolidate _run_list_item into shared run_to_list_item helper in schemas/runs.py (was duplicated in graphs.py and runs.py) - Merge duplicate validate exception blocks - Fix non-deterministic test_delete_active_run_rejected to use DB-inserted run instead of racing against fast execution Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/execution/app/routes/graphs.py | 31 ++--------------- packages/execution/app/routes/runs.py | 34 +++++++------------ packages/execution/app/schemas/__init__.py | 2 ++ packages/execution/app/schemas/runs.py | 13 +++++++ .../tests/unit/test_routes_phase4.py | 19 ++++------- 5 files changed, 37 insertions(+), 62 deletions(-) diff --git a/packages/execution/app/routes/graphs.py b/packages/execution/app/routes/graphs.py index ba04779..b46d86b 100644 --- a/packages/execution/app/routes/graphs.py +++ b/packages/execution/app/routes/graphs.py @@ -22,7 +22,7 @@ ValidateResponse, ) from app.schemas.pagination import PaginatedResponse -from app.schemas.runs import RunListItem, StartRunRequest, StartRunResponse +from app.schemas.runs import StartRunRequest, StartRunResponse, run_to_list_item logger = logging.getLogger(__name__) @@ -181,21 +181,6 @@ async def validate_graph( try: validate_schema(graph.schema_json) - except GraphBuildError as exc: - return JSONResponse( - status_code=422, - content=ValidateResponse( - valid=False, - errors=[ - SchemaValidationError( - message=str(exc), - node_ref=getattr(exc, "node_ref", None), - ) - ], - ).model_dump(), - ) - - try: mock = FakeListChatModel(responses=[""]) build_graph(graph.schema_json, llm_override=mock) except GraphBuildError as exc: @@ -251,18 +236,6 @@ async def export_graph( _RUN_STATUS = Literal["running", "paused", "completed", "error"] -def _run_list_item(run) -> dict: - return RunListItem( - id=run.id, - graph_id=run.graph_id, - status=run.status, - input=run.input, - duration_ms=run.duration_ms, - created_at=run.created_at, - error=run.error, - ).model_dump() - - @router.get( "/{graph_id}/runs", response_model=PaginatedResponse, @@ -291,7 +264,7 @@ async def list_runs_for_graph( offset=offset, ) return PaginatedResponse( - items=[_run_list_item(r) for r in runs], + items=[run_to_list_item(r) for r in runs], total=total, limit=limit, offset=offset, diff --git a/packages/execution/app/routes/runs.py b/packages/execution/app/routes/runs.py index ac2e9b9..8acd946 100644 --- a/packages/execution/app/routes/runs.py +++ b/packages/execution/app/routes/runs.py @@ -14,7 +14,7 @@ from app.db.connection import get_db from app.executor import RunManager, format_sse, stream_run_sse from app.schemas.pagination import PaginatedResponse -from app.schemas.runs import ResumeRunRequest, RunListItem, RunStatusResponse +from app.schemas.runs import ResumeRunRequest, RunStatusResponse, run_to_list_item logger = logging.getLogger(__name__) @@ -71,7 +71,7 @@ async def stream_run( ) # Not in RunManager — check DB - run = await crud.get_run(db, run_id, owner_id=auth.owner_id) + run = await crud.get_run(db, run_id, owner_id=_owner_filter(auth)) if run is None: raise HTTPException(status_code=404, detail="Run not found") @@ -190,7 +190,7 @@ async def run_status( ) # Fall back to DB - run = await crud.get_run(db, run_id, owner_id=auth.owner_id) + run = await crud.get_run(db, run_id, owner_id=_owner_filter(auth)) if run is None: raise HTTPException(status_code=404, detail="Run not found") @@ -210,21 +210,10 @@ async def run_status( def _owner_filter(auth: AuthContext) -> str | None: + """Return owner_id for DB filtering, or None for admin (sees all).""" return None if auth.is_admin else auth.owner_id -def _run_list_item(run) -> dict: - return RunListItem( - id=run.id, - graph_id=run.graph_id, - status=run.status, - input=run.input, - duration_ms=run.duration_ms, - created_at=run.created_at, - error=run.error, - ).model_dump() - - @router.get( "", response_model=PaginatedResponse, @@ -248,7 +237,7 @@ async def list_all_runs( offset=offset, ) return PaginatedResponse( - items=[_run_list_item(r) for r in runs], + items=[run_to_list_item(r) for r in runs], total=total, limit=limit, offset=offset, @@ -316,11 +305,14 @@ async def delete_run( """Delete a completed or error run from the database.""" run_manager = _get_run_manager(request) ctx = run_manager.get_run(run_id) - if ctx is not None and ctx.status in ("running", "paused"): - raise HTTPException( - status_code=409, - detail="Cannot delete an active run. Cancel it first.", - ) + if ctx is not None: + if ctx.owner_id != auth.owner_id and not auth.is_admin: + raise HTTPException(status_code=404, detail="Run not found") + if ctx.status in ("running", "paused"): + raise HTTPException( + status_code=409, + detail="Cannot delete an active run. Cancel it first.", + ) run = await crud.get_run(db, run_id, owner_id=_owner_filter(auth)) if run is None: diff --git a/packages/execution/app/schemas/__init__.py b/packages/execution/app/schemas/__init__.py index 76e3773..c3151bc 100644 --- a/packages/execution/app/schemas/__init__.py +++ b/packages/execution/app/schemas/__init__.py @@ -6,6 +6,7 @@ RunStatusResponse, StartRunRequest, StartRunResponse, + run_to_list_item, ) __all__ = [ @@ -14,4 +15,5 @@ "RunStatusResponse", "StartRunRequest", "StartRunResponse", + "run_to_list_item", ] diff --git a/packages/execution/app/schemas/runs.py b/packages/execution/app/schemas/runs.py index 4204e1c..6a82a1c 100644 --- a/packages/execution/app/schemas/runs.py +++ b/packages/execution/app/schemas/runs.py @@ -40,6 +40,19 @@ class RunListItem(BaseModel): error: str | None = None +def run_to_list_item(run) -> dict: + """Convert a Run model to a RunListItem dict for list responses.""" + return RunListItem( + id=run.id, + graph_id=run.graph_id, + status=run.status, + input=run.input, + duration_ms=run.duration_ms, + created_at=run.created_at, + error=run.error, + ).model_dump() + + class ResumeRunRequest(BaseModel): input: bool | str | dict | list | int | float = Field( ..., diff --git a/packages/execution/tests/unit/test_routes_phase4.py b/packages/execution/tests/unit/test_routes_phase4.py index 563b076..f2dd8df 100644 --- a/packages/execution/tests/unit/test_routes_phase4.py +++ b/packages/execution/tests/unit/test_routes_phase4.py @@ -397,17 +397,12 @@ async def test_delete_completed_run(client, api_key): async def test_delete_active_run_rejected(client, api_key): - _, raw = api_key + key, raw = api_key + db = app.state.db gid = await _create_graph(client, raw) - resp = await client.post( - f"/v1/graphs/{gid}/run", - headers=_headers(raw), - json={"input": {"result": "1 + 1"}}, - ) - run_id = resp.json()["run_id"] + # Insert a run with "running" status directly in DB + run = await _insert_run(db, gid, key.id, "running") - # Try deleting immediately (still running) - resp = await client.delete(f"/v1/runs/{run_id}", headers=_headers(raw)) - # Could be 409 (still in RunManager) or 204 (already completed) - # The run is very fast, so check both - assert resp.status_code in (204, 409) + resp = await client.delete(f"/v1/runs/{run.id}", headers=_headers(raw)) + assert resp.status_code == 409 + assert "cancel it first" in resp.json()["detail"].lower() From 5da6d8217f1709ba4f0ba03a9006ecf4b743c2fc Mon Sep 17 00:00:00 2001 From: prosdev Date: Sat, 14 Mar 2026 14:16:55 -0700 Subject: [PATCH 2/2] fix: consolidate owner_filter and add review-before-PR rule - Move _owner_filter to shared owner_filter() in auth/deps.py, remove duplicates from graphs.py and runs.py - Add test_delete_live_run_in_manager_rejected verifying ownership check on RunManager path (different owner gets 404, not 409) - Add code-review-before-PR rule to CLAUDE.md non-negotiables Co-Authored-By: Claude Opus 4.6 (1M context) --- CLAUDE.md | 4 +++ packages/execution/app/auth/deps.py | 5 ++++ packages/execution/app/routes/graphs.py | 25 +++++++---------- packages/execution/app/routes/runs.py | 19 +++++-------- .../tests/unit/test_routes_phase4.py | 27 +++++++++++++++++++ 5 files changed, 53 insertions(+), 27 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 4db567a..6c00576 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -54,6 +54,10 @@ Biome handles formatting and linting. No ESLint. - Every tool response includes `{ success, recoverable }`. No silent failures. - Migrations run on server startup inside transactions. Server refuses to start if a migration fails. +- **Code review before PR.** Always run the `code-reviewer` agent (which + launches security-reviewer, logic-reviewer, and quality-reviewer in + parallel) on the branch diff before creating a pull request. Address + any CRITICAL or WARNING findings before merging. --- diff --git a/packages/execution/app/auth/deps.py b/packages/execution/app/auth/deps.py index 4b112a5..48d4531 100644 --- a/packages/execution/app/auth/deps.py +++ b/packages/execution/app/auth/deps.py @@ -63,3 +63,8 @@ async def _check(auth: AuthContext = Depends(require_auth)) -> AuthContext: require_admin = require_scope("admin") + + +def owner_filter(auth: AuthContext) -> str | None: + """Return owner_id for CRUD filtering, or None for admin (sees all).""" + return None if auth.is_admin else auth.owner_id diff --git a/packages/execution/app/routes/graphs.py b/packages/execution/app/routes/graphs.py index b46d86b..24b914c 100644 --- a/packages/execution/app/routes/graphs.py +++ b/packages/execution/app/routes/graphs.py @@ -10,7 +10,7 @@ from langchain_core.language_models import FakeListChatModel from langgraph.checkpoint.memory import InMemorySaver -from app.auth.deps import AuthContext, require_scope +from app.auth.deps import AuthContext, owner_filter, require_scope from app.builder import GraphBuildError, build_graph, validate_schema from app.db import crud from app.db.connection import get_db @@ -29,11 +29,6 @@ router = APIRouter(prefix="/v1/graphs", tags=["Graphs"]) -def _owner_filter(auth: AuthContext) -> str | None: - """Return owner_id for CRUD filtering, or None for admin (sees all).""" - return None if auth.is_admin else auth.owner_id - - def _graph_response(graph) -> GraphResponse: return GraphResponse( id=graph.id, @@ -80,7 +75,7 @@ async def list_graphs( """ graphs, total = await crud.list_graphs( db, - owner_id=_owner_filter(auth), + owner_id=owner_filter(auth), limit=limit, offset=offset, ) @@ -108,7 +103,7 @@ async def get_graph( Returns 404 if the graph doesn't exist or belongs to another key. """ - graph = await crud.get_graph(db, graph_id, owner_id=_owner_filter(auth)) + graph = await crud.get_graph(db, graph_id, owner_id=owner_filter(auth)) if graph is None: raise HTTPException(status_code=404, detail="Graph not found") return _graph_response(graph) @@ -132,7 +127,7 @@ async def update_graph( graph_id, body.name, body.schema_json, - owner_id=_owner_filter(auth), + owner_id=owner_filter(auth), ) if graph is None: raise HTTPException(status_code=404, detail="Graph not found") @@ -151,7 +146,7 @@ async def delete_graph( db=Depends(get_db), ) -> Response: """Delete a graph by ID. Returns 204 on success with no body.""" - deleted = await crud.delete_graph(db, graph_id, owner_id=_owner_filter(auth)) + deleted = await crud.delete_graph(db, graph_id, owner_id=owner_filter(auth)) if not deleted: raise HTTPException(status_code=404, detail="Graph not found") return Response(status_code=204) @@ -175,7 +170,7 @@ async def validate_graph( db=Depends(get_db), ) -> ValidateResponse | JSONResponse: """Validate a graph's schema without executing it.""" - graph = await crud.get_graph(db, graph_id, owner_id=_owner_filter(auth)) + graph = await crud.get_graph(db, graph_id, owner_id=owner_filter(auth)) if graph is None: raise HTTPException(status_code=404, detail="Graph not found") @@ -222,7 +217,7 @@ async def export_graph( db=Depends(get_db), ) -> None: """Export graph as standalone Python code (not yet implemented).""" - graph = await crud.get_graph(db, graph_id, owner_id=_owner_filter(auth)) + graph = await crud.get_graph(db, graph_id, owner_id=owner_filter(auth)) if graph is None: raise HTTPException(status_code=404, detail="Graph not found") raise HTTPException( @@ -251,14 +246,14 @@ async def list_runs_for_graph( db=Depends(get_db), ) -> PaginatedResponse: """List paginated run history for a specific graph.""" - graph = await crud.get_graph(db, graph_id, owner_id=_owner_filter(auth)) + graph = await crud.get_graph(db, graph_id, owner_id=owner_filter(auth)) if graph is None: raise HTTPException(status_code=404, detail="Graph not found") runs, total = await crud.list_runs_by_graph( db, graph_id, - owner_id=_owner_filter(auth), + owner_id=owner_filter(auth), status=status, limit=limit, offset=offset, @@ -298,7 +293,7 @@ async def start_run( db=Depends(get_db), ) -> StartRunResponse: """Start a new graph execution run.""" - graph = await crud.get_graph(db, graph_id, owner_id=_owner_filter(auth)) + graph = await crud.get_graph(db, graph_id, owner_id=owner_filter(auth)) if graph is None: raise HTTPException(status_code=404, detail="Graph not found") diff --git a/packages/execution/app/routes/runs.py b/packages/execution/app/routes/runs.py index 8acd946..fb603eb 100644 --- a/packages/execution/app/routes/runs.py +++ b/packages/execution/app/routes/runs.py @@ -9,7 +9,7 @@ from fastapi import APIRouter, Depends, Header, HTTPException, Query, Request from fastapi.responses import Response, StreamingResponse -from app.auth.deps import AuthContext, require_scope +from app.auth.deps import AuthContext, owner_filter, require_scope from app.db import crud from app.db.connection import get_db from app.executor import RunManager, format_sse, stream_run_sse @@ -71,7 +71,7 @@ async def stream_run( ) # Not in RunManager — check DB - run = await crud.get_run(db, run_id, owner_id=_owner_filter(auth)) + run = await crud.get_run(db, run_id, owner_id=owner_filter(auth)) if run is None: raise HTTPException(status_code=404, detail="Run not found") @@ -190,7 +190,7 @@ async def run_status( ) # Fall back to DB - run = await crud.get_run(db, run_id, owner_id=_owner_filter(auth)) + run = await crud.get_run(db, run_id, owner_id=owner_filter(auth)) if run is None: raise HTTPException(status_code=404, detail="Run not found") @@ -209,11 +209,6 @@ async def run_status( # ── List / Cancel / Delete ──────────────────────────────────────────── -def _owner_filter(auth: AuthContext) -> str | None: - """Return owner_id for DB filtering, or None for admin (sees all).""" - return None if auth.is_admin else auth.owner_id - - @router.get( "", response_model=PaginatedResponse, @@ -230,7 +225,7 @@ async def list_all_runs( """List paginated run history across all graphs.""" runs, total = await crud.list_runs( db, - owner_id=_owner_filter(auth), + owner_id=owner_filter(auth), graph_id=graph_id, status=status, limit=limit, @@ -273,7 +268,7 @@ async def cancel_run( raise HTTPException(status_code=409, detail="Run is not cancellable") # DB fallback - run = await crud.get_run(db, run_id, owner_id=_owner_filter(auth)) + run = await crud.get_run(db, run_id, owner_id=owner_filter(auth)) if run is None: raise HTTPException(status_code=404, detail="Run not found") if run.status in ("running", "paused"): @@ -314,7 +309,7 @@ async def delete_run( detail="Cannot delete an active run. Cancel it first.", ) - run = await crud.get_run(db, run_id, owner_id=_owner_filter(auth)) + run = await crud.get_run(db, run_id, owner_id=owner_filter(auth)) if run is None: raise HTTPException(status_code=404, detail="Run not found") if run.status in ("running", "paused"): @@ -322,5 +317,5 @@ async def delete_run( status_code=409, detail="Cannot delete an active run. Cancel it first.", ) - await crud.delete_run(db, run_id, owner_id=_owner_filter(auth)) + await crud.delete_run(db, run_id, owner_id=owner_filter(auth)) return Response(status_code=204) diff --git a/packages/execution/tests/unit/test_routes_phase4.py b/packages/execution/tests/unit/test_routes_phase4.py index f2dd8df..8df3738 100644 --- a/packages/execution/tests/unit/test_routes_phase4.py +++ b/packages/execution/tests/unit/test_routes_phase4.py @@ -406,3 +406,30 @@ async def test_delete_active_run_rejected(client, api_key): resp = await client.delete(f"/v1/runs/{run.id}", headers=_headers(raw)) assert resp.status_code == 409 assert "cancel it first" in resp.json()["detail"].lower() + + +async def test_delete_live_run_in_manager_rejected(client, api_key): + """Delete a run that's live in RunManager — tests ownership check.""" + _, raw = api_key + gid = await _create_pause_graph(client, raw) + resp = await client.post(f"/v1/graphs/{gid}/run", headers=_headers(raw), json={}) + run_id = resp.json()["run_id"] + + # Wait for pause (run is live in RunManager) + for _ in range(50): + resp = await client.get( + f"/v1/runs/{run_id}/status", + headers=_headers(raw), + ) + if resp.json()["status"] == "paused": + break + await asyncio.sleep(0.1) + + # Owner can see 409 (active run) + resp = await client.delete(f"/v1/runs/{run_id}", headers=_headers(raw)) + assert resp.status_code == 409 + + # Different owner gets 404 (not 409 — no info leak) + _, raw_b = await create_test_key(app.state.db, name="other") + resp = await client.delete(f"/v1/runs/{run_id}", headers=_headers(raw_b)) + assert resp.status_code == 404