From 7bb3f2d796d351bbe097db89049e6e11f0c92494 Mon Sep 17 00:00:00 2001 From: Debian Date: Tue, 11 Jul 2023 04:21:59 +0000 Subject: [PATCH 1/8] Support data import to AMI platform DB --- trapdata/cli/export.py | 22 +++++++++++++++++----- trapdata/common/logs.py | 2 +- trapdata/db/models/images.py | 6 ++++++ trapdata/db/models/occurrences.py | 4 ++++ 4 files changed, 28 insertions(+), 6 deletions(-) diff --git a/trapdata/cli/export.py b/trapdata/cli/export.py index 19684bb7..79128eaa 100644 --- a/trapdata/cli/export.py +++ b/trapdata/cli/export.py @@ -213,7 +213,7 @@ def sessions( @cli.command() def captures( - date: datetime.datetime, + date: Optional[datetime.datetime] = None, format: ExportFormat = ExportFormat.json, outfile: Optional[pathlib.Path] = None, ) -> Optional[str]: @@ -224,16 +224,28 @@ def captures( """ Session = get_session_class(settings.database_url) session = Session() + if date is not None: + event_dates = [date.date()] + else: + event_dates = [ + event.day + for event in get_monitoring_sessions_from_db( + db_path=settings.database_url, base_directory=settings.image_base_path + ) + ] events = get_monitoring_session_by_date( db_path=settings.database_url, base_directory=settings.image_base_path, - event_dates=[str(date.date())], + event_dates=event_dates, ) - if not len(events): + if date and not len(events): raise Exception(f"No Monitoring Event with date: {date.date()}") - event = events[0] - captures = get_monitoring_session_images(settings.database_url, event, limit=100) + captures = [] + for event in events: + captures += get_monitoring_session_images( + settings.database_url, event, limit=100 + ) [session.add(img) for img in captures] df = pd.DataFrame([img.report_detail().model_dump() for img in captures]) diff --git a/trapdata/common/logs.py b/trapdata/common/logs.py index e0c2f7cb..cf4b4e7e 100644 --- a/trapdata/common/logs.py +++ b/trapdata/common/logs.py @@ -3,7 +3,7 @@ import structlog structlog.configure( - wrapper_class=structlog.make_filtering_bound_logger(logging.INFO), + wrapper_class=structlog.make_filtering_bound_logger(logging.DEBUG), ) diff --git a/trapdata/db/models/images.py b/trapdata/db/models/images.py index fa3770f2..34dd3788 100644 --- a/trapdata/db/models/images.py +++ b/trapdata/db/models/images.py @@ -29,6 +29,9 @@ class CaptureListItem(BaseModel): class CaptureDetail(CaptureListItem): id: int event: object + url: Optional[str] = None + event: object + deployment: str notes: Optional[str] detections: list filesize: int @@ -121,11 +124,14 @@ def report_data(self) -> CaptureListItem: return CaptureListItem( id=self.id, source_image=f"{constants.IMAGE_BASE_URL}vermont/snapshots/{self.path}", + path=self.path, timestamp=self.timestamp, last_read=self.last_read, last_processed=self.last_processed, in_queue=self.in_queue, num_detections=self.num_detected_objects, + event=self.monitoring_session.day, + deployment=self.monitoring_session.deployment, ) def report_detail(self) -> CaptureDetail: diff --git a/trapdata/db/models/occurrences.py b/trapdata/db/models/occurrences.py index 561d3166..bc1b9cf0 100644 --- a/trapdata/db/models/occurrences.py +++ b/trapdata/db/models/occurrences.py @@ -159,11 +159,15 @@ def get_unique_species_by_track( models.DetectedObject.id, models.DetectedObject.image_id.label("source_image_id"), models.TrapImage.path.label("source_image_path"), + models.TrapImage.width.label("source_image_width"), + models.TrapImage.height.label("source_image_height"), + models.TrapImage.filesize.label("source_image_filesize"), models.DetectedObject.specific_label.label("label"), models.DetectedObject.specific_label_score.label("score"), models.DetectedObject.path.label("cropped_image_path"), models.DetectedObject.sequence_id, models.DetectedObject.timestamp, + models.DetectedObject.bbox, ) .where( (models.DetectedObject.monitoring_session_id == monitoring_session.id) From 2515e0be12b7f93e48eeecf1bff146f2dabd698f Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 6 Aug 2025 18:38:28 -0700 Subject: [PATCH 2/8] feat: new exporter using PipelineResultsResponse schema for Antenna --- trapdata/api/export_utils.py | 353 ++++++++++++++++++++++++++++++ trapdata/api/schemas.py | 19 ++ trapdata/cli/export.py | 171 +++++++++++++++ trapdata/db/models/occurrences.py | 25 ++- 4 files changed, 567 insertions(+), 1 deletion(-) create mode 100644 trapdata/api/export_utils.py diff --git a/trapdata/api/export_utils.py b/trapdata/api/export_utils.py new file mode 100644 index 00000000..d11a6006 --- /dev/null +++ b/trapdata/api/export_utils.py @@ -0,0 +1,353 @@ +""" +Utilities for converting database models to API schemas for export functionality. +""" + +import datetime +from typing import Optional, Protocol + +from trapdata import ml +from trapdata.api.schemas import ( + AlgorithmConfigResponse, + AlgorithmReference, + BoundingBox, + ClassificationResponse, + DetectionResponse, + PipelineResultsResponse, + SourceImageResponse, +) +from trapdata.settings import read_settings + + +class DetectedObjectLike(Protocol): + """Protocol for objects that behave like DetectedObject for conversion.""" + + id: Optional[int] + specific_label: Optional[str] + specific_label_score: Optional[float] + bbox: Optional[list[int]] + path: Optional[str] + timestamp: Optional[datetime.datetime] + detection_algorithm: Optional[str] + classification_algorithm: Optional[str] + + +def create_algorithm_reference( + algorithm_name: Optional[str], task_type: str = "detection" +) -> AlgorithmReference: + """ + Create an AlgorithmReference from an algorithm name. + + Args: + algorithm_name: Name of the algorithm, may be None for legacy data + task_type: Type of task (detection, classification) + + Returns: + AlgorithmReference object + """ + if not algorithm_name: + if task_type == "detection": + algorithm_name = "unknown_detector" + key = "unknown_detector" + else: + algorithm_name = "unknown_classifier" + key = "unknown_classifier" + return AlgorithmReference(name=algorithm_name, key=key) + + # Try to find the actual algorithm key from the model classes + current_settings = read_settings() + + if task_type == "detection": + detector_choice = current_settings.localization_model + detector_class = ml.models.object_detectors.get(detector_choice.value) + if detector_class and detector_class.name == algorithm_name: + key = detector_class.get_key() + else: + # Fallback to generated key + key = algorithm_name.lower().replace(" ", "_").replace("-", "_") + else: + # Check species classifier first + species_choice = current_settings.species_classification_model + species_class = ml.models.species_classifiers.get(species_choice.value) + if species_class and species_class.name == algorithm_name: + key = species_class.get_key() + else: + # Check binary classifier + binary_choice = current_settings.binary_classification_model + binary_class = ml.models.binary_classifiers.get(binary_choice.value) + if binary_class and binary_class.name == algorithm_name: + key = binary_class.get_key() + else: + # Fallback to generated key + key = algorithm_name.lower().replace(" ", "_").replace("-", "_") + + return AlgorithmReference(name=algorithm_name, key=key) + + +def convert_classification_to_classification_response( + detected_obj: DetectedObjectLike, + algorithm_name: Optional[str] = None, + timestamp: Optional[datetime.datetime] = None, +) -> ClassificationResponse: + """ + Convert classification data from a DetectedObject to ClassificationResponse. + + Args: + detected_obj: Database DetectedObject with classification data + algorithm_name: Name of classification algorithm used + timestamp: Timestamp for the classification + + Returns: + ClassificationResponse object + """ + if timestamp is None: + timestamp = detected_obj.timestamp or datetime.datetime.now() + + # Use the specific label and score from the detected object + classification = detected_obj.specific_label or "unknown" + score = detected_obj.specific_label_score or 0.0 + + # Create algorithm reference + algorithm = create_algorithm_reference( + algorithm_name or detected_obj.classification_algorithm, + task_type="classification", + ) + + return ClassificationResponse( + classification=classification, + labels=None, # Not available in database model + scores=[score], # Single score for the predicted class + logits=[], # Not stored in database + inference_time=None, # Not stored in database + algorithm=algorithm, + terminal=True, + timestamp=timestamp, + ) + + +def convert_detected_object_to_detection_response( + detected_obj: DetectedObjectLike, + source_image_id: str, + crop_image_url: Optional[str] = None, + detection_algorithm_name: Optional[str] = None, + classification_algorithm_name: Optional[str] = None, +) -> DetectionResponse: + """ + Convert a DetectedObject from database to DetectionResponse API schema. + + Args: + detected_obj: Database DetectedObject + source_image_id: ID of the source image + crop_image_url: URL to the cropped image (optional) + detection_algorithm_name: Name of detection algorithm used + classification_algorithm_name: Name of classification algorithm used + + Returns: + DetectionResponse object with embedded ClassificationResponse + """ + # Convert bounding box from list to BoundingBox object + bbox_coords = detected_obj.bbox or [0, 0, 0, 0] + # Convert int coordinates to float for BoundingBox + bbox_coords_float = [float(coord) for coord in bbox_coords] + bbox = BoundingBox.from_coords(bbox_coords_float) + + # Create detection algorithm reference + detection_algorithm = create_algorithm_reference( + detection_algorithm_name or detected_obj.detection_algorithm, + task_type="detection", + ) + + # Create classification response if classification data exists + classifications = [] + if detected_obj.specific_label: + classification_response = convert_classification_to_classification_response( + detected_obj, + algorithm_name=classification_algorithm_name, + timestamp=detected_obj.timestamp, + ) + classifications.append(classification_response) + + # Use crop image path as URL if available + if not crop_image_url and detected_obj.path: + crop_image_url = str(detected_obj.path) + + return DetectionResponse( + source_image_id=source_image_id, + bbox=bbox, + inference_time=None, # Not stored in database + algorithm=detection_algorithm, + timestamp=detected_obj.timestamp or datetime.datetime.now(), + crop_image_url=crop_image_url, + classifications=classifications, + ) + + +def convert_occurrence_to_detection_responses( + occurrence_data: dict, + detection_algorithm_name: Optional[str] = None, + classification_algorithm_name: Optional[str] = None, +) -> list[DetectionResponse]: + """ + Convert occurrence data (with examples) to a list of DetectionResponse objects. + + Args: + occurrence_data: Dictionary containing occurrence data with examples + detection_algorithm_name: Name of detection algorithm used + classification_algorithm_name: Name of classification algorithm used + + Returns: + List of DetectionResponse objects + """ + detection_responses = [] + + # Get current algorithm names from settings if not provided + if not detection_algorithm_name or not classification_algorithm_name: + current_settings = read_settings() + + if not detection_algorithm_name: + detector_choice = current_settings.localization_model + detector_class = ml.models.object_detectors.get(detector_choice.value) + if detector_class: + detection_algorithm_name = detector_class.name + + if not classification_algorithm_name: + species_choice = current_settings.species_classification_model + species_class = ml.models.species_classifiers.get(species_choice.value) + if species_class: + classification_algorithm_name = species_class.name + + examples = occurrence_data.get("examples", []) + for example in examples: + # Create a mock DetectedObject from the example data + class MockDetectedObject: + def __init__(self, example_data): + self.id = example_data.get("id") + self.specific_label = example_data.get("label") + self.specific_label_score = example_data.get("score") + self.bbox = example_data.get("bbox", [0, 0, 0, 0]) + self.path = example_data.get("cropped_image_path") + self.timestamp = example_data.get("timestamp") + self.detection_algorithm = detection_algorithm_name + self.classification_algorithm = classification_algorithm_name + + mock_obj = MockDetectedObject(example) + source_image_id = str(example.get("source_image_id", "unknown")) + + detection_response = convert_detected_object_to_detection_response( + mock_obj, + source_image_id=source_image_id, + detection_algorithm_name=detection_algorithm_name, + classification_algorithm_name=classification_algorithm_name, + ) + + detection_responses.append(detection_response) + + return detection_responses + + +def get_current_algorithms() -> dict[str, AlgorithmConfigResponse]: + """ + Get the currently configured algorithms from settings. + + Returns: + Dictionary of algorithm configurations keyed by algorithm key + """ + current_settings = read_settings() + algorithms = {} + + # Get object detector + detector_choice = current_settings.localization_model + detector_class = ml.models.object_detectors.get(detector_choice.value) + if detector_class: + algorithms[detector_class.get_key()] = AlgorithmConfigResponse( + name=detector_class.name, + key=detector_class.get_key(), + task_type="detection", + description=getattr(detector_class, "description", None), + version=1, + ) + + # Get binary classifier + binary_choice = current_settings.binary_classification_model + binary_class = ml.models.binary_classifiers.get(binary_choice.value) + if binary_class: + algorithms[binary_class.get_key()] = AlgorithmConfigResponse( + name=binary_class.name, + key=binary_class.get_key(), + task_type="classification", + description=getattr(binary_class, "description", None), + version=1, + ) + + # Get species classifier + species_choice = current_settings.species_classification_model + species_class = ml.models.species_classifiers.get(species_choice.value) + if species_class: + algorithms[species_class.get_key()] = AlgorithmConfigResponse( + name=species_class.name, + key=species_class.get_key(), + task_type="classification", + description=getattr(species_class, "description", None), + version=1, + ) + + return algorithms + + +def get_source_images_from_occurrences(occurrences: list) -> list[SourceImageResponse]: + """ + Extract unique source images from occurrence data. + + Args: + occurrences: List of occurrence dictionaries with examples + + Returns: + List of SourceImageResponse objects + """ + source_images = {} + + for occurrence in occurrences: + examples = occurrence.get("examples", []) + for example in examples: + source_image_id = str(example.get("source_image_id", "unknown")) + source_image_path = example.get("source_image_path", "") + + if source_image_id not in source_images: + source_images[source_image_id] = SourceImageResponse( + id=source_image_id, + url=source_image_path, + ) + + return list(source_images.values()) + + +def create_pipeline_results_response( + occurrences: list, + detection_responses: list[DetectionResponse], + pipeline_name: str = "local_batch_processor", + total_time: float = 0.0, +) -> PipelineResultsResponse: + """ + Create a complete PipelineResultsResponse from occurrence data and responses. + + Args: + occurrences: List of occurrence dictionaries + detection_responses: List of DetectionResponse objects + pipeline_name: Name of the pipeline used + total_time: Total processing time + + Returns: + Complete PipelineResultsResponse object + """ + # Get current algorithms + algorithms = get_current_algorithms() + + # Get source images + source_images = get_source_images_from_occurrences(occurrences) + + return PipelineResultsResponse( + pipeline=pipeline_name, + algorithms=algorithms, + total_time=total_time, + source_images=source_images, + detections=detection_responses, + ) diff --git a/trapdata/api/schemas.py b/trapdata/api/schemas.py index 7083d64b..b6ecb7a6 100644 --- a/trapdata/api/schemas.py +++ b/trapdata/api/schemas.py @@ -40,6 +40,7 @@ class SourceImage(pydantic.BaseModel): width: int | None = None height: int | None = None timestamp: datetime.datetime | None = None + deployment: "DeploymentReference | None" = None # Validate that there is at least one of the following fields @pydantic.model_validator(mode="after") @@ -67,6 +68,23 @@ def open(self, raise_exception=False) -> PIL.Image.Image | None: return self._pil +class DeploymentReference(pydantic.BaseModel): + """Reference to a deployment.""" + + name: str = pydantic.Field( + description="Name of the deployment, e.g. 'Vermont Moth Camera Station 1'.", + examples=["vermont-moth-camera-station-1"], + ) + key: str = pydantic.Field( + description=( + "A unique key for the deployment, used to reference it in the API. " + "In practive the ADC's deployment key and name are the same and are " + "derived from the root folder name of the source image." + ), + examples=["vermont-moth-camera-station-1"], + ) + + class AlgorithmReference(pydantic.BaseModel): name: str key: str @@ -277,6 +295,7 @@ class PipelineResultsResponse(pydantic.BaseModel): total_time: float source_images: list[SourceImageResponse] detections: list[DetectionResponse] + deployments: list[DeploymentReference] | None = None config: PipelineConfigRequest = PipelineConfigRequest() diff --git a/trapdata/cli/export.py b/trapdata/cli/export.py index 79128eaa..f75fbf44 100644 --- a/trapdata/cli/export.py +++ b/trapdata/cli/export.py @@ -11,6 +11,10 @@ from rich import print from trapdata import logger +from trapdata.api.export_utils import ( + convert_occurrence_to_detection_responses, + create_pipeline_results_response, +) from trapdata.cli import settings from trapdata.db import get_session_class from trapdata.db.models.deployments import list_deployments @@ -266,3 +270,170 @@ def deployments( df = pd.DataFrame([d.model_dump() for d in deployments]) return export(df=df, format=format, outfile=outfile) + + +@cli.command(name="api-occurrences") +def api_occurrences( + format: ExportFormat = ExportFormat.json, + num_examples: int = 3, + limit: Optional[int] = None, + offset: int = 0, + outfile: Optional[pathlib.Path] = None, + collect_images: bool = False, + absolute_paths: bool = False, + detection_algorithm: Optional[str] = None, + classification_algorithm: Optional[str] = None, +) -> Optional[str]: + """ + Export occurrences using API schemas (DetectionResponse/ClassificationResponse). + + This exports the same occurrence data as the 'occurrences' command but uses + the new API schema format with DetectionResponse and ClassificationResponse + objects instead of the legacy Occurrence and ExportedDetection formats. + """ + events = get_monitoring_sessions_from_db( + db_path=settings.database_url, base_directory=settings.image_base_path + ) + + # Get occurrence data using existing logic + occurrences: list[Occurrence] = [] + tabular_formats = [ExportFormat.csv] + + if format in tabular_formats: + num_examples = 1 + + for event in events: + occurrences += list_occurrences( + settings.database_url, + monitoring_session=event, + classification_threshold=settings.classification_threshold, + num_examples=num_examples, + limit=limit, + offset=offset, + ) + + # Convert occurrences to DetectionResponse objects + all_detection_responses = [] + occurrence_dicts = [] + for occurrence in occurrences: + occurrence_dict = occurrence.model_dump() + occurrence_dicts.append(occurrence_dict) + detection_responses = convert_occurrence_to_detection_responses( + occurrence_dict, + detection_algorithm_name=detection_algorithm, + classification_algorithm_name=classification_algorithm, + ) + all_detection_responses.extend(detection_responses) + + # Create full pipeline results response + pipeline_response = create_pipeline_results_response( + occurrences=occurrence_dicts, + detection_responses=all_detection_responses, + pipeline_name="local_batch_processor", + total_time=0.0, + ) + + logger.info( + f"Preparing to export pipeline response with {len(all_detection_responses)} detection records as {format}" + ) + + if outfile: + destination_dir = outfile.parent + else: + destination_dir = settings.user_data_path / "exports" + destination_dir.mkdir(parents=True, exist_ok=True) + + if collect_images: + # Collect images for exported detections into a subdirectory + if outfile: + name = outfile.stem + else: + name = f"api_occurrences_{int(time.time())}" + destination_dir = destination_dir / f"{name}_images" + logger.info(f'Collecting images into "{destination_dir}"') + destination_dir.mkdir(parents=True, exist_ok=True) + + for detection in all_detection_responses: + if detection.crop_image_url: + source_path = pathlib.Path(detection.crop_image_url).resolve() + if source_path.exists(): + # Create a meaningful filename + classification = "unknown" + if detection.classifications: + classification = detection.classifications[0].classification + + destination = ( + destination_dir + / f"{classification}_{detection.source_image_id}_{source_path.name}" + ) + if not destination.exists(): + shutil.copy(source_path, destination) + + # Update the crop_image_url to point to the collected image + if absolute_paths: + detection.crop_image_url = str(destination.absolute()) + else: + detection.crop_image_url = str( + destination.relative_to(destination_dir) + ) + + # Convert to DataFrame for export based on format + if format in tabular_formats: + # For CSV, flatten the detection responses structure + detection_dicts = [ + detection.model_dump() for detection in all_detection_responses + ] + flattened_dicts = [] + for detection_dict in detection_dicts: + flat_dict = { + "source_image_id": detection_dict["source_image_id"], + "bbox_x1": detection_dict["bbox"]["x1"], + "bbox_y1": detection_dict["bbox"]["y1"], + "bbox_x2": detection_dict["bbox"]["x2"], + "bbox_y2": detection_dict["bbox"]["y2"], + "timestamp": detection_dict["timestamp"], + "crop_image_url": detection_dict.get("crop_image_url"), + "detection_algorithm_name": detection_dict["algorithm"]["name"], + "detection_algorithm_key": detection_dict["algorithm"]["key"], + } + + # Add classification data if available + if detection_dict["classifications"]: + classification = detection_dict["classifications"][0] + flat_dict.update( + { + "classification": classification["classification"], + "classification_score": ( + classification["scores"][0] + if classification["scores"] + else None + ), + "classification_algorithm_name": classification["algorithm"][ + "name" + ], + "classification_algorithm_key": classification["algorithm"][ + "key" + ], + "classification_timestamp": classification["timestamp"], + } + ) + else: + flat_dict.update( + { + "classification": None, + "classification_score": None, + "classification_algorithm_name": None, + "classification_algorithm_key": None, + "classification_timestamp": None, + } + ) + + flattened_dicts.append(flat_dict) + + df = pd.DataFrame(flattened_dicts) + else: + # For JSON/HTML, export the full pipeline response + pipeline_dict = pipeline_response.model_dump() + df = pd.DataFrame([pipeline_dict]) + + return export(df=df, format=format, outfile=outfile) diff --git a/trapdata/db/models/occurrences.py b/trapdata/db/models/occurrences.py index bc1b9cf0..100f1eea 100644 --- a/trapdata/db/models/occurrences.py +++ b/trapdata/db/models/occurrences.py @@ -18,6 +18,29 @@ from trapdata.db import models +class ExportedDetection(pydantic.BaseModel): + id: int + source_image_id: int + source_image_path: str + source_image_width: int + source_image_height: int + source_image_filesize: int + label: str + score: float + cropped_image_path: str | None = None + sequence_id: str | None = ( + None # This is the Occurrence ID on the ADC side (= detections in a sequence) + ) + timestamp: datetime.datetime + detection_algorithm: str | None = ( + None # Name of the object detection algorithm used + ) + classification_algorithm: str | None = ( + None # Classification algorithm used to generate the label & score + ) + bbox: list[int] # Bounding box in the format [x_min, y_min, x_max, y_max] + + class Occurrence(pydantic.BaseModel): id: str label: str @@ -30,7 +53,7 @@ class Occurrence(pydantic.BaseModel): num_frames: int # cropped_image_path: pathlib.Path # source_image_id: int - examples: list[dict] + examples: list[ExportedDetection] = [] example_crop: Optional[pathlib.Path] = None # detections: list[object] # deployment: object From 6e12c44fb4b26218a15ca03d63c695b539aa392c Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 6 Aug 2025 18:58:54 -0700 Subject: [PATCH 3/8] feat: add deployment data to api export format --- trapdata/api/export_utils.py | 14 +++++++++++++- trapdata/api/schemas.py | 1 + 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/trapdata/api/export_utils.py b/trapdata/api/export_utils.py index d11a6006..dd37d788 100644 --- a/trapdata/api/export_utils.py +++ b/trapdata/api/export_utils.py @@ -303,9 +303,20 @@ def get_source_images_from_occurrences(occurrences: list) -> list[SourceImageRes Returns: List of SourceImageResponse objects """ + from trapdata.api.schemas import DeploymentReference + source_images = {} for occurrence in occurrences: + # Get deployment information from the occurrence + deployment_name = occurrence.get("deployment") + deployment = None + if deployment_name: + deployment = DeploymentReference( + name=deployment_name, + key=deployment_name, # Use same value for key as name + ) + examples = occurrence.get("examples", []) for example in examples: source_image_id = str(example.get("source_image_id", "unknown")) @@ -315,6 +326,7 @@ def get_source_images_from_occurrences(occurrences: list) -> list[SourceImageRes source_images[source_image_id] = SourceImageResponse( id=source_image_id, url=source_image_path, + deployment=deployment, ) return list(source_images.values()) @@ -341,7 +353,7 @@ def create_pipeline_results_response( # Get current algorithms algorithms = get_current_algorithms() - # Get source images + # Get source images with deployment information source_images = get_source_images_from_occurrences(occurrences) return PipelineResultsResponse( diff --git a/trapdata/api/schemas.py b/trapdata/api/schemas.py index b6ecb7a6..f0cc0c65 100644 --- a/trapdata/api/schemas.py +++ b/trapdata/api/schemas.py @@ -159,6 +159,7 @@ class SourceImageResponse(pydantic.BaseModel): id: str url: str + deployment: "DeploymentReference | None" = None class AlgorithmCategoryMapResponse(pydantic.BaseModel): From 078f200db6cb71ecce5a8489346ae8f3bc2c8219 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 6 Aug 2025 18:59:17 -0700 Subject: [PATCH 4/8] fix: format of api export should match pipelineresultsresponse --- trapdata/cli/export.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/trapdata/cli/export.py b/trapdata/cli/export.py index f75fbf44..b73cab83 100644 --- a/trapdata/cli/export.py +++ b/trapdata/cli/export.py @@ -431,9 +431,19 @@ def api_occurrences( flattened_dicts.append(flat_dict) df = pd.DataFrame(flattened_dicts) + return export(df=df, format=format, outfile=outfile) else: - # For JSON/HTML, export the full pipeline response + # For JSON/HTML, export the full pipeline response directly + import json + pipeline_dict = pipeline_response.model_dump() - df = pd.DataFrame([pipeline_dict]) - return export(df=df, format=format, outfile=outfile) + if outfile: + with open(outfile, "w") as f: + json.dump(pipeline_dict, f, indent=2, default=str) + logger.info(f'Exported pipeline response to "{outfile}"') + return str(outfile.absolute()) + else: + output = json.dumps(pipeline_dict, indent=2, default=str) + print(output) + return output From 036a687fd2dbcfade1397d2caaf8721c2df1601c Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 6 Aug 2025 19:13:38 -0700 Subject: [PATCH 5/8] feat: incomplete support for category maps in the api exports --- trapdata/api/export_utils.py | 38 +++++++++++++++++++++++++++++++++--- trapdata/cli/export.py | 4 +++- 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/trapdata/api/export_utils.py b/trapdata/api/export_utils.py index dd37d788..45d42d22 100644 --- a/trapdata/api/export_utils.py +++ b/trapdata/api/export_utils.py @@ -244,10 +244,15 @@ def __init__(self, example_data): return detection_responses -def get_current_algorithms() -> dict[str, AlgorithmConfigResponse]: +def get_current_algorithms( + include_category_maps: bool = False, +) -> dict[str, AlgorithmConfigResponse]: """ Get the currently configured algorithms from settings. + Args: + include_category_maps: Whether to include category maps in algorithm configs + Returns: Dictionary of algorithm configurations keyed by algorithm key """ @@ -258,36 +263,61 @@ def get_current_algorithms() -> dict[str, AlgorithmConfigResponse]: detector_choice = current_settings.localization_model detector_class = ml.models.object_detectors.get(detector_choice.value) if detector_class: + category_map = None + if include_category_maps: + raise NotImplementedError( + "Category maps are not yet implemented for the batch export. " + ) + algorithms[detector_class.get_key()] = AlgorithmConfigResponse( name=detector_class.name, key=detector_class.get_key(), - task_type="detection", + task_type="localization", description=getattr(detector_class, "description", None), version=1, + category_map=category_map, ) # Get binary classifier binary_choice = current_settings.binary_classification_model binary_class = ml.models.binary_classifiers.get(binary_choice.value) if binary_class: + category_map = None + if include_category_maps: + # TODO: Implement category map loading for local models + raise NotImplementedError( + "Category maps for local models require model instantiation which " + "downloads large files. This feature needs optimization." + ) + algorithms[binary_class.get_key()] = AlgorithmConfigResponse( name=binary_class.name, key=binary_class.get_key(), task_type="classification", description=getattr(binary_class, "description", None), version=1, + category_map=category_map, ) # Get species classifier species_choice = current_settings.species_classification_model species_class = ml.models.species_classifiers.get(species_choice.value) if species_class: + category_map = None + if include_category_maps: + # TODO: Implement category map loading for local models + raise NotImplementedError( + "Category maps for local models require model instantiation which " + "downloads large files. This feature needs optimization." + ) + algorithms[species_class.get_key()] = AlgorithmConfigResponse( name=species_class.name, key=species_class.get_key(), task_type="classification", description=getattr(species_class, "description", None), version=1, + category_map=category_map, ) return algorithms @@ -337,6 +367,7 @@ def create_pipeline_results_response( detection_responses: list[DetectionResponse], pipeline_name: str = "local_batch_processor", total_time: float = 0.0, + include_category_maps: bool = False, ) -> PipelineResultsResponse: """ Create a complete PipelineResultsResponse from occurrence data and responses. @@ -346,12 +377,13 @@ def create_pipeline_results_response( detection_responses: List of DetectionResponse objects pipeline_name: Name of the pipeline used total_time: Total processing time + include_category_maps: Whether to include category maps in algorithm configs Returns: Complete PipelineResultsResponse object """ # Get current algorithms - algorithms = get_current_algorithms() + algorithms = get_current_algorithms(include_category_maps=include_category_maps) # Get source images with deployment information source_images = get_source_images_from_occurrences(occurrences) diff --git a/trapdata/cli/export.py b/trapdata/cli/export.py index b73cab83..6c7e8089 100644 --- a/trapdata/cli/export.py +++ b/trapdata/cli/export.py @@ -283,6 +283,7 @@ def api_occurrences( absolute_paths: bool = False, detection_algorithm: Optional[str] = None, classification_algorithm: Optional[str] = None, + include_category_maps: bool = False, ) -> Optional[str]: """ Export occurrences using API schemas (DetectionResponse/ClassificationResponse). @@ -331,6 +332,7 @@ def api_occurrences( detection_responses=all_detection_responses, pipeline_name="local_batch_processor", total_time=0.0, + include_category_maps=include_category_maps, ) logger.info( @@ -377,7 +379,7 @@ def api_occurrences( destination.relative_to(destination_dir) ) - # Convert to DataFrame for export based on format + # Handle export based on format if format in tabular_formats: # For CSV, flatten the detection responses structure detection_dicts = [ From 4a60a30fb1a60370fa0876d58604cb3ec0604bd3 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 6 Aug 2025 19:15:07 -0700 Subject: [PATCH 6/8] chore: clean up --- trapdata/api/export_utils.py | 6 +++--- trapdata/cli/export.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/trapdata/api/export_utils.py b/trapdata/api/export_utils.py index 45d42d22..ef0b756f 100644 --- a/trapdata/api/export_utils.py +++ b/trapdata/api/export_utils.py @@ -18,7 +18,7 @@ from trapdata.settings import read_settings -class DetectedObjectLike(Protocol): +class DetectedObjectProtocol(Protocol): """Protocol for objects that behave like DetectedObject for conversion.""" id: Optional[int] @@ -84,7 +84,7 @@ def create_algorithm_reference( def convert_classification_to_classification_response( - detected_obj: DetectedObjectLike, + detected_obj: DetectedObjectProtocol, algorithm_name: Optional[str] = None, timestamp: Optional[datetime.datetime] = None, ) -> ClassificationResponse: @@ -125,7 +125,7 @@ def convert_classification_to_classification_response( def convert_detected_object_to_detection_response( - detected_obj: DetectedObjectLike, + detected_obj: DetectedObjectProtocol, source_image_id: str, crop_image_url: Optional[str] = None, detection_algorithm_name: Optional[str] = None, diff --git a/trapdata/cli/export.py b/trapdata/cli/export.py index 6c7e8089..9379c4b1 100644 --- a/trapdata/cli/export.py +++ b/trapdata/cli/export.py @@ -275,7 +275,7 @@ def deployments( @cli.command(name="api-occurrences") def api_occurrences( format: ExportFormat = ExportFormat.json, - num_examples: int = 3, + num_examples: int = 9999, limit: Optional[int] = None, offset: int = 0, outfile: Optional[pathlib.Path] = None, From ef0d9ef20f4d767ef49fac8885a98877f653a780 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 6 Aug 2025 19:24:31 -0700 Subject: [PATCH 7/8] feat: require the user to specify a valid pipeline name for import --- trapdata/cli/export.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/trapdata/cli/export.py b/trapdata/cli/export.py index 9379c4b1..5860b4d6 100644 --- a/trapdata/cli/export.py +++ b/trapdata/cli/export.py @@ -274,8 +274,9 @@ def deployments( @cli.command(name="api-occurrences") def api_occurrences( + pipeline_slug: str, format: ExportFormat = ExportFormat.json, - num_examples: int = 9999, + num_examples: int = 3, limit: Optional[int] = None, offset: int = 0, outfile: Optional[pathlib.Path] = None, @@ -291,7 +292,10 @@ def api_occurrences( This exports the same occurrence data as the 'occurrences' command but uses the new API schema format with DetectionResponse and ClassificationResponse objects instead of the legacy Occurrence and ExportedDetection formats. + + Pipeline must be one of the valid choices from CLASSIFIER_CHOICES """ + # Validate pipeline choice events = get_monitoring_sessions_from_db( db_path=settings.database_url, base_directory=settings.image_base_path ) @@ -330,7 +334,7 @@ def api_occurrences( pipeline_response = create_pipeline_results_response( occurrences=occurrence_dicts, detection_responses=all_detection_responses, - pipeline_name="local_batch_processor", + pipeline_name=pipeline_slug, total_time=0.0, include_category_maps=include_category_maps, ) From 914136ed87c201e8dcf920242d0a03ba4a346117 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Thu, 7 Aug 2025 10:36:29 -0700 Subject: [PATCH 8/8] feat: split occurrence exports into multiple files --- trapdata/cli/export.py | 178 +++++++++++++++++++++++++++++++++++------ 1 file changed, 154 insertions(+), 24 deletions(-) diff --git a/trapdata/cli/export.py b/trapdata/cli/export.py index 5860b4d6..06dce10b 100644 --- a/trapdata/cli/export.py +++ b/trapdata/cli/export.py @@ -39,6 +39,144 @@ class ExportFormat(str, enum.Enum): csv = "csv" +def _export_batched_pipeline_responses( + all_detection_responses: list, + occurrence_dicts: list, + pipeline_slug: str, + include_category_maps: bool, + batch_size: Optional[int], + images_per_batch: int, + outfile: Optional[pathlib.Path], + destination_dir: pathlib.Path, +) -> str: + """ + Export pipeline responses split into multiple JSON files. + + Args: + all_detection_responses: All detection responses to split + occurrence_dicts: All occurrence dictionaries + pipeline_slug: Pipeline name + include_category_maps: Whether to include category maps + batch_size: Number of detections per batch (takes precedence) + images_per_batch: Number of source images per batch + outfile: Output file path (used for naming pattern) + destination_dir: Directory to save files + + Returns: + String describing the export results + """ + import json + from collections import defaultdict + + # Group detections by source image + detections_by_image = defaultdict(list) + for detection in all_detection_responses: + detections_by_image[detection.source_image_id].append(detection) + + # Group occurrences by source image for consistency + occurrences_by_image = defaultdict(list) + for occurrence in occurrence_dicts: + for example in occurrence.get("examples", []): + source_image_id = str(example.get("source_image_id", "unknown")) + occurrences_by_image[source_image_id].append(occurrence) + + # Create batches + batches = [] + if batch_size is not None: + # Batch by number of detections + current_batch_detections = [] + current_batch_occurrences = [] + + for detection in all_detection_responses: + current_batch_detections.append(detection) + + # Find corresponding occurrences for this detection + source_image_id = detection.source_image_id + for occurrence in occurrences_by_image[source_image_id]: + if occurrence not in current_batch_occurrences: + current_batch_occurrences.append(occurrence) + + if len(current_batch_detections) >= batch_size: + batches.append((current_batch_detections, current_batch_occurrences)) + current_batch_detections = [] + current_batch_occurrences = [] + + # Add remaining detections as final batch + if current_batch_detections: + batches.append((current_batch_detections, current_batch_occurrences)) + else: + # Batch by number of source images + source_image_ids = list(detections_by_image.keys()) + + for i in range(0, len(source_image_ids), images_per_batch): + batch_image_ids = source_image_ids[i : i + images_per_batch] + batch_detections = [] + batch_occurrences = [] + + for image_id in batch_image_ids: + batch_detections.extend(detections_by_image[image_id]) + batch_occurrences.extend(occurrences_by_image[image_id]) + + # Remove duplicate occurrences + unique_occurrences = [] + seen_occurrence_ids = set() + for occurrence in batch_occurrences: + occ_id = occurrence.get("id") + if occ_id not in seen_occurrence_ids: + unique_occurrences.append(occurrence) + seen_occurrence_ids.add(occ_id) + + batches.append((batch_detections, unique_occurrences)) + + # Export each batch + exported_files = [] + timestamp = int(time.time()) + + for batch_idx, (batch_detections, batch_occurrences) in enumerate(batches): + # Create pipeline response for this batch + pipeline_response = create_pipeline_results_response( + occurrences=batch_occurrences, + detection_responses=batch_detections, + pipeline_name=pipeline_slug, + total_time=0.0, + include_category_maps=include_category_maps, + ) + + # Determine output filename + if outfile: + base_name = outfile.stem + suffix = outfile.suffix + batch_filename = f"{base_name}_batch_{batch_idx + 1:03d}{suffix}" + else: + batch_filename = ( + f"api_occurrences_{timestamp}_batch_{batch_idx + 1:03d}.json" + ) + + batch_filepath = destination_dir / batch_filename + + # Write batch file + pipeline_dict = pipeline_response.model_dump() + with open(batch_filepath, "w") as f: + json.dump(pipeline_dict, f, indent=2, default=str) + + exported_files.append(str(batch_filepath.absolute())) + + logger.info( + f"Exported batch {batch_idx + 1}/{len(batches)} with " + f"{len(batch_detections)} detections from " + f"{len({d.source_image_id for d in batch_detections})} source images " + f'to "{batch_filepath}"' + ) + + summary = ( + f"Exported {len(all_detection_responses)} total detections across " + f"{len(batches)} batch files:\n" + "\n".join(f" - {f}" for f in exported_files) + ) + + logger.info(f"Batch export complete: {len(batches)} files created") + return summary + + def export( df: pd.DataFrame, format: ExportFormat = ExportFormat.json, @@ -285,6 +423,7 @@ def api_occurrences( detection_algorithm: Optional[str] = None, classification_algorithm: Optional[str] = None, include_category_maps: bool = False, + images_per_batch: int = 100, ) -> Optional[str]: """ Export occurrences using API schemas (DetectionResponse/ClassificationResponse). @@ -293,7 +432,10 @@ def api_occurrences( the new API schema format with DetectionResponse and ClassificationResponse objects instead of the legacy Occurrence and ExportedDetection formats. - Pipeline must be one of the valid choices from CLASSIFIER_CHOICES + Args: + pipeline_slug: The pipeline reference in Antenna, must be one of the valid + choices from CLASSIFIER_CHOICES. + images_per_batch: Number of source images per exported file (default: 100) """ # Validate pipeline choice events = get_monitoring_sessions_from_db( @@ -330,15 +472,6 @@ def api_occurrences( ) all_detection_responses.extend(detection_responses) - # Create full pipeline results response - pipeline_response = create_pipeline_results_response( - occurrences=occurrence_dicts, - detection_responses=all_detection_responses, - pipeline_name=pipeline_slug, - total_time=0.0, - include_category_maps=include_category_maps, - ) - logger.info( f"Preparing to export pipeline response with {len(all_detection_responses)} detection records as {format}" ) @@ -439,17 +572,14 @@ def api_occurrences( df = pd.DataFrame(flattened_dicts) return export(df=df, format=format, outfile=outfile) else: - # For JSON/HTML, export the full pipeline response directly - import json - - pipeline_dict = pipeline_response.model_dump() - - if outfile: - with open(outfile, "w") as f: - json.dump(pipeline_dict, f, indent=2, default=str) - logger.info(f'Exported pipeline response to "{outfile}"') - return str(outfile.absolute()) - else: - output = json.dumps(pipeline_dict, indent=2, default=str) - print(output) - return output + # Always use batching with default of 1 image per batch + return _export_batched_pipeline_responses( + all_detection_responses=all_detection_responses, + occurrence_dicts=occurrence_dicts, + pipeline_slug=pipeline_slug, + include_category_maps=include_category_maps, + batch_size=None, + images_per_batch=images_per_batch, + outfile=outfile, + destination_dir=destination_dir, + )