Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ docs = [
"pdoc3>=0.10.0",
]
pg = [
# PostgreSQL-backed PgReplayStore (and future PgIdempotencyBackend).
# psycopg3 gives both sync + async client interfaces so the same dep
# serves the sync replay store today and an async one later.
# PostgreSQL-backed adcp.signing.PgReplayStore,
# adcp.decisioning.PostgresTaskRegistry (durable HITL task state), and
# future PgIdempotencyBackend. psycopg3 ships both sync + async pool
# interfaces so the single dep serves all three use cases.
"psycopg[binary]>=3.1.0",
"psycopg-pool>=3.2.0",
]
Expand All @@ -121,6 +122,7 @@ adcp = [
"py.typed",
"ADCP_VERSION",
"signing/pg/*.sql",
"decisioning/pg/*.sql",
# AdCP JSON schemas, mirrored from ``schemas/cache/`` by
# ``scripts/bundle_schemas.py`` so the wheel ships them for
# ``adcp.validation.schema_loader``.
Expand Down
27 changes: 27 additions & 0 deletions src/adcp/decisioning/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,32 @@ def create_media_buy(
WorkflowHandoff,
)

# Conditional import: PostgresTaskRegistry needs the [pg] extra. Always expose
# the name — when psycopg isn't installed we fall through to a stub class whose
# constructor raises ImportError with the install hint. Matches the pattern
# used by adcp.signing for PgReplayStore.
try:
from adcp.decisioning.pg import PostgresTaskRegistry # noqa: F401
except ImportError: # pragma: no cover — exercised by the [pg] extra tests
from typing import ClassVar as _ClassVar

class PostgresTaskRegistry: # type: ignore[no-redef]
"""Stub raised when ``adcp[pg]`` isn't installed.

Attempting to instantiate raises :class:`ImportError` with the
install-hint text from :mod:`adcp.decisioning.pg.task_registry`.
"""

is_durable: _ClassVar[bool] = True

def __init__(self, *args: object, **kwargs: object) -> None:
raise ImportError(
"PostgresTaskRegistry requires psycopg3 and psycopg-pool. "
"Install the 'pg' extra: `pip install 'adcp[pg]'` "
"(Poetry: `poetry add 'adcp[pg]'`)."
)


__all__ = [
"Account",
"AccountStore",
Expand Down Expand Up @@ -161,6 +187,7 @@ def create_media_buy(
"InMemoryTaskRegistry",
"MaybeAsync",
"OAuthCredential",
"PostgresTaskRegistry",
"Proposal",
"PropertyList",
"PropertyListReference",
Expand Down
13 changes: 11 additions & 2 deletions src/adcp/decisioning/pg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,17 @@
request to gate dispatch on the seller's commercial relationship
with the buyer agent (allowlist + onboarding state + billing
capabilities).
* :class:`PostgresTaskRegistry` — durable
:class:`~adcp.decisioning.TaskRegistry` for HITL task state. Survives
process restarts and is safe for multi-worker deployments sharing a
single Postgres database. Drop-in replacement for
:class:`~adcp.decisioning.InMemoryTaskRegistry` that satisfies the
production-mode durability gate.

The schema DDL ships alongside the Python code (e.g.
``adcp/decisioning/pg/buyer_agent_registry.sql``) so adopters can run
it through whatever migration tool they use (Alembic, Flyway, psql).
``adcp/decisioning/pg/buyer_agent_registry.sql``,
``adcp/decisioning/pg/decisioning_tasks.sql``) so adopters can run it
through whatever migration tool they use (Alembic, Flyway, psql).
"""

from __future__ import annotations
Expand All @@ -24,9 +31,11 @@
PG_AVAILABLE,
PgBuyerAgentRegistry,
)
from adcp.decisioning.pg.task_registry import PostgresTaskRegistry

__all__ = [
"DEFAULT_TABLE_NAME",
"PG_AVAILABLE",
"PgBuyerAgentRegistry",
"PostgresTaskRegistry",
]
32 changes: 32 additions & 0 deletions src/adcp/decisioning/pg/decisioning_tasks.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
-- AdCP decisioning task registry — durable HITL task state.
--
-- Run this once per deployment. Tracked by PostgresTaskRegistry;
-- see src/adcp/decisioning/pg/task_registry.py for the query shapes
-- the Python code executes.
--
-- COLLATE "C" on identifier columns avoids locale-dependent case
-- folding — on some locales "Task-A" and "task-a" compare equal,
-- which could collapse distinct task_ids or account_ids. "C" is the
-- byte-for-byte comparison we actually want.
--
-- Alternatively, call PostgresTaskRegistry.create_schema() from
-- application code — it runs the equivalent DDL idempotently on boot.

CREATE TABLE IF NOT EXISTS decisioning_tasks (
task_id TEXT COLLATE "C" NOT NULL PRIMARY KEY,
account_id TEXT COLLATE "C" NOT NULL,
state TEXT NOT NULL DEFAULT 'submitted',
task_type TEXT NOT NULL,
progress JSONB,
result JSONB,
error JSONB,
-- Unix epoch seconds (float), matches TaskRecord.created_at/updated_at
-- so Python round-trips the value without lossy TIMESTAMPTZ conversion.
created_at DOUBLE PRECISION NOT NULL,
updated_at DOUBLE PRECISION NOT NULL
);

-- Supports the cross-tenant get() query: WHERE task_id = $1 AND account_id = $2.
-- Without this index, every tasks/get is a full-table scan on account_id.
CREATE INDEX IF NOT EXISTS decisioning_tasks_account_idx
ON decisioning_tasks (account_id);
Loading
Loading