Conversation
|
Hiiii @marius-mather , just updated the workflow submission using workflows, app_users in db. I've also modified some tests related. the workflows entity was created manually via ECS (as this will be managed by admin), including BindCraft tool only at this moment. |
There was a problem hiding this comment.
Pull request overview
This pull request implements a database-driven workflow launch configuration system. The main changes include:
Changes:
- Refactored workflow launch to fetch pipeline URL and revision from a
workflowsdatabase table instead of requiring the frontend to provide them - Made the
datasetIdrequired for all workflow launches and removed the conditional dataset handling logic - Moved job management endpoints from
/api/workflows/jobsto/api/jobswith a dedicated router - Implemented auto-provisioning of users in the authentication flow with profile enrichment from Auth0 claims
- Changed the
work_dircolumn type fromBigIntegertoTextin the database model
Reviewed changes
Copilot reviewed 19 out of 20 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| app/routes/workflows.py | Refactored launch endpoint to query workflow config from DB, added /me/sync endpoint, persists WorkflowRun record |
| app/routes/workflow/jobs.py | Updated router paths to mount at root instead of /jobs prefix, added tags |
| app/routes/dependencies.py | Implemented auto-user provisioning with profile enrichment and race condition handling |
| app/auth/validator.py | Added verify_access_token_claims and fetch_userinfo_claims functions |
| app/services/bindflow_executor.py | Changed dataset_id from optional to required parameter, made pipeline/revision keyword-only args |
| app/schemas/workflows.py | Replaced pipeline field with tool field, made datasetId required |
| app/db/models/core.py | Changed work_dir from BigInteger to Text |
| app/main.py | Added workflow_jobs_router at /api/jobs prefix |
| tests/* | Updated all tests to reflect schema and API changes |
| docs/* | Updated schema diagrams and README with new endpoint paths |
Comments suppressed due to low confidence (9)
app/routes/dependencies.py:94
- There's a potential email uniqueness constraint violation during concurrent requests. If two users with the same email but different auth0_user_ids try to register simultaneously, one will fail with an IntegrityError on the email unique constraint, but the code only handles the IntegrityError for auth0_user_id conflicts. Consider handling email uniqueness violations separately or relaxing the email unique constraint.
db.add(user)
try:
db.commit()
except IntegrityError:
# Handle race where another request inserts the same auth0_user_id concurrently.
db.rollback()
existing = db.execute(
select(AppUser).where(AppUser.auth0_user_id == auth0_user_id)
).scalar_one_or_none()
if existing is None:
raise
user = existing
app/auth/validator.py:152
- The fetch_userinfo_claims function uses a synchronous httpx.get call which will block the async event loop. Since this is called from get_current_user_id which is a dependency used in async route handlers, this can cause performance issues under load. Consider using httpx.AsyncClient for the userinfo request or marking the function as async and using await.
def fetch_userinfo_claims(token: str) -> dict[str, Any]:
"""Fetch Auth0 /userinfo claims for the provided access token."""
settings = _get_auth0_settings()
userinfo_url = f"https://{settings.domain}/userinfo"
try:
response = httpx.get(
userinfo_url,
headers={"Authorization": f"Bearer {token}"},
timeout=10,
)
response.raise_for_status()
except httpx.HTTPError:
return {}
app/routes/workflows.py:125
- If the database commit fails after successfully launching the workflow in Seqera, there's no exception handling or rollback mechanism to clean up. While the HTTPException will be raised, the launched workflow will remain active in Seqera without a corresponding local record. Consider wrapping the database operations in a try-except block and adding appropriate logging or alerting when this inconsistent state occurs.
db.add(workflow_run)
db.commit()
app/routes/workflows.py:93
- The error messages for missing repo_url and default_revision return HTTP 500 (Internal Server Error), which suggests a server configuration issue. However, these are configuration errors that should be caught during deployment or seeding, not runtime. Consider returning HTTP 503 (Service Unavailable) instead to better indicate that the service cannot be used until properly configured, or handle this during application startup with validation checks.
if not workflow.repo_url:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Workflow '{workflow.name}' is missing repo_url in workflows table.",
)
if not workflow.default_revision:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Workflow '{workflow.name}' is missing default_revision in workflows table.",
)
tests/conftest.py:162
- The test fixture creates a single BindCraft workflow entry for all tests, but the workflow_id is generated with uuid4() and not reused. If tests need to reference the workflow by ID, they won't be able to access it. Consider either storing the workflow_id in a known location or making it a fixed UUID for predictability in tests.
setup_session.add(
Workflow(
id=uuid4(),
name="BindCraft",
description="Test workflow",
repo_url="https://github.com/test/repo",
default_revision="dev",
)
)
tests/test_routes_dependencies.py:84
- The test mock class _DB has a simplified implementation that doesn't fully simulate SQLAlchemy's execute behavior. When the IntegrityError is raised, the mock sets db.value to existing before raising the exception. However, in the actual get_current_user_id implementation, after rollback, it executes a new query. The mock's execute method will return the existing value because db.value was already set. This works but could be clearer - consider documenting this behavior or refactoring the test to be more explicit about the race condition simulation.
def test_get_current_user_id_race_conflict_fetches_existing(mocker: MockerFixture):
mocker.patch(
"app.routes.dependencies.verify_access_token_claims",
return_value={"sub": "auth0|x", "name": "Test User", "email": "test@example.com"},
)
mocker.patch("app.routes.dependencies.fetch_userinfo_claims", return_value={})
existing = SimpleNamespace(id="u-existing")
db = _DB(None)
def _raise_conflict():
db.value = existing
raise IntegrityError("insert", {}, Exception("conflict"))
db.commit = _raise_conflict
credentials = HTTPAuthorizationCredentials(scheme="Bearer", credentials="mock-token")
assert get_current_user_id(credentials, db) == "u-existing"
assert db.rolled_back is True
app/routes/dependencies.py:94
- The auto-creation of users in get_current_user_id could potentially be abused. Any valid Auth0 token (even from a different tenant if the issuer/audience checks are misconfigured) will result in a new user being created in the system. Consider adding additional validation or a whitelist mechanism to control which Auth0 users can be automatically provisioned, especially for production environments.
if not user:
user = AppUser(
id=uuid4(),
auth0_user_id=auth0_user_id,
name=name,
email=email,
)
db.add(user)
try:
db.commit()
except IntegrityError:
# Handle race where another request inserts the same auth0_user_id concurrently.
db.rollback()
existing = db.execute(
select(AppUser).where(AppUser.auth0_user_id == auth0_user_id)
).scalar_one_or_none()
if existing is None:
raise
user = existing
app/routes/workflows.py:44
- The new /me/sync endpoint is not covered by any tests. This endpoint is part of the user authentication flow and should have test coverage to ensure it works correctly and returns the expected response format.
@router.post("/me/sync")
async def sync_current_user(
current_user_id: UUID = Depends(get_current_user_id),
) -> dict[str, str]:
"""Ensure authenticated user exists in app_users and return user id."""
return {"message": "User synced", "userId": str(current_user_id)}
app/schemas/workflows.py:72
- The datasetId field in WorkflowLaunchPayload is required but doesn't validate that it's not empty or just whitespace. The validation is duplicated in the route handler with payload.datasetId.strip() check. Consider adding a field_validator to the schema to strip whitespace and validate non-empty, similar to how the tool field is validated, to centralize validation logic in the schema layer.
datasetId: str = Field(
...,
description="Seqera dataset ID to attach to the workflow",
)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…igration Co-authored-by: uwwint <25582125+uwwint@users.noreply.github.com>
Fix work_dir column type mismatch in initial migration
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 20 out of 21 changed files in this pull request and generated no new comments.
Comments suppressed due to low confidence (5)
app/routes/dependencies.py:21
- The USERINFO_CACHE dictionary can grow unbounded in memory as it stores entries keyed by the full token string. Since tokens are often long strings and there's no eviction mechanism beyond checking expiry on retrieval, this cache can consume significant memory over time. Even though expired entries won't be returned, they remain in the dictionary indefinitely until they happen to be accessed again.
Consider using a bounded cache with automatic eviction like TTLCache from cachetools (which is already a project dependency as seen in app/auth/validator.py) or implementing a periodic cleanup mechanism to remove expired entries.
USERINFO_CACHE: dict[str, tuple[float, dict[str, object]]] = {}
app/routes/workflows.py:139
- When a workflow launch fails after the database row is committed on line 113, the error handlers perform db.rollback() which only rolls back uncommitted changes. The workflow_run entry persists in the database with a placeholder seqera_run_id that equals the local UUID rather than the actual Seqera workflow ID. This creates orphaned database records that may confuse downstream queries expecting seqera_run_id to reference valid Seqera workflows.
Consider either: (1) deleting the row on launch failure, (2) adding a status column to mark failed launches, or (3) documenting that the presence of a row with seqera_run_id == id indicates a failed launch attempt.
db.add(workflow_run)
db.commit()
try:
# Use workflow config from DB (repo_url/default_revision) and selected dataset.
result: BindflowLaunchResult = await launch_bindflow_workflow(
payload.launch,
dataset_id,
pipeline=workflow.repo_url,
revision=resolved_revision,
output_id=str(run_id),
)
workflow_run.seqera_run_id = result.workflow_id
db.commit()
except BindflowConfigurationError as exc:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)
) from exc
except BindflowExecutorError as exc:
db.rollback()
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc)) from exc
except Exception as exc:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to update local workflow run after launch.",
) from exc
app/routes/dependencies.py:44
- The USERINFO_CACHE is a module-level dictionary that is shared across all requests and workers. In a multi-threaded or multi-process deployment, this creates potential concurrency issues. Multiple threads could simultaneously check the cache on line 33, both find it missing or expired, and both call fetch_userinfo_claims, then both attempt to write to the cache on line 43. While this won't cause crashes, it results in redundant API calls.
More critically, the dictionary itself is not thread-safe for concurrent writes in Python when multiple threads modify it simultaneously. Consider using threading.Lock for synchronization or using a thread-safe cache implementation like TTLCache with a lock.
def _fetch_userinfo_claims_cached(token: str, claims: dict[str, object]) -> dict[str, object]:
now = time()
cached = USERINFO_CACHE.get(token)
if cached and cached[0] > now:
return cached[1]
userinfo = fetch_userinfo_claims(token)
if not userinfo:
return {}
expiry_epoch = _get_token_expiry_epoch(claims)
if expiry_epoch is not None and expiry_epoch > now:
USERINFO_CACHE[token] = (expiry_epoch, userinfo)
return userinfo
app/routes/workflows.py:44
- The new /me/sync endpoint at POST /api/workflows/me/sync is added without any test coverage. While the endpoint is simple (just returning the user ID), it represents a new API surface that should have at least basic integration tests to verify authentication requirements and response format.
@router.post("/me/sync")
async def sync_current_user(
current_user_id: UUID = Depends(get_current_user_id),
) -> dict[str, str]:
"""Ensure authenticated user exists in app_users and return user id."""
return {"message": "User synced", "userId": str(current_user_id)}
app/routes/dependencies.py:130
- The user profile update logic on lines 121-130 performs a database commit on every request where placeholder values can be upgraded. This happens without checking if another concurrent request is already updating the same user. If two requests with better profile information arrive simultaneously for the same user, both will attempt to update and commit, potentially causing lost updates or conflicts.
Additionally, the commit happens during authentication, which could fail and cause the entire request to fail even though authentication itself succeeded. Consider batching these updates or making them asynchronous to avoid impacting request latency and reliability.
else:
# Refresh profile fields when we have better info than the placeholder values.
should_update = False
if user.name == auth0_user_id and name != auth0_user_id:
user.name = name
should_update = True
if user.email.endswith("@unknown.local") and not email.endswith("@unknown.local"):
user.email = email
should_update = True
if should_update:
db.commit()
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Pull Request
Summary
SBP-254 Workflow runs metadata + workflow launch configuration from DB
Changes
workflowstable instead of requiring frontend to send manual pipeline URL/branch.POST /api/workflows/launchnow:Workflow(name="BindCraft")workflow.repo_urlas pipelineworkflow.default_revision(fallback to request revision, thendev)workflow_runs.workflow_idreferencelaunch.pipelineoptional in schema (WorkflowLaunchForm) so frontend can omit pipeline/revision.app/routes/workflow/jobs.pyand mounted under/api/jobs:GET /api/jobsGET /api/jobs/{run_id}POST /api/jobs/{run_id}/cancelDELETE /api/jobs/{run_id}POST /api/jobs/bulk-deleteREADME) for new endpoints and infra utilities.How to Test
cd sbp-backenduv run ruff check app testsuv run black --check app testsuv run mypy app --ignore-missing-importsuv run pytest --cov=app --cov-report=term-missing --cov-report=xmlworkflowstable has BindCraft entry (name,repo_url,default_revision)POST /api/workflows/launchwithoutlaunch.pipelineworkflow_runs.workflow_idis populatedDemo
Demo is available here
Demo on 17/02/26 is here
Type of change
Checklist