Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 239 additions & 4 deletions src/routers/openml/tasks.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import json
import re
from typing import Annotated, cast
from enum import StrEnum
from typing import Annotated, Any, cast

import xmltodict
from fastapi import APIRouter, Depends
from fastapi import APIRouter, Body, Depends
from sqlalchemy import RowMapping, text
from sqlalchemy.ext.asyncio import AsyncConnection

import config
import database.datasets
import database.tasks
from core.errors import InternalError, TaskNotFoundError
from routers.dependencies import expdb_connection
from core.errors import InternalError, NoResultsError, TaskNotFoundError
from routers.dependencies import Pagination, expdb_connection
from routers.types import CasualString128, IntegerRange, SystemString64, integer_range_regex
from schemas.datasets.openml import Task

router = APIRouter(prefix="/tasks", tags=["tasks"])
Expand Down Expand Up @@ -157,6 +159,239 @@ async def _fill_json_template( # noqa: C901
return template.replace("[CONSTANT:base_url]", server_url)


class TaskStatusFilter(StrEnum):
"""Valid values for the status filter."""

ACTIVE = "active"
DEACTIVATED = "deactivated"
IN_PREPARATION = "in_preparation"
ALL = "all"


QUALITIES_TO_SHOW = [
"MajorityClassSize",
"MaxNominalAttDistinctValues",
"MinorityClassSize",
"NumberOfClasses",
"NumberOfFeatures",
"NumberOfInstances",
"NumberOfInstancesWithMissingValues",
"NumberOfMissingValues",
"NumberOfNumericFeatures",
"NumberOfSymbolicFeatures",
]

BASIC_TASK_INPUTS = [
"source_data",
"target_feature",
"estimation_procedure",
"evaluation_measures",
]


def _quality_clause(quality: str, range_: str | None) -> str:
"""Return a SQL WHERE clause fragment filtering tasks by a dataset quality range.

Looks up tasks whose source dataset has the given quality within the range.
Range can be exact ('100') or a range ('50..200').
"""
if not range_:
return ""
if not (match := re.match(integer_range_regex, range_)):
msg = f"`range_` not a valid range: {range_}"
raise ValueError(msg)
start, end = match.groups()
# end group looks like "..200", strip the ".." prefix to get just the number
value = f"`value` BETWEEN {start} AND {end[2:]}" if end else f"`value`={start}"
# nested subquery: find datasets with matching quality, then find tasks using those datasets
return f"""
AND t.`task_id` IN (
SELECT ti.`task_id` FROM task_inputs ti
WHERE ti.`input`='source_data' AND ti.`value` IN (
SELECT `data` FROM data_quality
WHERE `quality`='{quality}' AND {value}
)
)
""" # noqa: S608


@router.post(path="/list", description="Provided for convenience, same as `GET` endpoint.")
@router.get(path="/list")
async def list_tasks( # noqa: PLR0913
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider extracting small helper functions for IN-clause construction, range parsing, WHERE-clause assembly, dataset-status subquery, and task-enrichment to make list_tasks shorter and easier to follow.

You can keep the new behavior but reduce complexity by extracting a few focused helpers and reusing them.

1. Extract a generic IN clause builder

You repeat ",".join(...) and f-string interpolation for IDs and names. A tiny helper keeps SQL construction consistent and reduces visual noise:

def _in_clause(values: list[int | str]) -> str:
    # Callers are responsible for correctness of values (already sanitized IDs or enums)
    return ", ".join(str(v) for v in values)

Usage:

task_ids_str = _in_clause(tasks.keys())
did_list = _in_clause([t["did"] for t in tasks.values()])
basic_inputs_str = _in_clause([f"'{i}'" for i in BASIC_TASK_INPUTS])
qualities_str = _in_clause([f"'{q}'" for q in QUALITIES_TO_SHOW])

This shrinks the list_tasks body and centralizes the IN (...) formatting.

2. Split range parsing from quality SQL rendering

_quality_clause currently parses and renders SQL. Splitting the concerns makes both parts easier to read and test.

def _parse_integer_range(range_: str | None) -> tuple[str, str | None] | None:
    if not range_:
        return None
    match = re.match(integer_range_regex, range_)
    if not match:
        msg = f"`range_` not a valid range: {range_}"
        raise ValueError(msg)
    start, end = match.groups()
    end_num = end[2:] if end else None  # strip ".."
    return start, end_num

Then a generic quality clause builder:

def _quality_clause(quality: str, range_: str | None) -> str:
    parsed = _parse_integer_range(range_)
    if not parsed:
        return ""
    start, end = parsed
    value_expr = f"`value` BETWEEN {start} AND {end}" if end else f"`value`={start}"
    return f"""
        AND t.`task_id` IN (
            SELECT ti.`task_id` FROM task_inputs ti
            WHERE ti.`input`='source_data' AND ti.`value` IN (
                SELECT `data` FROM data_quality
                WHERE `quality`='{quality}' AND {value_expr}
            )
        )
    """

This leaves list_tasks with just the high-level mapping:

where_number_instances = _quality_clause("NumberOfInstances", number_instances)
...

3. Extract WHERE-clause assembly into a helper

The top of list_tasks builds many related conditions inline. Move that to a dedicated function that returns both fragments and the parameter dict:

def _build_task_where_clauses(
    status: TaskStatusFilter,
    task_type_id: int | None,
    tag: str | None,
    data_tag: str | None,
    task_id: list[int] | None,
    data_id: list[int] | None,
    data_name: str | None,
    number_instances: str | None,
    number_features: str | None,
    number_classes: str | None,
    number_missing_values: str | None,
) -> tuple[str, dict[str, Any]]:
    if status == TaskStatusFilter.ALL:
        where_status = ""
    else:
        where_status = f"AND IFNULL(ds.`status`, 'in_preparation') = '{status}'"

    where_type = "" if task_type_id is None else "AND t.`ttid` = :task_type_id"
    where_tag = "" if tag is None else \
        "AND t.`task_id` IN (SELECT `id` FROM task_tag WHERE `tag` = :tag)"
    where_data_tag = "" if data_tag is None else \
        "AND d.`did` IN (SELECT `id` FROM dataset_tag WHERE `tag` = :data_tag)"
    where_task_id = "" if not task_id else f"AND t.`task_id` IN ({_in_clause(task_id)})"
    where_data_id = "" if not data_id else f"AND d.`did` IN ({_in_clause(data_id)})"
    where_data_name = "" if data_name is None else "AND d.`name` = :data_name"

    where_number_instances = _quality_clause("NumberOfInstances", number_instances)
    where_number_features = _quality_clause("NumberOfFeatures", number_features)
    where_number_classes = _quality_clause("NumberOfClasses", number_classes)
    where_number_missing_values = _quality_clause(
        "NumberOfMissingValues", number_missing_values
    )

    where_sql = (
        where_status
        + where_type
        + where_tag
        + where_data_tag
        + where_task_id
        + where_data_id
        + where_data_name
        + where_number_instances
        + where_number_features
        + where_number_classes
        + where_number_missing_values
    )

    params = {
        "task_type_id": task_type_id,
        "tag": tag,
        "data_tag": data_tag,
        "data_name": data_name,
    }

    return where_sql, params

Then list_tasks reduces to:

where_sql, params = _build_task_where_clauses(
    status, task_type_id, tag, data_tag, task_id, data_id,
    data_name, number_instances, number_features,
    number_classes, number_missing_values,
)

query = text(f"""
    SELECT ...
    FROM ...
    WHERE 1=1
        {where_sql}
    GROUP BY ...
    LIMIT {pagination.limit} OFFSET {pagination.offset}
""")
result = await expdb.execute(query, parameters=params)

4. Extract the dataset-status subquery

Moving the status subquery out of the main query makes the main SQL much easier to read:

LATEST_DATASET_STATUS_SUBQUERY = """
    SELECT ds1.did, ds1.status
    FROM dataset_status ds1
    WHERE ds1.status_date = (
        SELECT MAX(ds2.status_date)
        FROM dataset_status ds2
        WHERE ds1.did = ds2.did
    )
"""

Then in list_tasks:

query = text(f"""
    SELECT ...
    FROM task t
    ...
    LEFT JOIN ({LATEST_DATASET_STATUS_SUBQUERY}) ds ON ds.`did` = d.`did`
    WHERE 1=1
        {where_sql}
    ...
""")

5. Unify enrichment of tasks (inputs/qualities/tags)

You can use a generic helper for the “append or init list” pattern to avoid repetition:

def _append_task_attr(
    tasks: dict[int, dict[str, Any]],
    task_id: int,
    key: str,
    value: Any,
) -> None:
    tasks[task_id].setdefault(key, []).append(value)

Then:

# inputs
for row in inputs_result.all():
    _append_task_attr(
        tasks,
        row.task_id,
        "input",
        {"name": row.input, "value": row.value},
    )

# qualities
for row in qualities_result.all():
    tid = did_to_task_id.get(row.data)
    if tid is not None:
        _append_task_attr(
            tasks,
            tid,
            "quality",
            {"name": row.quality, "value": str(row.value)},
        )

# tags
for row in tags_result.all():
    _append_task_attr(tasks, row.id, "tag", row.tag)

And a single “ensure keys” helper:

def _ensure_task_keys(task: dict[str, Any], keys: list[str]) -> None:
    for key in keys:
        task.setdefault(key, [])

Usage:

for task in tasks.values():
    _ensure_task_keys(task, ["input", "quality", "tag"])

These extractions keep all current behavior and SQL intact, but make list_tasks mostly a high‑level orchestration over small, single-purpose helpers, which should address the complexity concerns.

pagination: Annotated[Pagination, Body(default_factory=Pagination)],
task_type_id: Annotated[int | None, Body(description="Filter by task type id.")] = None,
tag: Annotated[str | None, SystemString64] = None,
data_tag: Annotated[str | None, SystemString64] = None,
status: Annotated[TaskStatusFilter, Body()] = TaskStatusFilter.ACTIVE,
task_id: Annotated[list[int] | None, Body(description="Filter by task id(s).")] = None,
data_id: Annotated[list[int] | None, Body(description="Filter by dataset id(s).")] = None,
data_name: Annotated[str | None, CasualString128] = None,
number_instances: Annotated[str | None, IntegerRange] = None,
number_features: Annotated[str | None, IntegerRange] = None,
number_classes: Annotated[str | None, IntegerRange] = None,
number_missing_values: Annotated[str | None, IntegerRange] = None,
expdb: Annotated[AsyncConnection, Depends(expdb_connection)] = None,
) -> list[dict[str, Any]]:
"""List tasks, optionally filtered by type, tag, status, dataset properties, and more."""
assert expdb is not None # noqa: S101

# --- WHERE clauses ---
if status == TaskStatusFilter.ALL:
where_status = ""
else:
where_status = f"AND IFNULL(ds.`status`, 'in_preparation') = '{status}'"

where_type = "" if task_type_id is None else "AND t.`ttid` = :task_type_id"
where_tag = (
"" if tag is None else "AND t.`task_id` IN (SELECT `id` FROM task_tag WHERE `tag` = :tag)"
)
where_data_tag = (
""
if data_tag is None
else "AND d.`did` IN (SELECT `id` FROM dataset_tag WHERE `tag` = :data_tag)"
)
task_id_str = ",".join(str(tid) for tid in task_id) if task_id else ""
where_task_id = "" if not task_id else f"AND t.`task_id` IN ({task_id_str})"
data_id_str = ",".join(str(did) for did in data_id) if data_id else ""
where_data_id = "" if not data_id else f"AND d.`did` IN ({data_id_str})"
where_data_name = "" if data_name is None else "AND d.`name` = :data_name"

where_number_instances = _quality_clause("NumberOfInstances", number_instances)
where_number_features = _quality_clause("NumberOfFeatures", number_features)
where_number_classes = _quality_clause("NumberOfClasses", number_classes)
where_number_missing_values = _quality_clause("NumberOfMissingValues", number_missing_values)

basic_inputs_str = ", ".join(f"'{i}'" for i in BASIC_TASK_INPUTS)

# subquery to get the latest status per dataset
# dataset_status has multiple rows per dataset (history), we want only the most recent
status_subquery = """
SELECT ds1.did, ds1.status
FROM dataset_status ds1
WHERE ds1.status_date = (
SELECT MAX(ds2.status_date) FROM dataset_status ds2
WHERE ds1.did = ds2.did
)
"""

query = text(
f"""
SELECT
t.`task_id`,
t.`ttid` AS task_type_id,
tt.`name` AS task_type,
d.`did`,
d.`name`,
d.`format`,
IFNULL(ds.`status`, 'in_preparation') AS status
FROM task t
JOIN task_type tt ON tt.`ttid` = t.`ttid`
JOIN task_inputs ti_source ON ti_source.`task_id` = t.`task_id`
AND ti_source.`input` = 'source_data'
JOIN dataset d ON d.`did` = ti_source.`value`
LEFT JOIN ({status_subquery}) ds ON ds.`did` = d.`did`
WHERE 1=1
{where_status}
{where_type}
{where_tag}
{where_data_tag}
{where_task_id}
{where_data_id}
{where_data_name}
{where_number_instances}
{where_number_features}
{where_number_classes}
{where_number_missing_values}
GROUP BY t.`task_id`, t.`ttid`, tt.`name`, d.`did`, d.`name`, d.`format`, ds.`status`
ORDER BY t.`task_id`
LIMIT {pagination.limit} OFFSET {pagination.offset}
""", # noqa: S608
)

result = await expdb.execute(
query,
parameters={
"task_type_id": task_type_id,
"tag": tag,
"data_tag": data_tag,
"data_name": data_name,
},
)
rows = result.mappings().all()

if not rows:
msg = "No tasks match the search criteria."
raise NoResultsError(msg)

columns = ["task_id", "task_type_id", "task_type", "did", "name", "format", "status"]
tasks: dict[int, dict[str, Any]] = {
row["task_id"]: {col: row[col] for col in columns} for row in rows
}

# fetch inputs for all tasks in one query
task_ids_str = ",".join(str(tid) for tid in tasks)
inputs_result = await expdb.execute(
text(
f"""
SELECT `task_id`, `input`, `value`
FROM task_inputs
WHERE `task_id` IN ({task_ids_str})
AND `input` IN ({basic_inputs_str})
""", # noqa: S608
),
)
for row in inputs_result.all():
tasks[row.task_id].setdefault("input", []).append(
{"name": row.input, "value": row.value},
)

# fetch qualities for all datasets in one query
did_list = ",".join(str(t["did"]) for t in tasks.values())
qualities_str = ", ".join(f"'{q}'" for q in QUALITIES_TO_SHOW)
qualities_result = await expdb.execute(
text(
f"""
SELECT `data`, `quality`, `value`
FROM data_quality
WHERE `data` IN ({did_list})
AND `quality` IN ({qualities_str})
""", # noqa: S608
),
)
# build a reverse map: dataset_id -> task_id
# needed because quality rows come back keyed by did, but our tasks dict is keyed by task_id
did_to_task_ids: dict[int, list[int]] = {}
for tid, t in tasks.items():
did_to_task_ids.setdefault(t["did"], []).append(tid)
for row in qualities_result.all():
for tid in did_to_task_ids.get(row.data, []):
tasks[tid].setdefault("quality", []).append(
{"name": row.quality, "value": str(row.value)},
)

# fetch tags for all tasks in one query
tags_result = await expdb.execute(
text(
f"""
SELECT `id`, `tag`
FROM task_tag
WHERE `id` IN ({task_ids_str})
""", # noqa: S608
),
)
for row in tags_result.all():
tasks[row.id].setdefault("tag", []).append(row.tag)

# ensure every task has all expected keys(input/quality/tag) even if no rows were found for them
# e.g. a task with no tags should return "tag": [] not missing key
for task in tasks.values():
task.setdefault("input", [])
task.setdefault("quality", [])
task.setdefault("tag", [])

return list(tasks.values())


@router.get("/{task_id}")
async def get_task(
task_id: int,
Expand Down
98 changes: 98 additions & 0 deletions tests/routers/openml/task_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,104 @@
import httpx


async def test_list_tasks_default(py_api: httpx.AsyncClient) -> None:
"""Default call returns active tasks with correct shape."""
response = await py_api.post("/tasks/list", json={})
assert response.status_code == HTTPStatus.OK
tasks = response.json()
assert isinstance(tasks, list)
assert len(tasks) > 0
assert all(task["status"] == "active" for task in tasks)
# verify shape of first task
task = tasks[0]
assert "task_id" in task
assert "task_type_id" in task
assert "task_type" in task
assert "did" in task
assert "name" in task
assert "format" in task
assert "status" in task
assert "input" in task
assert "quality" in task
assert "tag" in task


async def test_list_tasks_filter_type(py_api: httpx.AsyncClient) -> None:
"""Filter by task_type_id returns only tasks of that type."""
response = await py_api.post("/tasks/list", json={"task_type_id": 1})
assert response.status_code == HTTPStatus.OK
tasks = response.json()
assert len(tasks) > 0
assert all(t["task_type_id"] == 1 for t in tasks)


async def test_list_tasks_filter_tag(py_api: httpx.AsyncClient) -> None:
"""Filter by tag returns only tasks with that tag."""
response = await py_api.post("/tasks/list", json={"tag": "OpenML100"})
assert response.status_code == HTTPStatus.OK
tasks = response.json()
assert len(tasks) > 0
assert all("OpenML100" in t["tag"] for t in tasks)
Comment on lines +38 to +44
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add coverage for the remaining filters (data_tag, task_id, data_id, data_name, and other quality ranges).

Current tests exercise task_type_id, tag, pagination, and number_instances, but not the other supported filters: data_tag, task_id, data_id, data_name, number_features, number_classes, and number_missing_values. This leaves several WHERE-clause branches untested.

Consider adding targeted tests such as:

  • data_tag: only tasks whose dataset has the given tag are returned.
  • task_id: multiple IDs return exactly those tasks (and correct ordering/uniqueness if relevant).
  • data_id: only tasks for the specified dataset(s) are returned.
  • data_name: verifies expected matching behavior (case-(in)sensitivity as intended).
  • One or more quality ranges using number_features or number_classes to exercise _quality_clause beyond NumberOfInstances.

Suggested implementation:

async def test_list_tasks_filter_type(py_api: httpx.AsyncClient) -> None:
    """Filter by task_type_id returns only tasks of that type."""
    response = await py_api.post("/tasks/list", json={"task_type_id": 1})
    assert response.status_code == HTTPStatus.OK
    tasks = response.json()
    assert all(t["task_type_id"] == 1 for t in tasks)




async def test_list_tasks_default(py_api: httpx.AsyncClient) -> None:
    """Default call returns active tasks with correct shape."""
    response = await py_api.post("/tasks/list", json={})
    assert response.status_code == HTTPStatus.OK
    tasks = response.json()
    assert isinstance(tasks, list)
    assert len(tasks) > 0
    # verify shape of first task
    task = tasks[0]


async def test_list_tasks_filter_data_tag(py_api: httpx.AsyncClient) -> None:
    """Filter by data_tag returns only tasks whose dataset has that tag."""
    # Get a tag actually present on at least one dataset
    response = await py_api.post("/tasks/list", json={})
    assert response.status_code == HTTPStatus.OK
    tasks = response.json()
    # Find first task that has at least one data tag
    tagged_task = next(
        t for t in tasks if t.get("data_tags") is not None and len(t.get("data_tags")) > 0
    )
    data_tag = tagged_task["data_tags"][0]

    response = await py_api.post("/tasks/list", json={"data_tag": data_tag})
    assert response.status_code == HTTPStatus.OK
    filtered = response.json()
    assert len(filtered) > 0
    assert all(
        data_tag in t.get("data_tags", []) for t in filtered
    ), "All returned tasks should have the dataset tag used for filtering"


async def test_list_tasks_filter_task_id(py_api: httpx.AsyncClient) -> None:
    """Filter by task_id returns exactly the requested tasks."""
    # Discover a small set of valid task IDs from the default listing
    response = await py_api.post("/tasks/list", json={})
    assert response.status_code == HTTPStatus.OK
    tasks = response.json()
    assert len(tasks) >= 2

    requested_ids = sorted(t["task_id"] for t in tasks[:3])

    response = await py_api.post("/tasks/list", json={"task_id": requested_ids})
    assert response.status_code == HTTPStatus.OK
    filtered = response.json()
    returned_ids = sorted(t["task_id"] for t in filtered)

    assert set(returned_ids) == set(
        requested_ids
    ), "Filtering by task_id should return exactly the requested tasks"
    # If the API defines a particular ordering, assert on that here as well
    # (e.g. sorted by task_id). The current check ensures no duplicates.
    assert len(returned_ids) == len(set(returned_ids))


async def test_list_tasks_filter_data_id(py_api: httpx.AsyncClient) -> None:
    """Filter by data_id returns only tasks from those datasets."""
    response = await py_api.post("/tasks/list", json={})
    assert response.status_code == HTTPStatus.OK
    tasks = response.json()
    assert len(tasks) > 0

    # Take a couple of distinct data_ids from the available tasks
    data_ids = []
    for t in tasks:
        if t["data_id"] not in data_ids:
            data_ids.append(t["data_id"])
        if len(data_ids) == 2:
            break

    response = await py_api.post("/tasks/list", json={"data_id": data_ids})
    assert response.status_code == HTTPStatus.OK
    filtered = response.json()
    assert len(filtered) > 0
    assert all(
        t["data_id"] in data_ids for t in filtered
    ), "All returned tasks should belong to one of the requested data_ids"


async def test_list_tasks_filter_data_name(py_api: httpx.AsyncClient) -> None:
    """Filter by data_name returns only tasks whose dataset has that name."""
    response = await py_api.post("/tasks/list", json={})
    assert response.status_code == HTTPStatus.OK
    tasks = response.json()
    assert len(tasks) > 0

    data_name = tasks[0]["data_name"]

    # Use the exact name observed in the listing
    response = await py_api.post("/tasks/list", json={"data_name": data_name})
    assert response.status_code == HTTPStatus.OK
    filtered = response.json()
    assert len(filtered) > 0
    assert all(
        t["data_name"] == data_name for t in filtered
    ), "All returned tasks should have the requested data_name"


async def test_list_tasks_filter_number_features_range(py_api: httpx.AsyncClient) -> None:
    """Filter by number_features quality range returns tasks within that range."""
    # Probe existing tasks to discover a concrete number_features value
    response = await py_api.post("/tasks/list", json={})
    assert response.status_code == HTTPStatus.OK
    tasks = response.json()
    # Find a task with a defined number_features value
    task_with_features = next(
        t for t in tasks if t.get("number_features") is not None
    )
    num_features = task_with_features["number_features"]

    # Build a narrow range around that concrete value to exercise the quality clause
    payload = {
        "number_features": {
            "min": num_features,
            "max": num_features,
        }
    }
    response = await py_api.post("/tasks/list", json=payload)
    assert response.status_code == HTTPStatus.OK
    filtered = response.json()
    assert len(filtered) > 0
    assert all(
        t.get("number_features") is not None
        and payload["number_features"]["min"]
        <= t["number_features"]
        <= payload["number_features"]["max"]
        for t in filtered
    ), "All returned tasks should have number_features within the requested range"

These tests assume:

  • Response objects include data_tags (a list of strings on each task), task_id, data_id, data_name, and number_features.
  • The /tasks/list request model accepts:
    • data_tag as a single string.
    • task_id and data_id as lists of integers.
    • data_name as a string.
    • Quality ranges in the form {"number_features": {"min": ..., "max": ...}}.

To fully align with your existing API:

  1. Adjust the field names (data_tags vs. dataset_tags, etc.) in the assertions to match the actual task schema.
  2. Align the request payload shapes with whatever is already used in the existing number_instances test (e.g., if it uses number_features_min / number_features_max instead of a nested dict, mirror that here).
  3. If task_id / data_id only accept a single integer instead of a list, modify requested_ids / data_ids accordingly and assert on a single returned task.
  4. If data-name filtering is case-insensitive, extend test_list_tasks_filter_data_name to issue a request with data_name in a different case and assert that the same tasks are returned.



async def test_list_tasks_pagination(py_api: httpx.AsyncClient) -> None:
"""Pagination returns correct number of results."""
limit = 5
response = await py_api.post(
"/tasks/list",
json={"pagination": {"limit": limit, "offset": 0}},
)
assert response.status_code == HTTPStatus.OK
assert len(response.json()) == limit


async def test_list_tasks_pagination_offset(py_api: httpx.AsyncClient) -> None:
"""Offset returns different results than no offset."""
r1 = await py_api.post("/tasks/list", json={"pagination": {"limit": 5, "offset": 0}})
r2 = await py_api.post("/tasks/list", json={"pagination": {"limit": 5, "offset": 5}})
ids1 = [t["task_id"] for t in r1.json()]
ids2 = [t["task_id"] for t in r2.json()]
assert ids1 != ids2


async def test_list_tasks_number_instances_range(py_api: httpx.AsyncClient) -> None:
"""number_instances range filter returns tasks whose dataset matches."""
min_instances, max_instances = 100, 1000
response = await py_api.post(
"/tasks/list",
json={"number_instances": f"{min_instances}..{max_instances}"},
)
assert response.status_code == HTTPStatus.OK
tasks = response.json()
assert len(tasks) > 0
for task in tasks:
qualities = {q["name"]: q["value"] for q in task["quality"]}
assert min_instances <= float(qualities["NumberOfInstances"]) <= max_instances


async def test_list_tasks_no_results(py_api: httpx.AsyncClient) -> None:
"""Nonexistent tag returns 404 NoResultsError."""
response = await py_api.post("/tasks/list", json={"tag": "nonexistent_tag_xyz"})
assert response.status_code == HTTPStatus.NOT_FOUND
assert response.headers["content-type"] == "application/problem+json"
error = response.json()
assert error["status"] == HTTPStatus.NOT_FOUND
assert error["code"] == "372"


async def test_list_tasks_get(py_api: httpx.AsyncClient) -> None:
"""GET /tasks/list with no body also works."""
response = await py_api.get("/tasks/list")
assert response.status_code == HTTPStatus.OK
assert isinstance(response.json(), list)


async def test_list_tasks_invalid_range_format(py_api: httpx.AsyncClient) -> None:
"""Invalid number_instances range returns 422 validation error."""
response = await py_api.post("/tasks/list", json={"number_instances": "1...2"})
assert response.status_code == HTTPStatus.UNPROCESSABLE_ENTITY


async def test_get_task(py_api: httpx.AsyncClient) -> None:
response = await py_api.get("/tasks/59")
assert response.status_code == HTTPStatus.OK
Expand Down
Loading