Skip to content

Add tag/untag endpoints for tasks, flows, and runs#250

Open
YassinNouh21 wants to merge 2 commits intoopenml:mainfrom
YassinNouh21:feature/tag-untag-endpoints
Open

Add tag/untag endpoints for tasks, flows, and runs#250
YassinNouh21 wants to merge 2 commits intoopenml:mainfrom
YassinNouh21:feature/tag-untag-endpoints

Conversation

@YassinNouh21
Copy link

Summary

  • Adds POST /tasks/tag, /tasks/untag, /flows/tag, /flows/untag, /runs/tag, /runs/untag
  • Follows existing dataset tag pattern; untag adds ownership check (error 478) with admin bypass
  • New src/database/runs.py and src/routers/openml/runs.py for run endpoints

Closes #26, #27, #34, #35, #45, #46

Test plan

  • 30 new tests (11 task, 10 flow, 9 run) — all passing
  • Full suite (pytest -m "not php_api and not slow") — 167 passed, 0 failures
  • E2E curl tests against live API for all 6 endpoints

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 21, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Adds a tagging subsystem across the API: a new database tagging module providing insert/select/remove/selects helpers; tag/get_tag/delete_tag wrappers for flows, runs, and tasks; core tagging logic implementing tag_entity and untag_entity with ownership and duplicate checks; new POST /tag and POST /untag endpoints for flows, runs, and tasks wired to the core helpers and DB hooks; registers the runs router in src/main.py; and adds comprehensive tests for flow/run/task tagging behaviors (auth, duplicates, not-found, ownership).

Possibly related PRs

Suggested labels

enhancement

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 3.39% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The PR title accurately describes the main changes: adding tag/untag endpoints for tasks, flows, and runs.
Description check ✅ Passed The description is related to the changeset, clearly summarizing the new endpoints, database modules, shared helpers, and test coverage.
Linked Issues check ✅ Passed The PR addresses issues #26, #27, #34, #35, #45, #46 by implementing POST /task/tag, /task/untag, /flow/tag, /flow/untag, /run/tag, /run/untag endpoints with ownership checks and tests.
Out of Scope Changes check ✅ Passed All changes are directly scoped to adding tagging endpoints and supporting infrastructure (database modules, routers, tests, and shared helpers) with no unrelated modifications.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 7 issues, and left some high level feedback:

  • The tag/untag logic for tasks, flows, and runs (including the DB helpers) is almost identical; consider extracting shared utilities (e.g., a generic tag_entity/untag_entity helper or a small service layer) to reduce duplication and keep behavior changes consistent across entity types.
  • You’re performing case-insensitive duplicate checks in Python with tag.casefold() on every request; if this check needs to be robust at scale, consider enforcing case-insensitive uniqueness at the database level (collation/index or LOWER(tag) constraint) to avoid race conditions and duplicated tags with different casing.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The tag/untag logic for tasks, flows, and runs (including the DB helpers) is almost identical; consider extracting shared utilities (e.g., a generic `tag_entity`/`untag_entity` helper or a small service layer) to reduce duplication and keep behavior changes consistent across entity types.
- You’re performing case-insensitive duplicate checks in Python with `tag.casefold()` on every request; if this check needs to be robust at scale, consider enforcing case-insensitive uniqueness at the database level (collation/index or `LOWER(tag)` constraint) to avoid race conditions and duplicated tags with different casing.

## Individual Comments

### Comment 1
<location> `src/routers/openml/flows.py:24-40` </location>
<code_context>
+            status_code=HTTPStatus.PRECONDITION_FAILED,
+            detail={"code": "103", "message": "Authentication failed"},
+        )
+    tags = database.flows.get_tags(flow_id, expdb)
+    if tag.casefold() in [t.casefold() for t in tags]:
+        raise HTTPException(
+            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
+            detail={
+                "code": "473",
+                "message": "Entity already tagged by this tag.",
+                "additional_information": f"id={flow_id}; tag={tag}",
+            },
+        )
+    database.flows.tag(flow_id, tag, user_id=user.user_id, connection=expdb)
+    return {"flow_tag": {"id": str(flow_id), "tag": [*tags, tag]}}
+
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Re-read tags from the database after insert to avoid returning a potentially stale tag list.

This builds the response from the previously fetched `tags` plus `tag`, so any tag added between `get_tags` and `tag` won’t be included in the response. To avoid this race and match `untag_flow`’s behavior, re-query `get_tags` after the insert and return that list instead.

```suggestion
    if user is None:
        raise HTTPException(
            status_code=HTTPStatus.PRECONDITION_FAILED,
            detail={"code": "103", "message": "Authentication failed"},
        )

    tags = database.flows.get_tags(flow_id, expdb)
    if tag.casefold() in [t.casefold() for t in tags]:
        raise HTTPException(
            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            detail={
                "code": "473",
                "message": "Entity already tagged by this tag.",
                "additional_information": f"id={flow_id}; tag={tag}",
            },
        )

    database.flows.tag(flow_id, tag, user_id=user.user_id, connection=expdb)

    # Re-read tags after inserting to avoid returning a potentially stale tag list.
    tags = database.flows.get_tags(flow_id, expdb)

    return {"flow_tag": {"id": str(flow_id), "tag": tags}}
```
</issue_to_address>

### Comment 2
<location> `tests/routers/openml/task_tag_test.py:80-89` </location>
<code_context>
+def test_task_untag(expdb_test: Connection, py_api: TestClient) -> None:
</code_context>

<issue_to_address>
**suggestion (testing):** Add a test to cover the case where a non-admin uploader can successfully untag their own task tag.

Current tests cover admin untagging their own tag and blocking a non-owner (`code` 478), but they don’t verify that non-admin users can untag tags they created. Please add a test that:

1. Tags a task using a non-admin user (e.g., `ApiKey.SOME_USER`), then
2. Calls `/tasks/untag` as the same user and asserts success.

This ensures the ownership rule for non-admin uploaders is enforced and protected against regressions.

Suggested implementation:

```python
def test_task_untag(expdb_test: Connection, py_api: TestClient) -> None:
    task_id, tag = 59, "test"
    py_api.post(
        f"/tasks/tag?api_key={ApiKey.ADMIN}",
        json={"task_id": task_id, "tag": tag},
    )
    response = py_api.post(
        f"/tasks/untag?api_key={ApiKey.ADMIN}",
        json={"task_id": task_id, "tag": tag},
    )
    assert response.status_code == HTTPStatus.OK


def test_task_untag_non_admin_can_untag_own_tag(
    expdb_test: Connection, py_api: TestClient
) -> None:
    task_id, tag = 60, "user-tag"
    # Non-admin user tags the task
    tag_response = py_api.post(
        f"/tasks/tag?api_key={ApiKey.SOME_USER}",
        json={"task_id": task_id, "tag": tag},
    )
    assert tag_response.status_code == HTTPStatus.OK

    # Same non-admin user un-tags their own tag
    untag_response = py_api.post(
        f"/tasks/untag?api_key={ApiKey.SOME_USER}",
        json={"task_id": task_id, "tag": tag},
    )
    assert untag_response.status_code == HTTPStatus.OK

```

If `ApiKey.SOME_USER` is not the correct non-admin uploader key in your test suite, replace it with the appropriate enum member that represents a non-admin but authorized uploader (for example, `ApiKey.USER`, `ApiKey.UPLOADER`, etc., consistent with the rest of your tests).
</issue_to_address>

### Comment 3
<location> `src/routers/openml/flows.py:17` </location>
<code_context>
 router = APIRouter(prefix="/flows", tags=["flows"])


