Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ Biome handles formatting and linting. No ESLint.
- Every tool response includes `{ success, recoverable }`. No silent failures.
- Migrations run on server startup inside transactions. Server refuses to
start if a migration fails.
- **Code review before PR.** Always run the `code-reviewer` agent (which
launches security-reviewer, logic-reviewer, and quality-reviewer in
parallel) on the branch diff before creating a pull request. Address
any CRITICAL or WARNING findings before merging.

---

Expand Down
5 changes: 5 additions & 0 deletions packages/execution/app/auth/deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,8 @@ async def _check(auth: AuthContext = Depends(require_auth)) -> AuthContext:


require_admin = require_scope("admin")


def owner_filter(auth: AuthContext) -> str | None:
"""Return owner_id for CRUD filtering, or None for admin (sees all)."""
return None if auth.is_admin else auth.owner_id
56 changes: 12 additions & 44 deletions packages/execution/app/routes/graphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from langchain_core.language_models import FakeListChatModel
from langgraph.checkpoint.memory import InMemorySaver

from app.auth.deps import AuthContext, require_scope
from app.auth.deps import AuthContext, owner_filter, require_scope
from app.builder import GraphBuildError, build_graph, validate_schema
from app.db import crud
from app.db.connection import get_db
Expand All @@ -22,18 +22,13 @@
ValidateResponse,
)
from app.schemas.pagination import PaginatedResponse
from app.schemas.runs import RunListItem, StartRunRequest, StartRunResponse
from app.schemas.runs import StartRunRequest, StartRunResponse, run_to_list_item

logger = logging.getLogger(__name__)

router = APIRouter(prefix="/v1/graphs", tags=["Graphs"])


def _owner_filter(auth: AuthContext) -> str | None:
"""Return owner_id for CRUD filtering, or None for admin (sees all)."""
return None if auth.is_admin else auth.owner_id


