Worker implementation for processing images in the Antenna task queue (PSv2)#94
Worker implementation for processing images in the Antenna task queue (PSv2)#94mihow merged 46 commits intoRolnickLab:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds an end-to-end “pulling (v2)” worker implementation for PyTorch inference that polls an Antenna service for queued tasks, runs detection/classification, and posts results back.
Changes:
- Added
ami workerCLI command and worker runtime to poll/api/v2/jobsand post/result/payloads. - Implemented REST-backed
IterableDataset+ DataLoader plumbing for streaming tasks and image downloads. - Added small API/model utilities (service-info caching, reset helpers, task schema) to support the worker flow.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| trapdata/common/utils.py | Adds a small timing/logging helper used by the worker. |
| trapdata/cli/worker.py | New worker loop: fetch jobs, process batches, post results. |
| trapdata/cli/base.py | Registers worker CLI command and validates pipeline args. |
| trapdata/api/schemas.py | Adds PipelineProcessingTask schema for REST task payloads. |
| trapdata/api/models/localization.py | Adds detector reset helper; simplifies result saving loop. |
| trapdata/api/models/classification.py | Adds classifier reset + shared “update detection classification” helper. |
| trapdata/api/datasets.py | Adds REST task IterableDataset, collate fn, and DataLoader factory. |
| trapdata/api/api.py | Caches /info response via FastAPI lifespan initialization. |
| .vscode/launch.json | Adds VS Code launch configs for debugging worker and API. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Add three new settings to configure the Antenna API worker: - antenna_api_base_url: Base URL for Antenna API (defaults to localhost:8000/api/v2) - antenna_api_auth_token: Authentication token for Antenna project - antenna_api_batch_size: Number of tasks to fetch per batch (default: 4) These settings replace hardcoded environment variables and follow the existing Settings pattern with AMI_ prefix and Kivy metadata. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Add schemas to validate API responses from Antenna:
- AntennaJobListItem: Single job with id field
- AntennaJobsListResponse: List of jobs from GET /api/v2/jobs
- AntennaTasksListResponse: List of tasks from GET /api/v2/jobs/{id}/tasks
Also rename PipelineProcessingTask to AntennaPipelineProcessingTask for clarity.
These schemas provide type safety, validation, and clear documentation of
the expected API response format.
Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Changes: - Replace os.environ.get() with Settings object for configuration - Add validation for antenna_api_auth_token with clear error message - Use Pydantic AntennaJobsListResponse schema for type-safe API parsing - Use urljoin for safe URL construction instead of f-strings - Improve error handling with separate exception catch for validation errors This follows the existing Settings pattern and provides better type safety and validation. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Changes: - Use Pydantic AntennaTasksListResponse schema for type-safe API parsing - Raise exceptions instead of returning None for network errors (more Pythonic) - Fix error tuple bug: row["error"] was incorrectly wrapped in tuple - Use urljoin for safe URL construction - Add API contract documentation about atomic task dequeue - Update to use Settings object for configuration The exception-based error handling is clearer than checking for None vs empty list. The retry logic now catches RequestException explicitly. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Add proper type annotations for the predictions parameter: - seconds_per_item: float - image_id: str - detection_idx: int - predictions: ClassifierResult (instead of comment) This improves type checking and IDE support. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Add comprehensive documentation for running the Antenna worker: - Setup instructions with environment variable configuration - Example commands for running with single or multiple pipelines - Explanation of worker behavior and safety with parallel workers - Notes about authentication and safety Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Poetry lock file regenerated by Poetry 2.1.2 with updated dependencies: - alembic: 1.14.0 → 1.18.1 - anyio: 4.6.2.post1 → 4.12.1 - Added: annotated-doc 0.0.4 - Format changes: category → groups, added platform markers This is a side effect of running the development environment. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Changes:
- Remove urljoin import from datasets.py and worker.py
- Replace urljoin() calls with f"{base_url.rstrip('/')}/path" pattern
- Remove base_url trailing slash manipulation in RESTDataset.__init__
The urljoin behavior is unintuitive: it treats the last path segment as a
file and replaces it when joining relative paths. This required every call
site to ensure the base URL had a trailing slash, which is fragile.
The f-string approach is clearer and handles all edge cases (no slash, one
slash, multiple slashes) without requiring state modification or
scattered string checks.
Files changed:
- trapdata/api/datasets.py:5, 143-144, 160
- trapdata/cli/worker.py:6, 43-46, 70-73
Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
…ckLab#104) * Add urllib3.Retry + HTTPAdapter for HTTP resilience Replace manual retry logic with urllib3.Retry and HTTPAdapter to implement smart retries with exponential backoff. This change addresses PR RolnickLab#94 feedback about improving HTTP error handling. Key changes: - Add get_http_session() utility in trapdata/common/utils.py - Add retry settings to Settings (antenna_api_retry_max, antenna_api_retry_backoff) - Update RESTDataset to use session with retry logic - Update worker.py functions to accept and use HTTP sessions - Remove manual retry loop with fixed 5s delay - Only retry 5XX server errors and network failures (NOT 4XX client errors) Benefits: - Exponential backoff (0.5s, 1s, 2s) instead of fixed 5s delays - Max retry limit (3 attempts) instead of infinite loops - Respects 4XX semantics (don't retry client errors) - Connection pooling via Session for better performance Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> * Add tests for get_http_session() utility Add comprehensive test suite for the new get_http_session() utility function. Tests verify: - Session creation - HTTPAdapter mounting for http:// and https:// - Retry configuration (max_retries, backoff_factor, status_forcelist) - Default values (3 retries, 0.5s backoff) - Allowed methods (GET, POST) - raise_on_status=False (let caller handle status codes) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> * Update tests to work with session-based HTTP requests Update test mocks to patch get_http_session() instead of requests.get() since the code now uses session.get() for all HTTP requests. Changes: - TestRESTDatasetIteration: Mock get_http_session to return mock session - TestGetJobs: Mock get_http_session to return mock session - Replace test_fetch_retry_on_error with test_fetch_failure_stops_iteration to verify new behavior (iterator stops after max retries instead of infinite loop with manual retry) All tests pass (39 passed, 1 skipped). Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> * Format code with black * Refactor HTTP auth to use session-based headers Move authentication token handling into get_http_session() to centralize auth logic and improve security. Move get_http_session() from common/utils to api/utils where API-related utilities belong. Changes: - Move get_http_session() from trapdata/common/utils.py to trapdata/api/utils.py - Add auth_token parameter to get_http_session() - Session automatically sets Authorization header when token provided - RESTDataset: Remove stored session, create API session with auth for _fetch_tasks(), separate session without auth for _load_image() - worker.py: Remove session parameter and manual header management from post_batch_results(), _get_jobs(), and _process_job() - Pass retry_max and retry_backoff parameters instead of session objects - Update imports: datasets.py and worker.py now import from trapdata.api.utils - Update tests to verify auth_token passed to get_http_session() Benefits: - Security: External image URLs no longer receive API auth tokens - DRY: Auth header logic centralized in one place - Maintainability: Easier to change auth scheme in future - Organization: API utilities in api/utils.py - Correctness: Retry settings consistently applied via Settings Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> * Convert worker tests to integration tests with real ML inference Replaces fully mocked unit tests with integration tests that validate the Antenna API contract and run actual ML models. Tests now exercise the worker's unique code path (RESTDataset → rest_collate_fn) with real image loading and inference. Changes: - Add trapdata/api/tests/utils.py with shared test utilities - Add trapdata/api/tests/antenna_api_server.py to mock Antenna API - Rewrite test_worker.py as integration tests (17 tests, all passing) - Update test_api.py to use shared utilities Tests validate: real detector/classifier inference, HTTP image loading, schema compliance, batch processing, and end-to-end workflow. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> * Update RESTDataset with modern type hints and persistent sessions - Replace typing.Optional with | None syntax for type annotations - Add persistent HTTP sessions (api_session, image_fetch_session) for connection pooling - Add __del__ method for session cleanup - Update _fetch_tasks() and _load_image() to use persistent sessions Benefits: - Reduces overhead from creating sessions on every request - Enables connection pooling for better performance - Follows project style guide for type annotations Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> * Refactor worker functions to use Settings object and context managers Update function signatures: - _get_jobs(): Now takes Settings + pipeline_slug (was 5 parameters) - post_batch_results(): Now takes Settings + job_id + results (was 6 parameters) Add session cleanup: - Wrap get_http_session() calls with context managers for automatic cleanup - Prevents session leaks in long-running worker processes Update call sites: - Simplified _get_jobs() call in run_worker() from 6 lines to 1 line - Simplified post_batch_results() call in _process_job() from 8 lines to 1 line Benefits: - More maintainable: adding new settings doesn't require updating multiple call sites - Better resource management: sessions properly closed via context managers - Cleaner code: reduced repetition and improved readability Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> * Update worker tests for new function signatures - Add _make_settings() helper to TestGetJobs class - Update all _get_jobs() test calls to pass Settings object - Add context manager mocking (__enter__/__exit__) for session tests - Add retry settings to TestProcessJob._make_settings() All 17 tests pass with new signatures. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> * Clean up Antenna worker settings and type annotations - Remove retry settings from Kivy UI (keep as env var only) - Use modern type hints (list[str] instead of List[str]) - Fix env var fallback to use AMI_ prefix convention - Remove outdated TODO comment Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * Fix worker tests after Settings refactor - Add retry settings to mock Settings objects - Patch Session.get/post instead of module-level requests.get/post - Update _get_jobs calls to use Settings object pattern - Remove unused imports All 17 tests now passing. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> * Simplify _get_jobs to use explicit parameters Instead of passing a Settings object, use explicit parameters: - base_url, auth_token, pipeline_slug - retry_max and retry_backoff with sensible defaults This makes the function's dependencies clear and tests simpler (no MagicMock needed - just pass strings). Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
|
Things to follow up on:
|
Co-authored-by: mihow <158175+mihow@users.noreply.github.com>
📝 WalkthroughWalkthroughThis pull request introduces comprehensive Antenna API integration, adding a new worker module for distributed processing, REST-based dataset streaming, pipeline registration orchestration, and API client utilities. It includes configuration management, CLI commands, lifecycle improvements to the FastAPI app, and extensive integration tests alongside development tooling updates. Changes
Sequence Diagram(s)sequenceDiagram
participant Worker as run_worker()
participant Client as get_jobs()
participant API as Antenna API
participant Dataset as RESTDataset
participant Detector as APIMothDetector
participant Classifier as APIMothClassifier
participant PostResults as post_batch_results()
Worker->>Client: get_jobs(base_url, token, pipeline)
Client->>API: GET /jobs?pipeline_slug=...
API-->>Client: {results: [job_ids]}
Client-->>Worker: [job_ids]
Worker->>Worker: for each job_id
Worker->>Dataset: __init__(base_url, token, job_id)
Worker->>Dataset: __iter__()
Dataset->>API: GET /jobs/{job_id}/tasks?batch_size=...
API-->>Dataset: {tasks: [AntennaPipelineProcessingTask]}
Dataset->>Dataset: load_image(url) for each task
Dataset-->>Worker: {images, image_ids, reply_subjects}
Worker->>Detector: run(images)
Detector-->>Worker: detections
Worker->>Classifier: run(image_crops, detection_indices)
Classifier-->>Worker: predictions
Worker->>PostResults: post_batch_results(base_url, token, job_id, results)
PostResults->>API: POST /jobs/{job_id}/result/
API-->>PostResults: 200 OK
PostResults-->>Worker: True
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Fix all issues with AI agents
In `@trapdata/api/datasets.py`:
- Around line 232-290: The exception handler can reference worker_id before it
is assigned; initialize worker_id (and num_workers) before the try so they’re
always defined. In the method shown, set default values like worker_id = 0 and
num_workers = 1 above the try that calls torch.utils.data.get_worker_info(),
then keep the existing logic that overrides them from worker_info; this ensures
logger.error(f"Worker {worker_id}: Exception in iterator: {e}") won’t raise
UnboundLocalError.
In `@trapdata/api/models/classification.py`:
- Around line 156-158: Replace the bare assert that checks
detection.source_image_id == image_id with an explicit validation that raises a
clear exception (e.g., ValueError) when the invariant fails; locate the block
that accesses self.detections[detection_idx] (variables: self.detections,
detection_idx, detection, image_id) and after retrieving detection, if
detection.source_image_id != image_id raise an error containing detection_idx,
detection.source_image_id and the expected image_id to prevent silent failures
when Python assertions are disabled.
In `@trapdata/api/utils.py`:
- Around line 41-90: The current get_http_session function retries POST by
default; change its signature to accept an allowed_methods parameter (e.g.,
allowed_methods: tuple[str, ...] = ("GET",)) and use that value when
constructing the urllib3.Retry (pass allowed_methods=list(allowed_methods) or
equivalent), update the docstring to document the new parameter and default, and
leave existing callers to opt-in to POST retries (e.g., post_batch_results
should explicitly pass allowed_methods=("GET","POST") only if the endpoint is
idempotent or uses idempotency keys). Ensure the Retry call uses the provided
allowed_methods and that session mounting and auth handling remain unchanged.
In `@trapdata/cli/test.py`:
- Around line 48-50: Remove the unnecessary inline noqa directive from the
event_day annotation; specifically edit the Annotated declaration referencing
event_day and typer.Argument(formats=["%Y-%m-%d"]) to delete the trailing "#
noqa: F722" token so the annotation remains exactly as
Annotated[datetime.datetime, typer.Argument(formats=["%Y-%m-%d"])]; no other
changes needed.
In `@trapdata/cli/worker.py`:
- Around line 425-433: The CLI currently falls back to ANTENNA_API_* env vars;
change the registration defaults to use the app settings (read_settings()) and
AMI_ prefixed vars instead: in the function handling registration (where
base_url and auth_token are set), call read_settings() to obtain
settings.api_base_url / settings.api_token (or their AMI_ equivalents) and use
those values as fallbacks before finally checking os.environ for AMI_*
(AMI_API_BASE_URL / AMI_API_TOKEN); then update the logger message to reference
AMI_API_TOKEN when auth_token remains empty. Ensure you update references to
base_url and auth_token in that block only (the section that currently checks
ANTENNA_API_BASE_URL/ANTENNA_API_TOKEN).
🧹 Nitpick comments (13)
trapdata/db/models/detections.py (2)
326-326: Remove the no‑op list construction.
imagesis already loaded; building and discarding[img.id for img in images]adds overhead without side effects. If the goal was eager-loading, this doesn’t do it.♻️ Proposed change
- _ = [img.id for img in images] + # No-op removed; images are already loaded by .all()
407-407: Avoid the unused query before bulk_update_mappings.
This extra SELECT adds a DB round‑trip and the results are discarded. If existence checks are needed, handle them explicitly; otherwise remove the query.♻️ Proposed change
- _ = sesh.query(DetectedObject).filter(DetectedObject.id.in_(object_ids)).all() + # Removed unused prefetch; bulk_update_mappings does not require ittrapdata/common/utils.py (2)
126-149: LGTM with a minor style suggestion.The timing utility is well-designed with a clean chaining API. The docstring clearly explains usage.
Consider updating the return type annotation to use the built-in
tuplesyntax for consistency with the modern style used elsewhere (str | None):✨ Style consistency suggestion
-from typing import Any, Callable, Tuple, Union +from typing import Any, Callable, Union-def log_time(start: float = 0, msg: str | None = None) -> Tuple[float, Callable]: +def log_time(start: float = 0, msg: str | None = None) -> tuple[float, Callable]:
8-8: Optional: Simplify Union usage.Since the codebase targets Python 3.10+, you can replace
Union[pathlib.Path, str](lines 79-80) andUnion[pathlib.Path, None]with the modern|syntax throughout this file for consistency..env.example (1)
12-16: Optional: reorder keys to satisfy dotenv-linter warning.🔧 Proposed reorder
-AMI_ANTENNA_API_BASE_URL=http://localhost:8000/api/v2 -AMI_ANTENNA_API_AUTH_TOKEN=your_antenna_auth_token_here +AMI_ANTENNA_API_AUTH_TOKEN=your_antenna_auth_token_here +AMI_ANTENNA_API_BASE_URL=http://localhost:8000/api/v2 AMI_ANTENNA_API_BATCH_SIZE=4trapdata/api/models/localization.py (1)
49-52: Considerzip(..., strict=True)to surface length mismatches.🔧 Proposed fix
- for image_id, image_output in zip(item_ids, batch_output): + for image_id, image_output in zip(item_ids, batch_output, strict=True):trapdata/api/api.py (1)
359-374: Startup builds full model configs—consider lazy or metadata-only construction.Instantiating detectors/classifiers here may load weights and slow startup; a lightweight metadata registry (or lazy load on first /info) can reduce cold-start latency and memory pressure.
trapdata/api/tests/utils.py (1)
15-37: Stabilize test image ordering.
glob()ordering is filesystem-dependent; slicing[:num]can lead to flaky tests. Sort before slicing.🛠️ Proposed change
- source_image_urls = [ - file_server.get_url(f.relative_to(test_images_dir)) - for f in images_dir.glob("*.jpg") - ][:num] + source_image_urls = [ + file_server.get_url(f.relative_to(test_images_dir)) + for f in sorted(images_dir.glob("*.jpg")) + ][:num]trapdata/api/tests/antenna_api_server.py (2)
10-19: Remove unused importAntennaTaskResults.The
AntennaTaskResultsimport is not used anywhere in this module.🧹 Proposed fix
from trapdata.api.schemas import ( AntennaJobListItem, AntennaJobsListResponse, AntennaPipelineProcessingTask, AntennaTaskResult, - AntennaTaskResults, AntennaTasksListResponse, AsyncPipelineRegistrationRequest, AsyncPipelineRegistrationResponse, )
30-45: Consider prefixing unused parameters with underscore.The
pipeline__slug,ids_only, andincomplete_onlyparameters are part of the API contract but unused in this mock. Prefixing with underscores documents this intentionally and silences linter warnings.🧹 Proposed fix
`@app.get`("/api/v2/jobs") -def get_jobs(pipeline__slug: str, ids_only: int, incomplete_only: int): +def get_jobs(_pipeline__slug: str, _ids_only: int, _incomplete_only: int): """Return available job IDs.trapdata/api/tests/test_worker.py (3)
40-128: Minor style inconsistency:TestRestCollateFndoesn't inherit fromTestCase.All other test classes in this file inherit from
unittest.TestCase, butTestRestCollateFnuses pytest-style class syntax. This works fine with pytest but creates an inconsistent pattern. Consider inheriting fromTestCasefor consistency, or keep as-is if you prefer the lighter pytest style for unit tests.The test coverage itself is excellent — covering all-successful, all-failed, mixed, and single-item edge cases.
285-294: Testtest_query_params_sentdoesn't actually validate query parameters.The test comment claims to validate query params, but since the mock server ignores
pipeline__slug,ids_only, andincomplete_only, this test only verifies the function returns a list. Consider either:
- Enhancing the mock to capture and expose the received query params for assertion
- Renaming/documenting this as a basic smoke test rather than param validation
319-329: Duplicate_make_settingshelper in two test classes.
_make_settings()is duplicated identically in bothTestProcessJobIntegrationandTestWorkerEndToEnd. Consider extracting to a shared fixture or utility function in the test module orutils.py.♻️ Example extraction
# At module level or in utils.py def make_test_settings(): """Create mock settings for worker tests.""" settings = MagicMock() settings.antenna_api_base_url = "http://testserver/api/v2" settings.antenna_api_auth_token = "test-token" settings.antenna_api_batch_size = 2 settings.antenna_api_retry_max = 3 settings.antenna_api_retry_backoff = 0.5 settings.num_workers = 0 settings.localization_batch_size = 2 return settingsAlso applies to: 469-478
* Pipeline registration * Convert worker tests to integration tests with real ML inference Replaces fully mocked unit tests with integration tests that validate the Antenna API contract and run actual ML models. Tests now exercise the worker's unique code path (RESTDataset → rest_collate_fn) with real image loading and inference. Changes: - Add trapdata/api/tests/utils.py with shared test utilities - Add trapdata/api/tests/antenna_api_server.py to mock Antenna API - Rewrite test_worker.py as integration tests (17 tests, all passing) - Update test_api.py to use shared utilities Tests validate: real detector/classifier inference, HTTP image loading, schema compliance, batch processing, and end-to-end workflow. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> * Add AsyncPipelineRegistrationResponse schema Add Pydantic model to validate responses from pipeline registration API. Fields: pipelines_created, pipelines_updated, processing_service_id. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * Refactor registration functions to use get_http_session Update get_user_projects() and register_pipelines_for_project() to use the session-based HTTP pattern established in PR RolnickLab#104: - Use get_http_session() context manager for connection pooling - Add retry_max and retry_backoff parameters with defaults - Remove manual header management (session handles auth) - Standardize URL paths (base_url now includes /api/v2) - Use Pydantic model validation for API responses - Fix error handling with hasattr() check Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * Add integration tests for pipeline registration Add mock Antenna API endpoints: - GET /api/v2/projects/ - list user's projects - POST /api/v2/projects/{id}/pipelines/ - register pipelines Add TestRegistrationIntegration with 2 client tests: - test_get_user_projects - test_register_pipelines_for_project Update TestWorkerEndToEnd.test_full_workflow_with_real_inference to include registration step: register → get jobs → process → post results. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * Add git add -p to recommended development practices Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * Read retry settings from Settings in get_http_session() When max_retries or backoff_factor are not explicitly provided, get_http_session() now reads defaults from Settings (antenna_api_retry_max and antenna_api_retry_backoff). This centralizes retry configuration and allows callers to omit these low-level parameters. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * Use Settings pattern in register_pipelines() - Accept Settings object instead of base_url/auth_token params - Remove direct os.environ.get() calls for ANTENNA_API_* vars - Fix error message to reference correct env var (AMI_ANTENNA_API_AUTH_TOKEN) - Remove retry params from get_user_projects() and register_pipelines_for_project() since get_http_session() now reads settings internally - Remove unused os import Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> --------- Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com> Co-authored-by: Carlos Garcia Jurado Suarez <carlosgjs@live.com> Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@trapdata/cli/worker.py`:
- Around line 239-245: Clamp bbox coordinates from dresp.bbox to the image
tensor bounds before slicing: retrieve image shape from
image_tensors[dresp.source_image_id], convert bbox coords to ints and clamp
x1,x2 to [0,width] and y1,y2 to [0,height]; verify x2>x1 and y2>y1
(skip/continue for invalid boxes) and only then perform the crop and
unsqueeze—update the crop logic around image_tensors, dresp.bbox, and crop to
include these checks.
- Around line 198-203: Validate lengths before performing any zip operations to
avoid silent truncation: check that len(image_ids) equals len(images) before
zipping image_ids and images (the zip(image_ids, images) call) and check that
len(image_ids) == len(images) == len(reply_subjects) == len(image_urls) before
zipping image_ids, reply_subjects, image_urls, images; if lengths differ, raise
a ValueError with a clear message. Also update the zip calls to use zip(...,
strict=True) (Python 3.10+) so mismatches fail fast; reference the variables
image_ids, images, reply_subjects, image_urls and the zip usages in the worker
function when making these changes.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@trapdata/api/tests/antenna_api_server.py`:
- Around line 10-19: The import list in trapdata/api/tests/antenna_api_server.py
includes an unused symbol AntennaTaskResults; remove AntennaTaskResults from the
from trapdata.api.schemas import (...) statement so the module only imports the
actually used names (e.g., AntennaJobListItem, AntennaJobsListResponse,
AntennaPipelineProcessingTask, AntennaTaskResult, AntennaTasksListResponse,
AsyncPipelineRegistrationRequest, AsyncPipelineRegistrationResponse).
In `@trapdata/api/tests/test_worker.py`:
- Around line 40-41: The test class TestRestCollateFn is missing
unittest.TestCase inheritance so unittest discovery won't run it; update the
class definition to inherit from unittest.TestCase (i.e., class
TestRestCollateFn(unittest.TestCase):) and add an import for unittest if absent,
and while here convert any bare assert statements inside TestRestCollateFn to
unittest assertions (self.assertEqual / self.assertRaises, etc.) so the tests
run correctly under the project's unittest runner.
🧹 Nitpick comments (1)
trapdata/api/tests/antenna_api_server.py (1)
30-45: Consider using filter parameters for more realistic mock behavior.The
pipeline__slug,ids_only, andincomplete_onlyparameters are accepted but ignored. While acceptable for a basic mock, using these parameters would enable tests to verify that the worker sends correct query params. For instance, you could filter_jobs_queueby a stored pipeline slug or return full job objects whenids_only=0.If this level of fidelity isn't needed now, consider adding a
# noqa: ARG001comment to suppress the linter warnings.
- Initialize worker_id before try block to prevent UnboundLocalError - Remove unused AntennaTaskResults import from antenna_api_server - Remove unnecessary noqa directive from test.py (not flagged by Ruff) - Add TestCase inheritance to TestRestCollateFn for consistency Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@trapdata/api/datasets.py`:
- Around line 193-218: In _load_image, fix orientation by applying EXIF-based
transpose immediately after opening the image and before converting mode or
applying image_transforms: after Image.open(BytesIO(response.content)) call
ImageOps.exif_transpose (or equivalent EXIF transpose helper) to correct
rotation, then continue with the existing mode check and conversion to "RGB",
and then call image_transforms; keep existing error handling and use the same
symbols (image_fetch_session, BytesIO, Image, image_transforms).
Changes get_http_session to only retry GET requests by default, preventing
unintended duplicate operations from POST retries. Adds retry_methods
parameter (default: ("GET",)) to allow callers to explicitly opt-in to POST
retries for idempotent endpoints.
Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
- Add length validation before zip operations in worker.py to prevent silent truncation; use strict=True for Python 3.10+ fail-fast behavior - Replace bare assert with explicit ValueError in classification.py for clearer error messages when image_id mismatches occur - Fix comment reference in antenna_api_server.py test helper Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@trapdata/api/tests/antenna_api_server.py`:
- Around line 29-44: The get_jobs handler currently receives FastAPI query
params pipeline__slug, ids_only, and incomplete_only but doesn't use them; to
satisfy linters without renaming the parameters, explicitly mark them as
intentionally unused inside get_jobs (e.g., assign them to a throwaway variable
or use typing.cast/# pragma if preferred) so the function signature stays
unchanged; update the body of get_jobs to reference pipeline__slug, ids_only,
and incomplete_only in a no-op way (e.g., _ = pipeline__slug; _ = ids_only; _ =
incomplete_only) before computing job_ids.
In `@trapdata/cli/worker.py`:
- Line 171: Replace unguarded calls to torch.cuda.empty_cache() with a guarded
check using torch.cuda.is_available(): wrap the call as if
torch.cuda.is_available(): torch.cuda.empty_cache(). Update the call in the
worker module where torch.cuda.empty_cache() is invoked and make the same change
in the model base module (trapdata.ml.models.base) so CPU-only builds skip CUDA
cache clearing and device selection logic auto-detects GPU availability.
- Around line 310-312: post_batch_results(...) return value is ignored causing
silent data loss when posting fails; change the call in the worker loop to
capture its boolean result (e.g., success = post_batch_results(settings, job_id,
batch_results)), only add its timing to total_save_time when success, and on
failure either requeue the batch (so RESTDataset tasks aren’t lost) or raise/log
a clear error so the job can be retried/inspected; update the code around
post_batch_results, job_id, batch_results and total_save_time to implement this
check and appropriate error handling.
- Remove test_single_item (covered by test_all_successful) - Remove duplicate test_empty_queue tests (keep one in TestProcessJobIntegration) - Remove test_query_params_sent (weak test with no real assertions) - Remove TestRegistrationIntegration class (covered by E2E test) - Remove basic RESTDataset tests covered by integration tests Reduces test count from 18 to 11 while maintaining meaningful coverage. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Move Antenna platform integration code from cli/worker.py into a dedicated trapdata/antenna/ module for better separation of concerns and future portability to a standalone worker app. New module structure: - antenna/client.py: API client for fetching jobs and posting results - antenna/worker.py: Worker loop and job processing logic - antenna/registration.py: Pipeline registration with Antenna projects - antenna/schemas.py: Pydantic models for Antenna API - antenna/datasets.py: RESTDataset for streaming tasks from API - antenna/tests/: Worker integration tests cli/worker.py is now a thin CLI wrapper (~70 lines) that delegates to the antenna module. Co-Authored-By: Carlos Garcia <carlosgjs@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 9
🤖 Fix all issues with AI agents
In `@docs/claude/planning/antenna-module-refactor.md`:
- Around line 108-116: Update the documented test paths and validation checklist
to use the new module layout: replace occurrences of
"trapdata/api/tests/test_worker.py" with "trapdata/antenna/tests/test_worker.py"
(including the pytest command in the example code block and any checklist
entries noted around lines referenced, e.g., the validation checklist and the
commands block showing "pytest ..."). Search the file for any other mentions of
the old path and update them to the new "trapdata/antenna/tests/test_worker.py"
so the example commands and checklist accurately reflect the current structure.
In `@docs/claude/planning/simplify-worker-tests.md`:
- Around line 15-26: The markdown tables containing rows like
`TestRESTDatasetIntegration.test_empty_queue`,
`TestGetJobsIntegration.test_empty_queue`,
`TestProcessJobIntegration.test_empty_queue`,
`TestRESTDatasetIntegration.test_multiple_batches`, and
`TestWorkerEndToEnd.test_multiple_batches_processed` are flagged by markdownlint
MD060 for pipe spacing; fix by normalizing pipe spacing and column alignment
(ensure a single space after and before each pipe or run a markdown table
formatter) so each row and header aligns consistently, then re-run the linter/CI
to confirm the MD060 warning is resolved.
In `@docs/claude/planning/worker-integration-tests.md`:
- Around line 336-345: The markdown table starting with the header "| Test Class
| Tests | Type | What It Tests |" is triggering MD060 due to inconsistent pipe
spacing; fix by normalizing column pipe alignment and spacing (e.g., ensure a
single space on both sides of each pipe for every row) or run the project's
markdown formatter (prettier/markdownlint) to reflow the table, and if alignment
cannot be preserved, add an inline markdownlint disable comment for MD060 above
the table to suppress the warning; target the table block and the similar table
at the later occurrence (the block beginning with the same header).
In `@scripts/validate_dwc_export.py`:
- Around line 69-74: The subspecies count uses a misspelled field 'tqaxonRank'
which will KeyError; update the generator to check the correct field name
'taxonRank' (same as used for species) when computing subspecies from rows
(i.e., fix the expression that computes subspecies to reference row['taxonRank']
instead of row['tqaxonRank']), leaving the rest of the logic (variables species,
subspecies, rows and the print statements) unchanged.
- Around line 36-38: There is a typo in the loop over rank_counts: the computed
percentage variable is named pct but the print uses qpct causing a NameError;
update the print statement inside the loop that iterates "for rank, count in
sorted(rank_counts.items())" to reference pct instead of qpct so it prints f"
{rank}: {count} ({pct:.1f}%)".
In `@trapdata/antenna/client.py`:
- Around line 3-56: The broad except in get_jobs should be narrowed: replace the
generic "except Exception" that wraps resp.json() and
AntennaJobsListResponse.model_validate() with two specific handlers catching
json.JSONDecodeError (from calling resp.json()) and pydantic.ValidationError
(from AntennaJobsListResponse.model_validate()). Import json and
pydantic.ValidationError (or from pydantic import ValidationError), log distinct
error messages (e.g., "Failed to decode JSON" and "Failed to validate jobs
response") referencing base_url and the exception, and return [] in each
handler; keep the existing requests.RequestException handler for request errors.
In `@trapdata/antenna/schemas.py`:
- Around line 5-9: The import list in the module currently includes an unused
symbol ProcessingServiceInfoResponse; remove ProcessingServiceInfoResponse from
the from-import tuple (the line importing PipelineConfigResponse,
PipelineResultsResponse, ProcessingServiceInfoResponse) so only the actually
used symbols (PipelineConfigResponse and PipelineResultsResponse) are imported
to satisfy flake8.
In `@trapdata/cli/base.py`:
- Around line 2-8: The file imports unused symbols Annotated and
CLASSIFIER_CHOICES which will fail linting; remove Annotated from the typing
import (leave Optional) and delete the import of CLASSIFIER_CHOICES from
trapdata.api.api so only used symbols remain, then re-run flake8 to verify no
unused-import errors.
In `@trapdata/cli/test.py`:
- Line 27: The subprocess call using the bare "pytest" is PATH-dependent; change
the invocation of subprocess.call that sets return_code (the call using
subprocess.call(["pytest", "-v"])) to use the current Python interpreter by
replacing the args with [sys.executable, "-m", "pytest", "-v"] and add an import
sys at the top of the module so the active virtualenv's pytest module is used
reliably.
🧹 Nitpick comments (6)
scripts/validate_dwc_export.py (1)
21-23: Consider handling empty files to avoid division by zero.If the TSV file contains no data rows,
total_taxawill be 0, causingZeroDivisionErroron subsequent percentage calculations (lines 29, 37, 53, 60, 66).🛡️ Proposed early exit for empty files
total_taxa = len(rows) + if total_taxa == 0: + print("No taxa found in file.") + return print(f"Total Taxa: {total_taxa}")trapdata/api/api.py (1)
358-373: Avoid eager model initialization when building pipeline configs.
initialize_service_info()instantiates detector/classifier objects for every pipeline; if constructors load weights, startup and CLI registration can become heavy. Consider a lightweight metadata path (e.g., classmethod/constant config) and defer model loading until inference.trapdata/antenna/schemas.py (1)
64-71: Preferdefault_factoryfor list fields for consistency with the rest of the file.
Other list fields in this file (lines 57, 79–88) usepydantic.Field(default_factory=list). This pattern is also recommended in Pydantic v2 to make intent explicit.♻️ Suggested change
class AsyncPipelineRegistrationRequest(pydantic.BaseModel): """ Request to register pipelines from an async processing service """ processing_service_name: str - pipelines: list[PipelineConfigResponse] = [] + pipelines: list[PipelineConfigResponse] = pydantic.Field(default_factory=list)trapdata/antenna/datasets.py (2)
122-137: Consider handling EXIF orientation for rotated images.The code converts to RGB correctly, but per coding guidelines images should also handle EXIF orientation. Some camera trap images may have EXIF rotation metadata that could cause the model to receive incorrectly oriented images.
The broad
Exceptioncatch is acceptable here since external image loading can fail in many unpredictable ways (network errors, invalid formats, corrupted data, etc.).♻️ Proposed fix to handle EXIF orientation
response = self.image_fetch_session.get(image_url, timeout=30) response.raise_for_status() image = Image.open(BytesIO(response.content)) + # Handle EXIF orientation + from PIL import ImageOps + image = ImageOps.exif_transpose(image) + # Convert to RGB if necessary if image.mode != "RGB": image = image.convert("RGB")As per coding guidelines: "Handle EXIF orientation when preprocessing images; ensure models receive RGB format"
179-205: Minor: Redundant conditional and commented debug code.
Line 204: The ternary
if errors else Noneis redundant since we're already inside anif errors:block -errorsis guaranteed truthy here.Lines 182, 186: Commented-out
log_timecalls should be removed or uncommented if still needed for debugging.♻️ Proposed cleanup
for task in tasks: errors = [] # Load the image - # _, t = log_time() image_tensor = ( self._load_image(task.image_url) if task.image_url else None ) - # _, t = t(f"Loaded image from {image_url}") if image_tensor is None: errors.append("failed to load image") if errors: logger.warning( f"Worker {worker_id}: Errors in task for image '{task.image_id}': {', '.join(errors)}" ) # Yield the data row row = { "image": image_tensor, "reply_subject": task.reply_subject, "image_id": task.image_id, "image_url": task.image_url, } if errors: - row["error"] = "; ".join(errors) if errors else None + row["error"] = "; ".join(errors) yield rowtrapdata/antenna/tests/test_worker.py (1)
374-383: Consider extracting shared_make_settingshelper to reduce duplication.The
_make_settingsmethod is duplicated betweenTestProcessJobIntegrationandTestWorkerEndToEndclasses. Consider extracting it to a module-level helper or a shared test base class.♻️ Proposed refactor
# At module level or in a shared test utilities module def make_test_settings(): """Create mock settings for worker tests.""" settings = MagicMock() settings.antenna_api_base_url = "http://testserver/api/v2" settings.antenna_api_auth_token = "test-token" settings.antenna_api_batch_size = 2 settings.antenna_api_retry_max = 3 settings.antenna_api_retry_backoff = 0.5 settings.num_workers = 0 settings.localization_batch_size = 2 return settingsThen use in both test classes:
def _make_settings(self): return make_test_settings()Also applies to: 224-234
- Remove retry_max and retry_backoff from Settings (hardcoded in get_http_session) - get_http_session(auth_token=None) takes optional auth param - Client functions take base_url and auth_token explicitly - RESTDataset takes auth_token for API session, no auth for image fetching Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Prevents crashes on CPU-only builds by checking torch.cuda.is_available() before calling torch.cuda.empty_cache() in worker and model base modules. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Ensures the active virtualenv's pytest module is used by calling [sys.executable, "-m", "pytest", "-v"] instead of relying on PATH. This prevents failures when pytest is not in PATH or using the wrong pytest version. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Captures the boolean return value from post_batch_results() and raises RuntimeError if posting fails, preventing silent data loss. Only increments total_save_time on successful posts. This makes API posting failures visible rather than silently discarding processed results. External retry mechanisms (systemd, supervisord) can handle job-level retries. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
…ine flag - Change from 'ami worker run' to just 'ami worker' using @cli.callback(invoke_without_command=True) - Change flag from --pipelines (plural) to --pipeline (singular, repeatable) - Update README with new command structure and registration examples - Follows standard Docker/kubectl pattern for repeatable options Usage: ami worker # all pipelines ami worker --pipeline moth_binary # single ami worker --pipeline moth1 --pipeline moth2 # multiple Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Captures the boolean return value from post_batch_results() and raises RuntimeError if posting fails, preventing silent data loss. Only increments total_save_time on successful posts. Added exception handling in run_worker() loop to catch job processing failures and continue to next job rather than crashing the worker. This ensures the worker keeps consuming jobs even when individual jobs fail. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@trapdata/antenna/tests/test_worker.py`:
- Around line 139-146: Replace hardcoded auth token strings in tests with an
environment-backed test constant: create a single constant (e.g.,
TEST_AUTH_TOKEN = os.getenv("AMI_TEST_AUTH_TOKEN", "test-token")) and use it
wherever RESTDataset is instantiated (refer to _make_dataset and other test
helpers that call RESTDataset), so calls like RESTDataset(...,
auth_token="test-token") become auth_token=TEST_AUTH_TOKEN; ensure you import os
and define the constant once at the top of the test module to keep tests
deterministic and satisfy Ruff S105/S106.
🧹 Nitpick comments (2)
trapdata/antenna/datasets.py (1)
96-121: Consider handling EXIF orientation when loading images.The
_load_imagemethod converts images to RGB but doesn't handle EXIF orientation metadata. Images from cameras may appear rotated incorrectly. As per coding guidelines: "Handle EXIF orientation when preprocessing images."♻️ Proposed fix to handle EXIF orientation
+from PIL import ImageOps ... image = Image.open(BytesIO(response.content)) + # Handle EXIF orientation before any other processing + image = ImageOps.exif_transpose(image) + # Convert to RGB if necessary if image.mode != "RGB": image = image.convert("RGB")trapdata/antenna/tests/test_worker.py (1)
207-487: Consider gating the real-inference integration tests behind an opt‑in flag.
These tests run real ML inference (not mocked) and local file servers; if included in default CI runs, they can be slow or fail on CPU‑only or model‑less environments.♻️ Suggested opt‑in gating (unittest)
+import os -from unittest import TestCase +from unittest import TestCase, skipUnless @@ +RUN_INTEGRATION = os.getenv("RUN_INTEGRATION_TESTS") == "1" @@ +@skipUnless(RUN_INTEGRATION, "integration tests are opt-in") class TestProcessJobIntegration(TestCase): @@ +@skipUnless(RUN_INTEGRATION, "integration tests are opt-in") class TestWorkerEndToEnd(TestCase):
Implements a worker service (
ami worker) that processes images queued by users in the Antenna web platform. When users upload images to Antenna and request them to be processed, this worker pulls tasks from the queue via the jobs API, runs them through the local ML pipeline (detection + classification), and posts results back. This allows processing to be run behind a firewall (for example in a university HPCC) and for any number of workers to process images in parallel.This is the counterpart to RolnickLab/antenna#987 which adds the job queue API to Antenna.
Usage
Or configure in settings:
antenna_api_base_url- Antenna API endpoint (default:http://localhost:8000/api/v2)antenna_api_auth_token- Authentication token for Antenna projectantenna_api_batch_size- Number of tasks per batch (default: 4)Architecture
The worker functionality is implemented in a dedicated
trapdata/antenna/module for separation of concerns and future portability:trapdata/antenna/client.py- API client for fetching jobs and posting resultstrapdata/antenna/worker.py- Worker loop and job processing logictrapdata/antenna/registration.py- Pipeline registration with Antenna projectstrapdata/antenna/schemas.py- Pydantic models for Antenna API requests/responsestrapdata/antenna/datasets.py- RESTDataset for streaming tasks from the APItrapdata/antenna/tests/- Integration tests with mock Antenna API servertrapdata/cli/worker.py- Thin CLI wrapper (~75 lines) that delegates to the antenna moduleChanges
Worker implementation (
trapdata/antenna/worker.py)/jobsendpoint for available jobs by pipeline slug/jobs/{id}/tasks/jobs/{id}/result/API client (
trapdata/antenna/client.py)get_jobs()- Fetches job IDs for a given pipelinepost_batch_results()- Posts processed results back to AntennaPipeline registration (
trapdata/antenna/registration.py)register_pipelines()- Registers available pipelines with Antenna projectsget_user_projects()- Fetches accessible projects from Antenna APISession handling (
trapdata/api/utils.py)get_http_session()- Creates HTTP session with connection poolingSchemas (
trapdata/antenna/schemas.py)AntennaJobsListResponse,AntennaTasksListResponse,AntennaTaskResultAsyncPipelineRegistrationRequestandAsyncPipelineRegistrationResponsefor pipeline registrationDataset (
trapdata/antenna/datasets.py)RESTDataset- IterableDataset that streams tasks from Antenna APISettings (
trapdata/settings.py)antenna_api_base_url,antenna_api_auth_token,antenna_api_batch_sizewith Kivy UI integrationCLI (
trapdata/cli/worker.py)ami worker- Default command to run the worker (processes all pipelines)ami worker --pipeline <slug>- Process specific pipeline(s) (repeatable flag)ami worker register <name>- Register pipelines with Antenna projects@cli.callback(invoke_without_command=True)to make worker the default actionTests (
trapdata/antenna/tests/)Credits
Test plan
ami test all- all tests passSummary by CodeRabbit
Release Notes
New Features
Bug Fixes
Configuration
Documentation
Tests
✏️ Tip: You can customize this high-level summary in your review settings.