+@router.post(path="/tag")
+def tag_flow(
+    flow_id: Annotated[int, Body()],
</code_context>

<issue_to_address>
**issue (complexity):** Consider extracting the repeated auth and tag/untag logic into reusable helper functions so the individual endpoints just supply resource-specific behavior and response keys.

You can cut down duplication and make future tag changes safer by extracting the shared behavior into small helpers, then wiring them up in each router.

For example, the auth check and error payloads are identical across tag/untag endpoints and across routers:

```python
def _require_user(user: User | None) -> User:
    if user is None:
        raise HTTPException(
            status_code=HTTPStatus.PRECONDITION_FAILED,
            detail={"code": "103", "message": "Authentication failed"},
        )
    return user
```

Then in your endpoints:

```python
@router.post("/tag")
def tag_flow(
    flow_id: Annotated[int, Body()],
    tag: Annotated[str, SystemString64],
    user: Annotated[User | None, Depends(fetch_user)] = None,
    expdb: Annotated[Connection, Depends(expdb_connection)] = None,
) -> dict[str, dict[str, Any]]:
    user = _require_user(user)

    tags = database.flows.get_tags(flow_id, expdb)
    if tag.casefold() in (t.casefold() for t in tags):
        raise HTTPException(
            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            detail={
                "code": "473",
                "message": "Entity already tagged by this tag.",
                "additional_information": f"id={flow_id}; tag={tag}",
            },
        )
    database.flows.tag(flow_id, tag, user_id=user.user_id, connection=expdb)
    return {"flow_tag": {"id": str(flow_id), "tag": [*tags, tag]}}
```

You can also encapsulate the generic tag/untag logic so tasks/runs/flows only provide resource-specific functions and the response key:

```python
# tagging_helpers.py
from collections.abc import Callable

def make_tag_endpoint(
    get_tags: Callable[[int, Connection], list[str]],
    tag_fn: Callable[..., None],
    response_key: str,
):
    def endpoint(
        entity_id: Annotated[int, Body()],
        tag: Annotated[str, SystemString64],
        user: Annotated[User | None, Depends(fetch_user)] = None,
        expdb: Annotated[Connection, Depends(expdb_connection)] = None,
    ) -> dict[str, dict[str, Any]]:
        user = _require_user(user)
        tags = get_tags(entity_id, expdb)
        if tag.casefold() in (t.casefold() for t in tags):
            raise HTTPException(
                status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
                detail={
                    "code": "473",
                    "message": "Entity already tagged by this tag.",
                    "additional_information": f"id={entity_id}; tag={tag}",
                },
            )
        tag_fn(entity_id, tag, user_id=user.user_id, connection=expdb)
        return {response_key: {"id": str(entity_id), "tag": [*tags, tag]}}
    return endpoint
```

Then in the flows router:

```python
from tagging_helpers import make_tag_endpoint

tag_flow = router.post("/tag")(
    make_tag_endpoint(
        get_tags=database.flows.get_tags,
        tag_fn=database.flows.tag,
        response_key="flow_tag",
    )
)
```

A similar `make_untag_endpoint` can encapsulate the `get_tag` / ownership / delete logic, taking an `owner_id_attr` or a small predicate so that flows/tasks/runs don’t re-implement it. This keeps your new flow endpoints while reducing repetition across routers and making future tag behavior changes localized.
</issue_to_address>

### Comment 4
<location> `src/routers/openml/tasks.py:154` </location>
<code_context>
 router = APIRouter(prefix="/flows", tags=["flows"])


+@router.post(path="/tag")
+def tag_flow(
+    flow_id: Annotated[int, Body()],
</code_context>

<issue_to_address>
**issue (complexity):** Consider extracting the shared tagging/untagging logic into reusable helper functions so the task endpoints become thin wrappers around common behavior.

You can reduce duplication and keep the endpoint behavior identical by extracting the common tagging logic into small helpers and using them from `tag_task`/`untag_task` (and later from the flow/run routers).

Example of how to refactor this router while preserving all behavior:

```python
# helpers (can live in this module or a shared tagging module)

def _require_user(user: User | None) -> User:
    if user is None:
        raise HTTPException(
            status_code=HTTPStatus.PRECONDITION_FAILED,
            detail={"code": "103", "message": "Authentication failed"},
        )
    return user


def _tag_entity(
    entity_id: int,
    tag: str,
    user: User,
    connection: Connection,
    *,
    get_tags: callable,
    tag_fn: callable,
    response_key: str,
) -> dict[str, dict[str, Any]]:
    tags = get_tags(entity_id, connection)
    if tag.casefold() in (t.casefold() for t in tags):
        raise HTTPException(
            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            detail={
                "code": "473",
                "message": "Entity already tagged by this tag.",
                "additional_information": f"id={entity_id}; tag={tag}",
            },
        )
    tag_fn(entity_id, tag, user_id=user.user_id, connection=connection)
    return {response_key: {"id": str(entity_id), "tag": [*tags, tag]}}


def _untag_entity(
    entity_id: int,
    tag: str,
    user: User,
    connection: Connection,
    *,
    get_tag: callable,
    delete_tag: callable,
    get_tags: callable,
    response_key: str,
) -> dict[str, dict[str, Any]]:
    existing = get_tag(entity_id, tag, connection)
    if existing is None:
        raise HTTPException(
            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            detail={
                "code": "477",
                "message": "Tag not found.",
                "additional_information": f"id={entity_id}; tag={tag}",
            },
        )
    if existing.uploader != user.user_id and UserGroup.ADMIN not in user.groups:
        raise HTTPException(
            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            detail={
                "code": "478",
                "message": "Tag is not owned by you.",
                "additional_information": f"id={entity_id}; tag={tag}",
            },
        )
    delete_tag(entity_id, tag, connection)
    tags = get_tags(entity_id, connection)
    return {response_key: {"id": str(entity_id), "tag": tags}}
```

Your task endpoints become very small and declarative:

```python
@router.post(path="/tag")
def tag_task(
    task_id: Annotated[int, Body()],
    tag: Annotated[str, SystemString64],
    user: Annotated[User | None, Depends(fetch_user)] = None,
    expdb: Annotated[Connection, Depends(expdb_connection)] = None,
) -> dict[str, dict[str, Any]]:
    user = _require_user(user)
    return _tag_entity(
        task_id,
        tag,
        user,
        expdb,
        get_tags=database.tasks.get_tags,
        tag_fn=database.tasks.tag,
        response_key="task_tag",
    )


@router.post(path="/untag")
def untag_task(
    task_id: Annotated[int, Body()],
    tag: Annotated[str, SystemString64],
    user: Annotated[User | None, Depends(fetch_user)] = None,
    expdb: Annotated[Connection, Depends(expdb_connection)] = None,
) -> dict[str, dict[str, Any]]:
    user = _require_user(user)
    return _untag_entity(
        task_id,
        tag,
        user,
        expdb,
        get_tag=database.tasks.get_tag,
        delete_tag=database.tasks.delete_tag,
        get_tags=database.tasks.get_tags,
        response_key="task_tag",
    )
```

This keeps all error codes/messages and response shapes intact but centralizes the tagging rules. The same helpers can then be reused by the flow/run routers by passing their own `get_tags`/`get_tag`/`tag`/`delete_tag` functions and appropriate `response_key`.
</issue_to_address>

### Comment 5
<location> `src/routers/openml/runs.py:15` </location>
<code_context>
 router = APIRouter(prefix="/flows", tags=["flows"])


+@router.post(path="/tag")
+def tag_flow(
+    flow_id: Annotated[int, Body()],
</code_context>

<issue_to_address>
**issue (complexity):** Consider extracting the repeated auth and tagging checks into shared helper functions so the router endpoints only handle wiring to the database layer.

You can reduce the complexity here by extracting the duplicated tagging logic into shared helpers, and keeping the router functions as thin wiring to the DB layer.

For example, centralize the auth + duplicate‑check + error construction in a small tagging helper/service:

```python
# tagging_service.py
from http import HTTPStatus
from fastapi import HTTPException
from database.users import User, UserGroup

def require_authenticated(user: User | None) -> User:
    if user is None:
        raise HTTPException(
            status_code=HTTPStatus.PRECONDITION_FAILED,
            detail={"code": "103", "message": "Authentication failed"},
        )
    return user

def ensure_not_tagged(tags: list[str], tag: str, entity_id: int, entity_name: str) -> None:
    if tag.casefold() in [t.casefold() for t in tags]:
        raise HTTPException(
            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            detail={
                "code": "473",
                "message": f"{entity_name} already tagged by this tag.",
                "additional_information": f"id={entity_id}; tag={tag}",
            },
        )

def ensure_tag_exists(existing, entity_id: int, tag: str, entity_name: str) -> None:
    if existing is None:
        raise HTTPException(
            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            detail={
                "code": "477",
                "message": "Tag not found.",
                "additional_information": f"id={entity_id}; tag={tag}",
            },
        )

def ensure_tag_owner_or_admin(existing, user: User, entity_id: int, tag: str) -> None:
    if existing.uploader != user.user_id and UserGroup.ADMIN not in user.groups:
        raise HTTPException(
            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
            detail={
                "code": "478",
                "message": "Tag is not owned by you.",
                "additional_information": f"id={entity_id}; tag={tag}",
            },
        )
```

Then your new router becomes mostly plumbing, similar to flows/tasks routers:

```python
from fastapi import APIRouter, Body, Depends
from sqlalchemy import Connection
import database.runs
from database.users import User
from routers.dependencies import expdb_connection, fetch_user
from routers.types import SystemString64
from tagging_service import (
    require_authenticated,
    ensure_not_tagged,
    ensure_tag_exists,
    ensure_tag_owner_or_admin,
)

router = APIRouter(prefix="/runs", tags=["runs"])

@router.post(path="/tag")
def tag_run(
    run_id: int = Body(),
    tag: str = SystemString64,
    user: User | None = Depends(fetch_user),
    expdb: Connection = Depends(expdb_connection),
) -> dict[str, dict[str, Any]]:
    user = require_authenticated(user)
    tags = database.runs.get_tags(run_id, expdb)
    ensure_not_tagged(tags, tag, run_id, "Entity")
    database.runs.tag(run_id, tag, user_id=user.user_id, connection=expdb)
    return {"run_tag": {"id": str(run_id), "tag": [*tags, tag]}}

@router.post(path="/untag")
def untag_run(
    run_id: int = Body(),
    tag: str = SystemString64,
    user: User | None = Depends(fetch_user),
    expdb: Connection = Depends(expdb_connection),
) -> dict[str, dict[str, Any]]:
    user = require_authenticated(user)
    existing = database.runs.get_tag(run_id, tag, expdb)
    ensure_tag_exists(existing, run_id, tag, "Entity")
    ensure_tag_owner_or_admin(existing, user, run_id, tag)
    database.runs.delete_tag(run_id, tag, expdb)
    tags = database.runs.get_tags(run_id, expdb)
    return {"run_tag": {"id": str(run_id), "tag": tags}}
```

You can reuse the same `tagging_service` in the flows/tasks routers by just wiring the appropriate `database.*` functions and entity name. This keeps each router very small and removes the line‑for‑line duplication while preserving current behavior and error payloads.
</issue_to_address>

### Comment 6
<location> `src/database/flows.py:53` </location>
<code_context>
     )


+def tag(id_: int, tag_: str, *, user_id: int, connection: Connection) -> None:
+    connection.execute(
+        text(
</code_context>

<issue_to_address>
**issue (complexity):** Consider extracting the common tagging SQL into shared internal helper functions and have the specific tag/get_tag/delete_tag wrappers delegate to them to avoid duplication.

You can reduce repetition by extracting the shared tagging logic into a small internal helper and having the flow/task/run-specific functions delegate to it. That keeps behavior in one place while preserving the public API.

For example, in your data layer module:

```python
# Internal helpers – table/column names are fixed constants, not user input.
def _tag_generic(
    *,
    table: str,
    id_column: str,
    id_: int,
    tag_: str,
    user_id: int,
    connection: Connection,
) -> None:
    connection.execute(
        text(
            f"""
            INSERT INTO {table}(`{id_column}`, `tag`, `uploader`)
            VALUES (:id, :tag, :user_id)
            """
        ),
        parameters={"id": id_, "tag": tag_, "user_id": user_id},
    )


def _get_tag_generic(
    *,
    table: str,
    id_column: str,
    id_: int,
    tag_: str,
    connection: Connection,
) -> Row | None:
    return connection.execute(
        text(
            f"""
            SELECT `{id_column}` as id, `tag`, `uploader`
            FROM {table}
            WHERE `{id_column}` = :id AND `tag` = :tag
            """
        ),
        parameters={"id": id_, "tag": tag_},
    ).one_or_none()


def _delete_tag_generic(
    *,
    table: str,
    id_column: str,
    id_: int,
    tag_: str,
    connection: Connection,
) -> None:
    connection.execute(
        text(
            f"""
            DELETE FROM {table}
            WHERE `{id_column}` = :id AND `tag` = :tag
            """
        ),
        parameters={"id": id_, "tag": tag_},
    )
```

Then your flow-specific functions become tiny wrappers:

```python
def tag(id_: int, tag_: str, *, user_id: int, connection: Connection) -> None:
    _tag_generic(
        table="implementation_tag",
        id_column="id",
        id_=id_,
        tag_=tag_,
        user_id=user_id,
        connection=connection,
    )


def get_tag(id_: int, tag_: str, connection: Connection) -> Row | None:
    return _get_tag_generic(
        table="implementation_tag",
        id_column="id",
        id_=id_,
        tag_=tag_,
        connection=connection,
    )


def delete_tag(id_: int, tag_: str, connection: Connection) -> None:
    _delete_tag_generic(
        table="implementation_tag",
        id_column="id",
        id_=id_,
        tag_=tag_,
        connection=connection,
    )
```

You can apply the same pattern to the existing task/run tagging functions by just changing `table`/`id_column`. This centralizes the behavior so it’s less likely to diverge over time while keeping the public interface unchanged.
</issue_to_address>

### Comment 7
<location> `src/database/tasks.py:109` </location>
<code_context>
     )


+def tag(id_: int, tag_: str, *, user_id: int, connection: Connection) -> None:
+    connection.execute(
+        text(
</code_context>

<issue_to_address>
**issue (complexity):** Consider extracting the common tag insert/get/delete logic into shared helpers parameterized by table and id column, then have the task-specific functions delegate to them.

You can keep the new `tag/get_tag/delete_tag` APIs but route them through a shared helper to avoid copy‑pasted SQL across tasks/flows/runs.

For example, introduce a small internal helper that only varies by table name (and optionally id column), then delegate from the public functions:

```python
def _insert_tag(
    *,
    table: str,
    id_column: str,
    id_: int,
    tag_: str,
    user_id: int,
    connection: Connection,
) -> None:
    connection.execute(
        text(
            f"""
            INSERT INTO {table}(`{id_column}`, `tag`, `uploader`)
            VALUES (:id, :tag, :user_id)
            """
        ),
        parameters={"id": id_, "tag": tag_, "user_id": user_id},
    )


def _get_tag(
    *,
    table: str,
    id_column: str,
    id_: int,
    tag_: str,
    connection: Connection,
) -> Row | None:
    return connection.execute(
        text(
            f"""
            SELECT `{id_column}` AS id, `tag`, `uploader`
            FROM {table}
            WHERE `{id_column}` = :id AND `tag` = :tag
            """
        ),
        parameters={"id": id_, "tag": tag_},
    ).one_or_none()


def _delete_tag(
    *,
    table: str,
    id_column: str,
    id_: int,
    tag_: str,
    connection: Connection,
) -> None:
    connection.execute(
        text(
            f"""
            DELETE FROM {table}
            WHERE `{id_column}` = :id AND `tag` = :tag
            """
        ),
        parameters={"id": id_, "tag": tag_},
    )
```

Then your task‑specific functions become thin wrappers:

```python
def tag(id_: int, tag_: str, *, user_id: int, connection: Connection) -> None:
    _insert_tag(
        table="task_tag",
        id_column="id",
        id_=id_,
        tag_=tag_,
        user_id=user_id,
        connection=connection,
    )


def get_tag(id_: int, tag_: str, connection: Connection) -> Row | None:
    return _get_tag(
        table="task_tag",
        id_column="id",
        id_=id_,
        tag_=tag_,
        connection=connection,
    )


def delete_tag(id_: int, tag_: str, connection: Connection) -> None:
    _delete_tag(
        table="task_tag",
        id_column="id",
        id_=id_,
        tag_=tag_,
        connection=connection,
    )
```

You can then reuse the same `_insert_tag/_get_tag/_delete_tag` helpers for flows and runs by just changing `table=` (and `id_column=` if needed). This keeps the public API intact while centralizing tagging behavior and reducing future synchronization overhead.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +80 to +89
def test_task_untag(expdb_test: Connection, py_api: TestClient) -> None:
task_id, tag = 59, "test"
py_api.post(
f"/tasks/tag?api_key={ApiKey.ADMIN}",
json={"task_id": task_id, "tag": tag},
)
response = py_api.post(
f"/tasks/untag?api_key={ApiKey.ADMIN}",
json={"task_id": task_id, "tag": tag},
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add a test to cover the case where a non-admin uploader can successfully untag their own task tag.

Current tests cover admin untagging their own tag and blocking a non-owner (code 478), but they don’t verify that non-admin users can untag tags they created. Please add a test that:

  1. Tags a task using a non-admin user (e.g., ApiKey.SOME_USER), then
  2. Calls /tasks/untag as the same user and asserts success.

This ensures the ownership rule for non-admin uploaders is enforced and protected against regressions.

Suggested implementation:

def test_task_untag(expdb_test: Connection, py_api: TestClient) -> None:
    task_id, tag = 59, "test"
    py_api.post(
        f"/tasks/tag?api_key={ApiKey.ADMIN}",
        json={"task_id": task_id, "tag": tag},
    )
    response = py_api.post(
        f"/tasks/untag?api_key={ApiKey.ADMIN}",
        json={"task_id": task_id, "tag": tag},
    )
    assert response.status_code == HTTPStatus.OK


def test_task_untag_non_admin_can_untag_own_tag(
    expdb_test: Connection, py_api: TestClient
) -> None:
    task_id, tag = 60, "user-tag"
    # Non-admin user tags the task
    tag_response = py_api.post(
        f"/tasks/tag?api_key={ApiKey.SOME_USER}",
        json={"task_id": task_id, "tag": tag},
    )
    assert tag_response.status_code == HTTPStatus.OK

    # Same non-admin user un-tags their own tag
    untag_response = py_api.post(
        f"/tasks/untag?api_key={ApiKey.SOME_USER}",
        json={"task_id": task_id, "tag": tag},
    )
    assert untag_response.status_code == HTTPStatus.OK

If ApiKey.SOME_USER is not the correct non-admin uploader key in your test suite, replace it with the appropriate enum member that represents a non-admin but authorized uploader (for example, ApiKey.USER, ApiKey.UPLOADER, etc., consistent with the rest of your tests).

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (10)
tests/routers/openml/task_tag_test.py (1)

31-41: Response assertion is tightly coupled to task 59's initial seed state.

The assertion {"task_tag": {"id": str(task_id), "tag": [tag]}} implicitly requires task 59 to have zero tags in the seed database. If seed data ever gains a tag for task 59, the test will fail with a confusing mismatch. Asserting tag in response.json()["task_tag"]["tag"] (subset check) would be more resilient.

Additionally, there's no test for tagging a non-existent task_id — this edge case would currently return a misleading success response or an FK error, neither of which is validated.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/routers/openml/task_tag_test.py` around lines 31 - 41, The
test_task_tag currently asserts an exact JSON matching which assumes task 59 had
no preexisting tags; change the assertion to check that tag is present in
response.json()["task_tag"]["tag"] (i.e., a subset/contains check) instead of
equality to allow pre-seeded tags, and keep the existing verification via
get_tags(id_=task_id, expdb=expdb_test). Also add a new test that posts to the
same endpoint ("/tasks/tag" using py_api.post) with a non-existent task_id and
asserts the correct error behavior (e.g., 4xx status and appropriate error body)
to verify handling of missing FK/invalid task IDs.
src/routers/openml/runs.py (1)

27-38: No run-existence validation before tagging.

database.runs.get_tags returns [] for a non-existent run_id, so the endpoint proceeds to INSERT a tag for an entity that doesn't exist. Depending on whether run_tag.id has an FK to run, this either silently creates an orphaned row or throws an unhandled FK IntegrityError. Adding a database.runs.get() (currently absent from src/database/runs.py) and a 404 guard here would close this.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routers/openml/runs.py` around lines 27 - 38, The code calls
database.runs.get_tags(run_id, expdb) and then database.runs.tag(...) without
first verifying the run exists, which can create orphaned tags or trigger FK
errors; add an existence check by calling database.runs.get(run_id, expdb) (or
equivalent lookup) before checking tags, and if it returns no result raise
HTTPException with status_code HTTPStatus.NOT_FOUND and a descriptive detail
(include id=run_id); only proceed to duplicate-tag check and
database.runs.tag(...) after the run is confirmed to exist.
tests/routers/openml/run_tag_test.py (1)

44-54: test_run_tag only covers the admin key; task and flow tests parametrize over admin, non-owner, and owner.

Tagging by non-admin users is a valid path (any authenticated user can tag), and it's exercised in test_task_tag. Aligning test_run_tag to also parametrize over ApiKey.SOME_USER / ApiKey.OWNER_USER would give consistent coverage across entity types.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/routers/openml/run_tag_test.py` around lines 44 - 54, The test_run_tag
currently only exercises the admin path; update test_run_tag to parametrize over
API keys (e.g., ApiKey.ADMIN, ApiKey.SOME_USER, ApiKey.OWNER_USER) so it mirrors
test_task_tag's coverage: add a parameter (api_key) to the test signature, apply
a pytest.mark.parametrize decorator with the three ApiKey values, and use that
api_key value in the POST URL (replace ApiKey.ADMIN in the
f"/runs/tag?api_key=..." call). Keep the same assertions and follow-up tags
check using get_tags(id_=run_id, expdb=expdb_test).
src/database/tasks.py (1)

109-143: connection vs expdb naming divergence from existing module convention.

The three new functions use connection: Connection as the parameter name, while the pre-existing functions in this file (e.g., get_tags at line 95, get at line 7, etc.) all use expdb: Connection. The same divergence exists in src/database/flows.py and src/database/runs.py. Not a bug, but it creates a mild inconsistency in the public API surface of these modules.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/database/tasks.py` around lines 109 - 143, Rename the Connection
parameter in the new functions to match the module convention: change the
parameter name "connection: Connection" to "expdb: Connection" in the functions
tag, get_tag, and delete_tag so they align with existing functions like get_tags
and get; update all internal references (parameters dicts, calls) to use expdb
where applicable. Ensure the signature and any callers are adjusted consistently
to avoid breaking the public API.
src/routers/openml/tasks.py (1)

166-177: Task existence is not validated before tagging — and database.tasks.get() is already available.

Unlike the runs case, database.tasks.get(task_id, expdb) already exists and is used by get_task. Adding an existence guard here is a one-liner:

✨ Proposed guard
     if user is None:
         raise HTTPException(
             status_code=HTTPStatus.PRECONDITION_FAILED,
             detail={"code": "103", "message": "Authentication failed"},
         )
+    if not database.tasks.get(task_id, expdb):
+        raise HTTPException(
+            status_code=HTTPStatus.NOT_FOUND,
+            detail="Task not found",
+        )
     tags = database.tasks.get_tags(task_id, expdb)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routers/openml/tasks.py` around lines 166 - 177, Add an existence guard
before tagging: call database.tasks.get(task_id, expdb) and if it returns falsy,
raise an HTTPException for "not found" (e.g., 404) before proceeding to use
database.tasks.get_tags and database.tasks.tag; keep the rest of the logic
(checking tag duplication using tags = database.tasks.get_tags(...), raising the
duplicate tag HTTPException, then calling database.tasks.tag(...)) unchanged so
you don't attempt to tag a non-existent task.
src/database/runs.py (1)

1-52: No get() function and expdb/connection naming divergence.

The module is missing a get(run_id, expdb) lookup function. Its absence prevents the router from performing an entity-existence check before tagging (see the router review below). Adding one now — consistent with database.tasks.get() — would complete the module's API.

The same expdb (read) / connection (write) naming asymmetry noted in src/database/tasks.py applies here.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/database/runs.py` around lines 1 - 52, Add a read-only lookup function
get(id_: int, expdb: Connection) -> Row | None that mirrors
database.tasks.get(): execute a SELECT (e.g., SELECT * FROM run WHERE id =
:run_id) against the experiment DB and return one_or_none(); also normalize
parameter naming so read operations use expdb and write operations use
connection — change get_tag signature/usage to accept expdb: Connection (not
connection) for reads, keep tag and delete_tag using connection: Connection for
writes, and update any callers accordingly.
tests/routers/openml/flow_tag_test.py (2)

40-68: Hard-coded flow_id=1 and pre-seeded tag assumptions are fragile

test_flow_tag_returns_existing_tags and test_flow_tag_fails_if_tag_exists assume flow 1 already carries "OpenmlWeka" and "weka" in the test DB. If the seed data changes, both tests fail silently with no pointer to the cause. Consider using the flow fixture and setting up required tags explicitly at the start of each test (similar to test_flow_untag), or documenting the seed data dependency clearly.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/routers/openml/flow_tag_test.py` around lines 40 - 68, Both tests
hard-code flow_id=1 and assume pre-seeded tags ("OpenmlWeka", "weka"), which is
fragile; update test_flow_tag_returns_existing_tags and
test_flow_tag_fails_if_tag_exists to use the flow fixture (e.g., obtain flow.id
from the flow fixture) and explicitly create or attach the required tags at the
start of each test (mirroring the setup pattern used in test_flow_untag) before
calling the POST /flows/tag endpoint, then use that flow.id in assertions and
the expected error payload so tests no longer rely on external seed data.

112-123: Admin bypass for untag is not tested

The implementation at src/routers/openml/flows.py Line 65 contains an admin-bypass path (UserGroup.ADMIN not in user.groups), but the test suite covers only the rejection case (non-owner, non-admin). A test where a different admin key removes another user's tag would fully exercise this logic.

Would you like me to draft the missing test_flow_untag_admin_can_untag_any_tag test case?

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/routers/openml/flow_tag_test.py` around lines 112 - 123, Add a new test
named test_flow_untag_admin_can_untag_any_tag in
tests/routers/openml/flow_tag_test.py that creates a tag on a Flow as a
non-admin user (use Flow fixture and TestClient/ApiKey.SOME_USER), then calls
the untag endpoint using the admin API key (ApiKey.ADMIN) to remove that tag;
assert the untag response is successful (HTTP 200) and verify the tag is
actually removed from the flow (e.g., fetch flow tags or check the response
body). This exercises the admin-bypass logic in the untag handler (the branch
checking UserGroup.ADMIN in src/routers/openml/flows.py).
src/routers/openml/flows.py (2)

40-40: Response tag list is built from a pre-insert snapshot, inconsistent with untag_flow

untag_flow re-queries get_tags after delete_tag (Line 75), giving a consistent post-operation view. tag_flow constructs the list from the stale pre-insert tags snapshot ([*tags, tag]). While identical in practice for single requests, the two endpoints are inconsistent and the pattern is harder to reason about. Consider re-querying after the insert:

♻️ Proposed refactor
     database.flows.tag(flow_id, tag, user_id=user.user_id, connection=expdb)
-    return {"flow_tag": {"id": str(flow_id), "tag": [*tags, tag]}}
+    tags = database.flows.get_tags(flow_id, expdb)
+    return {"flow_tag": {"id": str(flow_id), "tag": tags}}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routers/openml/flows.py` at line 40, The response in tag_flow currently
returns a tags list built from the pre-insert snapshot ([*tags, tag]) which is
inconsistent with untag_flow; after performing the insert (the operation that
adds the tag in tag_flow), re-call get_tags(flow_id) to fetch the authoritative,
post-insert tag list and return that in the {"flow_tag": {"id": str(flow_id),
"tag": ...}} response so both tag_flow and untag_flow use the same fresh source
of truth.

32-32: INTERNAL_SERVER_ERROR (500) used for application-level domain errors

Codes 473, 477, and 478 are client-originated errors (duplicate tag, tag not found, wrong ownership) and should map to 4xx status codes (e.g., 409 Conflict for 473, 404 for 477, 403 for 478). Using 500 misrepresents the error to clients and monitoring tools. This applies equally to the same pattern in untag_flow (Lines 58, 67).

Acknowledged that the PR intentionally mirrors the existing dataset tag pattern — flagging for awareness if that pattern is ever revisited.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routers/openml/flows.py` at line 32, Change the HTTP status mapping for
application-level domain errors in the tag_flow and untag_flow handlers: when
catching AppErrors with code 473, return 409 Conflict; with code 477, return 404
Not Found; with code 478, return 403 Forbidden instead of
HTTPStatus.INTERNAL_SERVER_ERROR; update the error->status_code logic in the
tag_flow and untag_flow exception handlers (the block that checks err.code on
AppError) to return these 4xx codes so client-originated errors are represented
correctly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/routers/openml/flows.py`:
- Around line 17-40: tag_flow currently skips validating that the flow exists
which can create orphaned tags or DB errors; before fetching tags or calling
database.flows.tag, call database.flows.get(flow_id, expdb) (same as get_flow)
and if it returns None raise HTTPException with HTTPStatus.NOT_FOUND and the
same error payload pattern used in get_flow; then continue with the existing
duplicate-tag check and database.flows.tag to safely ensure tags are only added
for existing flows.
- Around line 29-39: The current get_tags -> tag sequence has a TOCTOU race: add
a UNIQUE constraint on implementation_tag for (id, tag) at the schema level, and
modify the call site around database.flows.tag(flow_id, tag, user_id=...) to
catch the DB integrity exception (e.g., IntegrityError or the DB driver's
unique-constraint error) and translate it into the same HTTPException payload
(status_code HTTPStatus.INTERNAL_SERVER_ERROR, detail code "473" with id and
tag) instead of letting the exception propagate; alternatively, change
database.flows.tag to use an idempotent INSERT (INSERT IGNORE / ON DUPLICATE KEY
UPDATE) or perform an atomic existence-check-and-insert in the DB and surface
the 473 response on duplicate—apply the same pattern to database.flows.tag usage
in tasks.py, runs.py, and datasets.py.

In `@src/routers/openml/runs.py`:
- Around line 27-38: The pre-check in the tag path (using database.runs.get_tags
and casefold) can race with the subsequent INSERT in database.runs.tag, so wrap
the call to database.runs.tag in a try/except that catches the DB IntegrityError
(or the specific DB exception your DB layer raises) and re-raises the same
HTTPException currently used for duplicates (status_code
HTTPStatus.INTERNAL_SERVER_ERROR, detail code "473", message "Entity already
tagged by this tag.", additional_information f"id={run_id}; tag={tag}") so
concurrent inserts surface the clean 473 instead of an opaque 500; apply the
same pattern to tag_flow and tag_task where get_tags + tag are used.

In `@src/routers/openml/tasks.py`:
- Around line 166-177: The current TOCTOU flaw in the tags flow (calls to
database.tasks.get_tags and database.tasks.tag) can let concurrent inserts
bypass the pre-check and leak raw DB errors; wrap the call to
database.tasks.tag(task_id, tag, user_id=user.user_id, connection=expdb) in a
try/except that catches the DB IntegrityError and raises the same HTTPException
with status_code HTTPStatus.INTERNAL_SERVER_ERROR and the existing detail
payload (code "473", message "Entity already tagged by this tag.",
additional_information containing id and tag); keep the original behavior of
returning {"task_tag": {"id": str(task_id), "tag": [*tags, tag]}} on success.

In `@tests/routers/openml/flow_tag_test.py`:
- Around line 86-101: The setup POST that applies the tag is currently sent
fire-and-forget in test_flow_untag; change it to capture its response and assert
it succeeded before calling the untag endpoint. Specifically, after calling
py_api.post(f"/flows/tag?api_key={ApiKey.ADMIN}", json={"flow_id": flow.id,
"tag": tag}) store the response, assert response.status_code == HTTPStatus.OK
(and optionally check the JSON contains the tag), then proceed to call
py_api.post for untag and the existing assertions; this ensures failures in the
tag step (in test_flow_untag / py_api.post / ApiKey.ADMIN / flow.id) produce
clear test errors.

---

Nitpick comments:
In `@src/database/runs.py`:
- Around line 1-52: Add a read-only lookup function get(id_: int, expdb:
Connection) -> Row | None that mirrors database.tasks.get(): execute a SELECT
(e.g., SELECT * FROM run WHERE id = :run_id) against the experiment DB and
return one_or_none(); also normalize parameter naming so read operations use
expdb and write operations use connection — change get_tag signature/usage to
accept expdb: Connection (not connection) for reads, keep tag and delete_tag
using connection: Connection for writes, and update any callers accordingly.

In `@src/database/tasks.py`:
- Around line 109-143: Rename the Connection parameter in the new functions to
match the module convention: change the parameter name "connection: Connection"
to "expdb: Connection" in the functions tag, get_tag, and delete_tag so they
align with existing functions like get_tags and get; update all internal
references (parameters dicts, calls) to use expdb where applicable. Ensure the
signature and any callers are adjusted consistently to avoid breaking the public
API.

In `@src/routers/openml/flows.py`:
- Line 40: The response in tag_flow currently returns a tags list built from the
pre-insert snapshot ([*tags, tag]) which is inconsistent with untag_flow; after
performing the insert (the operation that adds the tag in tag_flow), re-call
get_tags(flow_id) to fetch the authoritative, post-insert tag list and return
that in the {"flow_tag": {"id": str(flow_id), "tag": ...}} response so both
tag_flow and untag_flow use the same fresh source of truth.
- Line 32: Change the HTTP status mapping for application-level domain errors in
the tag_flow and untag_flow handlers: when catching AppErrors with code 473,
return 409 Conflict; with code 477, return 404 Not Found; with code 478, return
403 Forbidden instead of HTTPStatus.INTERNAL_SERVER_ERROR; update the
error->status_code logic in the tag_flow and untag_flow exception handlers (the
block that checks err.code on AppError) to return these 4xx codes so
client-originated errors are represented correctly.

In `@src/routers/openml/runs.py`:
- Around line 27-38: The code calls database.runs.get_tags(run_id, expdb) and
then database.runs.tag(...) without first verifying the run exists, which can
create orphaned tags or trigger FK errors; add an existence check by calling
database.runs.get(run_id, expdb) (or equivalent lookup) before checking tags,
and if it returns no result raise HTTPException with status_code
HTTPStatus.NOT_FOUND and a descriptive detail (include id=run_id); only proceed
to duplicate-tag check and database.runs.tag(...) after the run is confirmed to
exist.

In `@src/routers/openml/tasks.py`:
- Around line 166-177: Add an existence guard before tagging: call
database.tasks.get(task_id, expdb) and if it returns falsy, raise an
HTTPException for "not found" (e.g., 404) before proceeding to use
database.tasks.get_tags and database.tasks.tag; keep the rest of the logic
(checking tag duplication using tags = database.tasks.get_tags(...), raising the
duplicate tag HTTPException, then calling database.tasks.tag(...)) unchanged so
you don't attempt to tag a non-existent task.

In `@tests/routers/openml/flow_tag_test.py`:
- Around line 40-68: Both tests hard-code flow_id=1 and assume pre-seeded tags
("OpenmlWeka", "weka"), which is fragile; update
test_flow_tag_returns_existing_tags and test_flow_tag_fails_if_tag_exists to use
the flow fixture (e.g., obtain flow.id from the flow fixture) and explicitly
create or attach the required tags at the start of each test (mirroring the
setup pattern used in test_flow_untag) before calling the POST /flows/tag
endpoint, then use that flow.id in assertions and the expected error payload so
tests no longer rely on external seed data.
- Around line 112-123: Add a new test named
test_flow_untag_admin_can_untag_any_tag in tests/routers/openml/flow_tag_test.py
that creates a tag on a Flow as a non-admin user (use Flow fixture and
TestClient/ApiKey.SOME_USER), then calls the untag endpoint using the admin API
key (ApiKey.ADMIN) to remove that tag; assert the untag response is successful
(HTTP 200) and verify the tag is actually removed from the flow (e.g., fetch
flow tags or check the response body). This exercises the admin-bypass logic in
the untag handler (the branch checking UserGroup.ADMIN in
src/routers/openml/flows.py).

In `@tests/routers/openml/run_tag_test.py`:
- Around line 44-54: The test_run_tag currently only exercises the admin path;
update test_run_tag to parametrize over API keys (e.g., ApiKey.ADMIN,
ApiKey.SOME_USER, ApiKey.OWNER_USER) so it mirrors test_task_tag's coverage: add
a parameter (api_key) to the test signature, apply a pytest.mark.parametrize
decorator with the three ApiKey values, and use that api_key value in the POST
URL (replace ApiKey.ADMIN in the f"/runs/tag?api_key=..." call). Keep the same
assertions and follow-up tags check using get_tags(id_=run_id,
expdb=expdb_test).

In `@tests/routers/openml/task_tag_test.py`:
- Around line 31-41: The test_task_tag currently asserts an exact JSON matching
which assumes task 59 had no preexisting tags; change the assertion to check
that tag is present in response.json()["task_tag"]["tag"] (i.e., a
subset/contains check) instead of equality to allow pre-seeded tags, and keep
the existing verification via get_tags(id_=task_id, expdb=expdb_test). Also add
a new test that posts to the same endpoint ("/tasks/tag" using py_api.post) with
a non-existent task_id and asserts the correct error behavior (e.g., 4xx status
and appropriate error body) to verify handling of missing FK/invalid task IDs.

Comment on lines +29 to +39
tags = database.flows.get_tags(flow_id, expdb)
if tag.casefold() in [t.casefold() for t in tags]:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail={
"code": "473",
"message": "Entity already tagged by this tag.",
"additional_information": f"id={flow_id}; tag={tag}",
},
)
database.flows.tag(flow_id, tag, user_id=user.user_id, connection=expdb)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Search for implementation_tag table definition and constraints
find . -type f \( -name "*.sql" -o -name "*.py" \) | head -30

Repository: openml/server-api

Length of output: 898


🏁 Script executed:

# Look for database models and schema definitions
rg "implementation_tag" -A 5 -B 2 --max-count=20

Repository: openml/server-api

Length of output: 1655


🏁 Script executed:

# Find the database.flows module to examine tag() and get_tags() implementations
fd "database" --type f -e py | head -20

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Check the SQL schema files for implementation_tag constraints
cat ./docker/mysql/data/openml.sql | grep -A 10 "implementation_tag"

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Also check the _expdb schema
cat ./docker/mysql/data/openml_expdb.sql | grep -A 10 "implementation_tag"

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Read the full flows.py database module to see tag() and get_tags() functions
cat -n src/database/flows.py

Repository: openml/server-api

Length of output: 3895


🏁 Script executed:

# Try to read schema files with head/tail to see structure
head -100 ./docker/mysql/data/openml.sql | grep -A 15 "implementation_tag"

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Read the router flows.py file to see the full context of lines 29-39
cat -n src/routers/openml/flows.py | head -60

Repository: openml/server-api

Length of output: 2676


🏁 Script executed:

# Search for implementation_tag table creation in SQL files
fd -e sql -x grep -l "implementation_tag" {}

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Try to read the SQL files line by line
wc -l ./docker/mysql/data/openml.sql ./docker/mysql/data/openml_expdb.sql

Repository: openml/server-api

Length of output: 143


🏁 Script executed:

# Search for UNIQUE constraint patterns in SQL files
rg "UNIQUE.*implementation_tag|implementation_tag.*UNIQUE" --include="*.sql"

Repository: openml/server-api

Length of output: 502


🏁 Script executed:

# Check for PRIMARY KEY or UNIQUE constraints on (id, tag)
rg -A 20 "CREATE TABLE.*implementation_tag" --include="*.sql"

Repository: openml/server-api

Length of output: 502


🏁 Script executed:

# Read the SQL files directly to find implementation_tag table definition
cat ./docker/mysql/data/openml.sql | head -5
cat ./docker/mysql/data/openml_expdb.sql | head -5

Repository: openml/server-api

Length of output: 158


🏁 Script executed:

# Search for implementation_tag CREATE TABLE in both files
grep -n "implementation_tag" ./docker/mysql/data/openml.sql ./docker/mysql/data/openml_expdb.sql | head -20

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Try using sed or awk to extract lines around implementation_tag
sed -n '/implementation_tag/,/;/p' ./docker/mysql/data/openml.sql | head -30
sed -n '/implementation_tag/,/;/p' ./docker/mysql/data/openml_expdb.sql | head -30

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Search for database migration or setup files
find . -type f \( -name "*migration*" -o -name "*schema*" -o -name "*setup*" \) | grep -E "\.(py|sql)$"

Repository: openml/server-api

Length of output: 380


🏁 Script executed:

# Search for IntegrityError handling in the codebase
rg "IntegrityError" -t py

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Check database/__init__.py and database/setup.py for schema or constraint info
cat -n src/database/setup.py

Repository: openml/server-api

Length of output: 1145


🏁 Script executed:

# Check if there's any documentation about the schema in README files
find . -name "README*" -o -name "readme*" | head -5

Repository: openml/server-api

Length of output: 142


🏁 Script executed:

# Search for error handling around database.flows.tag() calls
rg "database\.flows\.tag|\.tag\(" -A 5 -B 5 src/routers/

Repository: openml/server-api

Length of output: 2753


🏁 Script executed:

# Check if there's try/except around tag operations in router
rg "try|except|IntegrityError|SQLAlchemy" -A 3 src/routers/openml/flows.py

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Look for ON DUPLICATE KEY UPDATE or similar patterns in database module
rg "ON DUPLICATE|INSERT IGNORE|INSERT OR" src/database/

Repository: openml/server-api

Length of output: 43


🏁 Script executed:

# Check the actual imports and error handling setup in database/flows.py
head -20 src/database/flows.py

Repository: openml/server-api

Length of output: 563


TOCTOU race condition in duplicate-tag check

The get_tags → casefold check → tag() insert sequence is not atomic. Two concurrent requests for the same (flow_id, tag) can both pass Line 30 and both reach the INSERT on Line 39, resulting in either a silent duplicate row or an unhandled database exception.

The tag() function (in src/database/flows.py) executes a plain INSERT with no error handling. No try/except wraps the call, and the query lacks ON DUPLICATE KEY UPDATE or similar protection. Even if a unique constraint exists on (id, tag) in the schema, the resulting IntegrityError would propagate uncaught rather than returning the proper 473 response.

Fix: Add a UNIQUE constraint on (id, tag) in implementation_tag and wrap the tag() call in a try/except to catch and translate IntegrityError to code 473, or use INSERT IGNORE/ON DUPLICATE KEY UPDATE with a post-insert existence check.

Note: The same pattern exists in tasks.py, runs.py, and datasets.py.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routers/openml/flows.py` around lines 29 - 39, The current get_tags ->
tag sequence has a TOCTOU race: add a UNIQUE constraint on implementation_tag
for (id, tag) at the schema level, and modify the call site around
database.flows.tag(flow_id, tag, user_id=...) to catch the DB integrity
exception (e.g., IntegrityError or the DB driver's unique-constraint error) and
translate it into the same HTTPException payload (status_code
HTTPStatus.INTERNAL_SERVER_ERROR, detail code "473" with id and tag) instead of
letting the exception propagate; alternatively, change database.flows.tag to use
an idempotent INSERT (INSERT IGNORE / ON DUPLICATE KEY UPDATE) or perform an
atomic existence-check-and-insert in the DB and surface the 473 response on
duplicate—apply the same pattern to database.flows.tag usage in tasks.py,
runs.py, and datasets.py.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@tests/routers/openml/run_tag_test.py`:
- Line 31: Update the incorrect type annotation for the test parameter named
"key" in tests/routers/openml/run_tag_test.py to allow None (e.g., change from
"key: ApiKey" to "key: ApiKey | None" or "Optional[ApiKey]") so it matches the
parametrized cases that include None; apply the same change for the second
occurrence around the other test (referenced at the other occurrence).
- Around line 59-62: The three setup POST calls using
py_api.post("/runs/tag?api_key=...") in tests test_run_tag_fails_if_tag_exists,
test_run_untag, and test_run_untag_fails_if_not_owner must assert their
responses so silent failures don't mask the actual test intent; update each
setup to capture the response and assert the expected success (e.g.,
HTTPStatus.OK or the expected response code/value) before proceeding with the
rest of the test, so subsequent asserts (like checking error codes
"473"/"477"/"478") fail for the right reasons.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
tests/routers/openml/flow_tag_test.py (1)

113-125: Consider adding a positive ownership test (non-admin untags their own tag).

Same parity gap as in run_tag_test.py — the negative path (test_flow_untag_fails_if_not_owner) is covered but the symmetric positive case (non-admin user successfully untagging their own tag) is missing, unlike the tasks test suite.

➕ Suggested test
def test_flow_untag_non_admin_own_tag(flow: Flow, expdb_test: Connection, py_api: TestClient) -> None:
    tag = "user_tag"
    setup = py_api.post(
        f"/flows/tag?api_key={ApiKey.SOME_USER}",
        json={"flow_id": flow.id, "tag": tag},
    )
    assert setup.status_code == HTTPStatus.OK
    response = py_api.post(
        f"/flows/untag?api_key={ApiKey.SOME_USER}",
        json={"flow_id": flow.id, "tag": tag},
    )
    assert response.status_code == HTTPStatus.OK

    tags = get_tags(flow_id=flow.id, expdb=expdb_test)
    assert tag not in tags
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/routers/openml/flow_tag_test.py` around lines 113 - 125, Add a
symmetric positive test verifying a non-admin can untag their own tag: create a
new test named something like test_flow_untag_non_admin_own_tag that uses the
flow fixture, posts a tag with py_api using ApiKey.SOME_USER, asserts the tag
setup returns HTTPStatus.OK, then calls the untag endpoint with the same
ApiKey.SOME_USER and asserts HTTPStatus.OK, and finally calls
get_tags(flow_id=flow.id, expdb=expdb_test) to assert the tag is no longer
present; mirror the existing test_flow_untag_fails_if_not_owner flow but use the
non-admin user for both tag and untag and include the expdb_test check.
tests/routers/openml/run_tag_test.py (1)

125-137: Consider adding a positive ownership test (non-admin untags their own tag).

test_run_untag_fails_if_not_owner covers the rejection path; the analogous positive case — a non-admin user successfully untagging a tag they own — is absent here, unlike test_task_untag_non_admin_own_tag in task_tag_test.py. The ownership predicate is identical across modules, so the gap is low-risk, but parity with the task tests would be cleaner.

➕ Suggested test
def test_run_untag_non_admin_own_tag(run_id: int, expdb_test: Connection, py_api: TestClient) -> None:
    tag = "user_tag"
    setup = py_api.post(
        f"/runs/tag?api_key={ApiKey.SOME_USER}",
        json={"run_id": run_id, "tag": tag},
    )
    assert setup.status_code == HTTPStatus.OK
    response = py_api.post(
        f"/runs/untag?api_key={ApiKey.SOME_USER}",
        json={"run_id": run_id, "tag": tag},
    )
    assert response.status_code == HTTPStatus.OK

    tags = get_tags(id_=run_id, expdb=expdb_test)
    assert tag not in tags
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/routers/openml/run_tag_test.py` around lines 125 - 137, Add a positive
test that ensures a non-admin user can untag a tag they created: implement a new
test function (e.g., test_run_untag_non_admin_own_tag) which uses py_api to POST
to /runs/tag with ApiKey.SOME_USER and json {"run_id": run_id, "tag":
"user_tag"}, asserts the setup returned HTTPStatus.OK, then POSTs to /runs/untag
with the same ApiKey.SOME_USER and tag, asserts HTTPStatus.OK, and finally
verifies the tag was removed by calling get_tags(id_=run_id, expdb=expdb_test)
and asserting the tag is not present; reference the existing
test_run_untag_fails_if_not_owner, ApiKey.SOME_USER, /runs/tag, /runs/untag, and
get_tags to place the test consistently with other tag tests.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/routers/openml/runs.py`:
- Around line 15-39: Check that the run exists before inserting a tag: in
tag_run, call an existence/read helper (e.g., database.runs.get or
database.runs.get_by_id) using run_id and expdb before calling
database.runs.tag; if the helper returns no row, raise an HTTPException
(HTTPStatus.NOT_FOUND) with an appropriate detail payload (e.g.,
{"code":"404","message":"Run not found", "additional_information":
f"id={run_id}"}). Do the same existence check path before the duplicate-tag
check so you don't query tags for a missing run, and mirror this pattern for
tag_flow and tag_task (use the same database.runs.* helper names shown in this
diff to locate where to add the check).

In `@src/routers/openml/tasks.py`:
- Around line 154-178: tag_task currently writes tags without verifying the task
exists, which can create orphan rows or leak DB errors; before calling
database.tasks.tag, call the task-existence check used elsewhere (e.g., the same
lookup used by get_task or a database.tasks.get/get_by_id with the provided
expdb connection) and if not found raise an HTTPException 404 with the same
response shape used by get_task; only proceed to call database.tasks.tag and
then database.tasks.get_tags when the task exists.

---

Duplicate comments:
In `@src/routers/openml/flows.py`:
- Around line 17-41: tag_flow currently skips an existence check and is
vulnerable to a TOCTOU race between get_tags and tag; before proceeding call
database.flows.get(flow_id, expdb) and raise an HTTPException (same auth-style
payload) if it returns None, then perform the tag insertion inside a try/except
that catches the DB integrity error (e.g., IntegrityError from your DB layer)
around database.flows.tag(flow_id, tag, user_id=user.user_id, connection=expdb)
and translate that error into the existing 473 HTTPException for duplicate tags;
keep the case-insensitive pre-check with database.flows.get_tags(flow_id, expdb)
as an early fast-fail but rely on the IntegrityError guard as the definitive
protection and still re-fetch tags after successful insertion.

In `@src/routers/openml/runs.py`:
- Around line 27-37: The current get_tags -> casefold check ->
database.runs.tag(...) sequence is vulnerable to TOCTOU; keep the pre-check if
desired but make the insert atomic by wrapping the call to
database.runs.tag(run_id, tag, user_id=user.user_id, connection=expdb) in a
try/except that catches the DB integrity exception (e.g.,
sqlalchemy.exc.IntegrityError or the DB client’s unique constraint error) and on
that exception raise the same HTTPException (status_code
HTTPStatus.INTERNAL_SERVER_ERROR with code "473" and the same detail payload) so
concurrent duplicate inserts are handled correctly; reference
database.runs.get_tags and database.runs.tag in the change.

---

Nitpick comments:
In `@tests/routers/openml/flow_tag_test.py`:
- Around line 113-125: Add a symmetric positive test verifying a non-admin can
untag their own tag: create a new test named something like
test_flow_untag_non_admin_own_tag that uses the flow fixture, posts a tag with
py_api using ApiKey.SOME_USER, asserts the tag setup returns HTTPStatus.OK, then
calls the untag endpoint with the same ApiKey.SOME_USER and asserts
HTTPStatus.OK, and finally calls get_tags(flow_id=flow.id, expdb=expdb_test) to
assert the tag is no longer present; mirror the existing
test_flow_untag_fails_if_not_owner flow but use the non-admin user for both tag
and untag and include the expdb_test check.

In `@tests/routers/openml/run_tag_test.py`:
- Around line 125-137: Add a positive test that ensures a non-admin user can
untag a tag they created: implement a new test function (e.g.,
test_run_untag_non_admin_own_tag) which uses py_api to POST to /runs/tag with
ApiKey.SOME_USER and json {"run_id": run_id, "tag": "user_tag"}, asserts the
setup returned HTTPStatus.OK, then POSTs to /runs/untag with the same
ApiKey.SOME_USER and tag, asserts HTTPStatus.OK, and finally verifies the tag
was removed by calling get_tags(id_=run_id, expdb=expdb_test) and asserting the
tag is not present; reference the existing test_run_untag_fails_if_not_owner,
ApiKey.SOME_USER, /runs/tag, /runs/untag, and get_tags to place the test
consistently with other tag tests.

@YassinNouh21
Copy link
Author

Hi @PGijsbers are u gonna look into this ?

@Jayant-kernel Jayant-kernel mentioned this pull request Mar 7, 2026
@PGijsbers
Copy link
Contributor

Hi @YassinNouh21! Thanks for your contribution and sorry for my late reply. I am willing to try and get this PR merged if you are still available to make changes. If you are, please update the PR with:

You can use the dataset/tag and setup/untag endpoints as reference, they use the updated style.

If you're still interested, then I will review the PR after the updates are made.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (3)
src/routers/openml/runs.py (1)

15-30: ⚠️ Potential issue | 🟠 Major

tag_run does not validate that run_id exists before inserting a tag.

The tag_entity helper doesn't verify the entity exists. A non-existent run_id will either silently create an orphaned row (no FK constraint) or produce an unhandled DB exception.

Add an existence check before calling tag_entity:

🛡️ Proposed fix
 `@router.post`(path="/tag")
 def tag_run(
     run_id: Annotated[int, Body()],
     tag: Annotated[str, SystemString64],
     user: Annotated[User | None, Depends(fetch_user)] = None,
     expdb: Annotated[Connection, Depends(expdb_connection)] = None,
 ) -> dict[str, dict[str, Any]]:
+    if database.runs.get(run_id, expdb) is None:
+        raise HTTPException(
+            status_code=HTTPStatus.NOT_FOUND,
+            detail={"code": "270", "message": "Run not found."},
+        )
     return tag_entity(
         run_id,
         tag,
         require_user(user),
         ...
     )

You'll need to add imports:

from http import HTTPStatus
from fastapi import HTTPException
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routers/openml/runs.py` around lines 15 - 30, The tag_run endpoint must
verify the run exists before calling tag_entity: call the runs existence check
via the database API (e.g., use database.runs.get or a suitable exists method)
inside tag_run after require_user(user) and before tag_entity, and if not found
raise HTTPException(status_code=HTTPStatus.NOT_FOUND, detail="Run not found");
add the imports for HTTPException and HTTPStatus at the top of the file. Ensure
you still pass database.runs.get_tags and database.runs.tag into tag_entity
unchanged so only the pre-check is added.
src/core/tagging.py (1)

30-41: ⚠️ Potential issue | 🟠 Major

TOCTOU race condition between duplicate check and INSERT.

The get_tags_fn → casefold check → tag_fn sequence is not atomic. Concurrent requests can both pass the duplicate check and attempt to INSERT, causing an unhandled IntegrityError (if a UNIQUE constraint exists) instead of the clean 473 response.

Wrap tag_fn in a try/except for IntegrityError and translate it to error code 473.

🛡️ Proposed fix
+from sqlalchemy.exc import IntegrityError
+
 def tag_entity(
     ...
 ) -> dict[str, dict[str, Any]]:
     tags = get_tags_fn(entity_id, expdb)
     if tag.casefold() in (t.casefold() for t in tags):
         raise HTTPException(
             status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
             detail={
                 "code": "473",
                 "message": "Entity already tagged by this tag.",
                 "additional_information": f"id={entity_id}; tag={tag}",
             },
         )
-    tag_fn(entity_id, tag, user_id=user.user_id, expdb=expdb)
+    try:
+        tag_fn(entity_id, tag, user_id=user.user_id, expdb=expdb)
+    except IntegrityError:
+        raise HTTPException(
+            status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
+            detail={
+                "code": "473",
+                "message": "Entity already tagged by this tag.",
+                "additional_information": f"id={entity_id}; tag={tag}",
+            },
+        ) from None
     tags = get_tags_fn(entity_id, expdb)
     return {response_key: {"id": str(entity_id), "tag": tags}}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/core/tagging.py` around lines 30 - 41, The duplicate-check then insert
(get_tags_fn → casefold check → tag_fn) has a TOCTOU race: concurrent requests
can both pass the check and cause a UNIQUE constraint IntegrityError; wrap the
call to tag_fn(...) in a try/except that catches the DB integrity exception
(e.g., sqlalchemy.exc.IntegrityError or the DB library's IntegrityError) and
raise the same HTTPException with status_code HTTPStatus.INTERNAL_SERVER_ERROR
and detail code "473" (same message and additional_information using entity_id
and tag) so concurrent inserts are translated to the canonical 473 response;
after a successful tag_fn call, still re-fetch tags with get_tags_fn(entity_id,
expdb).
src/routers/openml/flows.py (1)

18-33: ⚠️ Potential issue | 🟠 Major

tag_flow does not validate that the flow exists before inserting a tag.

Despite a past review comment being marked as addressed, the current implementation still lacks an entity existence check. The tag_entity helper doesn't verify the entity exists, so a non-existent flow_id will either create an orphaned row or cause an unhandled DB exception.

🛡️ Proposed fix
 `@router.post`(path="/tag")
 def tag_flow(
     flow_id: Annotated[int, Body()],
     tag: Annotated[str, SystemString64],
     user: Annotated[User | None, Depends(fetch_user)] = None,
     expdb: Annotated[Connection, Depends(expdb_connection)] = None,
 ) -> dict[str, dict[str, Any]]:
+    if database.flows.get(flow_id, expdb) is None:
+        raise HTTPException(
+            status_code=HTTPStatus.NOT_FOUND,
+            detail={"code": "180", "message": "Unknown flow."},
+        )
     return tag_entity(
         flow_id,
         tag,
         require_user(user),
         ...
     )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routers/openml/flows.py` around lines 18 - 33, tag_flow currently calls
tag_entity without verifying the flow exists, which can produce orphaned tags or
DB errors; before calling tag_entity, fetch the flow using the flows table
reader (e.g., call database.flows.get_by_id(flow_id) or
database.flows.get(flow_id) depending on the available API) with the same expdb
connection and require_user result, return a 404/raise an appropriate HTTP error
if the fetch returns None, and only then call tag_entity with
database.flows.get_tags and database.flows.tag; reference tag_flow, tag_entity,
database.flows.get_tags, database.flows.tag, require_user and expdb in your
change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@src/core/tagging.py`:
- Around line 30-41: The duplicate-check then insert (get_tags_fn → casefold
check → tag_fn) has a TOCTOU race: concurrent requests can both pass the check
and cause a UNIQUE constraint IntegrityError; wrap the call to tag_fn(...) in a
try/except that catches the DB integrity exception (e.g.,
sqlalchemy.exc.IntegrityError or the DB library's IntegrityError) and raise the
same HTTPException with status_code HTTPStatus.INTERNAL_SERVER_ERROR and detail
code "473" (same message and additional_information using entity_id and tag) so
concurrent inserts are translated to the canonical 473 response; after a
successful tag_fn call, still re-fetch tags with get_tags_fn(entity_id, expdb).

In `@src/routers/openml/flows.py`:
- Around line 18-33: tag_flow currently calls tag_entity without verifying the
flow exists, which can produce orphaned tags or DB errors; before calling
tag_entity, fetch the flow using the flows table reader (e.g., call
database.flows.get_by_id(flow_id) or database.flows.get(flow_id) depending on
the available API) with the same expdb connection and require_user result,
return a 404/raise an appropriate HTTP error if the fetch returns None, and only
then call tag_entity with database.flows.get_tags and database.flows.tag;
reference tag_flow, tag_entity, database.flows.get_tags, database.flows.tag,
require_user and expdb in your change.

In `@src/routers/openml/runs.py`:
- Around line 15-30: The tag_run endpoint must verify the run exists before
calling tag_entity: call the runs existence check via the database API (e.g.,
use database.runs.get or a suitable exists method) inside tag_run after
require_user(user) and before tag_entity, and if not found raise
HTTPException(status_code=HTTPStatus.NOT_FOUND, detail="Run not found"); add the
imports for HTTPException and HTTPStatus at the top of the file. Ensure you
still pass database.runs.get_tags and database.runs.tag into tag_entity
unchanged so only the pre-check is added.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 35132465-3fa0-4162-b584-aa171111aed2

📥 Commits

Reviewing files that changed from the base of the PR and between 18e713f and 56036af.

📒 Files selected for processing (8)
  • src/core/tagging.py
  • src/database/flows.py
  • src/database/runs.py
  • src/database/tagging.py
  • src/database/tasks.py
  • src/routers/openml/flows.py
  • src/routers/openml/runs.py
  • src/routers/openml/tasks.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/routers/openml/tasks.py

- Add POST /tasks/tag, /tasks/untag, /flows/tag, /flows/untag, /runs/tag, /runs/untag
- Extract shared database helpers (database/tagging.py) to avoid SQL duplication
- Extract shared router logic (core/tagging.py) with tag_entity/untag_entity helpers
- Use new error classes (TagAlreadyExistsError, TagNotFoundError, TagNotOwnedError)
- Use fetch_user_or_raise dependency for auth, matching existing patterns
- Add database/runs.py with get, get_tags, tag, get_tag, delete_tag
- Register runs router in main.py
- 33 tests covering auth, tagging, untagging, duplicates, ownership
@YassinNouh21 YassinNouh21 force-pushed the feature/tag-untag-endpoints branch from 56036af to a1e8a42 Compare March 17, 2026 16:11
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
tests/routers/openml/run_tag_test.py (1)

62-77: Assert the OpenML error codes in these failure-path tests.

Right now these checks only lock down 409/404/403. The tagging contract also distinguishes these cases with app-specific codes like 473, 477, and 478, so a wrong error mapping from the shared helper would still pass here. Adding code/message assertions would make these regressions much easier to catch.

Also applies to: 119-127, 151-165

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/routers/openml/run_tag_test.py` around lines 62 - 77, The failure-path
tests for run tagging (e.g., test_run_tag_fails_if_tag_exists which calls
py_api.post against "/runs/tag" with ApiKey.ADMIN) only assert HTTP status
codes; update them to also assert the app-specific OpenML error code/message in
the JSON body (for example expect code 473/477/478 as appropriate) so wrong
internal mappings fail the test; do the same change for the other tagging
failure tests in this file that post to "/runs/tag" (the not-found and not-admin
cases) by reading response.json() and asserting both response.status_code and
response_json["code"] and optionally response_json["message"] match the expected
app-specific values.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/database/tagging.py`:
- Around line 7-8: The helpers in this module accept the identifier parameters
table and id_column and interpolate them directly into shared SQL; validate
these inputs against a closed allowlist before any string interpolation by
adding an allowlist mapping (e.g., ALLOWED_TABLES = {"tags": {"id"}, "items":
{"item_id"}}) and checking that table is a key and id_column is one of its
allowed columns, raising ValueError on mismatch; perform this check at the start
of the public helper functions that accept table/id_column so no query is
constructed for invalid values, and when you must embed identifiers into SQL use
a safe identifier mechanism (e.g., psycopg2.sql.Identifier or the DB driver's
proper identifier-quoting helper) rather than naive string concatenation.
- Around line 46-62: remove_tag currently deletes rows by (id, tag) only which
allows TOCTOU deletion by another uploader; change remove_tag to accept an
expected_uploader (or expected_row_pk) parameter and include it in the DELETE
WHERE clause (e.g., AND `uploader` = :expected_uploader or AND primary_key =
:expected_pk) so the DELETE is conditional/atomic; update callers (notably
core.tagging.untag_entity and any pathway used by non-admins) to pass the
checked uploader value when calling remove_tag so authorization is enforced in
the same SQL statement.

---

Nitpick comments:
In `@tests/routers/openml/run_tag_test.py`:
- Around line 62-77: The failure-path tests for run tagging (e.g.,
test_run_tag_fails_if_tag_exists which calls py_api.post against "/runs/tag"
with ApiKey.ADMIN) only assert HTTP status codes; update them to also assert the
app-specific OpenML error code/message in the JSON body (for example expect code
473/477/478 as appropriate) so wrong internal mappings fail the test; do the
same change for the other tagging failure tests in this file that post to
"/runs/tag" (the not-found and not-admin cases) by reading response.json() and
asserting both response.status_code and response_json["code"] and optionally
response_json["message"] match the expected app-specific values.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 08213c45-e514-41ae-a008-ced2bbdfe571

📥 Commits

Reviewing files that changed from the base of the PR and between 56036af and 8068f89.

📒 Files selected for processing (12)
  • src/core/tagging.py
  • src/database/flows.py
  • src/database/runs.py
  • src/database/tagging.py
  • src/database/tasks.py
  • src/main.py
  • src/routers/openml/flows.py
  • src/routers/openml/runs.py
  • src/routers/openml/tasks.py
  • tests/routers/openml/flow_tag_test.py
  • tests/routers/openml/run_tag_test.py
  • tests/routers/openml/task_tag_test.py
🚧 Files skipped from review as they are similar to previous changes (4)
  • src/main.py
  • tests/routers/openml/flow_tag_test.py
  • src/core/tagging.py
  • tests/routers/openml/task_tag_test.py

Comment on lines +7 to +8
table: str,
id_column: str,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Constrain table and id_column before interpolating them into shared SQL.

These helpers are now the reusable tagging boundary, but they accept arbitrary identifier strings and splice them straight into every query. The current callers pass constants, yet one future non-constant call turns this module into an injection sink. Please validate against a closed set of supported identifier combinations here, instead of relying on every caller to stay disciplined.

Also applies to: 16-19, 35-39, 56-59, 74-78

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/database/tagging.py` around lines 7 - 8, The helpers in this module
accept the identifier parameters table and id_column and interpolate them
directly into shared SQL; validate these inputs against a closed allowlist
before any string interpolation by adding an allowlist mapping (e.g.,
ALLOWED_TABLES = {"tags": {"id"}, "items": {"item_id"}}) and checking that table
is a key and id_column is one of its allowed columns, raising ValueError on
mismatch; perform this check at the start of the public helper functions that
accept table/id_column so no query is constructed for invalid values, and when
you must embed identifiers into SQL use a safe identifier mechanism (e.g.,
psycopg2.sql.Identifier or the DB driver's proper identifier-quoting helper)
rather than naive string concatenation.

Comment on lines +46 to +62
async def remove_tag(
*,
table: str,
id_column: str,
id_: int,
tag_: str,
expdb: AsyncConnection,
) -> None:
await expdb.execute(
text(
f"""
DELETE FROM {table}
WHERE `{id_column}` = :id AND `tag` = :tag
""",
),
parameters={"id": id_, "tag": tag_},
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Bind the DELETE to the checked owner.

The new /untag flow does the ownership check before it calls delete_tag, but remove_tag deletes by (id, tag) only. If that row is removed and recreated by someone else between the check and this DELETE, the original requester can still delete the new owner's tag. Make the mutation conditional on the expected uploader (or row PK) so authorization and deletion stay atomic.

🔒 Suggested direction
 async def remove_tag(
     *,
     table: str,
     id_column: str,
     id_: int,
     tag_: str,
+    uploader: int | None = None,
     expdb: AsyncConnection,
 ) -> None:
+    uploader_clause = " AND `uploader` = :uploader" if uploader is not None else ""
     await expdb.execute(
         text(
             f"""
             DELETE FROM {table}
-            WHERE `{id_column}` = :id AND `tag` = :tag
+            WHERE `{id_column}` = :id AND `tag` = :tag{uploader_clause}
             """,
         ),
-        parameters={"id": id_, "tag": tag_},
+        parameters={
+            "id": id_,
+            "tag": tag_,
+            **({"uploader": uploader} if uploader is not None else {}),
+        },
     )

Then thread the expected uploader through core.tagging.untag_entity for non-admin deletes.

🧰 Tools
🪛 Ruff (0.15.6)

[error] 56-59: Possible SQL injection vector through string-based query construction

(S608)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/database/tagging.py` around lines 46 - 62, remove_tag currently deletes
rows by (id, tag) only which allows TOCTOU deletion by another uploader; change
remove_tag to accept an expected_uploader (or expected_row_pk) parameter and
include it in the DELETE WHERE clause (e.g., AND `uploader` = :expected_uploader
or AND primary_key = :expected_pk) so the DELETE is conditional/atomic; update
callers (notably core.tagging.untag_entity and any pathway used by non-admins)
to pass the checked uploader value when calling remove_tag so authorization is
enforced in the same SQL statement.

@YassinNouh21
Copy link
Author

@PGijsbers are u gonna give it a look again ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

POST /task/tag

2 participants