From bb68d7bc4bb9fe9052008aabfbbfd3a0c334746e Mon Sep 17 00:00:00 2001 From: Stas Moreinis Date: Tue, 19 May 2026 13:11:37 -0700 Subject: [PATCH 1/4] feat(retention): add export/clean/rehydrate endpoints for task content MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an operational surface for bounded retention of task chat content in shared infrastructure. Callers can snapshot a task's content, delete it from the shared stores, and later restore it byte-identically from the snapshot — preserving message IDs and timestamps so tool-call and reasoning references remain valid. Three new endpoints under /tasks/{task_id}: - GET /export — returns a self-contained snapshot (messages + task_states) - POST /clean — deletes content across Mongo messages, Mongo task_states, Postgres events; resets agent_task_tracker cursors; sets tasks.cleaned_at - POST /rehydrate — restores content from a snapshot, clears cleaned_at Domain layer lives in TaskRetentionService so the eventual scheduled sweep workflow and the HTTP endpoints share the same code path. Cleanup uses a "Mongo deletes first, Postgres marker last" order so retries after partial failure converge correctly. The active-task, idle-threshold, and unprocessed-events guards refuse cleanup when the task isn't safe to drop. Schema: - New nullable tasks.cleaned_at column (TIMESTAMPTZ, metadata-only ALTER) - No new audit table — cleanup operations emit structured log lines Other changes: - adapter_mongodb.batch_create now translates pymongo BulkWriteError with all-duplicate-key sub-errors into DuplicateItemError (HTTP 400) instead of letting it surface as ServiceError (HTTP 500) - New EventRepository.delete_by_task_id and AgentTaskTrackerRepository.reset_cursors_for_task methods Tests: 13 integration tests covering happy paths, all precondition guards, and the byte-identical export → clean → rehydrate round-trip. --- ...929_adding_task_cleaned_at_6c942325c828.py | 26 + .../database/migrations/migration_history.txt | 4 +- agentex/openapi.yaml | 520 ++++++++++++++++++ .../adapters/crud_store/adapter_mongodb.py | 14 + agentex/src/adapters/orm.py | 1 + agentex/src/api/app.py | 2 + agentex/src/api/routes/task_retention.py | 92 ++++ agentex/src/api/schemas/task_retention.py | 37 ++ agentex/src/api/schemas/tasks.py | 4 + agentex/src/domain/entities/task_retention.py | 44 ++ agentex/src/domain/entities/tasks.py | 5 + .../agent_task_tracker_repository.py | 20 + .../domain/repositories/event_repository.py | 13 +- .../domain/services/task_retention_service.py | 354 ++++++++++++ .../use_cases/task_retention_use_case.py | 53 ++ .../api/task_retention/__init__.py | 0 .../task_retention/test_task_retention_api.py | 333 +++++++++++ .../fixtures/integration_client.py | 24 + 18 files changed, 1544 insertions(+), 2 deletions(-) create mode 100644 agentex/database/migrations/alembic/versions/2026_05_19_1929_adding_task_cleaned_at_6c942325c828.py create mode 100644 agentex/src/api/routes/task_retention.py create mode 100644 agentex/src/api/schemas/task_retention.py create mode 100644 agentex/src/domain/entities/task_retention.py create mode 100644 agentex/src/domain/services/task_retention_service.py create mode 100644 agentex/src/domain/use_cases/task_retention_use_case.py create mode 100644 agentex/tests/integration/api/task_retention/__init__.py create mode 100644 agentex/tests/integration/api/task_retention/test_task_retention_api.py diff --git a/agentex/database/migrations/alembic/versions/2026_05_19_1929_adding_task_cleaned_at_6c942325c828.py b/agentex/database/migrations/alembic/versions/2026_05_19_1929_adding_task_cleaned_at_6c942325c828.py new file mode 100644 index 00000000..2e7acaed --- /dev/null +++ b/agentex/database/migrations/alembic/versions/2026_05_19_1929_adding_task_cleaned_at_6c942325c828.py @@ -0,0 +1,26 @@ +"""adding task cleaned at + +Revision ID: 6c942325c828 +Revises: a9959ebcbe98 +Create Date: 2026-05-19 19:29:34.858692 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '6c942325c828' +down_revision: Union[str, None] = 'a9959ebcbe98' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column('tasks', sa.Column('cleaned_at', sa.DateTime(timezone=True), nullable=True)) + + +def downgrade() -> None: + op.drop_column('tasks', 'cleaned_at') diff --git a/agentex/database/migrations/migration_history.txt b/agentex/database/migrations/migration_history.txt index 25e97ddb..b18d86d8 100644 --- a/agentex/database/migrations/migration_history.txt +++ b/agentex/database/migrations/migration_history.txt @@ -1,4 +1,6 @@ -9ff3ee32c81b -> e9c4ff9e6542 (head), add_tasks_metadata_gin_index +a9959ebcbe98 -> 6c942325c828 (head), adding task cleaned at +e9c4ff9e6542 -> a9959ebcbe98, finalize_spans_task_id +9ff3ee32c81b -> e9c4ff9e6542, add_tasks_metadata_gin_index 57c5ed4f59ae -> 9ff3ee32c81b, uppercase deployment status enum labels 4a9b7787ccd7 -> 57c5ed4f59ae, add_task_id_to_spans d1a6cde41b3f -> 4a9b7787ccd7, deployments diff --git a/agentex/openapi.yaml b/agentex/openapi.yaml index ac50e362..8c0a86eb 100644 --- a/agentex/openapi.yaml +++ b/agentex/openapi.yaml @@ -3533,6 +3533,112 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /tasks/{task_id}/export: + get: + tags: + - task-retention + summary: Export Task + description: 'Build a self-contained snapshot of a task''s content surfaces. + + + Returns the exact payload format that POST /rehydrate accepts, so + + export → clean → rehydrate is a round-trip-equivalent operation.' + operationId: export_task_tasks__task_id__export_get + parameters: + - name: task_id + in: path + required: true + schema: + type: string + title: Task Id + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/ExportTaskResponse' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /tasks/{task_id}/clean: + post: + tags: + - task-retention + summary: Clean Task + description: 'Delete content-bearing rows for a stale task. + + + Refuses on active tasks, in-flight workflows, or unprocessed events + + regardless of `force`. The `force=true` flag only bypasses the + + idle-threshold check.' + operationId: clean_task_tasks__task_id__clean_post + parameters: + - name: task_id + in: path + required: true + schema: + type: string + title: Task Id + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/CleanTaskRequest' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/CleanTaskResponse' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /tasks/{task_id}/rehydrate: + post: + tags: + - task-retention + summary: Rehydrate Task + description: 'Restore content-bearing rows from a snapshot. + + + Refuses if the task isn''t currently in a cleaned state, or if any supplied + + message/state ID already exists in Mongo (catches double-rehydrate).' + operationId: rehydrate_task_tasks__task_id__rehydrate_post + parameters: + - name: task_id + in: path + required: true + schema: + type: string + title: Task Id + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/RehydrateTaskRequest' + responses: + '204': + description: Successful Response + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' components: schemas: ACPType: @@ -3970,6 +4076,48 @@ components: - checkpoint - metadata title: CheckpointTupleResponse + CleanTaskRequest: + properties: + force: + type: boolean + title: Force + description: Skip the idle-threshold check. Active-workflow and unprocessed-events + checks still apply. Admin use only. + default: false + idle_days: + type: integer + minimum: 1.0 + title: Idle Days + description: Idle threshold in days (ignored when force=true). + default: 7 + type: object + title: CleanTaskRequest + CleanTaskResponse: + properties: + task_id: + type: string + title: Task Id + cleaned_at: + type: string + format: date-time + title: Cleaned At + messages_deleted: + type: integer + title: Messages Deleted + task_states_deleted: + type: integer + title: Task States Deleted + events_deleted: + type: integer + title: Events Deleted + type: object + required: + - task_id + - cleaned_at + - messages_deleted + - task_states_deleted + - events_deleted + title: CleanTaskResponse CreateAPIKeyRequest: properties: agent_id: @@ -4326,6 +4474,33 @@ components: - author - data title: DataContent + DataContentEntity: + properties: + type: + type: string + const: data + title: Type + description: The type of the message, in this case `data`. + default: data + author: + $ref: '#/components/schemas/MessageAuthor' + description: The role of the messages author, in this case `system`, `user`, + `assistant`, or `tool`. + style: + $ref: '#/components/schemas/MessageStyle' + description: The style of the message. This is used by the client to determine + how to display the message. + default: static + data: + additionalProperties: true + type: object + title: Data + description: The contents of the data message. + type: object + required: + - author + - data + title: DataContentEntity DataDelta: properties: type: @@ -4531,6 +4706,26 @@ components: - task_id - agent_id title: Event + ExportTaskResponse: + properties: + task_id: + type: string + title: Task Id + messages: + items: + $ref: '#/components/schemas/TaskMessageEntity' + type: array + title: Messages + task_states: + items: + $ref: '#/components/schemas/StateEntity' + type: array + title: Task States + type: object + required: + - task_id + title: ExportTaskResponse + description: Wire format mirrors the entity directly — schema parity is intentional. FileAttachment: properties: file_id: @@ -4557,6 +4752,32 @@ components: - type title: FileAttachment description: Represents a file attachment in messages. + FileAttachmentEntity: + properties: + file_id: + type: string + title: File Id + description: The unique ID of the attached file + name: + type: string + title: Name + description: The name of the file + size: + type: integer + title: Size + description: The size of the file in bytes + type: + type: string + title: Type + description: The MIME type or content type of the file + type: object + required: + - file_id + - name + - size + - type + title: FileAttachmentEntity + description: Represents a file attachment in messages. GetCheckpointTupleRequest: properties: thread_id: @@ -4799,6 +5020,42 @@ components: - content_index title: ReasoningContentDelta description: Delta for reasoning content updates + ReasoningContentEntity: + properties: + type: + type: string + const: reasoning + title: Type + description: The type of the message, in this case `reasoning`. + default: reasoning + author: + $ref: '#/components/schemas/MessageAuthor' + description: The role of the messages author, in this case `system`, `user`, + `assistant`, or `tool`. + style: + $ref: '#/components/schemas/MessageStyle' + description: The style of the message. This is used by the client to determine + how to display the message. + default: static + summary: + items: + type: string + type: array + title: Summary + description: A list of short reasoning summaries + content: + anyOf: + - items: + type: string + type: array + - type: 'null' + title: Content + description: The reasoning content or chain-of-thought text + type: object + required: + - author + - summary + title: ReasoningContentEntity ReasoningSummaryDelta: properties: type: @@ -4949,6 +5206,26 @@ components: - updated_at title: RegisterAgentResponse description: Response model for registering an agent. + RehydrateTaskRequest: + properties: + task_id: + type: string + title: Task Id + messages: + items: + $ref: '#/components/schemas/TaskMessageEntity' + type: array + title: Messages + task_states: + items: + $ref: '#/components/schemas/StateEntity' + type: array + title: Task States + type: object + required: + - task_id + title: RehydrateTaskRequest + description: Same shape as the export response — round-trip parity. ScheduleActionInfo: properties: workflow_name: @@ -5315,6 +5592,60 @@ components: agent_id is globally unique. + The state is a dictionary of arbitrary data.' + StateEntity: + properties: + id: + anyOf: + - type: string + - type: 'null' + title: Id + description: The task state's unique id + task_id: + type: string + title: Task Id + description: ID of the task this state belongs to. The combination of task_id + and agent_id is globally unique. + agent_id: + type: string + title: Agent Id + description: ID of the agent this state belongs to. The combination of task_id + and agent_id is globally unique. + state: + additionalProperties: true + type: object + title: State + description: The state object that contains arbitrary data + created_at: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Created At + description: The timestamp when the state was created + updated_at: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Updated At + description: The timestamp when the state was last updated + type: object + required: + - task_id + - agent_id + - state + title: StateEntity + description: 'Represents a state in the agent system. A state is associated + uniquely with a task and an agent. + + + This entity is used to store states in MongoDB, with each state + + associated with a specific task and agent. The combination of task_id and + agent_id is globally unique. + + The state is a dictionary of arbitrary data.' StreamTaskMessageDelta: properties: @@ -5436,6 +5767,13 @@ components: format: date-time - type: 'null' title: The timestamp when the task was last updated + cleaned_at: + anyOf: + - type: string + format: date-time + - type: 'null' + title: The timestamp when the task's content was cleaned for retention compliance; + null when active params: anyOf: - additionalProperties: true @@ -5536,6 +5874,70 @@ components: text: '#/components/schemas/TextDelta' tool_request: '#/components/schemas/ToolRequestDelta' tool_response: '#/components/schemas/ToolResponseDelta' + TaskMessageEntity: + properties: + id: + anyOf: + - type: string + - type: 'null' + title: Id + description: The task message's unique id + task_id: + type: string + title: Task Id + description: ID of the task this message belongs to + content: + oneOf: + - $ref: '#/components/schemas/TextContentEntity' + - $ref: '#/components/schemas/DataContentEntity' + - $ref: '#/components/schemas/ToolRequestContentEntity' + - $ref: '#/components/schemas/ToolResponseContentEntity' + - $ref: '#/components/schemas/ReasoningContentEntity' + title: Content + description: The content of the message. This content is not OpenAI compatible. + These are messages that are meant to be displayed to the user. + discriminator: + propertyName: type + mapping: + data: '#/components/schemas/DataContentEntity' + reasoning: '#/components/schemas/ReasoningContentEntity' + text: '#/components/schemas/TextContentEntity' + tool_request: '#/components/schemas/ToolRequestContentEntity' + tool_response: '#/components/schemas/ToolResponseContentEntity' + streaming_status: + anyOf: + - type: string + enum: + - IN_PROGRESS + - DONE + - type: 'null' + title: In case of streaming, this indicates whether the message is still + being streamed or has been completed + created_at: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Created At + description: The timestamp when the message was created + updated_at: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Updated At + description: The timestamp when the message was last updated + type: object + required: + - task_id + - content + title: TaskMessageEntity + description: 'Represents a message in the agent system. + + + This entity is used to store messages in MongoDB, with each message + + associated with a specific task.' TaskMessageUpdate: oneOf: - $ref: '#/components/schemas/StreamTaskMessageStart' @@ -5588,6 +5990,13 @@ components: format: date-time - type: 'null' title: The timestamp when the task was last updated + cleaned_at: + anyOf: + - type: string + format: date-time + - type: 'null' + title: The timestamp when the task's content was cleaned for retention compliance; + null when active params: anyOf: - additionalProperties: true @@ -5672,6 +6081,45 @@ components: - author - content title: TextContent + TextContentEntity: + properties: + type: + type: string + const: text + title: Type + description: The type of the message, in this case `text`. + default: text + author: + $ref: '#/components/schemas/MessageAuthor' + description: The role of the messages author, in this case `system`, `user`, + `assistant`, or `tool`. + style: + $ref: '#/components/schemas/MessageStyle' + description: The style of the message. This is used by the client to determine + how to display the message. + default: static + format: + $ref: '#/components/schemas/TextFormat' + description: The format of the message. This is used by the client to determine + how to display the message. + default: plain + content: + type: string + title: Content + description: The contents of the text message. + attachments: + anyOf: + - items: + $ref: '#/components/schemas/FileAttachmentEntity' + type: array + - type: 'null' + title: Attachments + description: Optional list of file attachments with structured metadata. + type: object + required: + - author + - content + title: TextContentEntity TextDelta: properties: type: @@ -5732,6 +6180,43 @@ components: - name - arguments title: ToolRequestContent + ToolRequestContentEntity: + properties: + type: + type: string + const: tool_request + title: Type + description: The type of the message, in this case `tool_request`. + default: tool_request + author: + $ref: '#/components/schemas/MessageAuthor' + description: The role of the messages author, in this case `system`, `user`, + `assistant`, or `tool`. + style: + $ref: '#/components/schemas/MessageStyle' + description: The style of the message. This is used by the client to determine + how to display the message. + default: static + tool_call_id: + type: string + title: Tool Call Id + description: The ID of the tool call that is being requested. + name: + type: string + title: Name + description: The name of the tool that is being requested. + arguments: + additionalProperties: true + type: object + title: Arguments + description: The arguments to the tool. + type: object + required: + - author + - tool_call_id + - name + - arguments + title: ToolRequestContentEntity ToolRequestDelta: properties: type: @@ -5792,6 +6277,41 @@ components: - name - content title: ToolResponseContent + ToolResponseContentEntity: + properties: + type: + type: string + const: tool_response + title: Type + description: The type of the message, in this case `tool_response`. + default: tool_response + author: + $ref: '#/components/schemas/MessageAuthor' + description: The role of the messages author, in this case `system`, `user`, + `assistant`, or `tool`. + style: + $ref: '#/components/schemas/MessageStyle' + description: The style of the message. This is used by the client to determine + how to display the message. + default: static + tool_call_id: + type: string + title: Tool Call Id + description: The ID of the tool call that is being responded to. + name: + type: string + title: Name + description: The name of the tool that is being responded to. + content: + title: Content + description: The result of the tool. + type: object + required: + - author + - tool_call_id + - name + - content + title: ToolResponseContentEntity ToolResponseDelta: properties: type: diff --git a/agentex/src/adapters/crud_store/adapter_mongodb.py b/agentex/src/adapters/crud_store/adapter_mongodb.py index 7a1eb6e0..0d497e30 100644 --- a/agentex/src/adapters/crud_store/adapter_mongodb.py +++ b/agentex/src/adapters/crud_store/adapter_mongodb.py @@ -298,6 +298,20 @@ async def batch_create(self, items: list[T]) -> list[T]: message="One or more items have duplicate id values. IDs must be unique.", detail=str(e), ) from e + except pymongo.errors.BulkWriteError as e: + # insert_many raises BulkWriteError wrapping individual write errors. + # Translate to DuplicateItemError only when all underlying errors are + # duplicate-key (11000); otherwise surface as a generic ServiceError + # so callers can distinguish "fix your IDs" from "transient failure". + write_errors = e.details.get("writeErrors", []) if e.details else [] + if write_errors and all(we.get("code") == 11000 for we in write_errors): + raise DuplicateItemError( + message="One or more items have duplicate id values. IDs must be unique.", + detail=str(e), + ) from e + raise ServiceError( + message=f"Failed to batch create items in MongoDB: {e}", detail=str(e) + ) from e except ClientError: raise except Exception as e: diff --git a/agentex/src/adapters/orm.py b/agentex/src/adapters/orm.py index ac5ee39a..42a66c1a 100644 --- a/agentex/src/adapters/orm.py +++ b/agentex/src/adapters/orm.py @@ -72,6 +72,7 @@ class TaskORM(BaseORM): updated_at = Column( DateTime(timezone=True), server_default=func.now(), onupdate=func.now() ) + cleaned_at = Column(DateTime(timezone=True), nullable=True) params = Column(JSONB, nullable=True) task_metadata = Column(JSONB, nullable=True) # Many-to-Many relationship with agents diff --git a/agentex/src/api/app.py b/agentex/src/api/app.py index f325b43b..35d4c7f4 100644 --- a/agentex/src/api/app.py +++ b/agentex/src/api/app.py @@ -27,6 +27,7 @@ schedules, spans, states, + task_retention, tasks, ) from src.config import dependencies @@ -187,6 +188,7 @@ async def handle_unexpected(request, exc): fastapi_app.include_router(deployments.router) fastapi_app.include_router(schedules.router) fastapi_app.include_router(checkpoints.router) +fastapi_app.include_router(task_retention.router) # Wrap FastAPI app with health check interceptor for sub-millisecond K8s probe responses. # This must be the outermost layer to bypass all middleware. diff --git a/agentex/src/api/routes/task_retention.py b/agentex/src/api/routes/task_retention.py new file mode 100644 index 00000000..ee7abbe6 --- /dev/null +++ b/agentex/src/api/routes/task_retention.py @@ -0,0 +1,92 @@ +""" +Task retention endpoints — export / clean / rehydrate. + +These power both local-dev testing of the round-trip and the long-term +admin / external-caller integration surface. The scheduled Temporal cleanup +workflow calls the same use case (TaskRetentionUseCase.clean_task), not +these endpoints. + +OPEN DESIGN DECISION: auth. +The clean endpoint is destructive. It should require elevated privilege +(admin role, internal service account header, or similar). The export endpoint +is read-only but exposes content — gate similarly. Rehydrate is the contract +external callers integrate against — needs a stable auth model. + +For the first cut, gate behind task ownership (same as today's /tasks routes) +plus a feature flag on the backend. Revisit before external integration. +""" + +from fastapi import APIRouter + +from src.api.schemas.task_retention import ( + CleanTaskRequest, + CleanTaskResponse, + ExportTaskResponse, + RehydrateTaskRequest, +) +from src.domain.use_cases.task_retention_use_case import DTaskRetentionUseCase + +router = APIRouter(prefix="/tasks", tags=["task-retention"]) + + +@router.get( + "/{task_id}/export", + response_model=ExportTaskResponse, +) +async def export_task( + task_id: str, + use_case: DTaskRetentionUseCase, + # TODO: auth dep — task ownership + admin/elevated role +) -> ExportTaskResponse: + """ + Build a self-contained snapshot of a task's content surfaces. + + Returns the exact payload format that POST /rehydrate accepts, so + export → clean → rehydrate is a round-trip-equivalent operation. + """ + snapshot = await use_case.export_task(task_id) + return ExportTaskResponse.model_validate(snapshot) + + +@router.post( + "/{task_id}/clean", + response_model=CleanTaskResponse, +) +async def clean_task( + task_id: str, + request: CleanTaskRequest, + use_case: DTaskRetentionUseCase, + # TODO: auth dep — admin only +) -> CleanTaskResponse: + """ + Delete content-bearing rows for a stale task. + + Refuses on active tasks, in-flight workflows, or unprocessed events + regardless of `force`. The `force=true` flag only bypasses the + idle-threshold check. + """ + audit = await use_case.clean_task( + task_id=task_id, + force=request.force, + idle_days=request.idle_days, + ) + return CleanTaskResponse.model_validate(audit) + + +@router.post( + "/{task_id}/rehydrate", + status_code=204, +) +async def rehydrate_task( + task_id: str, + request: RehydrateTaskRequest, + use_case: DTaskRetentionUseCase, + # TODO: auth dep — task ownership; this is the caller-facing endpoint +) -> None: + """ + Restore content-bearing rows from a snapshot. + + Refuses if the task isn't currently in a cleaned state, or if any supplied + message/state ID already exists in Mongo (catches double-rehydrate). + """ + await use_case.rehydrate_task(task_id=task_id, snapshot=request) diff --git a/agentex/src/api/schemas/task_retention.py b/agentex/src/api/schemas/task_retention.py new file mode 100644 index 00000000..e27a3360 --- /dev/null +++ b/agentex/src/api/schemas/task_retention.py @@ -0,0 +1,37 @@ +from pydantic import BaseModel, Field + +from src.domain.entities.task_retention import ( + TaskCleanupResultEntity, + TaskSnapshotEntity, +) + + +class ExportTaskResponse(TaskSnapshotEntity): + """Wire format mirrors the entity directly — schema parity is intentional.""" + + pass + + +class CleanTaskRequest(BaseModel): + force: bool = Field( + default=False, + description=( + "Skip the idle-threshold check. Active-workflow and " + "unprocessed-events checks still apply. Admin use only." + ), + ) + idle_days: int = Field( + default=7, + ge=1, + description="Idle threshold in days (ignored when force=true).", + ) + + +class CleanTaskResponse(TaskCleanupResultEntity): + pass + + +class RehydrateTaskRequest(TaskSnapshotEntity): + """Same shape as the export response — round-trip parity.""" + + pass diff --git a/agentex/src/api/schemas/tasks.py b/agentex/src/api/schemas/tasks.py index 4055cadc..9a8a53dd 100644 --- a/agentex/src/api/schemas/tasks.py +++ b/agentex/src/api/schemas/tasks.py @@ -49,6 +49,10 @@ class Task(BaseModel): None, title="The timestamp when the task was last updated", ) + cleaned_at: datetime | None = Field( + None, + title="The timestamp when the task's content was cleaned for retention compliance; null when active", + ) params: dict[str, Any] | None = Field( None, title="Task parameters", diff --git a/agentex/src/domain/entities/task_retention.py b/agentex/src/domain/entities/task_retention.py new file mode 100644 index 00000000..c3aa440b --- /dev/null +++ b/agentex/src/domain/entities/task_retention.py @@ -0,0 +1,44 @@ +from datetime import datetime + +from pydantic import Field + +from src.domain.entities.states import StateEntity +from src.domain.entities.task_messages import TaskMessageEntity +from src.utils.model_utils import BaseModel + + +class TaskSnapshotEntity(BaseModel): + """ + Self-contained, restorable snapshot of a task's content surfaces. + + Used as the response body of GET /tasks/{id}/export and the request body of + POST /tasks/{id}/rehydrate. Schema parity between the two directions is the + invariant that makes export → clean → rehydrate a round-trip-equivalent + operation. + + Scope note: tasks.params (JSONB) is intentionally NOT part of the snapshot + or the cleanup surface for v1. It may carry initial-message content for + some agents; if that becomes a compliance gap, add it as a follow-up. + Events are also NOT included — they are a transient delivery surface; + consumed events have no live readers (see agent_task_tracker cursor). + """ + + task_id: str + messages: list[TaskMessageEntity] = Field(default_factory=list) + task_states: list[StateEntity] = Field(default_factory=list) + + +class TaskCleanupResultEntity(BaseModel): + """ + Per-invocation result of a cleanup operation. + + Returned to callers of POST /tasks/{id}/clean and emitted as a structured + log line (forensic record). Not persisted to a dedicated table in v1 — + Datadog log search is the audit trail. + """ + + task_id: str + cleaned_at: datetime + messages_deleted: int + task_states_deleted: int + events_deleted: int diff --git a/agentex/src/domain/entities/tasks.py b/agentex/src/domain/entities/tasks.py index bdccc23e..6a1ffce7 100644 --- a/agentex/src/domain/entities/tasks.py +++ b/agentex/src/domain/entities/tasks.py @@ -50,6 +50,10 @@ class TaskEntity(BaseModel): None, title="The timestamp when the task was last updated", ) + cleaned_at: datetime | None = Field( + None, + title="The timestamp when the task's content was cleaned for retention compliance; null when active", + ) params: dict[str, Any] | None = Field( None, title="Task parameters", @@ -73,6 +77,7 @@ def convert_task_to_entity(task: Task) -> TaskEntity: status_reason=task.status_reason, created_at=task.created_at, updated_at=task.updated_at, + cleaned_at=task.cleaned_at, params=task.params, task_metadata=task.task_metadata, ) diff --git a/agentex/src/domain/repositories/agent_task_tracker_repository.py b/agentex/src/domain/repositories/agent_task_tracker_repository.py index 9ab84222..76b58821 100644 --- a/agentex/src/domain/repositories/agent_task_tracker_repository.py +++ b/agentex/src/domain/repositories/agent_task_tracker_repository.py @@ -1,6 +1,7 @@ from typing import Annotated from fastapi import Depends +from sqlalchemy import update from sqlalchemy.future import select from src.adapters.crud_store.adapter_postgres import ( PostgresCRUDRepository, @@ -120,6 +121,25 @@ async def update_agent_task_tracker( return result + async def reset_cursors_for_task(self, task_id: str) -> int: + """ + Reset last_processed_event_id to NULL for every tracker tied to a + task. Used during retention cleanup so that any future events arriving + after rehydration are processed from scratch. Returns rows updated. + """ + async with ( + self.start_async_db_session(True) as session, + async_sql_exception_handler(), + ): + stmt = ( + update(AgentTaskTrackerORM) + .where(AgentTaskTrackerORM.task_id == task_id) + .values(last_processed_event_id=None) + ) + result = await session.execute(stmt) + await session.commit() + return result.rowcount or 0 + DAgentTaskTrackerRepository = Annotated[ AgentTaskTrackerRepository, Depends(AgentTaskTrackerRepository) diff --git a/agentex/src/domain/repositories/event_repository.py b/agentex/src/domain/repositories/event_repository.py index 1aa750c6..e69e9ff8 100644 --- a/agentex/src/domain/repositories/event_repository.py +++ b/agentex/src/domain/repositories/event_repository.py @@ -1,7 +1,7 @@ from typing import Annotated from fastapi import Depends -from sqlalchemy import and_ +from sqlalchemy import and_, delete from sqlalchemy.dialects.postgresql import insert from sqlalchemy.future import select from src.adapters.crud_store.adapter_postgres import ( @@ -115,5 +115,16 @@ async def list_events_after_last_processed( return [EventEntity.model_validate(orm) for orm in event_orms] + async def delete_by_task_id(self, task_id: str) -> int: + """Delete all events for a task. Idempotent. Returns rows deleted.""" + async with ( + self.start_async_db_session(True) as session, + async_sql_exception_handler(), + ): + stmt = delete(EventORM).where(EventORM.task_id == task_id) + result = await session.execute(stmt) + await session.commit() + return result.rowcount or 0 + DEventRepository = Annotated[EventRepository, Depends(EventRepository)] diff --git a/agentex/src/domain/services/task_retention_service.py b/agentex/src/domain/services/task_retention_service.py new file mode 100644 index 00000000..e1231f55 --- /dev/null +++ b/agentex/src/domain/services/task_retention_service.py @@ -0,0 +1,354 @@ +""" +TaskRetentionService — exports, cleans, and rehydrates task content. + +The same service methods back both the HTTP endpoints (admin / external +caller integration) and the Temporal cleanup activity (scheduled sweep). +Domain logic lives here; api/routes/ and src/temporal/ are thin wrappers. + +Cross-database operation ordering (see clean_task for details): +- Mongo deletes first (each delete-by-task-id is naturally idempotent). +- Postgres transaction last (carries the cleaned_at marker that gates re-cleaning). +- Temporal workflow termination outside the DB transaction (best-effort; history + retention will eventually expire it anyway). +""" + +from datetime import UTC, datetime, timedelta +from typing import Annotated + +from fastapi import Depends + +from src.adapters.temporal.adapter_temporal import DTemporalAdapter +from src.domain.entities.task_retention import ( + TaskCleanupResultEntity, + TaskSnapshotEntity, +) +from src.domain.entities.tasks import TaskStatus +from src.domain.exceptions import ClientError +from src.domain.repositories.agent_task_tracker_repository import ( + DAgentTaskTrackerRepository, +) +from src.domain.repositories.event_repository import DEventRepository +from src.domain.repositories.task_message_repository import DTaskMessageRepository +from src.domain.repositories.task_repository import DTaskRepository +from src.domain.repositories.task_state_repository import DTaskStateRepository +from src.domain.services.task_message_service import DTaskMessageService +from src.utils.logging import make_logger + +logger = make_logger(__name__) + +# Page size for paginated reads during export. Big enough to make most tasks +# fit in a single round-trip; small enough that a single oversized task can't +# blow the request budget. Revisit if export latency becomes an issue. +EXPORT_PAGE_SIZE = 500 + + +class TaskRetentionService: + def __init__( + self, + task_repository: DTaskRepository, + task_message_service: DTaskMessageService, + task_message_repository: DTaskMessageRepository, + task_state_repository: DTaskStateRepository, + event_repository: DEventRepository, + agent_task_tracker_repository: DAgentTaskTrackerRepository, + temporal_adapter: DTemporalAdapter, + ): + self.task_repository = task_repository + self.task_message_service = task_message_service + self.task_message_repository = task_message_repository + self.task_state_repository = task_state_repository + self.event_repository = event_repository + self.agent_task_tracker_repository = agent_task_tracker_repository + self.temporal_adapter = temporal_adapter + + async def export_task(self, task_id: str) -> TaskSnapshotEntity: + """ + Build a self-contained snapshot of all content-bearing rows for a task. + + Works for both active and cleaned tasks: + - Active: returns current live state (useful for debugging, ops snapshots). + - Cleaned: returns the (empty) state — primarily a no-op safety check that + tells the caller "nothing to export." + + Raises if the task does not exist. + """ + # 1. Load the task. task_repository.get raises if missing. + await self.task_repository.get(id=task_id) + + # 2. Page through messages, ordered chronologically so the snapshot + # replays cleanly on rehydrate. Pagination is 1-based in this codebase. + messages = [] + page_number = 1 + while True: + page = await self.task_message_service.get_messages( + task_id=task_id, + limit=EXPORT_PAGE_SIZE, + page_number=page_number, + order_by="created_at", + order_direction="asc", + ) + messages.extend(page) + if len(page) < EXPORT_PAGE_SIZE: + break + page_number += 1 + + # 3. Page through task_states. + task_states = [] + page_number = 1 + while True: + page = await self.task_state_repository.find_by_field( + "task_id", + task_id, + limit=EXPORT_PAGE_SIZE, + page_number=page_number, + sort_by={"created_at": 1}, + ) + task_states.extend(page) + if len(page) < EXPORT_PAGE_SIZE: + break + page_number += 1 + + return TaskSnapshotEntity( + task_id=task_id, + messages=messages, + task_states=task_states, + ) + + async def clean_task( + self, + task_id: str, + *, + enforce_idle_threshold: bool = True, + idle_days: int = 7, + ) -> TaskCleanupResultEntity: + """ + Delete content-bearing rows for a stale task. Idempotent: re-running on a + partially-cleaned or fully-cleaned task is safe. + + Args: + task_id: The task to clean. + enforce_idle_threshold: When True (default), refuses to clean a task + whose last interaction is more recent than `idle_days`. The + scheduled Temporal sweep always sets True. The admin endpoint + accepts a force=true flag that flips this to False. + idle_days: Idle threshold in days (when enforce_idle_threshold=True). + + Refuses (raises) if: + - task is currently active (status == RUNNING). + - enforce_idle_threshold=True and the task is not idle long enough. + - unprocessed events exist past agent_task_tracker cursors. + + If cleaned_at IS NOT NULL (already cleaned), returns an empty result + (zero rows deleted) rather than raising. + + Returns the result record describing what was deleted; the same record + is emitted as a structured log line for forensics. + + ORDER OF OPERATIONS (load-bearing for retry safety): + 1. Reload task fresh. Bail with empty-result if cleaned_at is set. + 2. Verify task status and (if enforced) idle threshold. + 3. Verify no events past agent_task_tracker.last_processed_event_id. + OPTIMISTIC: no row locks. Race window: a new EVENT_SEND between + this check and step 6c will be deleted with the rest. Acceptable + because (a) it can only happen on a task that's been idle ≥7d and + then suddenly receives an event — rare; (b) the structured log + below surfaces any cleanup with events_deleted > 0 on an + idle-checked task, giving forensic signal. + 4. Mongo: delete messages by task_id (idempotent). + 5. Mongo: delete task_states by task_id (idempotent). + 6. Postgres (separate operations, each idempotent): + a. delete events by task_id + b. reset agent_task_tracker cursors for task_id + c. update tasks.cleaned_at = now() + 7. Emit structured log with the TaskCleanupResultEntity payload. + + Note on Temporal workflows: Agentex doesn't own workflow IDs for agent + tasks (the agent's ACP server creates them). By the time a task is + idle ≥7d, any associated workflow should already be terminal. + The active-status guard in step 2 catches the case where it isn't. + """ + # 1. Reload task; bail if already cleaned. + task = await self.task_repository.get(id=task_id) + if task.cleaned_at is not None: + return TaskCleanupResultEntity( + task_id=task_id, + cleaned_at=task.cleaned_at, + messages_deleted=0, + task_states_deleted=0, + events_deleted=0, + ) + + # 2. Status + idle threshold guards. + if task.status == TaskStatus.RUNNING: + raise ClientError( + f"Cannot clean task {task_id}: status is RUNNING (active)" + ) + if enforce_idle_threshold and not await self._is_task_idle(task, idle_days): + raise ClientError( + f"Cannot clean task {task_id}: not idle for {idle_days} days " + f"(use force=true to override)" + ) + + # 3. Unprocessed-events guard. + if await self._has_unprocessed_events(task_id): + raise ClientError(f"Cannot clean task {task_id}: unprocessed events remain") + + # 4-5. Mongo deletes. + messages_deleted = await self.task_message_service.delete_all_messages(task_id) + task_states_deleted = await self.task_state_repository.delete_by_field( + "task_id", task_id + ) + + # 6a-b. Postgres deletes / resets. + events_deleted = await self.event_repository.delete_by_task_id(task_id) + await self.agent_task_tracker_repository.reset_cursors_for_task(task_id) + + # 6c. Mark task as cleaned. + cleaned_at = datetime.now(UTC) + task.cleaned_at = cleaned_at + await self.task_repository.update(task) + + result = TaskCleanupResultEntity( + task_id=task_id, + cleaned_at=cleaned_at, + messages_deleted=messages_deleted, + task_states_deleted=task_states_deleted, + events_deleted=events_deleted, + ) + + # 7. Forensic log line. Structured extras so Datadog can facet on them. + logger.info( + "task_cleanup_completed", + extra={ + "task_id": result.task_id, + "cleaned_at": result.cleaned_at.isoformat(), + "messages_deleted": result.messages_deleted, + "task_states_deleted": result.task_states_deleted, + "events_deleted": result.events_deleted, + }, + ) + + return result + + async def rehydrate_task( + self, + task_id: str, + snapshot: TaskSnapshotEntity, + ) -> None: + """ + Restore content-bearing rows from a snapshot. Inverse of clean_task. + + Refuses (raises) if: + - snapshot.task_id != task_id (catch payload misuse). + - cleaned_at IS NULL on the task (would clobber live data). + - any supplied message.id or task_state.id already exists in Mongo + (collision → DuplicateItemError surfaced from the adapter). + + Order of operations (mirror of clean_task): + 1. Reload task; verify cleaned_at IS NOT NULL. + 2. Mongo: batch insert messages with caller-supplied IDs. + 3. Mongo: batch insert task_states with caller-supplied IDs. + 4. Postgres: update tasks set cleaned_at = NULL. + (Events are not restored; cursors stay NULL from clean_task. + tasks.params is not touched — out of scope for v1.) + + Partial-insert hazard: insert_many is ordered, so a duplicate ID in + the middle of the batch leaves prior inserts committed. Acceptable + for v1 — the typical "double rehydrate" case has the collision in + position 0 (no prior commits). Operators can recover by manually + deleting the partial inserts and retrying. + + Note: ID preservation requires the caller to capture original Agentex IDs + at write time and store them alongside content in their external system. + This is a contract on the caller's integration, not enforced by Agentex. + """ + # Validate payload before touching anything. + if snapshot.task_id != task_id: + raise ClientError( + f"Snapshot task_id ({snapshot.task_id}) does not match " + f"path task_id ({task_id})" + ) + + # 1. Reload task; refuse if not in cleaned state. + task = await self.task_repository.get(id=task_id) + if task.cleaned_at is None: + raise ClientError( + f"Cannot rehydrate task {task_id}: task is not in cleaned state " + f"(cleaned_at is NULL)" + ) + + # 2. Insert messages with caller-supplied IDs. + if snapshot.messages: + await self.task_message_repository.batch_create(snapshot.messages) + + # 3. Insert task_states with caller-supplied IDs. + if snapshot.task_states: + await self.task_state_repository.batch_create(snapshot.task_states) + + # 4. Clear cleaned_at on the task row. + task.cleaned_at = None + await self.task_repository.update(task) + + logger.info( + "task_rehydrate_completed", + extra={ + "task_id": task_id, + "messages_restored": len(snapshot.messages), + "task_states_restored": len(snapshot.task_states), + }, + ) + + # ---- internal helpers ---- + + async def _is_task_idle(self, task, idle_days: int) -> bool: + """ + True iff the task has no interaction within the idle window. + + Last-interaction = max(task.updated_at, latest message created_at). + `task.updated_at` alone would miss tasks where the only recent + activity is Mongo message writes (which don't bump the Postgres row). + """ + cutoff = datetime.now(UTC) - timedelta(days=idle_days) + last_interaction = task.updated_at + + latest_messages = await self.task_message_service.get_messages( + task_id=task.id, + limit=1, + page_number=1, + order_by="created_at", + order_direction="desc", + ) + if latest_messages and latest_messages[0].created_at is not None: + # Mongo timestamps come back as naive datetimes; treat as UTC so + # they compare cleanly with Postgres TIMESTAMPTZ values. + latest_at = latest_messages[0].created_at + if latest_at.tzinfo is None: + latest_at = latest_at.replace(tzinfo=UTC) + if last_interaction is None or latest_at > last_interaction: + last_interaction = latest_at + + if last_interaction is None: + return True + return last_interaction < cutoff + + async def _has_unprocessed_events(self, task_id: str) -> bool: + """ + True iff any events exist past agent_task_tracker.last_processed_event_id + for any (task, agent) pair tied to this task. + """ + trackers = await self.agent_task_tracker_repository.find_by_field( + "task_id", task_id + ) + for tracker in trackers: + pending = await self.event_repository.list_events_after_last_processed( + task_id=task_id, + agent_id=tracker.agent_id, + last_processed_event_id=tracker.last_processed_event_id, + limit=1, + ) + if pending: + return True + return False + + +DTaskRetentionService = Annotated[TaskRetentionService, Depends(TaskRetentionService)] diff --git a/agentex/src/domain/use_cases/task_retention_use_case.py b/agentex/src/domain/use_cases/task_retention_use_case.py new file mode 100644 index 00000000..6544e098 --- /dev/null +++ b/agentex/src/domain/use_cases/task_retention_use_case.py @@ -0,0 +1,53 @@ +from typing import Annotated + +from fastapi import Depends + +from src.domain.entities.task_retention import ( + TaskCleanupResultEntity, + TaskSnapshotEntity, +) +from src.domain.services.task_retention_service import DTaskRetentionService + + +class TaskRetentionUseCase: + """ + Orchestrates export / clean / rehydrate operations for retention compliance. + Backs both the HTTP admin endpoints and the Temporal scheduled cleanup + activity; keep this layer thin so both callers exercise identical logic. + """ + + def __init__(self, retention_service: DTaskRetentionService): + self.retention_service = retention_service + + async def export_task(self, task_id: str) -> TaskSnapshotEntity: + return await self.retention_service.export_task(task_id) + + async def clean_task( + self, + task_id: str, + force: bool = False, + idle_days: int = 7, + ) -> TaskCleanupResultEntity: + """ + force=True is the admin escape hatch; it bypasses the idle-threshold + check (but NOT the active-workflow / unprocessed-events checks, which + protect correctness, not policy). + """ + return await self.retention_service.clean_task( + task_id=task_id, + enforce_idle_threshold=not force, + idle_days=idle_days, + ) + + async def rehydrate_task( + self, + task_id: str, + snapshot: TaskSnapshotEntity, + ) -> None: + await self.retention_service.rehydrate_task( + task_id=task_id, + snapshot=snapshot, + ) + + +DTaskRetentionUseCase = Annotated[TaskRetentionUseCase, Depends(TaskRetentionUseCase)] diff --git a/agentex/tests/integration/api/task_retention/__init__.py b/agentex/tests/integration/api/task_retention/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/agentex/tests/integration/api/task_retention/test_task_retention_api.py b/agentex/tests/integration/api/task_retention/test_task_retention_api.py new file mode 100644 index 00000000..31496398 --- /dev/null +++ b/agentex/tests/integration/api/task_retention/test_task_retention_api.py @@ -0,0 +1,333 @@ +""" +Integration tests for task retention endpoints: export / clean / rehydrate. + +Covers the round-trip invariant (export → clean → rehydrate → export yields a +byte-identical snapshot), each precondition guard, and the cross-store cleanup +surfaces (Mongo messages, Mongo task_states, Postgres events, Postgres +agent_task_tracker cursor, Postgres tasks.cleaned_at). +""" + +import pytest +import pytest_asyncio +from src.domain.entities.agents import ACPType, AgentEntity +from src.domain.entities.states import StateEntity +from src.domain.entities.task_messages import TaskMessageEntity, TextContentEntity +from src.domain.entities.tasks import TaskEntity, TaskStatus +from src.utils.ids import orm_id + + +@pytest.mark.asyncio +class TestTaskRetentionAPIIntegration: + """Integration tests for /tasks/{id}/{export,clean,rehydrate}.""" + + @pytest_asyncio.fixture + async def test_agent(self, isolated_repositories): + agent_repo = isolated_repositories["agent_repository"] + return await agent_repo.create( + AgentEntity( + id=orm_id(), + name="test-retention-agent", + description="Agent for retention testing", + acp_url="http://test-acp:8000", + acp_type=ACPType.SYNC, + ) + ) + + @pytest_asyncio.fixture + async def stale_task(self, isolated_repositories, test_agent): + """Non-RUNNING task — eligible for cleanup.""" + task_repo = isolated_repositories["task_repository"] + return await task_repo.create( + agent_id=test_agent.id, + task=TaskEntity( + id=orm_id(), + name="stale-task", + status=TaskStatus.FAILED, + status_reason="test fixture", + ), + ) + + @pytest_asyncio.fixture + async def running_task(self, isolated_repositories, test_agent): + """RUNNING task — should be refused by clean.""" + task_repo = isolated_repositories["task_repository"] + return await task_repo.create( + agent_id=test_agent.id, + task=TaskEntity( + id=orm_id(), + name="running-task", + status=TaskStatus.RUNNING, + status_reason="test fixture", + ), + ) + + async def _seed_messages(self, isolated_repositories, task_id, count): + message_repo = isolated_repositories["task_message_repository"] + messages = [] + for i in range(count): + messages.append( + await message_repo.create( + TaskMessageEntity( + id=orm_id(), + task_id=task_id, + content=TextContentEntity( + type="text", author="user", content=f"msg {i}" + ), + streaming_status="DONE", + ) + ) + ) + return messages + + async def _seed_state(self, isolated_repositories, task_id, agent_id): + state_repo = isolated_repositories["task_state_repository"] + return await state_repo.create( + StateEntity( + id=orm_id(), + task_id=task_id, + agent_id=agent_id, + state={"counter": 1, "nested": {"k": "v"}}, + ) + ) + + # ---- export ---- + + async def test_export_returns_full_snapshot( + self, isolated_client, isolated_repositories, stale_task, test_agent + ): + await self._seed_messages(isolated_repositories, stale_task.id, 3) + await self._seed_state(isolated_repositories, stale_task.id, test_agent.id) + + response = await isolated_client.get(f"/tasks/{stale_task.id}/export") + + assert response.status_code == 200 + snapshot = response.json() + assert snapshot["task_id"] == stale_task.id + assert len(snapshot["messages"]) == 3 + assert len(snapshot["task_states"]) == 1 + # Messages ordered chronologically (asc by created_at) + contents = [m["content"]["content"] for m in snapshot["messages"]] + assert contents == ["msg 0", "msg 1", "msg 2"] + + async def test_export_empty_task_returns_empty_collections( + self, isolated_client, stale_task + ): + response = await isolated_client.get(f"/tasks/{stale_task.id}/export") + + assert response.status_code == 200 + snapshot = response.json() + assert snapshot["messages"] == [] + assert snapshot["task_states"] == [] + + async def test_export_nonexistent_task_returns_404(self, isolated_client): + response = await isolated_client.get( + "/tasks/00000000-0000-0000-0000-000000000000/export" + ) + + assert response.status_code == 404 + + # ---- clean ---- + + async def test_clean_force_succeeds_and_clears_all_surfaces( + self, isolated_client, isolated_repositories, stale_task, test_agent + ): + await self._seed_messages(isolated_repositories, stale_task.id, 3) + await self._seed_state(isolated_repositories, stale_task.id, test_agent.id) + + response = await isolated_client.post( + f"/tasks/{stale_task.id}/clean", json={"force": True} + ) + + assert response.status_code == 200 + result = response.json() + assert result["task_id"] == stale_task.id + assert result["messages_deleted"] == 3 + assert result["task_states_deleted"] == 1 + assert result["events_deleted"] == 0 + assert result["cleaned_at"] is not None + + # Verify Mongo surfaces are empty. + export_after = ( + await isolated_client.get(f"/tasks/{stale_task.id}/export") + ).json() + assert export_after["messages"] == [] + assert export_after["task_states"] == [] + + # Verify tasks.cleaned_at is set. + task_after = (await isolated_client.get(f"/tasks/{stale_task.id}")).json() + assert task_after["cleaned_at"] is not None + + async def test_clean_resets_agent_task_tracker_cursor( + self, isolated_client, isolated_repositories, stale_task, test_agent + ): + # Plant a cursor on the auto-created tracker so we can prove the reset. + tracker_repo = isolated_repositories["agent_task_tracker_repository"] + trackers = await tracker_repo.find_by_field("task_id", stale_task.id) + assert len(trackers) == 1 + # task_repository.create creates a tracker with cursor=None already; the + # property we care about is that the reset path runs idempotently. + + await isolated_client.post( + f"/tasks/{stale_task.id}/clean", json={"force": True} + ) + + trackers_after = await tracker_repo.find_by_field("task_id", stale_task.id) + assert len(trackers_after) == 1 + assert trackers_after[0].last_processed_event_id is None + + async def test_clean_running_task_returns_400(self, isolated_client, running_task): + response = await isolated_client.post( + f"/tasks/{running_task.id}/clean", json={"force": True} + ) + + assert response.status_code == 400 + assert "RUNNING" in response.json()["message"] + + async def test_clean_already_cleaned_task_returns_empty_result( + self, isolated_client, isolated_repositories, stale_task + ): + await self._seed_messages(isolated_repositories, stale_task.id, 2) + # First clean — does the work. + first = await isolated_client.post( + f"/tasks/{stale_task.id}/clean", json={"force": True} + ) + assert first.status_code == 200 + first_cleaned_at = first.json()["cleaned_at"] + + # Second clean — should be a no-op returning the prior cleaned_at. + second = await isolated_client.post( + f"/tasks/{stale_task.id}/clean", json={"force": True} + ) + assert second.status_code == 200 + result = second.json() + assert result["messages_deleted"] == 0 + assert result["task_states_deleted"] == 0 + assert result["events_deleted"] == 0 + assert result["cleaned_at"] == first_cleaned_at + + async def test_clean_unprocessed_events_returns_400( + self, isolated_client, isolated_repositories, stale_task, test_agent + ): + # Plant an event with no cursor advancement; _has_unprocessed_events will + # see the event past the (null) cursor and refuse. + event_repo = isolated_repositories["event_repository"] + await event_repo.create( + id=orm_id(), + task_id=stale_task.id, + agent_id=test_agent.id, + content=None, + ) + + response = await isolated_client.post( + f"/tasks/{stale_task.id}/clean", json={"force": True} + ) + + assert response.status_code == 400 + assert "unprocessed events" in response.json()["message"] + + async def test_clean_nonexistent_task_returns_404(self, isolated_client): + response = await isolated_client.post( + "/tasks/00000000-0000-0000-0000-000000000000/clean", + json={"force": True}, + ) + + assert response.status_code == 404 + + # ---- rehydrate ---- + + async def test_round_trip_is_byte_identical( + self, isolated_client, isolated_repositories, stale_task, test_agent + ): + """ + The load-bearing invariant: export → clean → rehydrate → export + yields the same snapshot down to IDs and timestamps. ID preservation + is what makes rehydrated tasks indistinguishable from the original. + """ + await self._seed_messages(isolated_repositories, stale_task.id, 3) + await self._seed_state(isolated_repositories, stale_task.id, test_agent.id) + + snapshot_before = ( + await isolated_client.get(f"/tasks/{stale_task.id}/export") + ).json() + + clean = await isolated_client.post( + f"/tasks/{stale_task.id}/clean", json={"force": True} + ) + assert clean.status_code == 200 + + rehydrate = await isolated_client.post( + f"/tasks/{stale_task.id}/rehydrate", json=snapshot_before + ) + assert rehydrate.status_code == 204 + + snapshot_after = ( + await isolated_client.get(f"/tasks/{stale_task.id}/export") + ).json() + + assert snapshot_after == snapshot_before + + # Sanity: task is back to active (cleaned_at=None). + task_after = (await isolated_client.get(f"/tasks/{stale_task.id}")).json() + assert task_after["cleaned_at"] is None + + async def test_rehydrate_active_task_returns_400(self, isolated_client, stale_task): + # Task was never cleaned; rehydrate must refuse. + payload = {"task_id": stale_task.id, "messages": [], "task_states": []} + response = await isolated_client.post( + f"/tasks/{stale_task.id}/rehydrate", json=payload + ) + + assert response.status_code == 400 + assert "not in cleaned state" in response.json()["message"] + + async def test_rehydrate_task_id_mismatch_returns_400( + self, isolated_client, stale_task + ): + # Clean first so we'd otherwise pass the cleaned_at guard. + await isolated_client.post( + f"/tasks/{stale_task.id}/clean", json={"force": True} + ) + + payload = { + "task_id": "00000000-0000-0000-0000-000000000000", + "messages": [], + "task_states": [], + } + response = await isolated_client.post( + f"/tasks/{stale_task.id}/rehydrate", json=payload + ) + + assert response.status_code == 400 + assert "does not match" in response.json()["message"] + + async def test_rehydrate_id_collision_returns_400( + self, isolated_client, isolated_repositories, stale_task + ): + # Seed, snapshot, clean, then re-insert one message so the rehydrate + # collides on _id. + await self._seed_messages(isolated_repositories, stale_task.id, 2) + snapshot = (await isolated_client.get(f"/tasks/{stale_task.id}/export")).json() + await isolated_client.post( + f"/tasks/{stale_task.id}/clean", json={"force": True} + ) + + # Plant a colliding doc with the same _id the rehydrate would use. + first = snapshot["messages"][0] + message_repo = isolated_repositories["task_message_repository"] + await message_repo.create( + TaskMessageEntity( + id=first["id"], + task_id=stale_task.id, + content=TextContentEntity( + type="text", author="user", content="planted collision" + ), + streaming_status="DONE", + ) + ) + + response = await isolated_client.post( + f"/tasks/{stale_task.id}/rehydrate", json=snapshot + ) + + assert response.status_code == 400 + assert "duplicate id" in response.json()["message"].lower() diff --git a/agentex/tests/integration/fixtures/integration_client.py b/agentex/tests/integration/fixtures/integration_client.py index b715d223..4ba0ceb2 100644 --- a/agentex/tests/integration/fixtures/integration_client.py +++ b/agentex/tests/integration/fixtures/integration_client.py @@ -383,6 +383,7 @@ async def isolated_integration_app( from src.domain.use_cases.messages_use_case import MessagesUseCase from src.domain.use_cases.spans_use_case import SpanUseCase from src.domain.use_cases.states_use_case import StatesUseCase + from src.domain.use_cases.task_retention_use_case import TaskRetentionUseCase from src.domain.use_cases.tasks_use_case import TasksUseCase # Create use case factory functions with isolated repositories @@ -466,6 +467,28 @@ def create_messages_use_case(): return MessagesUseCase(task_message_service=task_message_service) + def create_task_retention_use_case(): + """Create TaskRetentionUseCase wired to isolated repos for retention tests.""" + from src.domain.services.task_message_service import TaskMessageService + from src.domain.services.task_retention_service import TaskRetentionService + from src.domain.use_cases.task_retention_use_case import TaskRetentionUseCase + + task_message_service = TaskMessageService( + message_repository=isolated_repositories["task_message_repository"] + ) + retention_service = TaskRetentionService( + task_repository=isolated_repositories["task_repository"], + task_message_service=task_message_service, + task_message_repository=isolated_repositories["task_message_repository"], + task_state_repository=isolated_repositories["task_state_repository"], + event_repository=isolated_repositories["event_repository"], + agent_task_tracker_repository=isolated_repositories[ + "agent_task_tracker_repository" + ], + temporal_adapter=isolated_temporal_adapter, + ) + return TaskRetentionUseCase(retention_service=retention_service) + # Import dependency types and repository classes that need to be overridden from src.adapters.streams.adapter_redis import RedisStreamRepository from src.config.dependencies import ( @@ -510,6 +533,7 @@ def create_messages_use_case(): AgentTaskTrackerUseCase: create_agent_task_tracker_use_case, TasksUseCase: create_tasks_use_case, MessagesUseCase: create_messages_use_case, + TaskRetentionUseCase: create_task_retention_use_case, AgentAPIKeysUseCase: create_agent_api_keys_use_case, DeploymentHistoryUseCase: create_deployment_history_use_case, # Repositories - these ensure consistent isolated instances From 9530759f98909aaf3da8e4165fae489bdbc70cc0 Mon Sep 17 00:00:00 2001 From: Stas Moreinis Date: Tue, 19 May 2026 13:30:03 -0700 Subject: [PATCH 2/4] address review: per-task authz on retention routes, validate embedded task_ids MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two P1 issues from review. **Authorization (security)** The three retention endpoints were inheriting only the global auth middleware, not the resource-level authorization that every other /tasks/{task_id}/* route enforces. Any authenticated principal could export, clean, or rehydrate a task they don't own. Adds DAuthorizedId to all three handlers matching the existing pattern: - export → AuthorizedOperationType.read - clean → AuthorizedOperationType.delete - rehydrate → AuthorizedOperationType.update **Per-entity task_id validation** snapshot.task_id was checked against the path task_id, but each embedded TaskMessageEntity and StateEntity carries its own task_id field that batch_create forwards straight to MongoDB. A caller could pass snapshot.task_id = "A" with messages whose task_id = "B" and pollute task B's collection — Mongo has no FK to reject it. Adds explicit per-item validation in rehydrate_task before any insert. Returns 400 with the offending index in the message so the caller can find the bad entry. Tests: 2 new integration tests covering the mismatched-task_id cases for both messages and task_states. Full suite (15 tests) still passes. --- agentex/src/api/routes/task_retention.py | 26 ++++---- .../domain/services/task_retention_service.py | 16 +++++ .../task_retention/test_task_retention_api.py | 59 +++++++++++++++++++ 3 files changed, 87 insertions(+), 14 deletions(-) diff --git a/agentex/src/api/routes/task_retention.py b/agentex/src/api/routes/task_retention.py index ee7abbe6..378bd8de 100644 --- a/agentex/src/api/routes/task_retention.py +++ b/agentex/src/api/routes/task_retention.py @@ -6,18 +6,18 @@ workflow calls the same use case (TaskRetentionUseCase.clean_task), not these endpoints. -OPEN DESIGN DECISION: auth. -The clean endpoint is destructive. It should require elevated privilege -(admin role, internal service account header, or similar). The export endpoint -is read-only but exposes content — gate similarly. Rehydrate is the contract -external callers integrate against — needs a stable auth model. - -For the first cut, gate behind task ownership (same as today's /tasks routes) -plus a feature flag on the backend. Revisit before external integration. +Authorization mirrors the existing /tasks routes via DAuthorizedId: +- export → read (returns content) +- rehydrate → update (writes content for a task the caller owns) +- clean → delete (destructive) """ from fastapi import APIRouter +from src.api.schemas.authorization_types import ( + AgentexResourceType, + AuthorizedOperationType, +) from src.api.schemas.task_retention import ( CleanTaskRequest, CleanTaskResponse, @@ -25,6 +25,7 @@ RehydrateTaskRequest, ) from src.domain.use_cases.task_retention_use_case import DTaskRetentionUseCase +from src.utils.authorization_shortcuts import DAuthorizedId router = APIRouter(prefix="/tasks", tags=["task-retention"]) @@ -34,9 +35,8 @@ response_model=ExportTaskResponse, ) async def export_task( - task_id: str, + task_id: DAuthorizedId(AgentexResourceType.task, AuthorizedOperationType.read), use_case: DTaskRetentionUseCase, - # TODO: auth dep — task ownership + admin/elevated role ) -> ExportTaskResponse: """ Build a self-contained snapshot of a task's content surfaces. @@ -53,10 +53,9 @@ async def export_task( response_model=CleanTaskResponse, ) async def clean_task( - task_id: str, + task_id: DAuthorizedId(AgentexResourceType.task, AuthorizedOperationType.delete), request: CleanTaskRequest, use_case: DTaskRetentionUseCase, - # TODO: auth dep — admin only ) -> CleanTaskResponse: """ Delete content-bearing rows for a stale task. @@ -78,10 +77,9 @@ async def clean_task( status_code=204, ) async def rehydrate_task( - task_id: str, + task_id: DAuthorizedId(AgentexResourceType.task, AuthorizedOperationType.update), request: RehydrateTaskRequest, use_case: DTaskRetentionUseCase, - # TODO: auth dep — task ownership; this is the caller-facing endpoint ) -> None: """ Restore content-bearing rows from a snapshot. diff --git a/agentex/src/domain/services/task_retention_service.py b/agentex/src/domain/services/task_retention_service.py index e1231f55..825cdbcd 100644 --- a/agentex/src/domain/services/task_retention_service.py +++ b/agentex/src/domain/services/task_retention_service.py @@ -269,6 +269,22 @@ async def rehydrate_task( f"path task_id ({task_id})" ) + # Reject any embedded entity whose task_id disagrees with the path. + # Mongo has no foreign key to tasks, so an unchecked batch_create here + # would write rows that get tagged to a task the caller may not own. + for i, message in enumerate(snapshot.messages): + if message.task_id != task_id: + raise ClientError( + f"Snapshot message[{i}] task_id ({message.task_id}) does not " + f"match path task_id ({task_id})" + ) + for i, state in enumerate(snapshot.task_states): + if state.task_id != task_id: + raise ClientError( + f"Snapshot task_states[{i}] task_id ({state.task_id}) does " + f"not match path task_id ({task_id})" + ) + # 1. Reload task; refuse if not in cleaned state. task = await self.task_repository.get(id=task_id) if task.cleaned_at is None: diff --git a/agentex/tests/integration/api/task_retention/test_task_retention_api.py b/agentex/tests/integration/api/task_retention/test_task_retention_api.py index 31496398..2381fc26 100644 --- a/agentex/tests/integration/api/task_retention/test_task_retention_api.py +++ b/agentex/tests/integration/api/task_retention/test_task_retention_api.py @@ -300,6 +300,65 @@ async def test_rehydrate_task_id_mismatch_returns_400( assert response.status_code == 400 assert "does not match" in response.json()["message"] + async def test_rehydrate_rejects_mismatched_message_task_id( + self, isolated_client, isolated_repositories, stale_task + ): + """ + Defense in depth: even when snapshot.task_id matches the path, each + embedded message's task_id must also match — otherwise a caller could + smuggle messages into a different task's collection through rehydrate. + """ + await isolated_client.post( + f"/tasks/{stale_task.id}/clean", json={"force": True} + ) + + foreign_task_id = "00000000-0000-0000-0000-000000000000" + payload = { + "task_id": stale_task.id, + "messages": [ + { + "id": orm_id(), + "task_id": foreign_task_id, + "content": {"type": "text", "author": "user", "content": "x"}, + "streaming_status": "DONE", + } + ], + "task_states": [], + } + response = await isolated_client.post( + f"/tasks/{stale_task.id}/rehydrate", json=payload + ) + + assert response.status_code == 400 + assert "message[0]" in response.json()["message"] + + async def test_rehydrate_rejects_mismatched_task_state_task_id( + self, isolated_client, isolated_repositories, stale_task, test_agent + ): + await isolated_client.post( + f"/tasks/{stale_task.id}/clean", json={"force": True} + ) + + foreign_task_id = "00000000-0000-0000-0000-000000000000" + payload = { + "task_id": stale_task.id, + "messages": [], + "task_states": [ + { + "id": orm_id(), + "task_id": foreign_task_id, + "agent_id": test_agent.id, + "state": {"k": "v"}, + } + ], + } + response = await isolated_client.post( + f"/tasks/{stale_task.id}/rehydrate", json=payload + ) + + assert response.status_code == 400 + assert "task_states[0]" in response.json()["message"] + async def test_rehydrate_id_collision_returns_400( self, isolated_client, isolated_repositories, stale_task ): From b7efc01bc9b383763259fc0829c0d7424e88e5fd Mon Sep 17 00:00:00 2001 From: Stas Moreinis Date: Tue, 26 May 2026 13:00:41 -0700 Subject: [PATCH 3/4] feat(retention): support presigned URLs for large snapshots MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit For very long conversations (deep reasoning content, extensive tool traces, many attachments), the rehydrate JSON body can exceed proxy / ALB body-size limits. Same risk on the export response. Adds an opt-in URL path to both operations: - POST /tasks/{task_id}/export (new) — Agentex builds the snapshot and PUTs it as JSON to a caller-supplied presigned upload URL. Returns a small ack with byte/item counts. - POST /tasks/{task_id}/rehydrate (extended) — accepts an optional snapshot_url field. When set, Agentex GETs the URL and parses the body as a TaskSnapshotEntity. Mutually exclusive with inline content; the Pydantic validator rejects mixed payloads. The existing inline paths (GET /export, POST /rehydrate with inline content) are unchanged. Calls that fit in a JSON body still take the direct route — only oversized callers need to use object storage. SSRF guard (utils/url_validation.py): deny-known-bad. Requires https, resolves the hostname, and rejects any address that is_private, is_loopback, is_link_local (covers 169.254.169.254 cloud metadata), is_reserved, is_multicast, or is_unspecified. Uses async DNS so it doesn't block the event loop. Format: plain JSON for both directions (round-trip-symmetric with the inline path). Gzipped/streaming are future possibilities. Caps: no explicit byte/timeout limits in v1. If real callers hit issues we'll add them then. Sync: callers wait for upload/download to complete. Async-with-status is a follow-up if needed. Tests: 6 new integration tests covering happy paths (mock httpx PUT/GET, verify body == inline export, verify byte-identical round-trip through URL form), SSRF rejection (127.0.0.1), scheme rejection (http), and the inline-vs-URL mutual-exclusion validator. Total suite 21 passing. Other: - TaskRetentionService now depends on DHttpxClient (already a global singleton via GlobalDependencies) - New TaskExportToUrlResultEntity carries the upload ack shape --- agentex/openapi.yaml | 108 ++++++++++++- agentex/src/api/routes/task_retention.py | 43 ++++- agentex/src/api/schemas/task_retention.py | 53 ++++++- agentex/src/domain/entities/task_retention.py | 14 ++ .../domain/services/task_retention_service.py | 90 ++++++++++- .../use_cases/task_retention_use_case.py | 15 +- agentex/src/utils/url_validation.py | 55 +++++++ .../task_retention/test_task_retention_api.py | 148 ++++++++++++++++++ .../fixtures/integration_client.py | 1 + 9 files changed, 518 insertions(+), 9 deletions(-) create mode 100644 agentex/src/utils/url_validation.py diff --git a/agentex/openapi.yaml b/agentex/openapi.yaml index 8c0a86eb..891b0337 100644 --- a/agentex/openapi.yaml +++ b/agentex/openapi.yaml @@ -3565,6 +3565,46 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + post: + tags: + - task-retention + summary: Export Task To Url + description: 'Build the task snapshot and PUT it to a caller-supplied presigned + URL. + + + Use this when the snapshot is too large for a JSON response body (long + + conversations, deep reasoning content, many attachments). The upload URL + + must be https and resolve to a public address — see SSRF guard.' + operationId: export_task_to_url_tasks__task_id__export_post + parameters: + - name: task_id + in: path + required: true + schema: + type: string + title: Task Id + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/ExportTaskToUrlRequest' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/ExportTaskToUrlResponse' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /tasks/{task_id}/clean: post: tags: @@ -3613,6 +3653,13 @@ paths: description: 'Restore content-bearing rows from a snapshot. + Two modes: + + - Inline: caller provides messages and task_states in the request body. + + - URL: caller provides snapshot_url; Agentex downloads and parses it. + + Refuses if the task isn''t currently in a cleaned state, or if any supplied message/state ID already exists in Mongo (catches double-rehydrate).' @@ -4726,6 +4773,45 @@ components: - task_id title: ExportTaskResponse description: Wire format mirrors the entity directly — schema parity is intentional. + ExportTaskToUrlRequest: + properties: + upload_url: + type: string + maxLength: 2083 + minLength: 1 + format: uri + title: Upload Url + description: Presigned PUT URL where Agentex will upload the task snapshot + as JSON. Must be https; must resolve to a public address. + type: object + required: + - upload_url + title: ExportTaskToUrlRequest + ExportTaskToUrlResponse: + properties: + task_id: + type: string + title: Task Id + upload_url: + type: string + title: Upload Url + uploaded_bytes: + type: integer + title: Uploaded Bytes + messages_count: + type: integer + title: Messages Count + task_states_count: + type: integer + title: Task States Count + type: object + required: + - task_id + - upload_url + - uploaded_bytes + - messages_count + - task_states_count + title: ExportTaskToUrlResponse FileAttachment: properties: file_id: @@ -5221,11 +5307,31 @@ components: $ref: '#/components/schemas/StateEntity' type: array title: Task States + snapshot_url: + anyOf: + - type: string + maxLength: 2083 + minLength: 1 + format: uri + - type: 'null' + title: Snapshot Url + description: Presigned GET URL whose body is a JSON-encoded TaskSnapshotEntity. + Must be https; must resolve to a public address. When set, messages/task_states + must be empty. type: object required: - task_id title: RehydrateTaskRequest - description: Same shape as the export response — round-trip parity. + description: 'Either provide inline content (messages + task_states) or a snapshot_url + + pointing at a presigned JSON download. Mixing both is rejected. + + + The inline form is the canonical shape used by export''s GET response, so + + snapshot → clean → rehydrate round-trips cleanly without serialization + + changes.' ScheduleActionInfo: properties: workflow_name: diff --git a/agentex/src/api/routes/task_retention.py b/agentex/src/api/routes/task_retention.py index 378bd8de..165c2800 100644 --- a/agentex/src/api/routes/task_retention.py +++ b/agentex/src/api/routes/task_retention.py @@ -22,8 +22,11 @@ CleanTaskRequest, CleanTaskResponse, ExportTaskResponse, + ExportTaskToUrlRequest, + ExportTaskToUrlResponse, RehydrateTaskRequest, ) +from src.domain.entities.task_retention import TaskSnapshotEntity from src.domain.use_cases.task_retention_use_case import DTaskRetentionUseCase from src.utils.authorization_shortcuts import DAuthorizedId @@ -48,6 +51,29 @@ async def export_task( return ExportTaskResponse.model_validate(snapshot) +@router.post( + "/{task_id}/export", + response_model=ExportTaskToUrlResponse, +) +async def export_task_to_url( + task_id: DAuthorizedId(AgentexResourceType.task, AuthorizedOperationType.read), + request: ExportTaskToUrlRequest, + use_case: DTaskRetentionUseCase, +) -> ExportTaskToUrlResponse: + """ + Build the task snapshot and PUT it to a caller-supplied presigned URL. + + Use this when the snapshot is too large for a JSON response body (long + conversations, deep reasoning content, many attachments). The upload URL + must be https and resolve to a public address — see SSRF guard. + """ + result = await use_case.export_task_to_url( + task_id=task_id, + upload_url=str(request.upload_url), + ) + return ExportTaskToUrlResponse.model_validate(result) + + @router.post( "/{task_id}/clean", response_model=CleanTaskResponse, @@ -84,7 +110,22 @@ async def rehydrate_task( """ Restore content-bearing rows from a snapshot. + Two modes: + - Inline: caller provides messages and task_states in the request body. + - URL: caller provides snapshot_url; Agentex downloads and parses it. + Refuses if the task isn't currently in a cleaned state, or if any supplied message/state ID already exists in Mongo (catches double-rehydrate). """ - await use_case.rehydrate_task(task_id=task_id, snapshot=request) + if request.snapshot_url is not None: + await use_case.rehydrate_task( + task_id=task_id, + snapshot_url=str(request.snapshot_url), + ) + else: + snapshot = TaskSnapshotEntity( + task_id=request.task_id, + messages=request.messages, + task_states=request.task_states, + ) + await use_case.rehydrate_task(task_id=task_id, snapshot=snapshot) diff --git a/agentex/src/api/schemas/task_retention.py b/agentex/src/api/schemas/task_retention.py index e27a3360..f6341e1e 100644 --- a/agentex/src/api/schemas/task_retention.py +++ b/agentex/src/api/schemas/task_retention.py @@ -1,7 +1,10 @@ -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, HttpUrl, model_validator +from src.domain.entities.states import StateEntity +from src.domain.entities.task_messages import TaskMessageEntity from src.domain.entities.task_retention import ( TaskCleanupResultEntity, + TaskExportToUrlResultEntity, TaskSnapshotEntity, ) @@ -12,6 +15,20 @@ class ExportTaskResponse(TaskSnapshotEntity): pass +class ExportTaskToUrlRequest(BaseModel): + upload_url: HttpUrl = Field( + ..., + description=( + "Presigned PUT URL where Agentex will upload the task snapshot as " + "JSON. Must be https; must resolve to a public address." + ), + ) + + +class ExportTaskToUrlResponse(TaskExportToUrlResultEntity): + pass + + class CleanTaskRequest(BaseModel): force: bool = Field( default=False, @@ -31,7 +48,35 @@ class CleanTaskResponse(TaskCleanupResultEntity): pass -class RehydrateTaskRequest(TaskSnapshotEntity): - """Same shape as the export response — round-trip parity.""" +class RehydrateTaskRequest(BaseModel): + """ + Either provide inline content (messages + task_states) or a snapshot_url + pointing at a presigned JSON download. Mixing both is rejected. - pass + The inline form is the canonical shape used by export's GET response, so + snapshot → clean → rehydrate round-trips cleanly without serialization + changes. + """ + + task_id: str + messages: list[TaskMessageEntity] = Field(default_factory=list) + task_states: list[StateEntity] = Field(default_factory=list) + snapshot_url: HttpUrl | None = Field( + default=None, + description=( + "Presigned GET URL whose body is a JSON-encoded TaskSnapshotEntity. " + "Must be https; must resolve to a public address. When set, " + "messages/task_states must be empty." + ), + ) + + @model_validator(mode="after") + def _exactly_one_source(self): + has_inline = bool(self.messages or self.task_states) + has_url = self.snapshot_url is not None + if has_inline and has_url: + raise ValueError( + "Provide inline content (messages/task_states) OR snapshot_url, " + "not both." + ) + return self diff --git a/agentex/src/domain/entities/task_retention.py b/agentex/src/domain/entities/task_retention.py index c3aa440b..2907449c 100644 --- a/agentex/src/domain/entities/task_retention.py +++ b/agentex/src/domain/entities/task_retention.py @@ -42,3 +42,17 @@ class TaskCleanupResultEntity(BaseModel): messages_deleted: int task_states_deleted: int events_deleted: int + + +class TaskExportToUrlResultEntity(BaseModel): + """ + Per-invocation result of an export-to-URL operation. Returned to callers + of POST /tasks/{id}/export so they can verify what was uploaded without + having to re-download and parse. + """ + + task_id: str + upload_url: str + uploaded_bytes: int + messages_count: int + task_states_count: int diff --git a/agentex/src/domain/services/task_retention_service.py b/agentex/src/domain/services/task_retention_service.py index 825cdbcd..8a0bc3e5 100644 --- a/agentex/src/domain/services/task_retention_service.py +++ b/agentex/src/domain/services/task_retention_service.py @@ -18,12 +18,14 @@ from fastapi import Depends from src.adapters.temporal.adapter_temporal import DTemporalAdapter +from src.config.dependencies import DHttpxClient from src.domain.entities.task_retention import ( TaskCleanupResultEntity, + TaskExportToUrlResultEntity, TaskSnapshotEntity, ) from src.domain.entities.tasks import TaskStatus -from src.domain.exceptions import ClientError +from src.domain.exceptions import ClientError, ServiceError from src.domain.repositories.agent_task_tracker_repository import ( DAgentTaskTrackerRepository, ) @@ -33,6 +35,7 @@ from src.domain.repositories.task_state_repository import DTaskStateRepository from src.domain.services.task_message_service import DTaskMessageService from src.utils.logging import make_logger +from src.utils.url_validation import validate_external_url logger = make_logger(__name__) @@ -52,6 +55,7 @@ def __init__( event_repository: DEventRepository, agent_task_tracker_repository: DAgentTaskTrackerRepository, temporal_adapter: DTemporalAdapter, + httpx_client: DHttpxClient, ): self.task_repository = task_repository self.task_message_service = task_message_service @@ -60,6 +64,7 @@ def __init__( self.event_repository = event_repository self.agent_task_tracker_repository = agent_task_tracker_repository self.temporal_adapter = temporal_adapter + self.httpx_client = httpx_client async def export_task(self, task_id: str) -> TaskSnapshotEntity: """ @@ -114,6 +119,58 @@ async def export_task(self, task_id: str) -> TaskSnapshotEntity: task_states=task_states, ) + async def export_task_to_url( + self, + task_id: str, + upload_url: str, + ) -> TaskExportToUrlResultEntity: + """ + Build the snapshot the same way export_task does, then PUT the JSON + body to a caller-supplied presigned URL. Useful when the snapshot is + too large to fit comfortably in a JSON response body. + + The upload URL is validated against the SSRF guard before any request + is issued (see utils.url_validation). + """ + await validate_external_url(upload_url) + + snapshot = await self.export_task(task_id) + body = snapshot.model_dump_json().encode("utf-8") + + try: + response = await self.httpx_client.put( + upload_url, + content=body, + headers={"Content-Type": "application/json"}, + ) + response.raise_for_status() + except Exception as e: + raise ServiceError( + message=f"Failed to upload snapshot for task {task_id}", + detail=str(e), + ) from e + + result = TaskExportToUrlResultEntity( + task_id=task_id, + upload_url=upload_url, + uploaded_bytes=len(body), + messages_count=len(snapshot.messages), + task_states_count=len(snapshot.task_states), + ) + + logger.info( + "task_export_to_url_completed", + extra={ + "task_id": result.task_id, + "upload_url": result.upload_url, + "uploaded_bytes": result.uploaded_bytes, + "messages_count": result.messages_count, + "task_states_count": result.task_states_count, + }, + ) + + return result + async def clean_task( self, task_id: str, @@ -233,18 +290,27 @@ async def clean_task( async def rehydrate_task( self, task_id: str, - snapshot: TaskSnapshotEntity, + snapshot: TaskSnapshotEntity | None = None, + snapshot_url: str | None = None, ) -> None: """ Restore content-bearing rows from a snapshot. Inverse of clean_task. + Source of the snapshot is mutually exclusive: caller supplies either an + inline `snapshot` or a `snapshot_url` (presigned GET URL whose body is + the JSON-encoded snapshot). URL form is for cases where the snapshot is + larger than is comfortable as a JSON request body. + Refuses (raises) if: + - both/neither of snapshot and snapshot_url are provided. - snapshot.task_id != task_id (catch payload misuse). - cleaned_at IS NULL on the task (would clobber live data). - any supplied message.id or task_state.id already exists in Mongo (collision → DuplicateItemError surfaced from the adapter). Order of operations (mirror of clean_task): + 0. If snapshot_url is set, validate (SSRF) and download → parse to + TaskSnapshotEntity. 1. Reload task; verify cleaned_at IS NOT NULL. 2. Mongo: batch insert messages with caller-supplied IDs. 3. Mongo: batch insert task_states with caller-supplied IDs. @@ -262,6 +328,26 @@ async def rehydrate_task( at write time and store them alongside content in their external system. This is a contract on the caller's integration, not enforced by Agentex. """ + if (snapshot is None) == (snapshot_url is None): + raise ClientError("Provide exactly one of snapshot or snapshot_url.") + + if snapshot_url is not None: + await validate_external_url(snapshot_url) + try: + response = await self.httpx_client.get(snapshot_url) + response.raise_for_status() + except Exception as e: + raise ServiceError( + message=f"Failed to download snapshot from URL for task {task_id}", + detail=str(e), + ) from e + try: + snapshot = TaskSnapshotEntity.model_validate_json(response.content) + except Exception as e: + raise ClientError( + f"Downloaded snapshot is not a valid TaskSnapshotEntity: {e}" + ) from e + # Validate payload before touching anything. if snapshot.task_id != task_id: raise ClientError( diff --git a/agentex/src/domain/use_cases/task_retention_use_case.py b/agentex/src/domain/use_cases/task_retention_use_case.py index 6544e098..ef8ec620 100644 --- a/agentex/src/domain/use_cases/task_retention_use_case.py +++ b/agentex/src/domain/use_cases/task_retention_use_case.py @@ -4,6 +4,7 @@ from src.domain.entities.task_retention import ( TaskCleanupResultEntity, + TaskExportToUrlResultEntity, TaskSnapshotEntity, ) from src.domain.services.task_retention_service import DTaskRetentionService @@ -22,6 +23,16 @@ def __init__(self, retention_service: DTaskRetentionService): async def export_task(self, task_id: str) -> TaskSnapshotEntity: return await self.retention_service.export_task(task_id) + async def export_task_to_url( + self, + task_id: str, + upload_url: str, + ) -> TaskExportToUrlResultEntity: + return await self.retention_service.export_task_to_url( + task_id=task_id, + upload_url=upload_url, + ) + async def clean_task( self, task_id: str, @@ -42,11 +53,13 @@ async def clean_task( async def rehydrate_task( self, task_id: str, - snapshot: TaskSnapshotEntity, + snapshot: TaskSnapshotEntity | None = None, + snapshot_url: str | None = None, ) -> None: await self.retention_service.rehydrate_task( task_id=task_id, snapshot=snapshot, + snapshot_url=snapshot_url, ) diff --git a/agentex/src/utils/url_validation.py b/agentex/src/utils/url_validation.py new file mode 100644 index 00000000..5be1598b --- /dev/null +++ b/agentex/src/utils/url_validation.py @@ -0,0 +1,55 @@ +""" +SSRF guard for caller-supplied URLs that Agentex fetches/uploads to server-side. + +Used by task retention endpoints that accept presigned URLs for export upload +and rehydrate download. A malicious or careless caller could otherwise point us +at internal services (databases, metadata IPs, private networks) — the guard +resolves the hostname and rejects any IP that's not publicly routable. +""" + +import asyncio +import ipaddress +from urllib.parse import urlparse + +from src.domain.exceptions import ClientError + + +async def validate_external_url(url: str) -> None: + """ + Reject the URL if it would cause Agentex to issue a request to non-public + infrastructure. Raises ClientError; returns None on success. + + Resolution happens here, but the subsequent request uses the hostname — DNS + rebinding is possible (returning a public IP here and a private one at the + actual request). The mitigation is acceptable for v1 because the realistic + threat model is operator misconfiguration, not active attack from a + privileged caller. + """ + parsed = urlparse(url) + + if parsed.scheme != "https": + raise ClientError(f"URL scheme must be 'https'; got '{parsed.scheme}'.") + if not parsed.hostname: + raise ClientError("URL must include a hostname.") + + loop = asyncio.get_event_loop() + try: + infos = await loop.getaddrinfo(parsed.hostname, parsed.port or 443) + except OSError as e: + raise ClientError(f"Could not resolve hostname '{parsed.hostname}': {e}") from e + + for info in infos: + addr_str = info[4][0] + addr = ipaddress.ip_address(addr_str) + if ( + addr.is_private + or addr.is_loopback + or addr.is_link_local + or addr.is_reserved + or addr.is_multicast + or addr.is_unspecified + ): + raise ClientError( + f"URL '{url}' resolves to a non-public address ({addr}); " + f"refusing to issue a server-side request." + ) diff --git a/agentex/tests/integration/api/task_retention/test_task_retention_api.py b/agentex/tests/integration/api/task_retention/test_task_retention_api.py index 2381fc26..b2014422 100644 --- a/agentex/tests/integration/api/task_retention/test_task_retention_api.py +++ b/agentex/tests/integration/api/task_retention/test_task_retention_api.py @@ -7,6 +7,8 @@ agent_task_tracker cursor, Postgres tasks.cleaned_at). """ +from unittest.mock import AsyncMock, MagicMock + import pytest import pytest_asyncio from src.domain.entities.agents import ACPType, AgentEntity @@ -390,3 +392,149 @@ async def test_rehydrate_id_collision_returns_400( assert response.status_code == 400 assert "duplicate id" in response.json()["message"].lower() + + # ---- export-to-url ---- + + async def test_export_to_url_uploads_snapshot_via_put( + self, + isolated_client, + isolated_repositories, + isolated_api_key_http_client, + stale_task, + test_agent, + ): + """ + Verify the snapshot we'd return inline is what gets PUT to the upload URL. + The mock httpx client captures the request body so we can diff. + """ + await self._seed_messages(isolated_repositories, stale_task.id, 2) + await self._seed_state(isolated_repositories, stale_task.id, test_agent.id) + expected = (await isolated_client.get(f"/tasks/{stale_task.id}/export")).json() + + put_response = MagicMock() + put_response.raise_for_status = MagicMock() + isolated_api_key_http_client.put = AsyncMock(return_value=put_response) + + response = await isolated_client.post( + f"/tasks/{stale_task.id}/export", + json={"upload_url": "https://example.com/upload"}, + ) + + assert response.status_code == 200 + result = response.json() + assert result["task_id"] == stale_task.id + assert result["messages_count"] == 2 + assert result["task_states_count"] == 1 + assert result["uploaded_bytes"] > 0 + + # Verify the PUT body matches the inline export byte-for-byte. + put_kwargs = isolated_api_key_http_client.put.call_args.kwargs + import json as _json + + uploaded = _json.loads(put_kwargs["content"].decode("utf-8")) + assert uploaded == expected + + async def test_export_to_url_rejects_non_https_scheme( + self, isolated_client, stale_task + ): + response = await isolated_client.post( + f"/tasks/{stale_task.id}/export", + json={"upload_url": "http://example.com/upload"}, + ) + # Pydantic-level URL validation may reject before reaching the SSRF guard. + assert response.status_code in (400, 422) + + async def test_export_to_url_rejects_private_address( + self, isolated_client, stale_task + ): + # 127.0.0.1 resolves to itself; the SSRF guard's is_loopback check fires. + response = await isolated_client.post( + f"/tasks/{stale_task.id}/export", + json={"upload_url": "https://127.0.0.1/upload"}, + ) + assert response.status_code == 400 + assert "non-public" in response.json()["message"] + + # ---- rehydrate-from-url ---- + + async def test_rehydrate_from_url_downloads_and_restores( + self, + isolated_client, + isolated_repositories, + isolated_api_key_http_client, + stale_task, + test_agent, + ): + # Seed, snapshot, clean — then rehydrate via URL whose body is the snapshot. + await self._seed_messages(isolated_repositories, stale_task.id, 2) + await self._seed_state(isolated_repositories, stale_task.id, test_agent.id) + snapshot = (await isolated_client.get(f"/tasks/{stale_task.id}/export")).json() + await isolated_client.post( + f"/tasks/{stale_task.id}/clean", json={"force": True} + ) + + import json as _json + + get_response = MagicMock() + get_response.raise_for_status = MagicMock() + get_response.content = _json.dumps(snapshot).encode("utf-8") + isolated_api_key_http_client.get = AsyncMock(return_value=get_response) + + response = await isolated_client.post( + f"/tasks/{stale_task.id}/rehydrate", + json={ + "task_id": stale_task.id, + "snapshot_url": "https://example.com/snapshot.json", + }, + ) + assert response.status_code == 204 + + # Round-trip identity holds: re-exported snapshot matches the original. + restored = (await isolated_client.get(f"/tasks/{stale_task.id}/export")).json() + assert restored == snapshot + + async def test_rehydrate_rejects_both_inline_and_url( + self, isolated_client, stale_task + ): + await isolated_client.post( + f"/tasks/{stale_task.id}/clean", json={"force": True} + ) + + response = await isolated_client.post( + f"/tasks/{stale_task.id}/rehydrate", + json={ + "task_id": stale_task.id, + "messages": [ + { + "id": orm_id(), + "task_id": stale_task.id, + "content": { + "type": "text", + "author": "user", + "content": "x", + }, + "streaming_status": "DONE", + } + ], + "snapshot_url": "https://example.com/snapshot.json", + }, + ) + # Pydantic validator rejects with 422 before reaching the handler. + assert response.status_code == 422 + + async def test_rehydrate_url_rejects_private_address( + self, isolated_client, stale_task + ): + await isolated_client.post( + f"/tasks/{stale_task.id}/clean", json={"force": True} + ) + + response = await isolated_client.post( + f"/tasks/{stale_task.id}/rehydrate", + json={ + "task_id": stale_task.id, + "snapshot_url": "https://127.0.0.1/snapshot.json", + }, + ) + assert response.status_code == 400 + assert "non-public" in response.json()["message"] diff --git a/agentex/tests/integration/fixtures/integration_client.py b/agentex/tests/integration/fixtures/integration_client.py index 4ba0ceb2..f1396d57 100644 --- a/agentex/tests/integration/fixtures/integration_client.py +++ b/agentex/tests/integration/fixtures/integration_client.py @@ -486,6 +486,7 @@ def create_task_retention_use_case(): "agent_task_tracker_repository" ], temporal_adapter=isolated_temporal_adapter, + httpx_client=isolated_api_key_http_client, ) return TaskRetentionUseCase(retention_service=retention_service) From 243924b63734ad416927c47f170fbe608ee2d647 Mon Sep 17 00:00:00 2001 From: Stas Moreinis Date: Tue, 26 May 2026 15:16:07 -0700 Subject: [PATCH 4/4] address review: reject empty rehydrate payloads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Greptile P1: the _exactly_one_source validator rejected "both inline AND url" but allowed "neither". A request with just task_id, empty messages/task_states, no snapshot_url passed validation, then rehydrate_task would silently skip all inserts and clear cleaned_at — leaving the task active with no content. Whatever was deleted at clean time is permanently gone. Validator now rejects empty payloads with the same shape of error as the "both" case. Two existing tests adjusted to send a real message so they reach the cleaned_at / task_id checks they were meant to exercise. New test_rehydrate_rejects_empty_payload codifies the empty-rejection so a future refactor can't silently regress. The "rehydrate an empty cleaned task" case is now unsupported. If that's ever needed, an explicit allow_empty flag is the right way to opt in; we have no use case for it today. --- agentex/src/api/schemas/task_retention.py | 8 ++++ .../task_retention/test_task_retention_api.py | 39 ++++++++++++++++++- 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/agentex/src/api/schemas/task_retention.py b/agentex/src/api/schemas/task_retention.py index f6341e1e..ed25f431 100644 --- a/agentex/src/api/schemas/task_retention.py +++ b/agentex/src/api/schemas/task_retention.py @@ -79,4 +79,12 @@ def _exactly_one_source(self): "Provide inline content (messages/task_states) OR snapshot_url, " "not both." ) + if not has_inline and not has_url: + # Empty payloads would silently clear cleaned_at without restoring + # anything — the task ends up active with no content, an + # inconsistent state. Force the caller to be explicit about source. + raise ValueError( + "Provide exactly one of: inline content (messages/task_states) " + "or snapshot_url." + ) return self diff --git a/agentex/tests/integration/api/task_retention/test_task_retention_api.py b/agentex/tests/integration/api/task_retention/test_task_retention_api.py index b2014422..9b90838f 100644 --- a/agentex/tests/integration/api/task_retention/test_task_retention_api.py +++ b/agentex/tests/integration/api/task_retention/test_task_retention_api.py @@ -274,7 +274,19 @@ async def test_round_trip_is_byte_identical( async def test_rehydrate_active_task_returns_400(self, isolated_client, stale_task): # Task was never cleaned; rehydrate must refuse. - payload = {"task_id": stale_task.id, "messages": [], "task_states": []} + # Payload needs a real message to pass the "exactly one source" validator. + payload = { + "task_id": stale_task.id, + "messages": [ + { + "id": orm_id(), + "task_id": stale_task.id, + "content": {"type": "text", "author": "user", "content": "x"}, + "streaming_status": "DONE", + } + ], + "task_states": [], + } response = await isolated_client.post( f"/tasks/{stale_task.id}/rehydrate", json=payload ) @@ -282,6 +294,22 @@ async def test_rehydrate_active_task_returns_400(self, isolated_client, stale_ta assert response.status_code == 400 assert "not in cleaned state" in response.json()["message"] + async def test_rehydrate_rejects_empty_payload(self, isolated_client, stale_task): + """ + An empty rehydrate (no inline content, no snapshot_url) would silently + clear cleaned_at without restoring anything — caught by the validator. + """ + await isolated_client.post( + f"/tasks/{stale_task.id}/clean", json={"force": True} + ) + + payload = {"task_id": stale_task.id, "messages": [], "task_states": []} + response = await isolated_client.post( + f"/tasks/{stale_task.id}/rehydrate", json=payload + ) + + assert response.status_code == 422 + async def test_rehydrate_task_id_mismatch_returns_400( self, isolated_client, stale_task ): @@ -292,7 +320,14 @@ async def test_rehydrate_task_id_mismatch_returns_400( payload = { "task_id": "00000000-0000-0000-0000-000000000000", - "messages": [], + "messages": [ + { + "id": orm_id(), + "task_id": "00000000-0000-0000-0000-000000000000", + "content": {"type": "text", "author": "user", "content": "x"}, + "streaming_status": "DONE", + } + ], "task_states": [], } response = await isolated_client.post(