PSV2: endpoint to register pipelines#1076
PSV2: endpoint to register pipelines#1076carlosgjs wants to merge 26 commits intoRolnickLab:mainfrom
Conversation
✅ Deploy Preview for antenna-preview canceled.
|
📝 WalkthroughWalkthroughAdds a POST /projects/{id}/pipelines endpoint for async processing services to register pipelines, makes ProcessingService.endpoint_url nullable to support pull-mode services, updates related models/schemas/tasks, and adds tests and a migration to support the change. Changes
Sequence Diagram(s)sequenceDiagram
participant Worker as ProcessingService (worker)
participant API as Antenna API (/projects/{id}/pipelines)
participant DB as Database (Project, ProcessingService, Pipelines)
Worker->>API: POST /projects/{id}/pipelines\nAsyncPipelineRegistrationRequest
API->>API: parse & validate payload
alt validation fails
API-->>Worker: 400 ValidationError
else valid
API->>DB: find or create ProcessingService by name
DB-->>API: ProcessingService instance
API->>DB: ensure association ProcessingService <-> Project
DB-->>API: association persisted (or error if already-associated)
API->>API: call ProcessingService.create_pipelines(project, pipelines)
API->>DB: persist/merge pipeline records scoped to project
DB-->>API: PipelineRegistrationResponse
API-->>Worker: 200 PipelineRegistrationResponse
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Suggested labels
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
✅ Deploy Preview for antenna-ssec canceled.
|
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@ami/main/api/views.py`:
- Around line 240-262: The current logic in the view returns a 400 when a
ProcessingService (found via
ProcessingService.objects.filter(name=parsed.processing_service_name).first())
is already associated with the project, which prevents idempotent
re-registration; update the branch handling the existing processing_service so
that if project is already in processing_service.projects.all() you do not
return Response(status=400) but simply continue (no-op) and allow the endpoint
to proceed (e.g., log an info/debug message) — ensure you keep the
processing_service.projects.add(project)/save() when association is missing, and
remove the early return that blocks subsequent pipeline registration.
In `@ami/ml/schemas.py`:
- Around line 326-327: The endpoint_url annotation is optional but currently
still required because it has no default; update the schema so endpoint_url has
an explicit default (e.g., set endpoint_url to None or use Field(default=None)
if this is a Pydantic model) so parsing succeeds when senders omit it—modify the
endpoint_url declaration (near the latency: float field) to include the default
None.
In `@ami/ml/tasks.py`:
- Around line 112-114: The current check only skips when service.endpoint_url is
None but still allows empty strings; update the conditional around
service.endpoint_url in the loop (the block that calls logger.warning and
continue) to treat None, empty, or whitespace-only values as missing — e.g.,
ensure you test both falsy and stripped emptiness (safe-check to avoid calling
.strip() on None) so that logger.warning(f"Processing service {service} has no
endpoint URL, skipping.") is used and the loop continues for None/"", or
whitespace-only endpoint_url.
🧹 Nitpick comments (1)
ami/main/tests.py (1)
3457-3459: Consider disabling default fixtures for these API tests.
create_defaults=Truebuilds extra related objects these tests don’t use; turning it off keeps setup lean.♻️ Suggested change
- self.project = Project.objects.create(name="Test Project", owner=self.user, create_defaults=True) - self.other_project = Project.objects.create(name="Other Project", owner=self.other_user, create_defaults=True) + self.project = Project.objects.create(name="Test Project", owner=self.user, create_defaults=False) + self.other_project = Project.objects.create(name="Other Project", owner=self.other_user, create_defaults=False)
There was a problem hiding this comment.
Pull request overview
This pull request introduces a new API endpoint for V2 ML processing services to register pipelines with projects in "pull mode" (without requiring an endpoint URL). The changes enable processing services to push pipeline configurations to Antenna rather than Antenna pulling them from a service endpoint.
Changes:
- Added a new POST endpoint
/api/v2/projects/{id}/pipelines/for pipeline registration - Made ProcessingService.endpoint_url nullable to support pull-mode services
- Updated ProcessingService methods to handle null endpoint URLs gracefully
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| ami/main/api/views.py | Added new pipelines action to ProjectViewSet for receiving pipeline registrations from V2 services |
| ami/main/models.py | Added permission check for "pipelines" action requiring ProjectManager role |
| ami/ml/models/processing_service.py | Made endpoint_url nullable and updated get_status() and get_pipeline_configs() to handle null values |
| ami/ml/schemas.py | Made endpoint_url nullable in ProcessingServiceStatusResponse and added AsyncPipelineRegistrationRequest schema |
| ami/ml/migrations/0026_make_processing_service_endpoint_url_nullable.py | Database migration to make endpoint_url nullable |
| ami/ml/tasks.py | Added check to skip processing services without endpoint URLs in periodic status check |
| ami/ml/tests.py | Added comprehensive tests for null endpoint_url handling |
| ami/main/tests.py | Added test suite for the new pipelines API endpoint |
| requirements/base.txt | Added explanatory comment about psycopg binary vs non-binary versions |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@ami/ml/schemas.py`:
- Around line 339-345: Update the inaccurate docstring on
AsyncPipelineRegistrationRequest to reference AsyncPipelineRegistrationRequest
(not PipelineRegistrationResponse) and clearly state it represents the async
callback payload containing processing results; rename the output-oriented model
PipelineRegistrationResponse to a more generic name like
PipelineProcessingResult (or create a new PipelineProcessingResult model) and
adjust its fields (timestamp, success, error, pipelines_created,
algorithms_created) to be output/result-oriented (optional or excluded from
required input validation) so the
AsyncPipelineRegistrationRequest.pipeline_response type reflects a result object
rather than required caller-supplied output fields; update all references to
PipelineRegistrationResponse to the new name (or to the new model) including the
AsyncPipelineRegistrationRequest definition and related validation/serialization
logic.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@ami/main/api/views.py`:
- Around line 232-233: Update the docstring in ami/main/api/views.py that
describes the AsyncPipelineRegistrationRequest payload: change the field name
from `pipelines_response` to the correct `pipeline_response` so it reads that
the list of PipelineConfigResponse objects is under the
`pipeline_response.pipelines` key; keep references to
AsyncPipelineRegistrationRequest and PipelineConfigResponse so readers can
locate the schema and adjust any adjacent wording to match the actual field
name.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@ami/main/api/views.py`:
- Around line 254-263: Replace the non-atomic filter().first() + create() flow
for ProcessingService with an atomic get_or_create using the
parsed.processing_service_name; call
ProcessingService.objects.get_or_create(name=parsed.processing_service_name,
defaults={...}) to obtain (processing_service, created) and then ensure the
project is associated (processing_service.projects.add(project)) regardless of
created, mirroring the get_or_create_default_processing_service() pattern to
avoid TOCTOU races.
♻️ Duplicate comments (1)
ami/main/api/views.py (1)
232-233: Docstring schema key is incorrect.The request schema exposes
pipelinesdirectly, notpipelines_response.pipelines. Please update the docstring to match the actual payload structure.📝 Proposed docstring fix
- list of PipelineConfigResponse objects under the `pipelines_response.pipelines` key. + list of PipelineConfigResponse objects under the `pipelines` key.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@ami/main/api/views.py`:
- Around line 226-227: The pipelines view method currently defines an unused
parameter pk which triggers ARG002; update the method signature for def
pipelines(self, request, pk=None): to silence the lint warning by marking the
unused argument with a noqa (e.g., def pipelines(self, request, pk=None): #
noqa: ARG002) or remove/rename if appropriate; ensure the change references the
pipelines method so the decorator `@action`(detail=True, methods=["post"],
url_path="pipelines") remains unchanged.
|
@annavik it would be great to discuss the UX for pipeline registration this week while we have @carlosgjs |
| # Parse the incoming payload using the pydantic schema so we convert dicts to | ||
| # the expected PipelineConfigResponse models | ||
|
|
||
| try: |
There was a problem hiding this comment.
I feel there should be a way to leverage the existing Django Rest Framework patterns and tools here, like use a Serializer class instead of AsyncPipelineRegistrationRequest. BUT! this may be the beginning of a new pattern as well. Since AsyncPipelineRegistrationRequest is not just a serialized model, but a bunch of stuff. So something to keep our eye on. It's more like an "orchestration" endpoint than a basic CRUD endpoint, and I think there will be more of these. Such as when we want to post a PipelineResult directly to the API. Right now this view seems simple and works! If we have tests for it, then we should be good.
There was a problem hiding this comment.
I worked out a plan with Claude, it's not too bad! This uses several of our existing patterns:
.agents/planning/pipeline-registration-drf.md
I will branch of this branch so we can compare!
| return ProcessingServiceStatusResponse( | ||
| timestamp=datetime.datetime.now(), | ||
| request_successful=False, | ||
| server_live=None, |
There was a problem hiding this comment.
For async processing services (distributed workers), the server_live should be set by the view each time we hear from the processing service. We can rename the model field last_checked_live to last_seen_live and last_checked to last_seen. For v1 synchronous processing sevices with an endpoint_url, those can still be set by the status checker. @copilot can you make a follow-up ticket for this?
There was a problem hiding this comment.
@coderabbitai can you make a follow-up for this?
There was a problem hiding this comment.
🧩 Analysis chain
@mihow, I'll create a follow-up issue for this.
🏁 Script executed:
#!/bin/bash
# Create a follow-up GitHub issue for renaming ProcessingService fields
gh issue create \
--title "Rename ProcessingService fields to reflect async/pull model semantics" \
--body "## Context
For async processing services (distributed workers), the \`server_live\` status should be set by the view each time we hear from the processing service. For v1 synchronous processing services with an endpoint_url, those can still be set by the status checker.
## Requested Changes
Rename the following fields on the \`ProcessingService\` model:
- \`last_checked_live\` → \`last_seen_live\`
- \`last_checked\` → \`last_seen\`
This naming better reflects the semantic difference between:
- **Async/pull model**: workers report in, and we record when we last saw them
- **Sync/push model**: we actively check the service status
## References
- Requested by: `@mihow`
- PR: https://github.com/RolnickLab/antenna/pull/1076
- Comment: https://github.com/RolnickLab/antenna/pull/1076#discussion_r2776725245" \
--assignee mihow
Length of output: 49
@mihow, I've created a follow-up issue for this: #1122
The issue covers renaming last_checked_live → last_seen_live and last_checked → last_seen to better reflect the semantic difference between async/pull model services (where workers report in) and sync/push model services (where we actively check status). I've assigned it to you.
🧬 Code Graph Analysis Results
[Included relevant code snippets for context and follow-up]
-
Filename: processing_services/minimal/api/schemas.py
Line ranges: 274-293
Content (ProcessingServiceInfoResponse model)- Description: Information about the processing service, including pipelines as PipelineConfigResponse items. This is relevant for understanding how pipeline/config data is structured when interacting with a Processing Service backend.
class ProcessingServiceInfoResponse(pydantic.BaseModel):
"""Information about the processing service."""name: str = pydantic.Field(example="Mila Research Lab - Moth AI Services") description: str | None = pydantic.Field( default=None, examples=["Algorithms developed by the Mila Research Lab for analysis of moth images."], ) pipelines: list[PipelineConfigResponse] = pydantic.Field( default=list, examples=[ [ PipelineConfigResponse(name="Random Pipeline", slug="random", version=1, algorithms=[]), ] ], ) # algorithms: list[AlgorithmConfigResponse] = pydantic.Field( # default=list, # examples=[RANDOM_BINARY_CLASSIFIER], # ) -
Filename: processing_services/minimal/api/api.py
Line ranges: 47-57
Content (info() endpoint example)- Description: Example endpoint returning a ProcessingServiceInfoResponse. Helpful to understand expected shape of backend info payload that ProcessingService.create_pipelines consumes.
async def info() -> ProcessingServiceInfoResponse:
info = ProcessingServiceInfoResponse(
name="ML Backend Template",
description=(
"A template for an inference API that allows the user to run different sequences of machine learning "
"models and processing methods on images for the Antenna platform."
),
pipelines=[pipeline.config for pipeline in pipelines],
# algorithms=list(algorithm_choices.values()),
)
return info -
Filename: ami/utils/requests.py
Line ranges: 15-42
Content (create_session)- Description: Utility to create a requests.Session with retry behavior. Relevant for understanding how ProcessingService.get_status uses HTTP with retries.
def create_session(
retries: int = 3,
backoff_factor: int = 2,
status_forcelist: tuple[int, ...] = (500, 502, 503, 504),
) -> requests.Session:
"""
Create a requests Session with retry capabilities.Args: retries: Maximum number of retries backoff_factor: Backoff factor for retries status_forcelist: HTTP status codes to retry on Returns: Session configured with retry behavior """ session = requests.Session() retry = Retry( total=retries, read=retries, connect=retries, backoff_factor=backoff_factor, status_forcelist=status_forcelist, ) adapter = HTTPAdapter(max_retries=retry) session.mount("http://", adapter) session.mount("https://", adapter) return session -
Filename: ami/ml/models/project_pipeline_config.py
Line ranges: 12-26
Content (ProjectPipelineConfig model)- Description: Model that stores the relationship between a project and a pipeline, including enabled flag and per-project config. Used by ProcessingService.create_pipelines when associating pipelines with projects.
class ProjectPipelineConfig(BaseModel):
"""Intermediate model to store the relationship between a project and a pipeline."""project = models.ForeignKey("main.Project", related_name="project_pipeline_configs", on_delete=models.CASCADE) pipeline = models.ForeignKey("ml.Pipeline", related_name="project_pipeline_configs", on_delete=models.CASCADE) enabled = models.BooleanField(default=True) config = models.JSONField(default=dict, blank=True, null=True) def __str__(self): return f'#{self.pk} "{self.pipeline}" in {self.project}' class Meta: unique_together = ("pipeline", "project") verbose_name = "Project-Pipeline Configuration" verbose_name_plural = "Project-Pipeline Configurations" -
Filename: ami/ml/models/pipeline.py
Line ranges: 313-390
Content (get_or_create_algorithm_and_category_map)- Description: Helper to create or fetch Algorithm objects and associated category maps from a pipeline/config. Directly used by ProcessingService.create_pipelines when registering algorithms for a pipeline. Includes updating algorithm fields and attaching category maps.
def get_or_create_algorithm_and_category_map(
algorithm_config: AlgorithmConfigResponse,
logger: logging.Logger = logger,
) -> Algorithm:
"""
Create algorithms and category maps from a ProcessingServiceInfoResponse or a PipelineConfigResponse.:param algorithm_configs: A dictionary of algorithms from the processing services' "/info" endpoint :param logger: A logger instance from the parent function :return: A dictionary of algorithms registered in the pipeline, keyed by the algorithm key `@TODO` this should be called when registering a pipeline, not when saving results. But currently we don't have a way to register pipelines. """ algo, _created = Algorithm.objects.get_or_create( key=algorithm_config.key, version=algorithm_config.version, defaults={ "name": algorithm_config.name, "task_type": algorithm_config.task_type, "version_name": algorithm_config.version_name, "uri": algorithm_config.uri, "category_map": None, }, ) if _created: logger.info(f"Registered new algorithm {algo}") else: logger.info(f"Using existing algorithm {algo}") algo_fields_updated = [] new_category_map = None category_map_data = algorithm_config.category_map if not algo.has_valid_category_map(): if category_map_data: # New algorithms will not have a category map yet, and older ones may not either # The category map data should be in the algorithm config from the /info endpoint new_category_map = AlgorithmCategoryMap.objects.create( version=category_map_data.version, data=category_map_data.data, labels=category_map_data.labels, description=category_map_data.description, uri=category_map_data.uri, ) algo.category_map = new_category_map algo_fields_updated.append("category_map") logger.info(f"Registered new category map {new_category_map} for algorithm {algo}") else: if algorithm_config.task_type in Algorithm.classification_task_types: msg = ( f"No valid category map found for algorithm '{algorithm_config.key}' with " f"task type '{algorithm_config.task_type}' or in the pipeline /info response. " "Update the processing service to include a category map for all classification algorithms " "then re-register the pipelines." ) raise PipelineNotConfigured(msg) else: logger.debug(f"No category map found, but not required for task type {algorithm_config.task_type}") # Update fields that may have changed in the processing service, with a warning # These are fields that we have added to the API since the algorithm was first created fields_to_update = { "task_type": algorithm_config.task_type, "uri": algorithm_config.uri, } for field in fields_to_update: new_value = fields_to_update[field] if getattr(algo, field) != new_value: logger.warning(f"Field '{field}' changed for algorithm {algo} from {getattr(algo, field)} to {new_value}") setattr(algo, field, new_value) algo_fields_updated.append(field) if algo_fields_updated: algo.save(update_fields=algo_fields_updated) return algo -
Filename: ami/ml/models/processing_service.py
Note: The user asked for a follow-up; the included following items are directly relevant to follow-up actions:-
The class and methods themselves describe how pipelines are registered and how status/config are fetched, which a follow-up might extend or modify.
-
Additionally, behavior tied to:
- ProcessingService.create_pipelines enable_only, projects, pipeline_configs
- get_status health check and readyz endpoint usage
- get_pipeline_configs fetching info data
(Refer to the existing file for exact implementation.)
-
If you want, I can draft a concrete follow-up change (e.g., add a new endpoint, extend get_status to return more details, or enhance enable_only filtering) based on your target.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@ami/main/api/views.py`:
- Around line 254-283: Wrap the sequence of DB writes and pipeline creation in a
single atomic transaction: import transaction from django.db and enclose
ProcessingService.objects.get_or_create(...), the
check/processing_service.projects.add(project), and the call to
processing_service.create_pipelines(...) inside a with transaction.atomic():
block so any failure in create_pipelines rolls back the get_or_create/add
changes; keep the existing logging and Response handling inside the block or
immediately after as appropriate.
| # Atomically get or create the processing service | ||
| processing_service, created = ProcessingService.objects.get_or_create( | ||
| name=parsed.processing_service_name, | ||
| defaults={ | ||
| "endpoint_url": None, | ||
| }, | ||
| ) | ||
|
|
||
| # Check if the service is already associated with this project | ||
| if not created and project in processing_service.projects.all(): | ||
| error_msg = f"Processing service already exists and is associated with project {project.pk}" | ||
| logger.warning(error_msg) | ||
| return Response({"detail": error_msg}, status=status.HTTP_400_BAD_REQUEST) | ||
|
|
||
| # Associate with the project | ||
| processing_service.projects.add(project) | ||
|
|
||
| if created: | ||
| logger.info(f"Created processing service {processing_service} for project {project.pk}") | ||
| else: | ||
| logger.info(f"Associated processing service {processing_service} with project {project.pk}") | ||
|
|
||
| # Call create_pipelines limited to this project | ||
| response = processing_service.create_pipelines( | ||
| pipeline_configs=parsed.pipelines, | ||
| projects=Project.objects.filter(pk=project.pk), | ||
| ) | ||
|
|
||
| # response is a pydantic model; return its dict representation | ||
| return Response(response.dict()) |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Wrap DB operations in transaction.atomic().
The get_or_create, projects.add, and create_pipelines calls involve multiple DB writes that should be atomic. If create_pipelines fails partway through, you could end up with a partially-registered service. The planning document also explicitly calls this out.
Proposed fix
+ from django.db import transaction
+
- # Atomically get or create the processing service
- processing_service, created = ProcessingService.objects.get_or_create(
- name=parsed.processing_service_name,
- defaults={
- "endpoint_url": None,
- },
- )
-
- # Check if the service is already associated with this project
- if not created and project in processing_service.projects.all():
- error_msg = f"Processing service already exists and is associated with project {project.pk}"
- logger.warning(error_msg)
- return Response({"detail": error_msg}, status=status.HTTP_400_BAD_REQUEST)
-
- # Associate with the project
- processing_service.projects.add(project)
-
- if created:
- logger.info(f"Created processing service {processing_service} for project {project.pk}")
- else:
- logger.info(f"Associated processing service {processing_service} with project {project.pk}")
-
- # Call create_pipelines limited to this project
- response = processing_service.create_pipelines(
- pipeline_configs=parsed.pipelines,
- projects=Project.objects.filter(pk=project.pk),
- )
+ with transaction.atomic():
+ processing_service, created = ProcessingService.objects.get_or_create(
+ name=parsed.processing_service_name,
+ defaults={"endpoint_url": None},
+ )
+ processing_service.projects.add(project)
+
+ if created:
+ logger.info(f"Created processing service {processing_service} for project {project.pk}")
+ else:
+ logger.info(f"Associated processing service {processing_service} with project {project.pk}")
+
+ response = processing_service.create_pipelines(
+ pipeline_configs=parsed.pipelines,
+ projects=Project.objects.filter(pk=project.pk),
+ )🤖 Prompt for AI Agents
In `@ami/main/api/views.py` around lines 254 - 283, Wrap the sequence of DB writes
and pipeline creation in a single atomic transaction: import transaction from
django.db and enclose ProcessingService.objects.get_or_create(...), the
check/processing_service.projects.add(project), and the call to
processing_service.create_pipelines(...) inside a with transaction.atomic():
block so any failure in create_pipelines rolls back the get_or_create/add
changes; keep the existing logging and Response handling inside the block or
immediately after as appropriate.
There was a problem hiding this comment.
Addressed in .agents/planning/pipeline-registration-drf.md
There was a problem hiding this comment.
@mihow, understood! I can see the planning document includes transaction wrapping for the DB operations. Since this is being addressed as part of the planned refactor, no further action is needed on this comment.
🧠 Learnings used
Learnt from: CR
Repo: RolnickLab/antenna PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-11-26T03:02:40.840Z
Learning: Document agent implementations in .agents/AGENTS.md
Summary
This pull request introduces a new API endpoint to support the registration of ML pipelines for a project, primarily for integration with V2 ML processing services. It also changes the
ProcessingService.endpoint_urlto be nullable.New API endpoint for pipeline registration:
pipelinesPOST action to the project viewset inami/main/api/views.py, allowing V2 ML processing services to register available pipelines for a project. The endpoint parses the payload using the AsyncPipelineRegistrationRequest.Closes #1086
Checklist
Summary by CodeRabbit
New Features
Bug Fixes
Tests
Chores