def _graph_response(graph) -> GraphResponse:
return GraphResponse(
id=graph.id,
Expand Down Expand Up @@ -80,7 +75,7 @@ async def list_graphs(
"""
graphs, total = await crud.list_graphs(
db,
owner_id=_owner_filter(auth),
owner_id=owner_filter(auth),
limit=limit,
offset=offset,
)
Expand Down Expand Up @@ -108,7 +103,7 @@ async def get_graph(

Returns 404 if the graph doesn't exist or belongs to another key.
"""
graph = await crud.get_graph(db, graph_id, owner_id=_owner_filter(auth))
graph = await crud.get_graph(db, graph_id, owner_id=owner_filter(auth))
if graph is None:
raise HTTPException(status_code=404, detail="Graph not found")
return _graph_response(graph)
Expand All @@ -132,7 +127,7 @@ async def update_graph(
graph_id,
body.name,
body.schema_json,
owner_id=_owner_filter(auth),
owner_id=owner_filter(auth),
)
if graph is None:
raise HTTPException(status_code=404, detail="Graph not found")
Expand All @@ -151,7 +146,7 @@ async def delete_graph(
db=Depends(get_db),
) -> Response:
"""Delete a graph by ID. Returns 204 on success with no body."""
deleted = await crud.delete_graph(db, graph_id, owner_id=_owner_filter(auth))
deleted = await crud.delete_graph(db, graph_id, owner_id=owner_filter(auth))
if not deleted:
raise HTTPException(status_code=404, detail="Graph not found")
return Response(status_code=204)
Expand All @@ -175,27 +170,12 @@ async def validate_graph(
db=Depends(get_db),
) -> ValidateResponse | JSONResponse:
"""Validate a graph's schema without executing it."""
graph = await crud.get_graph(db, graph_id, owner_id=_owner_filter(auth))
graph = await crud.get_graph(db, graph_id, owner_id=owner_filter(auth))
if graph is None:
raise HTTPException(status_code=404, detail="Graph not found")

try:
validate_schema(graph.schema_json)
except GraphBuildError as exc:
return JSONResponse(
status_code=422,
content=ValidateResponse(
valid=False,
errors=[
SchemaValidationError(
message=str(exc),
node_ref=getattr(exc, "node_ref", None),
)
],
).model_dump(),
)

try:
mock = FakeListChatModel(responses=[""])
build_graph(graph.schema_json, llm_override=mock)
except GraphBuildError as exc:
Expand Down Expand Up @@ -237,7 +217,7 @@ async def export_graph(
db=Depends(get_db),
) -> None:
"""Export graph as standalone Python code (not yet implemented)."""
graph = await crud.get_graph(db, graph_id, owner_id=_owner_filter(auth))
graph = await crud.get_graph(db, graph_id, owner_id=owner_filter(auth))
if graph is None:
raise HTTPException(status_code=404, detail="Graph not found")
raise HTTPException(
Expand All @@ -251,18 +231,6 @@ async def export_graph(
_RUN_STATUS = Literal["running", "paused", "completed", "error"]


def _run_list_item(run) -> dict:
return RunListItem(
id=run.id,
graph_id=run.graph_id,
status=run.status,
input=run.input,
duration_ms=run.duration_ms,
created_at=run.created_at,
error=run.error,
).model_dump()


@router.get(
"/{graph_id}/runs",
response_model=PaginatedResponse,
Expand All @@ -278,20 +246,20 @@ async def list_runs_for_graph(
db=Depends(get_db),
) -> PaginatedResponse:
"""List paginated run history for a specific graph."""
graph = await crud.get_graph(db, graph_id, owner_id=_owner_filter(auth))
graph = await crud.get_graph(db, graph_id, owner_id=owner_filter(auth))
if graph is None:
raise HTTPException(status_code=404, detail="Graph not found")

runs, total = await crud.list_runs_by_graph(
db,
graph_id,
owner_id=_owner_filter(auth),
owner_id=owner_filter(auth),
status=status,
limit=limit,
offset=offset,
)
return PaginatedResponse(
items=[_run_list_item(r) for r in runs],
items=[run_to_list_item(r) for r in runs],
total=total,
limit=limit,
offset=offset,
Expand Down Expand Up @@ -325,7 +293,7 @@ async def start_run(
db=Depends(get_db),
) -> StartRunResponse:
"""Start a new graph execution run."""
graph = await crud.get_graph(db, graph_id, owner_id=_owner_filter(auth))
graph = await crud.get_graph(db, graph_id, owner_id=owner_filter(auth))
if graph is None:
raise HTTPException(status_code=404, detail="Graph not found")

Expand Down
47 changes: 17 additions & 30 deletions packages/execution/app/routes/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
from fastapi import APIRouter, Depends, Header, HTTPException, Query, Request
from fastapi.responses import Response, StreamingResponse

from app.auth.deps import AuthContext, require_scope
from app.auth.deps import AuthContext, owner_filter, require_scope
from app.db import crud
from app.db.connection import get_db
from app.executor import RunManager, format_sse, stream_run_sse
from app.schemas.pagination import PaginatedResponse
from app.schemas.runs import ResumeRunRequest, RunListItem, RunStatusResponse
from app.schemas.runs import ResumeRunRequest, RunStatusResponse, run_to_list_item

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -71,7 +71,7 @@ async def stream_run(
)

# Not in RunManager — check DB
run = await crud.get_run(db, run_id, owner_id=auth.owner_id)
run = await crud.get_run(db, run_id, owner_id=owner_filter(auth))
if run is None:
raise HTTPException(status_code=404, detail="Run not found")

Expand Down Expand Up @@ -190,7 +190,7 @@ async def run_status(
)

# Fall back to DB
run = await crud.get_run(db, run_id, owner_id=auth.owner_id)
run = await crud.get_run(db, run_id, owner_id=owner_filter(auth))
if run is None:
raise HTTPException(status_code=404, detail="Run not found")

Expand All @@ -209,22 +209,6 @@ async def run_status(
# ── List / Cancel / Delete ────────────────────────────────────────────


def _owner_filter(auth: AuthContext) -> str | None:
return None if auth.is_admin else auth.owner_id


def _run_list_item(run) -> dict:
return RunListItem(
id=run.id,
graph_id=run.graph_id,
status=run.status,
input=run.input,
duration_ms=run.duration_ms,
created_at=run.created_at,
error=run.error,
).model_dump()


@router.get(
"",
response_model=PaginatedResponse,
Expand All @@ -241,14 +225,14 @@ async def list_all_runs(
"""List paginated run history across all graphs."""
runs, total = await crud.list_runs(
db,
owner_id=_owner_filter(auth),
owner_id=owner_filter(auth),
graph_id=graph_id,
status=status,
limit=limit,
offset=offset,
)
return PaginatedResponse(
items=[_run_list_item(r) for r in runs],
items=[run_to_list_item(r) for r in runs],
total=total,
limit=limit,
offset=offset,
Expand Down Expand Up @@ -284,7 +268,7 @@ async def cancel_run(
raise HTTPException(status_code=409, detail="Run is not cancellable")

# DB fallback
run = await crud.get_run(db, run_id, owner_id=_owner_filter(auth))
run = await crud.get_run(db, run_id, owner_id=owner_filter(auth))
if run is None:
raise HTTPException(status_code=404, detail="Run not found")
if run.status in ("running", "paused"):
Expand Down Expand Up @@ -316,19 +300,22 @@ async def delete_run(
"""Delete a completed or error run from the database."""
run_manager = _get_run_manager(request)
ctx = run_manager.get_run(run_id)
if ctx is not None and ctx.status in ("running", "paused"):
raise HTTPException(
status_code=409,
detail="Cannot delete an active run. Cancel it first.",
)
if ctx is not None:
if ctx.owner_id != auth.owner_id and not auth.is_admin:
raise HTTPException(status_code=404, detail="Run not found")
if ctx.status in ("running", "paused"):
raise HTTPException(
status_code=409,
detail="Cannot delete an active run. Cancel it first.",
)

run = await crud.get_run(db, run_id, owner_id=_owner_filter(auth))
run = await crud.get_run(db, run_id, owner_id=owner_filter(auth))
if run is None:
raise HTTPException(status_code=404, detail="Run not found")
if run.status in ("running", "paused"):
raise HTTPException(
status_code=409,
detail="Cannot delete an active run. Cancel it first.",
)
await crud.delete_run(db, run_id, owner_id=_owner_filter(auth))
await crud.delete_run(db, run_id, owner_id=owner_filter(auth))
return Response(status_code=204)
2 changes: 2 additions & 0 deletions packages/execution/app/schemas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
RunStatusResponse,
StartRunRequest,
StartRunResponse,
run_to_list_item,
)

__all__ = [
Expand All @@ -14,4 +15,5 @@
"RunStatusResponse",
"StartRunRequest",
"StartRunResponse",
"run_to_list_item",
]
13 changes: 13 additions & 0 deletions packages/execution/app/schemas/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,19 @@ class RunListItem(BaseModel):
error: str | None = None


def run_to_list_item(run) -> dict:
"""Convert a Run model to a RunListItem dict for list responses."""
return RunListItem(
id=run.id,
graph_id=run.graph_id,
status=run.status,
input=run.input,
duration_ms=run.duration_ms,
created_at=run.created_at,
error=run.error,
).model_dump()


class ResumeRunRequest(BaseModel):
input: bool | str | dict | list | int | float = Field(
...,
Expand Down
42 changes: 32 additions & 10 deletions packages/execution/tests/unit/test_routes_phase4.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,17 +397,39 @@ async def test_delete_completed_run(client, api_key):


async def test_delete_active_run_rejected(client, api_key):
_, raw = api_key
key, raw = api_key
db = app.state.db
gid = await _create_graph(client, raw)
resp = await client.post(
f"/v1/graphs/{gid}/run",
headers=_headers(raw),
json={"input": {"result": "1 + 1"}},
)
# Insert a run with "running" status directly in DB
run = await _insert_run(db, gid, key.id, "running")

resp = await client.delete(f"/v1/runs/{run.id}", headers=_headers(raw))
assert resp.status_code == 409
assert "cancel it first" in resp.json()["detail"].lower()


async def test_delete_live_run_in_manager_rejected(client, api_key):
"""Delete a run that's live in RunManager — tests ownership check."""
_, raw = api_key
gid = await _create_pause_graph(client, raw)
resp = await client.post(f"/v1/graphs/{gid}/run", headers=_headers(raw), json={})
run_id = resp.json()["run_id"]

# Try deleting immediately (still running)
# Wait for pause (run is live in RunManager)
for _ in range(50):
resp = await client.get(
f"/v1/runs/{run_id}/status",
headers=_headers(raw),
)
if resp.json()["status"] == "paused":
break
await asyncio.sleep(0.1)

# Owner can see 409 (active run)
resp = await client.delete(f"/v1/runs/{run_id}", headers=_headers(raw))
# Could be 409 (still in RunManager) or 204 (already completed)
# The run is very fast, so check both
assert resp.status_code in (204, 409)
assert resp.status_code == 409

# Different owner gets 404 (not 409 — no info leak)
_, raw_b = await create_test_key(app.state.db, name="other")
resp = await client.delete(f"/v1/runs/{run_id}", headers=_headers(raw_b))
assert resp.status_code == 404
Loading