diff --git a/examples/v3_reference_seller/README.md b/examples/v3_reference_seller/README.md index 0f307bbbf..69809601b 100644 --- a/examples/v3_reference_seller/README.md +++ b/examples/v3_reference_seller/README.md @@ -147,15 +147,79 @@ exist; the reference seller wires the simpler defaults: in `src/app.py` for production durability. Both classes ship in the SDK; this seller's `app.py` uses the in-memory variants for fast iteration. -- **Alembic migrations** — `Base.metadata.create_all` runs at boot - (idempotent on table existence — it does NOT detect column - renames or type changes on existing tables). Adopters who - prototyped against earlier branches and pulled new column - changes should drop and recreate the dev database; production - sellers wire Alembic and version their schema changes. - **Admin CRUD API** — separate Starlette app for tenant / agent CRUD. Patterns to come; for now use `seed.py` and direct SQL. +## Migrations + +The app boots with `Base.metadata.create_all` — idempotent on table +existence, but **blind to column renames, type changes, and new columns +on existing tables**. For local fast-iteration this is fine. Once you +have production data, use Alembic to evolve the schema safely. + +> ⚠️ **`create_all` is unsafe for schema evolution once production data +> exists.** Column renames and type changes applied after first boot +> will not be detected and will silently leave the schema stale. + +### Install Alembic + +```bash +pip install alembic +# or, if using a requirements file: +echo "alembic" >> requirements.txt && pip install -r requirements.txt +``` + +### Apply migrations + +```bash +cd examples/v3_reference_seller + +# Apply all pending migrations (run after every git pull that touches models). +DATABASE_URL=postgresql+asyncpg://postgres@localhost/adcp python -m migrate + +# Equivalent direct alembic invocation: +DATABASE_URL=postgresql+asyncpg://postgres@localhost/adcp alembic upgrade head +``` + +### Generate a new migration after changing models + +```bash +cd examples/v3_reference_seller +DATABASE_URL=postgresql+asyncpg://postgres@localhost/adcp \ + alembic revision --autogenerate -m "describe your change" +``` + +Alembic compares the live database to `Base.metadata` and emits a +migration file under `alembic/versions/`. **Always review the generated +file before committing** — autogenerate misses some constructs (partial +index predicates, custom CHECK constraints, server defaults). + +> ⚙️ **Adding a new model file?** Import it in `alembic/env.py` alongside +> `src.models` and `src.audit`, or autogenerate will silently omit its +> tables from the migration. + +### Roll back + +```bash +# Roll back one step. +DATABASE_URL=postgresql+asyncpg://postgres@localhost/adcp alembic downgrade -1 + +# Roll back to before any migrations (drops all tables defined in this schema). +DATABASE_URL=postgresql+asyncpg://postgres@localhost/adcp alembic downgrade base +``` + +> ⚠️ **`downgrade` in production is irreversible without a data backup.** +> Take a snapshot before running downgrade against any database that +> holds real data. + +### Run migration integration tests + +```bash +# Uses a throw-away database (adcp_test) so the migration run starts clean. +DATABASE_URL=postgresql+asyncpg://postgres@localhost/adcp_test \ + pytest examples/v3_reference_seller/tests/test_migrations.py -m integration -v +``` + ## Customization Adopters typically change: diff --git a/examples/v3_reference_seller/alembic.ini b/examples/v3_reference_seller/alembic.ini new file mode 100644 index 000000000..bf6bec661 --- /dev/null +++ b/examples/v3_reference_seller/alembic.ini @@ -0,0 +1,50 @@ +# Alembic configuration for the v3 reference seller example. +# +# Run from the examples/v3_reference_seller/ directory: +# +# DATABASE_URL=postgresql+asyncpg://... alembic upgrade head +# +# When embedding this example inside a larger repo, update +# script_location to an absolute path (e.g. /path/to/alembic) so +# Alembic can find the migration scripts regardless of cwd. + +[alembic] +script_location = alembic + +# DATABASE_URL is read from the environment in env.py — leave this +# blank so it is never accidentally hardcoded in version control. +sqlalchemy.url = + +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/examples/v3_reference_seller/alembic/env.py b/examples/v3_reference_seller/alembic/env.py new file mode 100644 index 000000000..5a7a1aee2 --- /dev/null +++ b/examples/v3_reference_seller/alembic/env.py @@ -0,0 +1,104 @@ +"""Alembic environment for the v3 reference seller. + +Uses SQLAlchemy's async engine (asyncpg) via the standard Alembic +async pattern. Run from the examples/v3_reference_seller/ directory: + + DATABASE_URL=postgresql+asyncpg://... alembic upgrade head + +For autogenerate to capture every table, both src.models and src.audit +must be imported before target_metadata is read. Missing either import +silently omits that module's tables from the generated migration. +""" + +from __future__ import annotations + +import asyncio +import os +import sys +from logging.config import fileConfig +from pathlib import Path + +from alembic import context +from sqlalchemy.ext.asyncio import create_async_engine + +# --------------------------------------------------------------------------- +# Path wiring — make ``src.*`` importable when env.py is executed from the +# examples/v3_reference_seller/ directory by the ``alembic`` CLI. +# --------------------------------------------------------------------------- +_HERE = Path(__file__).resolve().parent.parent # examples/v3_reference_seller/ +if str(_HERE) not in sys.path: + sys.path.insert(0, str(_HERE)) + +# Import all ORM modules so their tables appear in Base.metadata. +# Adding a new model file? Import it here or autogenerate will miss it. +import src.audit # noqa: E402, F401 — registers AuditEventRow on Base.metadata +from src.models import Base # noqa: E402 + +target_metadata = Base.metadata + +# --------------------------------------------------------------------------- +# Alembic config +# --------------------------------------------------------------------------- +config = context.config + +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +# DATABASE_URL comes from the environment; never hardcode it here. +try: + _db_url: str = os.environ["DATABASE_URL"] +except KeyError: + raise RuntimeError( + "DATABASE_URL environment variable is not set. " + "Example: DATABASE_URL=postgresql+asyncpg://postgres@localhost/adcp alembic upgrade head" + ) from None +config.set_main_option("sqlalchemy.url", _db_url) + + +# --------------------------------------------------------------------------- +# Migration helpers +# --------------------------------------------------------------------------- + +def run_migrations_offline() -> None: + """Emit SQL to stdout rather than connecting to the DB. + + Useful for generating a migration script to review or apply manually. + """ + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + compare_type=True, + ) + with context.begin_transaction(): + context.run_migrations() + + +def do_run_migrations(connection) -> None: + context.configure( + connection=connection, + target_metadata=target_metadata, + compare_type=True, + ) + with context.begin_transaction(): + context.run_migrations() + + +async def run_migrations_online() -> None: + """Create an async engine and run migrations inside a sync wrapper. + + Alembic's migration functions are synchronous; ``run_sync`` bridges + the gap so we can use an asyncpg engine end-to-end. + """ + connectable = create_async_engine(_db_url, echo=False) + async with connectable.connect() as connection: + await connection.run_sync(do_run_migrations) + await connectable.dispose() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + asyncio.run(run_migrations_online()) diff --git a/examples/v3_reference_seller/alembic/script.py.mako b/examples/v3_reference_seller/alembic/script.py.mako new file mode 100644 index 000000000..ee746cf6d --- /dev/null +++ b/examples/v3_reference_seller/alembic/script.py.mako @@ -0,0 +1,28 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from __future__ import annotations + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision: str = ${repr(up_revision)} +down_revision: Union[str, None] = ${repr(down_revision)} +branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)} +depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)} + + +def upgrade() -> None: + ${upgrades if upgrades else "pass"} + + +def downgrade() -> None: + ${downgrades if downgrades else "pass"} diff --git a/examples/v3_reference_seller/alembic/versions/0001_initial_schema.py b/examples/v3_reference_seller/alembic/versions/0001_initial_schema.py new file mode 100644 index 000000000..f1aa14083 --- /dev/null +++ b/examples/v3_reference_seller/alembic/versions/0001_initial_schema.py @@ -0,0 +1,163 @@ +"""initial schema + +Revision ID: 0001 +Revises: +Create Date: 2026-05-03 + +Generated from the dev schema — review before applying to production. +Captures all five tables declared by the v3 reference seller: + tenants, buyer_agents, accounts, media_buys, audit_events. + +""" + +from __future__ import annotations + +from collections.abc import Sequence + +import sqlalchemy as sa +from alembic import op + +revision: str = "0001" +down_revision: str | None = None +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + op.create_table( + "tenants", + sa.Column("id", sa.String(64), nullable=False), + sa.Column("host", sa.String(255), nullable=False), + sa.Column("display_name", sa.String(255), nullable=False), + sa.Column("status", sa.String(32), nullable=False), + sa.Column("ext", sa.JSON(), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), + sa.CheckConstraint( + "status IN ('active', 'suspended', 'archived')", + name="tenants_status_ck", + ), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("host"), + ) + op.create_index("tenants_host_idx", "tenants", ["host"]) + + op.create_table( + "buyer_agents", + sa.Column("id", sa.String(64), nullable=False), + sa.Column("tenant_id", sa.String(64), nullable=False), + sa.Column("agent_url", sa.String(512), nullable=False), + sa.Column("display_name", sa.String(255), nullable=False), + sa.Column("status", sa.String(32), nullable=False), + sa.Column("billing_capabilities", sa.JSON(), nullable=False), + sa.Column("api_key_id", sa.String(128), nullable=True), + sa.Column("default_terms", sa.JSON(), nullable=True), + sa.Column("allowed_brands", sa.JSON(), nullable=True), + sa.Column("ext", sa.JSON(), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), + sa.CheckConstraint( + "status IN ('active', 'suspended', 'blocked')", + name="buyer_agents_status_ck", + ), + sa.ForeignKeyConstraint(["tenant_id"], ["tenants.id"], ondelete="CASCADE"), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("tenant_id", "agent_url", name="buyer_agents_tenant_agent_uk"), + ) + op.create_index("buyer_agents_tenant_idx", "buyer_agents", ["tenant_id"]) + op.create_index( + "buyer_agents_api_key_idx", + "buyer_agents", + ["api_key_id"], + postgresql_where=sa.text("api_key_id IS NOT NULL"), + ) + + op.create_table( + "accounts", + sa.Column("id", sa.String(64), nullable=False), + sa.Column("tenant_id", sa.String(64), nullable=False), + sa.Column("buyer_agent_id", sa.String(64), nullable=False), + sa.Column("account_id", sa.String(255), nullable=False), + sa.Column("name", sa.String(255), nullable=False), + sa.Column("status", sa.String(32), nullable=False), + sa.Column("billing", sa.String(32), nullable=True), + sa.Column("billing_entity", sa.JSON(), nullable=True), + sa.Column("reporting_bucket", sa.JSON(), nullable=True), + sa.Column("rate_card", sa.String(64), nullable=True), + sa.Column("payment_terms", sa.String(32), nullable=True), + sa.Column("credit_limit", sa.JSON(), nullable=True), + sa.Column("sandbox", sa.Boolean(), nullable=False), + sa.Column("ext", sa.JSON(), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), + sa.CheckConstraint( + "status IN ('active', 'pending_approval', 'suspended', 'closed')", + name="accounts_status_ck", + ), + sa.ForeignKeyConstraint(["tenant_id"], ["tenants.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint( + ["buyer_agent_id"], ["buyer_agents.id"], ondelete="RESTRICT" + ), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("tenant_id", "account_id", name="accounts_tenant_acct_uk"), + ) + op.create_index("accounts_tenant_idx", "accounts", ["tenant_id"]) + op.create_index("accounts_buyer_agent_idx", "accounts", ["buyer_agent_id"]) + + op.create_table( + "media_buys", + sa.Column("id", sa.BigInteger(), autoincrement=True, nullable=False), + sa.Column("tenant_id", sa.String(64), nullable=False), + sa.Column("account_id", sa.String(64), nullable=False), + sa.Column("media_buy_id", sa.String(64), nullable=False), + sa.Column("idempotency_key", sa.String(255), nullable=False), + sa.Column("status", sa.String(32), nullable=False), + sa.Column("brand_domain", sa.String(255), nullable=True), + sa.Column("total_budget", sa.Float(), nullable=True), + sa.Column("currency", sa.String(3), nullable=True), + sa.Column("start_time", sa.DateTime(timezone=True), nullable=True), + sa.Column("end_time", sa.DateTime(timezone=True), nullable=True), + sa.Column("request_snapshot", sa.JSON(), nullable=True), + sa.Column("response_snapshot", sa.JSON(), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), + sa.ForeignKeyConstraint(["tenant_id"], ["tenants.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint(["account_id"], ["accounts.id"], ondelete="RESTRICT"), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("media_buy_id"), + sa.UniqueConstraint("tenant_id", "idempotency_key", name="media_buys_idem_uk"), + ) + op.create_index("media_buys_tenant_idx", "media_buys", ["tenant_id"]) + op.create_index("media_buys_account_idx", "media_buys", ["account_id"]) + + op.create_table( + "audit_events", + sa.Column("id", sa.BigInteger(), autoincrement=True, nullable=False), + sa.Column("occurred_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("tenant_id", sa.String(64), nullable=True), + sa.Column("request_id", sa.String(255), nullable=True), + sa.Column("operation", sa.String(64), nullable=False), + sa.Column("caller_identity", sa.String(512), nullable=True), + sa.Column("success", sa.Boolean(), nullable=False), + sa.Column("error_type", sa.String(128), nullable=True), + sa.Column("error_message", sa.String(255), nullable=True), + sa.Column("details", sa.JSON(), nullable=True), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index( + "audit_events_tenant_idx", "audit_events", ["tenant_id", "occurred_at"] + ) + op.create_index( + "audit_events_operation_idx", "audit_events", ["operation", "occurred_at"] + ) + op.create_index( + "audit_events_caller_idx", "audit_events", ["caller_identity", "occurred_at"] + ) + + +def downgrade() -> None: + op.drop_table("audit_events") + op.drop_table("media_buys") + op.drop_table("accounts") + op.drop_table("buyer_agents") + op.drop_table("tenants") diff --git a/examples/v3_reference_seller/alembic/versions/0002_broadening_cycle.py b/examples/v3_reference_seller/alembic/versions/0002_broadening_cycle.py new file mode 100644 index 000000000..8db142ebb --- /dev/null +++ b/examples/v3_reference_seller/alembic/versions/0002_broadening_cycle.py @@ -0,0 +1,95 @@ +"""broadening cycle: invoice_recipient, creatives, performance_feedback + +Revision ID: 0002 +Revises: 0001 +Create Date: 2026-05-03 + +Adds the three schema additions introduced alongside the v3 broadening +cycle (PR #408): + + - media_buys.invoice_recipient — JSON, nullable; per-buy invoice + override (bank details write-only + on response, durable on storage) + - creatives — seller-side creative registry; + idempotency-keyed on + (tenant_id, creative_id) + - performance_feedback — buyer-supplied performance signals + FK'd to media_buys.id + +Downgrade reverses all three steps safely (performance_feedback first +to satisfy FK order, then creatives, then the column drop). + +""" + +from __future__ import annotations + +from collections.abc import Sequence + +import sqlalchemy as sa +from alembic import op + +revision: str = "0002" +down_revision: str | None = "0001" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + # 1. Add invoice_recipient to media_buys. + op.add_column( + "media_buys", + sa.Column("invoice_recipient", sa.JSON(), nullable=True), + ) + + # 2. Create creatives table. + op.create_table( + "creatives", + sa.Column("id", sa.String(64), nullable=False), + sa.Column("tenant_id", sa.String(64), nullable=False), + sa.Column("account_id", sa.String(64), nullable=False), + sa.Column("creative_id", sa.String(255), nullable=False), + sa.Column("name", sa.String(255), nullable=False), + sa.Column("format_id", sa.JSON(), nullable=False), + sa.Column("status", sa.String(32), nullable=False), + sa.Column("manifest_json", sa.JSON(), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), + sa.ForeignKeyConstraint(["tenant_id"], ["tenants.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint(["account_id"], ["accounts.id"], ondelete="CASCADE"), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint( + "tenant_id", "creative_id", name="creatives_tenant_creative_uk" + ), + ) + op.create_index("creatives_tenant_idx", "creatives", ["tenant_id"]) + op.create_index("creatives_account_idx", "creatives", ["account_id"]) + + # 3. Create performance_feedback table. + op.create_table( + "performance_feedback", + sa.Column("id", sa.BigInteger(), autoincrement=True, nullable=False), + sa.Column("tenant_id", sa.String(64), nullable=False), + sa.Column("media_buy_id", sa.BigInteger(), nullable=False), + sa.Column("feedback_type", sa.String(64), nullable=False), + sa.Column("value", sa.JSON(), nullable=False), + sa.Column("occurred_at", sa.DateTime(timezone=True), nullable=False), + sa.ForeignKeyConstraint(["tenant_id"], ["tenants.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint( + ["media_buy_id"], ["media_buys.id"], ondelete="CASCADE" + ), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index( + "performance_feedback_tenant_idx", "performance_feedback", ["tenant_id"] + ) + op.create_index( + "performance_feedback_media_buy_idx", + "performance_feedback", + ["media_buy_id"], + ) + + +def downgrade() -> None: + op.drop_table("performance_feedback") + op.drop_table("creatives") + op.drop_column("media_buys", "invoice_recipient") diff --git a/examples/v3_reference_seller/migrate.py b/examples/v3_reference_seller/migrate.py new file mode 100644 index 000000000..61f53db51 --- /dev/null +++ b/examples/v3_reference_seller/migrate.py @@ -0,0 +1,60 @@ +"""Migration helper for the v3 reference seller. + +Runs ``alembic upgrade head`` programmatically using the DATABASE_URL +environment variable. Intended as a standalone script for production +deployments and CI — not the default app boot path. + +Usage:: + + DATABASE_URL=postgresql+asyncpg://postgres@localhost/adcp python -m migrate + +The app itself still uses ``Base.metadata.create_all`` for fast local +iteration (see src/app.py). Switch to this script when you have +production data you need to preserve across schema changes. + +Requires ``alembic`` to be installed:: + + pip install alembic +""" + +from __future__ import annotations + +import os +import sys +from pathlib import Path + + +def main() -> None: + db_url = os.environ.get("DATABASE_URL") + if not db_url: + print( + "ERROR: DATABASE_URL is not set.\n" + "Example: DATABASE_URL=postgresql+asyncpg://postgres@localhost/adcp python -m migrate", + file=sys.stderr, + ) + sys.exit(1) + + try: + from alembic import command + from alembic.config import Config + except ImportError: + print( + "ERROR: alembic is not installed. Run: pip install alembic", + file=sys.stderr, + ) + sys.exit(1) + + ini_path = Path(__file__).parent / "alembic.ini" + alembic_cfg = Config(str(ini_path)) + alembic_cfg.set_main_option("sqlalchemy.url", db_url) + + from sqlalchemy.engine.url import make_url + + safe_url = make_url(db_url).render_as_string(hide_password=True) + print(f"Running alembic upgrade head against {safe_url!r}...") + command.upgrade(alembic_cfg, "head") + print("Migrations complete.") + + +if __name__ == "__main__": + main() diff --git a/examples/v3_reference_seller/tests/test_migrations.py b/examples/v3_reference_seller/tests/test_migrations.py new file mode 100644 index 000000000..4d72c8283 --- /dev/null +++ b/examples/v3_reference_seller/tests/test_migrations.py @@ -0,0 +1,118 @@ +"""Integration tests for Alembic migrations. + +Requires a real Postgres instance. Skipped unless DATABASE_URL is set. + + DATABASE_URL=postgresql+asyncpg://postgres@localhost/adcp_test \\ + pytest examples/v3_reference_seller/tests/test_migrations.py -m integration -v + +Use a throw-away database (``adcp_test``, not ``adcp``) so the migration +run starts from a clean slate without touching the dev database. + +The integration marker is declared in pyproject.toml; no extra marker +registration is needed for a fresh fork. +""" + +from __future__ import annotations + +import asyncio +import os +import sys +from pathlib import Path + +import pytest + +_HERE = Path(__file__).resolve().parent.parent +if str(_HERE) not in sys.path: + sys.path.insert(0, str(_HERE)) + + +@pytest.fixture(scope="module") +def db_url() -> str: + url = os.environ.get("DATABASE_URL") + if not url: + pytest.skip("DATABASE_URL not set — skipping migration integration tests") + return url # type: ignore[return-value] # pytest.skip() never returns + + +# --------------------------------------------------------------------------- +# Async helpers — all DB access goes through asyncpg (no psycopg2 needed). +# --------------------------------------------------------------------------- + +async def _wipe_schema(db_url: str) -> None: + from sqlalchemy import text + from sqlalchemy.ext.asyncio import create_async_engine + + engine = create_async_engine(db_url) + async with engine.begin() as conn: + await conn.execute(text("DROP SCHEMA public CASCADE")) + await conn.execute(text("CREATE SCHEMA public")) + await engine.dispose() + + +async def _get_table_names(db_url: str) -> set[str]: + from sqlalchemy import text + from sqlalchemy.ext.asyncio import create_async_engine + + engine = create_async_engine(db_url) + async with engine.connect() as conn: + result = await conn.execute( + text("SELECT tablename FROM pg_tables WHERE schemaname = 'public'") + ) + tables = {row[0] for row in result} + await engine.dispose() + return tables + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +@pytest.mark.integration +def test_upgrade_head_creates_all_tables(db_url: str) -> None: + """Running ``alembic upgrade head`` on a clean database creates all eight tables.""" + from alembic import command + from alembic.config import Config + + ini_path = _HERE / "alembic.ini" + alembic_cfg = Config(str(ini_path)) + alembic_cfg.set_main_option("sqlalchemy.url", db_url) + + # Wipe schema so we always start clean. + asyncio.run(_wipe_schema(db_url)) + + # Run all migrations. + command.upgrade(alembic_cfg, "head") + + # Spot-check: all eight tables must exist. + table_names = asyncio.run(_get_table_names(db_url)) + expected = { + "tenants", + "buyer_agents", + "accounts", + "media_buys", + "audit_events", + "creatives", + "performance_feedback", + } + assert expected <= table_names, f"Missing tables: {expected - table_names}" + + +@pytest.mark.integration +def test_downgrade_base_removes_all_tables(db_url: str) -> None: + """Running ``alembic downgrade base`` drops every table cleanly.""" + from alembic import command + from alembic.config import Config + + ini_path = _HERE / "alembic.ini" + alembic_cfg = Config(str(ini_path)) + alembic_cfg.set_main_option("sqlalchemy.url", db_url) + + # Start from head so downgrade has something to remove. + asyncio.run(_wipe_schema(db_url)) + command.upgrade(alembic_cfg, "head") + + command.downgrade(alembic_cfg, "base") + + remaining = asyncio.run(_get_table_names(db_url)) + # Only the alembic_version bookkeeping table may remain. + assert remaining <= {"alembic_version"}, f"Unexpected tables remain: {remaining}"