From 8e8043debe011c97ee0bd4f7528bb05523fd14bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20Rodrigues=20Mendon=C3=A7a?= Date: Fri, 20 Mar 2026 15:18:44 -0300 Subject: [PATCH 1/6] feat: criar runner generico de migrations com auditoria e rollback Runner unico (scripts/migrate.py) com CLI via typer para gerenciar migrations SQL e Python. Substitui abordagem ad-hoc de scripts individuais por sistema padronizado. Funcionalidades: - Discovery automatico de migrations por convencao de nome (NNN_desc.sql/py) - Tabela migration_history com auditoria completa (quem, quando, duracao) - Bootstrap automatico: importa schema_version na primeira execucao - Commit atomico: migration + registro de historico na mesma transacao - Comandos CLI: status, migrate, rollback, history, validate - Suporte a --dry-run em todos os comandos destrutivos - Rollback via _rollback.sql (SQL) ou rollback() (Python) 26 testes unitarios cobrindo: discovery, bootstrap, pending, execucao SQL/Python, rollback, validate. Ref: #109 Co-Authored-By: Claude Opus 4.6 --- scripts/migrate.py | 601 ++++++++++++++++++++++++++++++ tests/unit/test_migrate_runner.py | 519 ++++++++++++++++++++++++++ 2 files changed, 1120 insertions(+) create mode 100644 scripts/migrate.py create mode 100644 tests/unit/test_migrate_runner.py diff --git a/scripts/migrate.py b/scripts/migrate.py new file mode 100644 index 0000000..e814d37 --- /dev/null +++ b/scripts/migrate.py @@ -0,0 +1,601 @@ +#!/usr/bin/env python3 +""" +Generic database migration runner for destaquesgovbr/data-platform. + +Supports SQL (.sql) and Python (.py) migrations discovered by naming convention. +Provides audit history, dry-run, rollback, and validation. + +Usage: + python scripts/migrate.py status + python scripts/migrate.py migrate [--dry-run] [--target VERSION] + python scripts/migrate.py rollback VERSION [--dry-run] + python scripts/migrate.py history + python scripts/migrate.py validate +""" + +import importlib.util +import json +import os +import re +import sys +import time +from dataclasses import dataclass +from pathlib import Path + +from loguru import logger + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +MIGRATION_PATTERN = re.compile(r"^(\d{3})_(.+)\.(sql|py)$") +ROLLBACK_SUFFIX = "_rollback.sql" + +MIGRATIONS_DIR = Path(__file__).parent / "migrations" + +CREATE_MIGRATION_HISTORY_SQL = """ +CREATE TABLE IF NOT EXISTS migration_history ( + id SERIAL PRIMARY KEY, + version VARCHAR(10) NOT NULL, + name VARCHAR(255) NOT NULL, + migration_type VARCHAR(10) NOT NULL CHECK (migration_type IN ('sql', 'python')), + operation VARCHAR(10) NOT NULL CHECK (operation IN ('migrate', 'rollback', 'dry_run')), + status VARCHAR(20) NOT NULL CHECK (status IN ('success', 'failed', 'unavailable')), + started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + finished_at TIMESTAMPTZ, + duration_ms INTEGER, + applied_by TEXT NOT NULL, + run_id TEXT, + description TEXT, + execution_details JSONB, + error_message TEXT +); + +CREATE INDEX IF NOT EXISTS idx_mh_version ON migration_history(version); +CREATE INDEX IF NOT EXISTS idx_mh_started_at ON migration_history(started_at DESC); +""" + +CREATE_MIGRATION_STATUS_VIEW_SQL = """ +CREATE OR REPLACE VIEW migration_status AS +SELECT DISTINCT ON (version) + version, name, migration_type, operation, status, applied_by, started_at, duration_ms +FROM migration_history +WHERE status = 'success' +ORDER BY version, started_at DESC; +""" + +TABLE_EXISTS_SQL = """ +SELECT EXISTS ( + SELECT 1 FROM information_schema.tables + WHERE table_schema = 'public' AND table_name = %s +) +""" + +IMPORT_SCHEMA_VERSION_SQL = """ +INSERT INTO migration_history (version, name, migration_type, operation, status, applied_by, description) +SELECT + sv.version, + 'schema_version_import', + 'sql', + 'migrate', + 'success', + 'bootstrap', + sv.description +FROM schema_version sv +WHERE NOT EXISTS ( + SELECT 1 FROM migration_history mh + WHERE mh.applied_by = 'bootstrap' AND mh.version = sv.version +) +""" + +RECORD_HISTORY_SQL = """ +INSERT INTO migration_history + (version, name, migration_type, operation, status, started_at, finished_at, + duration_ms, applied_by, run_id, description, execution_details, error_message) +VALUES (%s, %s, %s, %s, %s, %s, NOW(), %s, %s, %s, %s, %s, %s) +""" + +GET_APPLIED_VERSIONS_SQL = """ +SELECT DISTINCT version FROM migration_status WHERE operation = 'migrate' +""" + + +# --------------------------------------------------------------------------- +# Data structures +# --------------------------------------------------------------------------- + +@dataclass +class MigrationInfo: + version: str + name: str + path: Path + migration_type: str # 'sql' or 'python' + rollback_path: Path | None + + +# --------------------------------------------------------------------------- +# Discovery +# --------------------------------------------------------------------------- + +def discover_migrations(migrations_dir: Path) -> list[MigrationInfo]: + """Discover migration files in a directory by naming convention.""" + if not migrations_dir.exists(): + return [] + + migrations = {} + rollbacks = {} + + for f in sorted(migrations_dir.iterdir()): + name = f.name + + # Collect rollback files separately + if name.endswith(ROLLBACK_SUFFIX): + match = re.match(r"^(\d{3})_", name) + if match: + rollbacks[match.group(1)] = f + continue + + match = MIGRATION_PATTERN.match(name) + if match: + version = match.group(1) + desc = match.group(2) + mtype = "python" if match.group(3) == "py" else "sql" + migrations[version] = MigrationInfo( + version=version, + name=desc, + path=f, + migration_type=mtype, + rollback_path=None, + ) + + # Associate rollback files + for version, rollback_path in rollbacks.items(): + if version in migrations: + migrations[version].rollback_path = rollback_path + + return sorted(migrations.values(), key=lambda m: m.version) + + +# --------------------------------------------------------------------------- +# Bootstrap +# --------------------------------------------------------------------------- + +def bootstrap(conn) -> None: + """Create migration_history table and import schema_version if present.""" + cursor = conn.cursor() + try: + # Check if migration_history already exists + cursor.execute(TABLE_EXISTS_SQL, ("migration_history",)) + exists = cursor.fetchone()[0] + if exists: + return + + # Create table and view + cursor.execute(CREATE_MIGRATION_HISTORY_SQL) + cursor.execute(CREATE_MIGRATION_STATUS_VIEW_SQL) + + # Import from schema_version if it exists + cursor.execute(TABLE_EXISTS_SQL, ("schema_version",)) + sv_exists = cursor.fetchone()[0] + if sv_exists: + cursor.execute(IMPORT_SCHEMA_VERSION_SQL) + logger.info("Imported schema_version entries into migration_history") + + conn.commit() + logger.info("Bootstrap complete: migration_history table created") + except Exception: + conn.rollback() + raise + finally: + cursor.close() + + +# --------------------------------------------------------------------------- +# Status / Pending +# --------------------------------------------------------------------------- + +def get_pending( + conn, migrations: list[MigrationInfo], target: str | None = None +) -> list[MigrationInfo]: + """Return migrations that haven't been applied yet.""" + cursor = conn.cursor() + try: + cursor.execute(GET_APPLIED_VERSIONS_SQL) + applied = {row[0] for row in cursor.fetchall()} + finally: + cursor.close() + + pending = [m for m in migrations if m.version not in applied] + + if target: + pending = [m for m in pending if m.version <= target] + + return pending + + +# --------------------------------------------------------------------------- +# Record history +# --------------------------------------------------------------------------- + +def _record_history( + conn, + migration: MigrationInfo, + operation: str, + status: str, + started_at: float, + applied_by: str, + run_id: str | None, + description: str | None = None, + execution_details: dict | None = None, + error_message: str | None = None, +) -> None: + """Insert a record into migration_history.""" + duration_ms = int((time.time() - started_at) * 1000) + cursor = conn.cursor() + try: + from datetime import datetime, timezone + + started_dt = datetime.fromtimestamp(started_at, tz=timezone.utc) + cursor.execute( + RECORD_HISTORY_SQL, + ( + migration.version, + migration.name, + migration.migration_type, + operation, + status, + started_dt, + duration_ms, + applied_by, + run_id, + description, + json.dumps(execution_details) if execution_details else None, + error_message, + ), + ) + finally: + cursor.close() + + +# --------------------------------------------------------------------------- +# Execute migration +# --------------------------------------------------------------------------- + +def _load_python_module(path: Path): + """Dynamically load a Python migration module.""" + module_name = f"migration_{path.stem}" + spec = importlib.util.spec_from_file_location(module_name, path) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + + +def execute_migration( + conn, + migration: MigrationInfo, + dry_run: bool, + applied_by: str, + run_id: str | None, +) -> None: + """Execute a single migration (SQL or Python) with atomic commit.""" + operation = "dry_run" if dry_run else "migrate" + started_at = time.time() + description = None + execution_details = None + + logger.info(f"{'[DRY RUN] ' if dry_run else ''}Executing {migration.version}_{migration.name} ({migration.migration_type})") + + try: + if migration.migration_type == "sql": + sql_content = migration.path.read_text() + cursor = conn.cursor() + try: + cursor.execute(sql_content) + finally: + cursor.close() + else: + # Python migration + module = _load_python_module(migration.path) + if not hasattr(module, "describe"): + raise AttributeError( + f"Python migration {migration.path.name} must define describe()" + ) + description = module.describe() + result = module.migrate(conn, dry_run=dry_run) + execution_details = result if isinstance(result, dict) else None + + if dry_run: + _record_history( + conn, migration, operation, "success", started_at, + applied_by, run_id, description, execution_details, + ) + conn.rollback() + logger.info(f"[DRY RUN] {migration.version} previewed (rolled back)") + else: + _record_history( + conn, migration, operation, "success", started_at, + applied_by, run_id, description, execution_details, + ) + conn.commit() + logger.info(f"{migration.version}_{migration.name} applied successfully") + + except Exception as e: + conn.rollback() + # Record failure in a separate transaction + try: + _record_history( + conn, migration, operation, "failed", started_at, + applied_by, run_id, description, error_message=str(e), + ) + conn.commit() + except Exception: + logger.warning("Could not record failure in migration_history") + raise + + +# --------------------------------------------------------------------------- +# Execute rollback +# --------------------------------------------------------------------------- + +def execute_rollback( + conn, + migration: MigrationInfo, + dry_run: bool, + applied_by: str, + run_id: str | None, +) -> None: + """Execute rollback for a single migration.""" + operation = "rollback" + started_at = time.time() + description = None + execution_details = None + + logger.info(f"{'[DRY RUN] ' if dry_run else ''}Rolling back {migration.version}_{migration.name}") + + try: + if migration.migration_type == "sql": + if not migration.rollback_path or not migration.rollback_path.exists(): + raise FileNotFoundError( + f"No rollback file for SQL migration {migration.version}_{migration.name}. " + f"Expected: {migration.version}_{migration.name}_rollback.sql" + ) + sql_content = migration.rollback_path.read_text() + cursor = conn.cursor() + try: + cursor.execute(sql_content) + finally: + cursor.close() + else: + # Python migration + module = _load_python_module(migration.path) + try: + result = module.rollback(conn, dry_run=dry_run) + execution_details = result if isinstance(result, dict) else None + except NotImplementedError as nie: + _record_history( + conn, migration, operation, "unavailable", started_at, + applied_by, run_id, description, + error_message=str(nie), + ) + conn.commit() + logger.warning(f"{migration.version} rollback unavailable: {nie}") + return + + if dry_run: + conn.rollback() + logger.info(f"[DRY RUN] {migration.version} rollback previewed") + else: + _record_history( + conn, migration, operation, "success", started_at, + applied_by, run_id, description, execution_details, + ) + conn.commit() + logger.info(f"{migration.version}_{migration.name} rolled back successfully") + + except (FileNotFoundError, ValueError): + raise + except Exception as e: + conn.rollback() + try: + _record_history( + conn, migration, operation, "failed", started_at, + applied_by, run_id, description, error_message=str(e), + ) + conn.commit() + except Exception: + logger.warning("Could not record rollback failure in migration_history") + raise + + +# --------------------------------------------------------------------------- +# Validate +# --------------------------------------------------------------------------- + +def validate_migrations(migrations: list[MigrationInfo]) -> list[str]: + """Check for sequence gaps and other issues.""" + issues = [] + if not migrations: + return issues + + versions = [int(m.version) for m in migrations] + for i in range(len(versions) - 1): + if versions[i + 1] - versions[i] > 1: + for gap in range(versions[i] + 1, versions[i + 1]): + issues.append(f"Sequence gap: migration {gap:03d} is missing") + + return issues + + +# --------------------------------------------------------------------------- +# CLI (typer) +# --------------------------------------------------------------------------- + +def _get_applied_by() -> str: + """Determine who is running the migration.""" + return os.getenv("GITHUB_ACTOR", os.getenv("USER", "unknown")) + + +def _get_run_id() -> str | None: + """Get GitHub Actions run ID if available.""" + return os.getenv("GITHUB_RUN_ID") + + +def main(): + try: + import typer + except ImportError: + print("typer is required. Install with: pip install typer") + sys.exit(1) + + app = typer.Typer(help="Database migration runner for destaquesgovbr/data-platform") + + def _connect(db_url: str): + import psycopg2 + + conn = psycopg2.connect(db_url) + conn.autocommit = False + return conn + + @app.command() + def status( + db_url: str = typer.Option(None, "--db-url", envvar="DATABASE_URL"), + migrations_path: str = typer.Option(str(MIGRATIONS_DIR), "--migrations-dir"), + ): + """Show status of all migrations.""" + conn = _connect(db_url) + try: + bootstrap(conn) + migrations = discover_migrations(Path(migrations_path)) + pending = get_pending(conn, migrations) + applied = [m for m in migrations if m not in pending] + + typer.echo(f"Total migrations: {len(migrations)}") + typer.echo(f"Applied: {len(applied)}") + typer.echo(f"Pending: {len(pending)}") + typer.echo("") + for m in migrations: + marker = "PENDING" if m in pending else "APPLIED" + typer.echo(f" [{marker}] {m.version}_{m.name} ({m.migration_type})") + finally: + conn.close() + + @app.command(name="migrate") + def migrate_cmd( + db_url: str = typer.Option(None, "--db-url", envvar="DATABASE_URL"), + migrations_path: str = typer.Option(str(MIGRATIONS_DIR), "--migrations-dir"), + dry_run: bool = typer.Option(False, "--dry-run"), + target: str = typer.Option(None, "--target"), + yes: bool = typer.Option(False, "--yes", "-y"), + ): + """Apply pending migrations.""" + conn = _connect(db_url) + try: + bootstrap(conn) + migrations = discover_migrations(Path(migrations_path)) + pending = get_pending(conn, migrations, target=target) + + if not pending: + typer.echo("No pending migrations.") + return + + typer.echo(f"Pending migrations ({len(pending)}):") + for m in pending: + typer.echo(f" {m.version}_{m.name} ({m.migration_type})") + + if not dry_run and not yes: + typer.confirm("Apply these migrations?", abort=True) + + applied_by = _get_applied_by() + run_id = _get_run_id() + + for m in pending: + execute_migration(conn, m, dry_run=dry_run, applied_by=applied_by, run_id=run_id) + + typer.echo(f"\n{'[DRY RUN] ' if dry_run else ''}Done: {len(pending)} migration(s) processed.") + finally: + conn.close() + + @app.command() + def rollback( + version: str = typer.Argument(..., help="Migration version to rollback (e.g. 006)"), + db_url: str = typer.Option(None, "--db-url", envvar="DATABASE_URL"), + migrations_path: str = typer.Option(str(MIGRATIONS_DIR), "--migrations-dir"), + dry_run: bool = typer.Option(False, "--dry-run"), + yes: bool = typer.Option(False, "--yes", "-y"), + ): + """Rollback a specific migration.""" + conn = _connect(db_url) + try: + bootstrap(conn) + migrations = discover_migrations(Path(migrations_path)) + target = next((m for m in migrations if m.version == version), None) + if not target: + typer.echo(f"Migration {version} not found.") + raise typer.Exit(1) + + if not dry_run and not yes: + typer.confirm(f"Rollback migration {version}_{target.name}?", abort=True) + + applied_by = _get_applied_by() + run_id = _get_run_id() + execute_rollback(conn, target, dry_run=dry_run, applied_by=applied_by, run_id=run_id) + + typer.echo(f"\n{'[DRY RUN] ' if dry_run else ''}Rollback of {version} complete.") + finally: + conn.close() + + @app.command() + def history( + db_url: str = typer.Option(None, "--db-url", envvar="DATABASE_URL"), + limit: int = typer.Option(20, "--limit"), + ): + """Show migration history.""" + conn = _connect(db_url) + try: + bootstrap(conn) + cursor = conn.cursor() + cursor.execute( + "SELECT version, name, migration_type, operation, status, " + "applied_by, started_at, duration_ms, error_message " + "FROM migration_history ORDER BY started_at DESC LIMIT %s", + (limit,), + ) + rows = cursor.fetchall() + cursor.close() + + if not rows: + typer.echo("No migration history.") + return + + typer.echo(f"{'Ver':>5} {'Name':<30} {'Type':<8} {'Op':<10} {'Status':<12} {'By':<15} {'Duration':>10}") + typer.echo("-" * 95) + for row in rows: + ver, name, mtype, op, st, by, at, dur, err = row + dur_str = f"{dur}ms" if dur else "-" + typer.echo(f"{ver:>5} {name:<30} {mtype:<8} {op:<10} {st:<12} {by:<15} {dur_str:>10}") + if err: + typer.echo(f" Error: {err[:80]}") + finally: + conn.close() + + @app.command() + def validate( + migrations_path: str = typer.Option(str(MIGRATIONS_DIR), "--migrations-dir"), + ): + """Validate migration files for consistency.""" + migrations = discover_migrations(Path(migrations_path)) + issues = validate_migrations(migrations) + + if issues: + typer.echo(f"Found {len(issues)} issue(s):") + for issue in issues: + typer.echo(f" - {issue}") + raise typer.Exit(1) + else: + typer.echo(f"All {len(migrations)} migrations are consistent.") + + app() + + +if __name__ == "__main__": + main() diff --git a/tests/unit/test_migrate_runner.py b/tests/unit/test_migrate_runner.py new file mode 100644 index 0000000..4051b5e --- /dev/null +++ b/tests/unit/test_migrate_runner.py @@ -0,0 +1,519 @@ +"""Unit tests for scripts/migrate.py — generic migration runner.""" + +import json +import sys +from dataclasses import dataclass +from pathlib import Path +from unittest.mock import MagicMock, call, patch + +import pytest + +# Add scripts/ to path so we can import the migration runner +sys.path.insert(0, str(Path(__file__).parent.parent.parent / "scripts")) + + +# --------------------------------------------------------------------------- +# Discovery +# --------------------------------------------------------------------------- +class TestDiscoverMigrations: + def test_discovers_sql_and_py_in_order(self, tmp_path): + (tmp_path / "001_first.sql").write_text("SELECT 1;") + (tmp_path / "002_second.py").write_text("def migrate(conn, dry_run=False): pass") + (tmp_path / "003_third.sql").write_text("SELECT 3;") + + from migrate import discover_migrations + + migrations = discover_migrations(tmp_path) + assert [m.version for m in migrations] == ["001", "002", "003"] + assert [m.migration_type for m in migrations] == ["sql", "python", "sql"] + + def test_ignores_rollback_files(self, tmp_path): + (tmp_path / "001_create.sql").write_text("CREATE TABLE t;") + (tmp_path / "001_create_rollback.sql").write_text("DROP TABLE t;") + + from migrate import discover_migrations + + migrations = discover_migrations(tmp_path) + assert len(migrations) == 1 + assert migrations[0].version == "001" + + def test_associates_rollback_file(self, tmp_path): + (tmp_path / "001_create.sql").write_text("CREATE TABLE t;") + (tmp_path / "001_create_rollback.sql").write_text("DROP TABLE t;") + + from migrate import discover_migrations + + migrations = discover_migrations(tmp_path) + assert migrations[0].rollback_path is not None + assert "rollback" in str(migrations[0].rollback_path) + + def test_empty_directory(self, tmp_path): + from migrate import discover_migrations + + migrations = discover_migrations(tmp_path) + assert migrations == [] + + def test_ignores_non_migration_files(self, tmp_path): + (tmp_path / "README.md").write_text("# Docs") + (tmp_path / "helper.py").write_text("x = 1") + (tmp_path / "001_real.sql").write_text("SELECT 1;") + + from migrate import discover_migrations + + migrations = discover_migrations(tmp_path) + assert len(migrations) == 1 + + def test_nonexistent_directory(self, tmp_path): + from migrate import discover_migrations + + migrations = discover_migrations(tmp_path / "nonexistent") + assert migrations == [] + + +# --------------------------------------------------------------------------- +# Bootstrap +# --------------------------------------------------------------------------- +class TestBootstrap: + def _mock_conn(self, table_exists=False, schema_version_exists=False, schema_rows=None): + """Create a mock connection with configurable behavior.""" + conn = MagicMock() + cursor = MagicMock() + conn.cursor.return_value = cursor + cursor.__enter__ = MagicMock(return_value=cursor) + cursor.__exit__ = MagicMock(return_value=False) + + # fetchone responses for EXISTS checks + responses = [] + # 1st call: check if migration_history exists + responses.append((table_exists,)) + if not table_exists: + # 2nd call: check if schema_version exists + responses.append((schema_version_exists,)) + if schema_version_exists: + # fetchall for schema_version rows + cursor.fetchall.return_value = schema_rows or [] + + cursor.fetchone.side_effect = responses + return conn, cursor + + def test_creates_migration_history_table(self): + conn, cursor = self._mock_conn(table_exists=False, schema_version_exists=False) + + from migrate import bootstrap + + bootstrap(conn) + + # Should have executed CREATE TABLE + executed_sqls = [c[0][0] for c in cursor.execute.call_args_list] + create_calls = [s for s in executed_sqls if "CREATE TABLE" in s and "migration_history" in s] + assert len(create_calls) >= 1 + + conn.commit.assert_called() + + def test_imports_schema_version_entries(self): + schema_rows = [ + ("1.0", "2024-12-24 14:00:00+00", "Initial schema"), + ("1.3", "2025-03-10 17:00:00+00", "Alter unique_id"), + ] + conn, cursor = self._mock_conn( + table_exists=False, schema_version_exists=True, schema_rows=schema_rows + ) + + from migrate import bootstrap + + bootstrap(conn) + + # Should have INSERT ... migration_history for each schema_version row + executed_sqls = [c[0][0] for c in cursor.execute.call_args_list] + insert_calls = [s for s in executed_sqls if "INSERT" in s and "migration_history" in s] + assert len(insert_calls) >= 1 + + def test_skips_if_already_bootstrapped(self): + conn, cursor = self._mock_conn(table_exists=True) + + from migrate import bootstrap + + bootstrap(conn) + + # Should NOT execute CREATE TABLE + executed_sqls = [c[0][0] for c in cursor.execute.call_args_list] + create_calls = [s for s in executed_sqls if "CREATE TABLE" in s] + assert len(create_calls) == 0 + + def test_works_without_schema_version(self): + conn, cursor = self._mock_conn(table_exists=False, schema_version_exists=False) + + from migrate import bootstrap + + bootstrap(conn) + # Should not raise; commit should be called + conn.commit.assert_called() + + +# --------------------------------------------------------------------------- +# Get Pending +# --------------------------------------------------------------------------- +class TestGetPending: + def test_returns_pending_migrations(self): + conn = MagicMock() + cursor = MagicMock() + conn.cursor.return_value = cursor + cursor.__enter__ = MagicMock(return_value=cursor) + cursor.__exit__ = MagicMock(return_value=False) + # Already applied: 001 + cursor.fetchall.return_value = [("001",)] + + from migrate import MigrationInfo, get_pending + + migrations = [ + MigrationInfo(version="001", name="first", path=Path("001.sql"), migration_type="sql", rollback_path=None), + MigrationInfo(version="002", name="second", path=Path("002.sql"), migration_type="sql", rollback_path=None), + MigrationInfo(version="003", name="third", path=Path("003.py"), migration_type="python", rollback_path=None), + ] + pending = get_pending(conn, migrations) + assert [m.version for m in pending] == ["002", "003"] + + def test_none_pending_when_all_applied(self): + conn = MagicMock() + cursor = MagicMock() + conn.cursor.return_value = cursor + cursor.__enter__ = MagicMock(return_value=cursor) + cursor.__exit__ = MagicMock(return_value=False) + cursor.fetchall.return_value = [("001",), ("002",)] + + from migrate import MigrationInfo, get_pending + + migrations = [ + MigrationInfo(version="001", name="first", path=Path("001.sql"), migration_type="sql", rollback_path=None), + MigrationInfo(version="002", name="second", path=Path("002.sql"), migration_type="sql", rollback_path=None), + ] + pending = get_pending(conn, migrations) + assert pending == [] + + def test_respects_target_version(self): + conn = MagicMock() + cursor = MagicMock() + conn.cursor.return_value = cursor + cursor.__enter__ = MagicMock(return_value=cursor) + cursor.__exit__ = MagicMock(return_value=False) + cursor.fetchall.return_value = [] # none applied + + from migrate import MigrationInfo, get_pending + + migrations = [ + MigrationInfo(version="001", name="first", path=Path("001.sql"), migration_type="sql", rollback_path=None), + MigrationInfo(version="002", name="second", path=Path("002.sql"), migration_type="sql", rollback_path=None), + MigrationInfo(version="003", name="third", path=Path("003.sql"), migration_type="sql", rollback_path=None), + ] + pending = get_pending(conn, migrations, target="002") + assert [m.version for m in pending] == ["001", "002"] + + +# --------------------------------------------------------------------------- +# Execute Migration (SQL) +# --------------------------------------------------------------------------- +class TestExecuteMigrationSQL: + def test_executes_sql_and_records_history(self, tmp_path): + sql_file = tmp_path / "001_test.sql" + sql_file.write_text("CREATE TABLE test_table (id INT);") + + conn = MagicMock() + cursor = MagicMock() + conn.cursor.return_value = cursor + cursor.__enter__ = MagicMock(return_value=cursor) + cursor.__exit__ = MagicMock(return_value=False) + conn.autocommit = False + + from migrate import MigrationInfo, execute_migration + + migration = MigrationInfo( + version="001", name="test", path=sql_file, + migration_type="sql", rollback_path=None, + ) + execute_migration(conn, migration, dry_run=False, applied_by="test", run_id=None) + + # SQL content should have been executed + executed_sqls = [c[0][0] for c in cursor.execute.call_args_list] + sql_calls = [s for s in executed_sqls if "CREATE TABLE test_table" in s] + assert len(sql_calls) >= 1 + + # History should have been recorded + history_calls = [s for s in executed_sqls if "migration_history" in s] + assert len(history_calls) >= 1 + + conn.commit.assert_called() + + def test_dry_run_does_not_commit(self, tmp_path): + sql_file = tmp_path / "001_test.sql" + sql_file.write_text("CREATE TABLE test_table (id INT);") + + conn = MagicMock() + cursor = MagicMock() + conn.cursor.return_value = cursor + cursor.__enter__ = MagicMock(return_value=cursor) + cursor.__exit__ = MagicMock(return_value=False) + + from migrate import MigrationInfo, execute_migration + + migration = MigrationInfo( + version="001", name="test", path=sql_file, + migration_type="sql", rollback_path=None, + ) + execute_migration(conn, migration, dry_run=True, applied_by="test", run_id=None) + + conn.commit.assert_not_called() + conn.rollback.assert_called() + + def test_failure_records_failed_status(self, tmp_path): + sql_file = tmp_path / "001_bad.sql" + sql_file.write_text("INVALID SQL;") + + conn = MagicMock() + cursor = MagicMock() + conn.cursor.return_value = cursor + cursor.__enter__ = MagicMock(return_value=cursor) + cursor.__exit__ = MagicMock(return_value=False) + + # Make SQL execution fail + execute_calls = [0] + + def side_effect(sql, *args, **kwargs): + execute_calls[0] += 1 + if execute_calls[0] == 1: # First execute is the migration SQL + raise Exception("syntax error") + + cursor.execute.side_effect = side_effect + + from migrate import MigrationInfo, execute_migration + + migration = MigrationInfo( + version="001", name="bad", path=sql_file, + migration_type="sql", rollback_path=None, + ) + with pytest.raises(Exception, match="syntax error"): + execute_migration(conn, migration, dry_run=False, applied_by="test", run_id=None) + + conn.rollback.assert_called() + + +# --------------------------------------------------------------------------- +# Execute Migration (Python) +# --------------------------------------------------------------------------- +class TestExecuteMigrationPython: + def test_imports_and_calls_migrate(self, tmp_path): + py_file = tmp_path / "001_test_migration.py" + py_file.write_text( + 'def describe(): return "Test migration"\n' + 'def migrate(conn, dry_run=False): return {"rows_affected": 42}\n' + 'def rollback(conn, dry_run=False): return {}\n' + ) + + conn = MagicMock() + cursor = MagicMock() + conn.cursor.return_value = cursor + cursor.__enter__ = MagicMock(return_value=cursor) + cursor.__exit__ = MagicMock(return_value=False) + + from migrate import MigrationInfo, execute_migration + + migration = MigrationInfo( + version="001", name="test_migration", path=py_file, + migration_type="python", rollback_path=None, + ) + execute_migration(conn, migration, dry_run=False, applied_by="test", run_id=None) + conn.commit.assert_called() + + def test_stores_execution_details(self, tmp_path): + py_file = tmp_path / "001_test_migration.py" + py_file.write_text( + 'def describe(): return "Test"\n' + 'def migrate(conn, dry_run=False): return {"rows_affected": 42, "collisions": 0}\n' + 'def rollback(conn, dry_run=False): return {}\n' + ) + + conn = MagicMock() + cursor = MagicMock() + conn.cursor.return_value = cursor + cursor.__enter__ = MagicMock(return_value=cursor) + cursor.__exit__ = MagicMock(return_value=False) + + from migrate import MigrationInfo, execute_migration + + migration = MigrationInfo( + version="001", name="test_migration", path=py_file, + migration_type="python", rollback_path=None, + ) + execute_migration(conn, migration, dry_run=False, applied_by="test", run_id=None) + + # Check that execution_details with rows_affected was passed to INSERT + executed_sqls = [c for c in cursor.execute.call_args_list] + history_calls = [c for c in executed_sqls if "migration_history" in str(c)] + assert len(history_calls) >= 1 + # The JSONB value should contain rows_affected + history_args = str(history_calls[-1]) + assert "rows_affected" in history_args or "42" in history_args + + def test_module_without_describe_raises(self, tmp_path): + py_file = tmp_path / "001_bad.py" + py_file.write_text( + 'def migrate(conn, dry_run=False): return {}\n' + ) + + conn = MagicMock() + cursor = MagicMock() + conn.cursor.return_value = cursor + cursor.__enter__ = MagicMock(return_value=cursor) + cursor.__exit__ = MagicMock(return_value=False) + + from migrate import MigrationInfo, execute_migration + + migration = MigrationInfo( + version="001", name="bad", path=py_file, + migration_type="python", rollback_path=None, + ) + with pytest.raises((AttributeError, Exception)): + execute_migration(conn, migration, dry_run=False, applied_by="test", run_id=None) + + +# --------------------------------------------------------------------------- +# Execute Rollback +# --------------------------------------------------------------------------- +class TestExecuteRollback: + def test_sql_rollback_executes_file(self, tmp_path): + migration_file = tmp_path / "001_create.sql" + migration_file.write_text("CREATE TABLE t (id INT);") + rollback_file = tmp_path / "001_create_rollback.sql" + rollback_file.write_text("DROP TABLE t;") + + conn = MagicMock() + cursor = MagicMock() + conn.cursor.return_value = cursor + cursor.__enter__ = MagicMock(return_value=cursor) + cursor.__exit__ = MagicMock(return_value=False) + + from migrate import MigrationInfo, execute_rollback + + migration = MigrationInfo( + version="001", name="create", path=migration_file, + migration_type="sql", rollback_path=rollback_file, + ) + execute_rollback(conn, migration, dry_run=False, applied_by="test", run_id=None) + + executed_sqls = [c[0][0] for c in cursor.execute.call_args_list] + drop_calls = [s for s in executed_sqls if "DROP TABLE t" in s] + assert len(drop_calls) >= 1 + conn.commit.assert_called() + + def test_python_rollback_calls_rollback_function(self, tmp_path): + py_file = tmp_path / "001_data.py" + py_file.write_text( + 'def describe(): return "Test"\n' + 'def migrate(conn, dry_run=False): return {}\n' + 'def rollback(conn, dry_run=False): return {"restored": 10}\n' + ) + + conn = MagicMock() + cursor = MagicMock() + conn.cursor.return_value = cursor + cursor.__enter__ = MagicMock(return_value=cursor) + cursor.__exit__ = MagicMock(return_value=False) + + from migrate import MigrationInfo, execute_rollback + + migration = MigrationInfo( + version="001", name="data", path=py_file, + migration_type="python", rollback_path=None, + ) + execute_rollback(conn, migration, dry_run=False, applied_by="test", run_id=None) + conn.commit.assert_called() + + def test_sql_rollback_errors_if_no_file(self, tmp_path): + migration_file = tmp_path / "001_create.sql" + migration_file.write_text("CREATE TABLE t;") + + conn = MagicMock() + + from migrate import MigrationInfo, execute_rollback + + migration = MigrationInfo( + version="001", name="create", path=migration_file, + migration_type="sql", rollback_path=None, + ) + with pytest.raises((FileNotFoundError, ValueError)): + execute_rollback(conn, migration, dry_run=False, applied_by="test", run_id=None) + + def test_python_not_implemented_records_unavailable(self, tmp_path): + py_file = tmp_path / "001_data.py" + py_file.write_text( + 'def describe(): return "Test"\n' + 'def migrate(conn, dry_run=False): return {}\n' + 'def rollback(conn, dry_run=False): raise NotImplementedError("Cannot rollback")\n' + ) + + conn = MagicMock() + cursor = MagicMock() + conn.cursor.return_value = cursor + cursor.__enter__ = MagicMock(return_value=cursor) + cursor.__exit__ = MagicMock(return_value=False) + + from migrate import MigrationInfo, execute_rollback + + migration = MigrationInfo( + version="001", name="data", path=py_file, + migration_type="python", rollback_path=None, + ) + # Should not raise — records unavailable instead + execute_rollback(conn, migration, dry_run=False, applied_by="test", run_id=None) + + executed_sqls = str(cursor.execute.call_args_list) + assert "unavailable" in executed_sqls + + def test_rollback_records_operation_in_history(self, tmp_path): + rollback_file = tmp_path / "001_create_rollback.sql" + rollback_file.write_text("DROP TABLE t;") + migration_file = tmp_path / "001_create.sql" + migration_file.write_text("CREATE TABLE t;") + + conn = MagicMock() + cursor = MagicMock() + conn.cursor.return_value = cursor + cursor.__enter__ = MagicMock(return_value=cursor) + cursor.__exit__ = MagicMock(return_value=False) + + from migrate import MigrationInfo, execute_rollback + + migration = MigrationInfo( + version="001", name="create", path=migration_file, + migration_type="sql", rollback_path=rollback_file, + ) + execute_rollback(conn, migration, dry_run=False, applied_by="test", run_id=None) + + executed_sqls = str(cursor.execute.call_args_list) + assert "rollback" in executed_sqls + assert "migration_history" in executed_sqls + + +# --------------------------------------------------------------------------- +# Validate +# --------------------------------------------------------------------------- +class TestValidateMigrations: + def test_detects_sequence_gap(self, tmp_path): + (tmp_path / "001_first.sql").write_text("SELECT 1;") + (tmp_path / "003_third.sql").write_text("SELECT 3;") + + from migrate import discover_migrations, validate_migrations + + migrations = discover_migrations(tmp_path) + issues = validate_migrations(migrations) + assert any("gap" in issue.lower() or "002" in issue for issue in issues) + + def test_returns_empty_when_consistent(self, tmp_path): + (tmp_path / "001_first.sql").write_text("SELECT 1;") + (tmp_path / "002_second.sql").write_text("SELECT 2;") + (tmp_path / "003_third.sql").write_text("SELECT 3;") + + from migrate import discover_migrations, validate_migrations + + migrations = discover_migrations(tmp_path) + issues = validate_migrations(migrations) + assert issues == [] From f9b965a68d5c550c9dae34bb9e532495aa36bed1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20Rodrigues=20Mendon=C3=A7a?= Date: Fri, 20 Mar 2026 15:19:00 -0300 Subject: [PATCH 2/6] feat: adaptar migrate_unique_ids.py para interface do runner (migration 006) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cria scripts/migrations/006_migrate_unique_ids.py seguindo a interface padrao do runner (describe/migrate/rollback). Extrai logica de negocio do script original (slugify, build_id_mapping, batch update) sem argparse ou conexao propria — tudo gerenciado pelo runner. - describe(): retorna descricao humana para logs e auditoria - migrate(conn, dry_run): executa migracao de ~300k unique_ids - rollback(conn, dry_run): restaura IDs MD5 via legacy_unique_id - Funcoes de geracao de ID preservadas identicas ao original Script original (scripts/migrate_unique_ids.py) marcado como DEPRECATED, preservado para backward compatibility com testes existentes. 18 testes unitarios: interface, slugify, suffix, generate_id, migrate, rollback, dry-run, paridade com script original. Ref: #109 Co-Authored-By: Claude Opus 4.6 --- scripts/migrate_unique_ids.py | 4 + scripts/migrations/006_migrate_unique_ids.py | 293 +++++++++++++++++++ tests/unit/test_migration_006.py | 189 ++++++++++++ 3 files changed, 486 insertions(+) create mode 100644 scripts/migrations/006_migrate_unique_ids.py create mode 100644 tests/unit/test_migration_006.py diff --git a/scripts/migrate_unique_ids.py b/scripts/migrate_unique_ids.py index 64ac57c..a497d96 100644 --- a/scripts/migrate_unique_ids.py +++ b/scripts/migrate_unique_ids.py @@ -2,6 +2,10 @@ """ Migrate news unique_ids from MD5 hashes to readable slugs. +DEPRECATED: Use `python scripts/migrate.py migrate` instead. +This script is preserved for backward compatibility with existing tests. +The canonical implementation is now in scripts/migrations/006_migrate_unique_ids.py. + Phase 4 of issue #43: https://github.com/destaquesgovbr/data-platform/issues/43 Usage: diff --git a/scripts/migrations/006_migrate_unique_ids.py b/scripts/migrations/006_migrate_unique_ids.py new file mode 100644 index 0000000..2062695 --- /dev/null +++ b/scripts/migrations/006_migrate_unique_ids.py @@ -0,0 +1,293 @@ +""" +Migrate ~300k unique_ids from MD5 hashes to readable slugs. + +Python migration following the runner interface (describe/migrate/rollback). +Adapted from scripts/migrate_unique_ids.py (issue #43). + +Canonical source for ID generation: scraper/src/govbr_scraper/scrapers/unique_id.py +""" + +import hashlib +import re +import time +import unicodedata +from datetime import date + + +# ============================================================================= +# ID Generation Functions (inline copy from scraper) +# ============================================================================= + + +def slugify(text: str, max_length: int = 100) -> str: + """Convert text to a URL-friendly slug.""" + text = unicodedata.normalize("NFKD", text).encode("ascii", "ignore").decode("ascii") + text = text.lower() + text = re.sub(r"[^a-z0-9]+", "-", text) + text = re.sub(r"-+", "-", text).strip("-") + if len(text) > max_length: + truncated = text[:max_length] + if "-" in truncated: + truncated = truncated.rsplit("-", 1)[0] + text = truncated + return text + + +def generate_suffix(agency: str, published_at_value, title: str) -> str: + """Generate a deterministic 6-char hex suffix from article attributes.""" + date_str = ( + published_at_value.isoformat() + if isinstance(published_at_value, date) + else str(published_at_value) + ) + hash_input = f"{agency}_{date_str}_{title}".encode("utf-8") + return hashlib.md5(hash_input).hexdigest()[:6] + + +def generate_readable_unique_id(agency: str, published_at_value, title: str) -> str: + """Generate a readable unique ID in the format: {slug}_{suffix}.""" + slug = slugify(title) + suffix = generate_suffix(agency, published_at_value, title) + if slug: + return f"{slug}_{suffix}" + return f"sem-titulo_{suffix}" + + +def _generate_id_with_extended_suffix(agency, published_at, title, extra_chars): + """Generate ID with a longer suffix to resolve collisions.""" + slug = slugify(title) + date_str = ( + published_at.isoformat() if isinstance(published_at, date) else str(published_at) + ) + hash_input = f"{agency}_{date_str}_{title}".encode("utf-8") + suffix = hashlib.md5(hash_input).hexdigest()[: 6 + extra_chars] + if slug: + return f"{slug}_{suffix}" + return f"sem-titulo_{suffix}" + + +# ============================================================================= +# Database helpers +# ============================================================================= + + +def _fetch_all_news(conn): + """Fetch all news rows needed for migration using server-side cursor.""" + cursor = conn.cursor(name="fetch_news_for_migration") + cursor.itersize = 5000 + cursor.execute( + "SELECT unique_id, agency_key, published_at, title, legacy_unique_id " + "FROM news ORDER BY unique_id" + ) + rows = cursor.fetchall() + cursor.close() + return rows + + +def _build_id_mapping(rows): + """Build mapping {old_unique_id: new_unique_id} from news rows.""" + mapping = {} + seen_new_ids = {} + collision_count = 0 + + for unique_id, agency_key, published_at, title, _legacy in rows: + new_id = generate_readable_unique_id(agency_key, published_at, title) + + if unique_id == new_id: + seen_new_ids[new_id] = unique_id + continue + + if new_id in seen_new_ids: + for extra in range(1, 27): + new_id = _generate_id_with_extended_suffix( + agency_key, published_at, title, extra + ) + if new_id not in seen_new_ids: + break + if new_id in seen_new_ids: + raise ValueError( + f"Failed to resolve collision after 26 attempts for '{unique_id}'" + ) + collision_count += 1 + + mapping[unique_id] = new_id + seen_new_ids[new_id] = unique_id + + new_ids = list(mapping.values()) + if len(new_ids) != len(set(new_ids)): + raise ValueError("Mapping contains duplicate new_ids after resolution") + + return mapping, collision_count + + +def _has_news_features_table(conn): + """Check if news_features table exists.""" + cursor = conn.cursor() + cursor.execute( + "SELECT EXISTS (" + " SELECT 1 FROM information_schema.tables " + " WHERE table_schema = 'public' AND table_name = 'news_features'" + ")" + ) + exists = cursor.fetchone()[0] + cursor.close() + return exists + + +def _get_fk_constraint_name(conn): + """Get the FK constraint name on news_features referencing news.""" + cursor = conn.cursor() + cursor.execute( + "SELECT tc.constraint_name " + "FROM information_schema.table_constraints tc " + "JOIN information_schema.key_column_usage kcu " + " ON tc.constraint_name = kcu.constraint_name " + "WHERE tc.table_name = 'news_features' " + " AND tc.constraint_type = 'FOREIGN KEY' " + " AND kcu.column_name = 'unique_id'" + ) + row = cursor.fetchone() + cursor.close() + return row[0] if row else None + + +# ============================================================================= +# Runner interface +# ============================================================================= + + +def describe() -> str: + """Human description for logs and audit.""" + return "Migrar ~300k unique_ids de MD5 para slug legivel (issue #43)" + + +def migrate(conn, dry_run: bool = False) -> dict: + """Execute the migration. conn is psycopg2 without autocommit.""" + rows = _fetch_all_news(conn) + mapping, collision_count = _build_id_mapping(rows) + + if not mapping: + return {"rows_migrated": 0, "already_migrated": len(rows), "collisions": 0} + + if dry_run: + return { + "rows_migrated": 0, + "to_migrate": len(mapping), + "collisions": collision_count, + "preview": True, + } + + from psycopg2.extras import execute_batch + + cursor = conn.cursor() + t0 = time.time() + + # Backfill legacy_unique_id + cursor.execute( + "UPDATE news SET legacy_unique_id = unique_id WHERE legacy_unique_id IS NULL" + ) + backfilled = cursor.rowcount + + # Handle news_features FK + has_features = _has_news_features_table(conn) + fk_name = None + if has_features: + fk_name = _get_fk_constraint_name(conn) + if fk_name: + cursor.execute(f"ALTER TABLE news_features DROP CONSTRAINT {fk_name}") + + # Update news_features + if has_features: + params = [(new_id, old_id) for old_id, new_id in mapping.items()] + execute_batch( + cursor, + "UPDATE news_features SET unique_id = %s WHERE unique_id = %s", + params, + page_size=1000, + ) + + # Update news + params = [(new_id, old_id) for old_id, new_id in mapping.items()] + execute_batch( + cursor, + "UPDATE news SET unique_id = %s WHERE unique_id = %s", + params, + page_size=1000, + ) + + # Re-add FK + if has_features and fk_name: + cursor.execute( + f"ALTER TABLE news_features ADD CONSTRAINT {fk_name} " + f"FOREIGN KEY (unique_id) REFERENCES news(unique_id) ON DELETE CASCADE" + ) + + cursor.close() + elapsed = time.time() - t0 + + return { + "rows_migrated": len(mapping), + "backfilled_legacy": backfilled, + "collisions": collision_count, + "elapsed_seconds": round(elapsed, 2), + } + + +def rollback(conn, dry_run: bool = False) -> dict: + """Revert migration: restore MD5 unique_ids from legacy_unique_id.""" + cursor = conn.cursor() + + # Check legacy_unique_id populated + cursor.execute("SELECT COUNT(*) FROM news WHERE legacy_unique_id IS NULL") + null_count = cursor.fetchone()[0] + if null_count > 0: + cursor.close() + raise ValueError(f"{null_count} rows have NULL legacy_unique_id. Cannot rollback.") + + cursor.execute("SELECT COUNT(*) FROM news WHERE unique_id != legacy_unique_id") + to_rollback = cursor.fetchone()[0] + + if to_rollback == 0: + cursor.close() + return {"rows_rolled_back": 0, "message": "All records already have MD5 unique_ids"} + + if dry_run: + cursor.close() + return {"rows_rolled_back": 0, "to_rollback": to_rollback, "preview": True} + + # Handle FK + has_features = _has_news_features_table(conn) + fk_name = None + if has_features: + fk_name = _get_fk_constraint_name(conn) + if fk_name: + cursor.execute(f"ALTER TABLE news_features DROP CONSTRAINT {fk_name}") + + # Update news_features + if has_features: + cursor.execute( + "UPDATE news_features nf SET unique_id = n.legacy_unique_id " + "FROM news n WHERE nf.unique_id = n.unique_id " + "AND n.unique_id != n.legacy_unique_id" + ) + + # Update news + cursor.execute( + "UPDATE news SET unique_id = legacy_unique_id " + "WHERE unique_id != legacy_unique_id" + ) + rolled_back = cursor.rowcount + + # Re-add FK + if has_features and fk_name: + cursor.execute( + f"ALTER TABLE news_features ADD CONSTRAINT {fk_name} " + f"FOREIGN KEY (unique_id) REFERENCES news(unique_id) ON DELETE CASCADE" + ) + + # Verify + cursor.execute("SELECT COUNT(*) FROM news WHERE unique_id != legacy_unique_id") + remaining = cursor.fetchone()[0] + cursor.close() + + return {"rows_rolled_back": rolled_back, "remaining_mismatched": remaining} diff --git a/tests/unit/test_migration_006.py b/tests/unit/test_migration_006.py new file mode 100644 index 0000000..c7084be --- /dev/null +++ b/tests/unit/test_migration_006.py @@ -0,0 +1,189 @@ +"""Unit tests for scripts/migrations/006_migrate_unique_ids.py — Python migration interface.""" + +import sys +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +# Add scripts/migrations to path for importing +sys.path.insert(0, str(Path(__file__).parent.parent.parent / "scripts" / "migrations")) + + +def _import_migration_006(): + """Import the migration module dynamically (numeric prefix not importable directly).""" + import importlib.util + + module_path = ( + Path(__file__).parent.parent.parent / "scripts" / "migrations" / "006_migrate_unique_ids.py" + ) + spec = importlib.util.spec_from_file_location("migration_006", module_path) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + + +# --------------------------------------------------------------------------- +# Interface compliance +# --------------------------------------------------------------------------- +class TestMigration006Interface: + def test_describe_returns_nonempty_string(self): + mod = _import_migration_006() + result = mod.describe() + assert isinstance(result, str) + assert len(result) > 0 + + def test_has_migrate_function(self): + mod = _import_migration_006() + assert callable(getattr(mod, "migrate", None)) + + def test_has_rollback_function(self): + mod = _import_migration_006() + assert callable(getattr(mod, "rollback", None)) + + +# --------------------------------------------------------------------------- +# ID generation functions (preserved from original) +# --------------------------------------------------------------------------- +class TestSlugify006: + def test_basic_ascii(self): + mod = _import_migration_006() + assert mod.slugify("Hello World") == "hello-world" + + def test_portuguese_accents(self): + mod = _import_migration_006() + assert mod.slugify("Governo anuncia programa de habitacao popular") == ( + "governo-anuncia-programa-de-habitacao-popular" + ) + + def test_special_characters(self): + mod = _import_migration_006() + assert mod.slugify("R$ 100,00 — credito & mais!") == "r-100-00-credito-mais" + + def test_max_length_truncates(self): + mod = _import_migration_006() + result = mod.slugify("a" * 50 + "-" + "b" * 50 + "-ccc", max_length=100) + assert len(result) <= 100 + + def test_empty_string(self): + mod = _import_migration_006() + assert mod.slugify("") == "" + + +class TestGenerateSuffix006: + def test_deterministic(self): + mod = _import_migration_006() + a = mod.generate_suffix("mec", "2024-01-15", "Test Title") + b = mod.generate_suffix("mec", "2024-01-15", "Test Title") + assert a == b + + def test_length_6_hex(self): + mod = _import_migration_006() + result = mod.generate_suffix("mec", "2024-01-15", "Test Title") + assert len(result) == 6 + assert all(c in "0123456789abcdef" for c in result) + + +class TestGenerateReadableUniqueId006: + def test_format(self): + mod = _import_migration_006() + result = mod.generate_readable_unique_id("mec", "2024-01-15", "Test Title") + parts = result.rsplit("_", 1) + assert len(parts) == 2 + assert parts[0] == "test-title" + assert len(parts[1]) == 6 + + def test_empty_title(self): + mod = _import_migration_006() + result = mod.generate_readable_unique_id("mec", "2024-01-15", "") + assert result.startswith("sem-titulo_") + + +# --------------------------------------------------------------------------- +# migrate() and rollback() +# --------------------------------------------------------------------------- +class TestMigrate006: + @patch("psycopg2.extras.execute_batch") + def test_migrate_returns_dict_with_rows_affected(self, mock_execute_batch): + mod = _import_migration_006() + mock_conn = MagicMock() + mock_cursor = MagicMock() + mock_conn.cursor.return_value = mock_cursor + # fetch_all_news returns rows + mock_cursor.fetchall.return_value = [ + ("abc123hash00000000000000000000ff", "mec", "2024-01-15", "Test Title", None), + ] + mock_cursor.fetchone.side_effect = [ + (True,), # has_news_features_table + ("news_features_unique_id_fkey",), # FK name + ] + + result = mod.migrate(mock_conn, dry_run=False) + assert isinstance(result, dict) + assert "rows_migrated" in result + assert result["rows_migrated"] == 1 + + def test_dry_run_does_not_commit(self): + mod = _import_migration_006() + mock_conn = MagicMock() + mock_cursor = MagicMock() + mock_conn.cursor.return_value = mock_cursor + mock_cursor.fetchall.return_value = [ + ("abc123hash00000000000000000000ff", "mec", "2024-01-15", "Test Title", None), + ] + + result = mod.migrate(mock_conn, dry_run=True) + assert isinstance(result, dict) + mock_conn.commit.assert_not_called() + + def test_rollback_returns_dict(self): + mod = _import_migration_006() + mock_conn = MagicMock() + mock_cursor = MagicMock() + mock_conn.cursor.return_value = mock_cursor + mock_cursor.fetchone.side_effect = [ + (0,), # null legacy count + (5,), # rows to rollback + (True,), # has_news_features_table + ("news_features_unique_id_fkey",), # FK name + (0,), # verification + ] + + result = mod.rollback(mock_conn, dry_run=False) + assert isinstance(result, dict) + assert "rows_rolled_back" in result + + def test_rollback_dry_run_does_not_commit(self): + mod = _import_migration_006() + mock_conn = MagicMock() + mock_cursor = MagicMock() + mock_conn.cursor.return_value = mock_cursor + mock_cursor.fetchone.side_effect = [ + (0,), # null legacy count + (5,), # rows to rollback + ] + + result = mod.rollback(mock_conn, dry_run=True) + assert isinstance(result, dict) + mock_conn.commit.assert_not_called() + + +# --------------------------------------------------------------------------- +# Parity with original script +# --------------------------------------------------------------------------- +class TestParity: + def test_matches_original_slugify(self): + """006 slugify must match the original migrate_unique_ids.py output.""" + mod = _import_migration_006() + # Same test cases as test_migrate_unique_ids.py + assert mod.slugify("Hello World") == "hello-world" + assert mod.slugify("R$ 100,00 — credito & mais!") == "r-100-00-credito-mais" + assert mod.slugify("") == "" + + def test_matches_original_generate_readable_unique_id(self): + mod = _import_migration_006() + # These must produce identical output to the original script + result = mod.generate_readable_unique_id("mec", "2024-01-15", "Governo anuncia novo programa") + assert "_" in result + parts = result.rsplit("_", 1) + assert len(parts[1]) == 6 From c636f0b4d8e04adddbd7eb77e1e633172f646cf3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20Rodrigues=20Mendon=C3=A7a?= Date: Fri, 20 Mar 2026 15:19:12 -0300 Subject: [PATCH 3/6] feat: adicionar rollback SQL e tabela migration_history ao schema Rollback files: - 004_create_news_features_rollback.sql: remove tabela, trigger e indices - 005_alter_unique_id_varchar_rollback.sql: reverte VARCHAR(120) para VARCHAR(32), remove legacy_unique_id (requer rollback 006 antes) Schema (init.sql e create_schema.sql): - Adiciona tabela migration_history com auditoria completa - Adiciona view migration_status (estado atual por versao) - Indices em version e started_at para queries de status Ref: #109 Co-Authored-By: Claude Opus 4.6 --- docker/postgres/init.sql | 28 +++++++++++++ scripts/create_schema.sql | 33 +++++++++++++++ .../004_create_news_features_rollback.sql | 8 ++++ .../005_alter_unique_id_varchar_rollback.sql | 42 +++++++++++++++++++ 4 files changed, 111 insertions(+) create mode 100644 scripts/migrations/004_create_news_features_rollback.sql create mode 100644 scripts/migrations/005_alter_unique_id_varchar_rollback.sql diff --git a/docker/postgres/init.sql b/docker/postgres/init.sql index 2217813..1a8a9cd 100644 --- a/docker/postgres/init.sql +++ b/docker/postgres/init.sql @@ -127,3 +127,31 @@ WHERE content_embedding IS NULL; CREATE INDEX IF NOT EXISTS idx_news_published_at_2025 ON news (published_at) WHERE published_at >= '2025-01-01' AND published_at < '2026-01-01'; + +-- Migration history (audit trail for database migrations) +CREATE TABLE IF NOT EXISTS migration_history ( + id SERIAL PRIMARY KEY, + version VARCHAR(10) NOT NULL, + name VARCHAR(255) NOT NULL, + migration_type VARCHAR(10) NOT NULL CHECK (migration_type IN ('sql', 'python')), + operation VARCHAR(10) NOT NULL CHECK (operation IN ('migrate', 'rollback', 'dry_run')), + status VARCHAR(20) NOT NULL CHECK (status IN ('success', 'failed', 'unavailable')), + started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + finished_at TIMESTAMPTZ, + duration_ms INTEGER, + applied_by TEXT NOT NULL, + run_id TEXT, + description TEXT, + execution_details JSONB, + error_message TEXT +); + +CREATE INDEX IF NOT EXISTS idx_mh_version ON migration_history(version); +CREATE INDEX IF NOT EXISTS idx_mh_started_at ON migration_history(started_at DESC); + +CREATE OR REPLACE VIEW migration_status AS +SELECT DISTINCT ON (version) + version, name, migration_type, operation, status, applied_by, started_at, duration_ms +FROM migration_history +WHERE status = 'success' +ORDER BY version, started_at DESC; diff --git a/scripts/create_schema.sql b/scripts/create_schema.sql index e7e259d..a2c80ae 100644 --- a/scripts/create_schema.sql +++ b/scripts/create_schema.sql @@ -327,6 +327,39 @@ CREATE TABLE IF NOT EXISTS schema_version ( INSERT INTO schema_version (version, description) VALUES ('1.3', 'Widen unique_id to VARCHAR(120) for readable slugs, add legacy_unique_id'); +-- ============================================================================= +-- MIGRATION HISTORY (audit trail for database migrations) +-- ============================================================================= + +CREATE TABLE IF NOT EXISTS migration_history ( + id SERIAL PRIMARY KEY, + version VARCHAR(10) NOT NULL, + name VARCHAR(255) NOT NULL, + migration_type VARCHAR(10) NOT NULL CHECK (migration_type IN ('sql', 'python')), + operation VARCHAR(10) NOT NULL CHECK (operation IN ('migrate', 'rollback', 'dry_run')), + status VARCHAR(20) NOT NULL CHECK (status IN ('success', 'failed', 'unavailable')), + started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + finished_at TIMESTAMPTZ, + duration_ms INTEGER, + applied_by TEXT NOT NULL, + run_id TEXT, + description TEXT, + execution_details JSONB, + error_message TEXT +); + +CREATE INDEX IF NOT EXISTS idx_mh_version ON migration_history(version); +CREATE INDEX IF NOT EXISTS idx_mh_started_at ON migration_history(started_at DESC); + +COMMENT ON TABLE migration_history IS 'Audit trail for all database migrations (SQL and Python)'; + +CREATE OR REPLACE VIEW migration_status AS +SELECT DISTINCT ON (version) + version, name, migration_type, operation, status, applied_by, started_at, duration_ms +FROM migration_history +WHERE status = 'success' +ORDER BY version, started_at DESC; + -- ============================================================================= -- COMPLETION MESSAGE -- ============================================================================= diff --git a/scripts/migrations/004_create_news_features_rollback.sql b/scripts/migrations/004_create_news_features_rollback.sql new file mode 100644 index 0000000..0d991ea --- /dev/null +++ b/scripts/migrations/004_create_news_features_rollback.sql @@ -0,0 +1,8 @@ +-- 004_create_news_features_rollback.sql +-- Rollback: remove news_features table and related objects + +DROP TRIGGER IF EXISTS trg_news_features_updated_at ON news_features; +DROP FUNCTION IF EXISTS update_news_features_updated_at(); +DROP INDEX IF EXISTS idx_news_features_gin; +DROP INDEX IF EXISTS idx_news_features_updated_at; +DROP TABLE IF EXISTS news_features; diff --git a/scripts/migrations/005_alter_unique_id_varchar_rollback.sql b/scripts/migrations/005_alter_unique_id_varchar_rollback.sql new file mode 100644 index 0000000..efc346e --- /dev/null +++ b/scripts/migrations/005_alter_unique_id_varchar_rollback.sql @@ -0,0 +1,42 @@ +-- 005_alter_unique_id_varchar_rollback.sql +-- Rollback: revert unique_id to VARCHAR(32) and remove legacy_unique_id +-- +-- WARNING: This rollback will FAIL if any unique_id exceeds 32 chars. +-- Run 006 rollback first to restore MD5 IDs before running this. + +BEGIN; + +-- Step 1: Drop view that depends on news.unique_id +DROP VIEW IF EXISTS news_with_themes; + +-- Step 2: Narrow unique_id back to VARCHAR(32) +ALTER TABLE news_features ALTER COLUMN unique_id TYPE VARCHAR(32); +ALTER TABLE news ALTER COLUMN unique_id TYPE VARCHAR(32); + +-- Step 3: Recreate the view +CREATE VIEW news_with_themes AS +SELECT + n.id, + n.unique_id, + n.title, + n.url, + n.agency_name, + n.published_at, + n.summary, + t1.label as theme_l1, + t2.label as theme_l2, + t3.label as theme_l3, + COALESCE(t3.label, t2.label, t1.label) as most_specific_theme +FROM news n +LEFT JOIN themes t1 ON n.theme_l1_id = t1.id +LEFT JOIN themes t2 ON n.theme_l2_id = t2.id +LEFT JOIN themes t3 ON n.theme_l3_id = t3.id; + +-- Step 4: Remove legacy column and index +DROP INDEX IF EXISTS idx_news_legacy_unique_id; +ALTER TABLE news DROP COLUMN IF EXISTS legacy_unique_id; + +-- Step 5: Revert schema_version +DELETE FROM schema_version WHERE version = '1.3'; + +COMMIT; From c82b2a82b303118ecdb4c19c8bc5cd34ee1c06f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20Rodrigues=20Mendon=C3=A7a?= Date: Fri, 20 Mar 2026 15:19:29 -0300 Subject: [PATCH 4/6] feat: generalizar workflow db-migrate.yaml para qualquer migration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Substitui inputs hardcoded (migration file, data_migration, batch_size) por interface generica baseada no runner: Inputs: - command: status, migrate, rollback, history, validate - dry_run: preview sem alterar banco (default: true) - target_version: versao alvo para migrate --target ou rollback - confirm: obrigatorio para operacoes destrutivas com dry_run=false Jobs: - validate-inputs: verifica confirmacao para operacoes destrutivas - backup: cria backup automatico antes de migrate/rollback - run-migration: executa scripts/migrate.py com o comando escolhido - show-status: exibe status e historico apos execucao Melhorias: - Cloud SQL Proxy version extraida para env (facilita atualizacao) - Usa vars.GCP_WORKLOAD_IDENTITY_PROVIDER (variavel de org) - environment: production (proteção via required reviewer) Ref: #109 Co-Authored-By: Claude Opus 4.6 --- .github/workflows/db-migrate.yaml | 274 ++++++++++++------------------ 1 file changed, 113 insertions(+), 161 deletions(-) diff --git a/.github/workflows/db-migrate.yaml b/.github/workflows/db-migrate.yaml index d0c09ea..f1028fa 100644 --- a/.github/workflows/db-migrate.yaml +++ b/.github/workflows/db-migrate.yaml @@ -3,29 +3,30 @@ name: Database Migration on: workflow_dispatch: inputs: - migration: - description: 'SQL migration file to execute (relative to scripts/migrations/)' + command: + description: 'Migration command to execute' required: true - type: string - data_migration: - description: 'Run Python data migration script after SQL? (scripts/migrate_unique_ids.py)' - required: false - type: boolean - default: false - data_migration_mode: - description: 'Data migration mode (only used if data_migration is true)' - required: false type: choice options: - - dry-run + - status - migrate - rollback - default: dry-run - batch_size: - description: 'Batch size for data migration (default: 1000)' + - history + - validate + dry_run: + description: 'Preview changes without applying (default: true)' + required: false + type: boolean + default: true + target_version: + description: 'Target version for migrate (--target) or rollback (VERSION)' + required: false + type: string + confirm: + description: 'Required for migrate/rollback with dry_run=false' required: false - type: number - default: 1000 + type: boolean + default: false permissions: contents: read @@ -34,53 +35,53 @@ permissions: env: PROJECT_ID: inspire-7-finep CLOUD_SQL_INSTANCE: inspire-7-finep:southamerica-east1:destaquesgovbr-postgres - DATABASE_NAME: govbrnews + CLOUD_SQL_PROXY_VERSION: v2.14.3 jobs: - validate: - name: Validate Migration + validate-inputs: + name: Validate Inputs runs-on: ubuntu-latest - outputs: - migration_file: ${{ steps.check.outputs.migration_file }} steps: - - uses: actions/checkout@v4 - - - name: Validate migration file exists - id: check + - name: Check confirmation for destructive operations run: | - MIGRATION_FILE="scripts/migrations/${{ inputs.migration }}" - if [ ! -f "$MIGRATION_FILE" ]; then - echo "::error::Migration file not found: $MIGRATION_FILE" - echo "Available migrations:" - ls -1 scripts/migrations/*.sql 2>/dev/null || echo " (none)" - exit 1 + COMMAND="${{ inputs.command }}" + DRY_RUN="${{ inputs.dry_run }}" + CONFIRM="${{ inputs.confirm }}" + + echo "## Migration Request" >> $GITHUB_STEP_SUMMARY + echo "- **Command:** $COMMAND" >> $GITHUB_STEP_SUMMARY + echo "- **Dry run:** $DRY_RUN" >> $GITHUB_STEP_SUMMARY + echo "- **Target version:** ${{ inputs.target_version || 'N/A' }}" >> $GITHUB_STEP_SUMMARY + echo "- **Confirm:** $CONFIRM" >> $GITHUB_STEP_SUMMARY + echo "- **Actor:** ${{ github.actor }}" >> $GITHUB_STEP_SUMMARY + + if [[ "$COMMAND" == "migrate" || "$COMMAND" == "rollback" ]]; then + if [[ "$DRY_RUN" == "false" && "$CONFIRM" != "true" ]]; then + echo "::error::Destructive operation requires confirm=true when dry_run=false" + exit 1 + fi fi - echo "migration_file=$MIGRATION_FILE" >> $GITHUB_OUTPUT - echo "## Migration to execute" >> $GITHUB_STEP_SUMMARY - echo "- **File:** \`$MIGRATION_FILE\`" >> $GITHUB_STEP_SUMMARY - echo "- **Data migration:** ${{ inputs.data_migration }}" >> $GITHUB_STEP_SUMMARY - if [ "${{ inputs.data_migration }}" = "true" ]; then - echo "- **Mode:** ${{ inputs.data_migration_mode }}" >> $GITHUB_STEP_SUMMARY - echo "- **Batch size:** ${{ inputs.batch_size }}" >> $GITHUB_STEP_SUMMARY + + if [[ "$COMMAND" == "rollback" && -z "${{ inputs.target_version }}" ]]; then + echo "::error::rollback requires target_version" + exit 1 fi - echo "" >> $GITHUB_STEP_SUMMARY - echo "### SQL Content" >> $GITHUB_STEP_SUMMARY - echo '```sql' >> $GITHUB_STEP_SUMMARY - cat "$MIGRATION_FILE" >> $GITHUB_STEP_SUMMARY - echo '```' >> $GITHUB_STEP_SUMMARY backup: name: Create Backup - needs: validate + needs: validate-inputs + if: >- + inputs.command == 'migrate' || inputs.command == 'rollback' runs-on: ubuntu-latest + environment: production permissions: contents: read id-token: write steps: - uses: google-github-actions/auth@v2 with: - workload_identity_provider: projects/990583792367/locations/global/workloadIdentityPools/github-pool/providers/github-provider - service_account: github-actions@inspire-7-finep.iam.gserviceaccount.com + workload_identity_provider: ${{ vars.GCP_WORKLOAD_IDENTITY_PROVIDER }} + service_account: ${{ vars.GCP_SERVICE_ACCOUNT }} - uses: google-github-actions/setup-gcloud@v2 with: @@ -88,8 +89,7 @@ jobs: - name: Create on-demand backup run: | - DESCRIPTION="pre-migration-${{ inputs.migration }}-$(date +%Y%m%d-%H%M%S)" - # Truncate description to 100 chars (Cloud SQL limit) + DESCRIPTION="pre-${{ inputs.command }}-$(date +%Y%m%d-%H%M%S)" DESCRIPTION="${DESCRIPTION:0:100}" echo "Creating backup: $DESCRIPTION" gcloud sql backups create \ @@ -110,10 +110,12 @@ jobs: echo "$BACKUP" >> $GITHUB_STEP_SUMMARY echo '```' >> $GITHUB_STEP_SUMMARY - run-sql-migration: - name: Run SQL Migration - needs: [validate, backup] + run-migration: + name: Run Migration + needs: [validate-inputs, backup] + if: always() && needs.validate-inputs.result == 'success' && (needs.backup.result == 'success' || needs.backup.result == 'skipped') runs-on: ubuntu-latest + environment: production permissions: contents: read id-token: write @@ -122,8 +124,8 @@ jobs: - uses: google-github-actions/auth@v2 with: - workload_identity_provider: projects/990583792367/locations/global/workloadIdentityPools/github-pool/providers/github-provider - service_account: github-actions@inspire-7-finep.iam.gserviceaccount.com + workload_identity_provider: ${{ vars.GCP_WORKLOAD_IDENTITY_PROVIDER }} + service_account: ${{ vars.GCP_SERVICE_ACCOUNT }} - uses: google-github-actions/setup-gcloud@v2 with: @@ -131,10 +133,23 @@ jobs: - name: Install Cloud SQL Proxy run: | - curl -o cloud-sql-proxy https://storage.googleapis.com/cloud-sql-connectors/cloud-sql-proxy/v2.14.3/cloud-sql-proxy.linux.amd64 + curl -o cloud-sql-proxy https://storage.googleapis.com/cloud-sql-connectors/cloud-sql-proxy/${{ env.CLOUD_SQL_PROXY_VERSION }}/cloud-sql-proxy.linux.amd64 chmod +x cloud-sql-proxy sudo mv cloud-sql-proxy /usr/local/bin/ + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.12' + + - name: Install Poetry + run: | + curl -sSL https://install.python-poetry.org | python3 - + echo "$HOME/.local/bin" >> $GITHUB_PATH + + - name: Install dependencies + run: poetry install --no-interaction --only main + - name: Fetch database credentials id: db run: | @@ -146,55 +161,55 @@ jobs: run: | cloud-sql-proxy ${{ env.CLOUD_SQL_INSTANCE }} --port=5432 & sleep 3 - # Verify proxy is running if ! nc -z 127.0.0.1 5432; then echo "::error::Cloud SQL Proxy failed to start" exit 1 fi echo "Cloud SQL Proxy started successfully" - - name: Run SQL migration + - name: Execute migration command env: DATABASE_URL: ${{ steps.db.outputs.url }} run: | - # Replace host in connection string with localhost (proxy) LOCAL_URL=$(echo "$DATABASE_URL" | sed -E 's|@[^:]+:|@127.0.0.1:|') echo "::add-mask::$LOCAL_URL" - echo "Executing: ${{ needs.validate.outputs.migration_file }}" - psql "$LOCAL_URL" -f "${{ needs.validate.outputs.migration_file }}" -v ON_ERROR_STOP=1 - echo "SQL migration completed successfully" + COMMAND="${{ inputs.command }}" + DRY_RUN="${{ inputs.dry_run }}" + TARGET="${{ inputs.target_version }}" - - name: Validate schema after migration - env: - DATABASE_URL: ${{ steps.db.outputs.url }} - run: | - LOCAL_URL=$(echo "$DATABASE_URL" | sed -E 's|@[^:]+:|@127.0.0.1:|') - echo "::add-mask::$LOCAL_URL" + CMD="poetry run python scripts/migrate.py $COMMAND" - echo "## Schema Validation" >> $GITHUB_STEP_SUMMARY + case "$COMMAND" in + status|history|validate) + CMD="$CMD --db-url \"$LOCAL_URL\"" + ;; + migrate) + CMD="$CMD --db-url \"$LOCAL_URL\" --yes" + if [ "$DRY_RUN" = "true" ]; then + CMD="$CMD --dry-run" + fi + if [ -n "$TARGET" ]; then + CMD="$CMD --target $TARGET" + fi + ;; + rollback) + CMD="poetry run python scripts/migrate.py rollback $TARGET --db-url \"$LOCAL_URL\" --yes" + if [ "$DRY_RUN" = "true" ]; then + CMD="$CMD --dry-run" + fi + ;; + esac - # Check schema version - VERSION=$(psql "$LOCAL_URL" -t -c "SELECT version FROM schema_version ORDER BY applied_at DESC LIMIT 1;" 2>/dev/null | xargs || echo "N/A") - echo "- **Schema version:** $VERSION" >> $GITHUB_STEP_SUMMARY + echo "Executing: $COMMAND (dry_run=$DRY_RUN, target=$TARGET)" + eval $CMD - # Check unique_id column width on news table - COLS=$(psql "$LOCAL_URL" -t -c " - SELECT column_name || ': VARCHAR(' || character_maximum_length || ')' - FROM information_schema.columns - WHERE table_name = 'news' - AND column_name IN ('unique_id', 'legacy_unique_id') - ORDER BY column_name;" 2>/dev/null || echo "Could not query columns") - echo "- **Column sizes:**" >> $GITHUB_STEP_SUMMARY - echo '```' >> $GITHUB_STEP_SUMMARY - echo "$COLS" >> $GITHUB_STEP_SUMMARY - echo '```' >> $GITHUB_STEP_SUMMARY - - run-data-migration: - name: Run Data Migration - needs: [validate, run-sql-migration] - if: inputs.data_migration + show-status: + name: Show Status + needs: [validate-inputs, run-migration] + if: always() && needs.run-migration.result != 'skipped' runs-on: ubuntu-latest + environment: production permissions: contents: read id-token: write @@ -203,8 +218,8 @@ jobs: - uses: google-github-actions/auth@v2 with: - workload_identity_provider: projects/990583792367/locations/global/workloadIdentityPools/github-pool/providers/github-provider - service_account: github-actions@inspire-7-finep.iam.gserviceaccount.com + workload_identity_provider: ${{ vars.GCP_WORKLOAD_IDENTITY_PROVIDER }} + service_account: ${{ vars.GCP_SERVICE_ACCOUNT }} - uses: google-github-actions/setup-gcloud@v2 with: @@ -212,7 +227,7 @@ jobs: - name: Install Cloud SQL Proxy run: | - curl -o cloud-sql-proxy https://storage.googleapis.com/cloud-sql-connectors/cloud-sql-proxy/v2.14.3/cloud-sql-proxy.linux.amd64 + curl -o cloud-sql-proxy https://storage.googleapis.com/cloud-sql-connectors/cloud-sql-proxy/${{ env.CLOUD_SQL_PROXY_VERSION }}/cloud-sql-proxy.linux.amd64 chmod +x cloud-sql-proxy sudo mv cloud-sql-proxy /usr/local/bin/ @@ -240,84 +255,21 @@ jobs: run: | cloud-sql-proxy ${{ env.CLOUD_SQL_INSTANCE }} --port=5432 & sleep 3 - if ! nc -z 127.0.0.1 5432; then - echo "::error::Cloud SQL Proxy failed to start" - exit 1 - fi - - - name: Run data migration - env: - DATABASE_URL: ${{ steps.db.outputs.url }} - run: | - LOCAL_URL=$(echo "$DATABASE_URL" | sed -E 's|@[^:]+:|@127.0.0.1:|') - echo "::add-mask::$LOCAL_URL" - MODE="${{ inputs.data_migration_mode }}" - BATCH="${{ inputs.batch_size }}" - - echo "Running data migration: mode=$MODE batch_size=$BATCH" - - case "$MODE" in - dry-run) - poetry run python scripts/migrate_unique_ids.py \ - --db-url "$LOCAL_URL" \ - --dry-run \ - --output migration_dry_run.csv - echo "## Dry Run Results" >> $GITHUB_STEP_SUMMARY - echo "Records to migrate: $(wc -l < migration_dry_run.csv)" >> $GITHUB_STEP_SUMMARY - echo "" >> $GITHUB_STEP_SUMMARY - echo "### Sample (first 20 rows)" >> $GITHUB_STEP_SUMMARY - echo '```' >> $GITHUB_STEP_SUMMARY - head -21 migration_dry_run.csv >> $GITHUB_STEP_SUMMARY - echo '```' >> $GITHUB_STEP_SUMMARY - ;; - migrate) - poetry run python scripts/migrate_unique_ids.py \ - --db-url "$LOCAL_URL" \ - --batch-size "$BATCH" - ;; - rollback) - poetry run python scripts/migrate_unique_ids.py \ - --db-url "$LOCAL_URL" \ - --rollback \ - --batch-size "$BATCH" - ;; - esac - - echo "Data migration ($MODE) completed successfully" - - - name: Upload dry-run CSV - if: inputs.data_migration_mode == 'dry-run' - uses: actions/upload-artifact@v4 - with: - name: migration-dry-run - path: migration_dry_run.csv - retention-days: 7 - - - name: Validate data after migration - if: inputs.data_migration_mode == 'migrate' + - name: Show migration status env: DATABASE_URL: ${{ steps.db.outputs.url }} run: | LOCAL_URL=$(echo "$DATABASE_URL" | sed -E 's|@[^:]+:|@127.0.0.1:|') echo "::add-mask::$LOCAL_URL" - echo "## Data Migration Results" >> $GITHUB_STEP_SUMMARY - - STATS=$(psql "$LOCAL_URL" -t -c " - SELECT - COUNT(*) AS total, - COUNT(legacy_unique_id) AS with_legacy, - COUNT(*) FILTER (WHERE unique_id ~ '_[a-f0-9]{6}$') AS new_format - FROM news;") + echo "## Migration Status" >> $GITHUB_STEP_SUMMARY echo '```' >> $GITHUB_STEP_SUMMARY - echo "total | with_legacy | new_format" >> $GITHUB_STEP_SUMMARY - echo "$STATS" >> $GITHUB_STEP_SUMMARY + poetry run python scripts/migrate.py status --db-url "$LOCAL_URL" >> $GITHUB_STEP_SUMMARY 2>&1 || true echo '```' >> $GITHUB_STEP_SUMMARY - # Check for orphaned FKs - ORPHANS=$(psql "$LOCAL_URL" -t -c " - SELECT COUNT(*) FROM news_features nf - LEFT JOIN news n ON nf.unique_id = n.unique_id - WHERE n.unique_id IS NULL;") - echo "- **Orphaned FK in news_features:** $ORPHANS" >> $GITHUB_STEP_SUMMARY + echo "" >> $GITHUB_STEP_SUMMARY + echo "## Recent History" >> $GITHUB_STEP_SUMMARY + echo '```' >> $GITHUB_STEP_SUMMARY + poetry run python scripts/migrate.py history --db-url "$LOCAL_URL" --limit 10 >> $GITHUB_STEP_SUMMARY 2>&1 || true + echo '```' >> $GITHUB_STEP_SUMMARY From 844432c86ff950dd51978dc722656e4a2628ad6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20Rodrigues=20Mendon=C3=A7a?= Date: Fri, 20 Mar 2026 15:19:47 -0300 Subject: [PATCH 5/6] docs: reescrever documentacao de migrations e criar runbook de rollback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit docs/database/migrations.md: reescrito com documentacao completa do novo runner — CLI, discovery, Python migration interface, tabela migration_history, CI/CD workflow. docs/runbooks/migration-rollback.md: novo runbook operacional com 4 cenarios de rollback: - A: Python migration (mais simples) - B: SQL migration com rollback file - C: SQL migration sem rollback file (manual) - D: Rollback multiplo em ordem reversa - Emergencia: restauracao de backup scripts/migrations/README.md: atualizado com referencia ao runner. Ref: #109 Co-Authored-By: Claude Opus 4.6 --- docs/database/migrations.md | 378 +++++++--------------------- docs/runbooks/migration-rollback.md | 129 ++++++++++ scripts/migrations/README.md | 80 +++--- 3 files changed, 258 insertions(+), 329 deletions(-) create mode 100644 docs/runbooks/migration-rollback.md diff --git a/docs/database/migrations.md b/docs/database/migrations.md index 2027a8a..782eae9 100644 --- a/docs/database/migrations.md +++ b/docs/database/migrations.md @@ -1,347 +1,163 @@ -# Database Setup and Migrations +# Database Migrations -Quick guide for setting up and managing the PostgreSQL database. +Generic migration system for the destaquesgovbr data-platform. Supports SQL and Python migrations with audit history, dry-run, and rollback. --- -## Initial Setup - -### Prerequisites +## Quick Reference ```bash -# Install Cloud SQL Proxy -brew install cloud-sql-proxy # macOS -# or download from: https://cloud.google.com/sql/docs/postgres/connect-instance-auth-proxy - -# Install PostgreSQL client -brew install postgresql@15 # macOS -``` +# Show migration status +python scripts/migrate.py status -### 1. Create Database Schema +# Preview pending migrations (no changes) +python scripts/migrate.py migrate --dry-run -Run the automated setup script: +# Apply pending migrations +python scripts/migrate.py migrate --yes -```bash -cd /path/to/data-platform -./scripts/setup_database.sh -``` - -This script will: -1. Check prerequisites (cloud-sql-proxy, psql) -2. Fetch credentials from Secret Manager -3. Start Cloud SQL Proxy -4. Connect to database -5. Create schema (tables, indexes, triggers, views) -6. Validate structure - -**Expected output**: -``` -✅ Cloud SQL Proxy running on port 5432 -✅ Connected to PostgreSQL 15.15 -✅ Creating schema... -NOTICE: Schema creation completed successfully: - Tables: 5 - Indexes: 22 - Triggers: 4 -✅ Schema created successfully -``` +# Apply up to a specific version +python scripts/migrate.py migrate --target 005 --yes -### 2. Verify Schema +# Rollback a specific migration (preview) +python scripts/migrate.py rollback 006 --dry-run -Connect to the database: +# Rollback a specific migration +python scripts/migrate.py rollback 006 --yes -```bash -# Get password -PASSWORD=$(gcloud secrets versions access latest --secret="govbrnews-postgres-password") +# Show migration history +python scripts/migrate.py history -# Connect via Cloud SQL Proxy -cloud-sql-proxy inspire-7-finep:southamerica-east1:destaquesgovbr-postgres & -psql "host=127.0.0.1 dbname=govbrnews user=govbrnews_app password=$PASSWORD" +# Validate migration files +python scripts/migrate.py validate ``` -Check tables: - -```sql --- List all tables -\dt - --- Check table structure -\d news -\d agencies -\d themes - --- Count records -SELECT - (SELECT COUNT(*) FROM agencies) as agencies, - (SELECT COUNT(*) FROM themes) as themes, - (SELECT COUNT(*) FROM news) as news, - (SELECT COUNT(*) FROM sync_log) as sync_log; -``` +All commands require `DATABASE_URL` environment variable or `--db-url` flag. --- -## Populate Master Data +## How It Works -### 1. Agencies +### Runner: `scripts/migrate.py` -Populate the `agencies` table from the agencies YAML file: +Single entry point for all migration operations. On first run, it bootstraps the `migration_history` table and imports existing entries from `schema_version`. -```bash -python scripts/populate_agencies.py -``` +### Discovery -**Expected**: ~158 agency records +The runner discovers migration files in `scripts/migrations/` using naming conventions: -### 2. Themes +| Type | Pattern | Example | +|------|---------|---------| +| SQL migration | `NNN_description.sql` | `005_alter_unique_id_varchar.sql` | +| SQL rollback | `NNN_description_rollback.sql` | `005_alter_unique_id_varchar_rollback.sql` | +| Python migration | `NNN_description.py` | `006_migrate_unique_ids.py` | -Populate the `themes` table from the themes taxonomy file: +Rollback files are automatically associated with their migration by version number. -```bash -python scripts/populate_themes.py -``` +### Execution -**Expected**: ~150-200 theme records (hierarchical: L1 → L2 → L3) +1. Discovers pending migrations (not yet in `migration_status` view) +2. Executes each in version order +3. Records result in `migration_history` within the same transaction (atomic commit) +4. On failure: rolls back transaction, records `status=failed` separately, stops immediately -### 3. Verify +### Python Migration Interface -```sql --- Check agencies -SELECT COUNT(*), COUNT(DISTINCT key) FROM agencies; +Each `.py` migration file must expose: --- Check themes by level -SELECT level, COUNT(*) FROM themes GROUP BY level ORDER BY level; +```python +def describe() -> str: + """Human description for logs and audit.""" --- Sample data -SELECT * FROM agencies LIMIT 5; -SELECT code, label, level FROM themes WHERE level = 1 ORDER BY code; -``` - ---- - -## Schema Migrations - -### Current Schema Version +def migrate(conn, dry_run: bool = False) -> dict: + """Execute the migration. Returns metrics dict.""" -```sql -SELECT * FROM schema_version; -``` - -Expected output: -``` - version | applied_at | description ----------+----------------------------+------------------------------------------- - 1.0 | 2024-12-24 14:XX:XX+00 | Initial schema for GovBRNews data platform +def rollback(conn, dry_run: bool = False) -> dict: + """Revert the migration. Raises NotImplementedError if not possible.""" ``` -### Future Migrations - -When creating schema changes: - -1. **Create migration SQL file**: - ```bash - scripts/migrations/v1.1_add_column.sql - ``` - -2. **Apply migration**: - ```bash - psql -h 127.0.0.1 -U govbrnews_app -d govbrnews -f scripts/migrations/v1.1_add_column.sql - ``` - -3. **Update schema_version**: - ```sql - INSERT INTO schema_version (version, description) - VALUES ('1.1', 'Add new column to news table'); - ``` - -4. **Document in PROGRESS.md** +The runner manages the connection and transaction. The migration must not call `conn.commit()` or `conn.rollback()`. --- -## Common Operations - -### Backup Database +## Migration History -```bash -# Export to Cloud Storage -gcloud sql export sql destaquesgovbr-postgres \ - gs://destaquesgovbr-backups/manual-backup-$(date +%Y%m%d).sql \ - --database=govbrnews -``` +### Table: `migration_history` -### Import Data +| Column | Type | Description | +|--------|------|-------------| +| version | VARCHAR(10) | Migration version (e.g. "005") | +| name | VARCHAR(255) | Migration name | +| migration_type | VARCHAR(10) | `sql` or `python` | +| operation | VARCHAR(10) | `migrate`, `rollback`, or `dry_run` | +| status | VARCHAR(20) | `success`, `failed`, or `unavailable` | +| applied_by | TEXT | `$GITHUB_ACTOR` or `$USER` | +| run_id | TEXT | `$GITHUB_RUN_ID` (CI/CD only) | +| duration_ms | INTEGER | Execution time in milliseconds | +| execution_details | JSONB | Metrics returned by Python migrations | +| error_message | TEXT | Error details on failure | -```bash -# Import from Cloud Storage -gcloud sql import sql destaquesgovbr-postgres \ - gs://destaquesgovbr-backups/backup.sql \ - --database=govbrnews -``` +### View: `migration_status` -### Truncate Tables (Development Only) +Shows the current state of each migration (latest successful operation per version). ```sql --- ⚠️ DANGER: Delete all data (use only in development) -TRUNCATE TABLE news RESTART IDENTITY CASCADE; -TRUNCATE TABLE sync_log RESTART IDENTITY CASCADE; - --- Keep master data, only clear news -TRUNCATE TABLE news RESTART IDENTITY; -``` - -### Reset Database (Development Only) - -```bash -# Drop and recreate schema -psql -h 127.0.0.1 -U govbrnews_app -d govbrnews < 1000 -- > 1 second -ORDER BY mean_exec_time DESC -LIMIT 10; -``` - -### Index Usage - -```sql --- Check index usage -SELECT - schemaname, - tablename, - indexname, - idx_scan as index_scans, - idx_tup_read as tuples_read, - idx_tup_fetch as tuples_fetched -FROM pg_stat_user_indexes -WHERE schemaname = 'public' -ORDER BY idx_scan DESC; - --- Find unused indexes -SELECT - schemaname, - tablename, - indexname -FROM pg_stat_user_indexes -WHERE idx_scan = 0 - AND indexname NOT LIKE '%_pkey' -ORDER BY tablename, indexname; -``` - -### Table Sizes - -```sql -SELECT - tablename, - pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size -FROM pg_tables -WHERE schemaname = 'public' -ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC; -``` +| Version | File | Type | Description | +|---------|------|------|-------------| +| 001 | `001_add_pgvector_extension.sql` | SQL | Enable pgvector for vector search | +| 002 | `002_add_embedding_column.sql` | SQL | Add 768-dim embedding columns | +| 003 | `003_create_embedding_index.sql` | SQL | HNSW indexes for fast similarity | +| 004 | `004_create_news_features.sql` | SQL | JSONB feature store table | +| 005 | `005_alter_unique_id_varchar.sql` | SQL | Widen unique_id to VARCHAR(120) | +| 006 | `006_migrate_unique_ids.py` | Python | Migrate ~300k unique_ids to readable slugs | --- See also: - [Database Schema](./schema.md) -- [Cloud SQL Documentation](../../infra/docs/cloud-sql.md) -- [Migration Plan](_plan/README.md) +- [Migration Rollback Runbook](../runbooks/migration-rollback.md) diff --git a/docs/runbooks/migration-rollback.md b/docs/runbooks/migration-rollback.md new file mode 100644 index 0000000..5313076 --- /dev/null +++ b/docs/runbooks/migration-rollback.md @@ -0,0 +1,129 @@ +# Runbook: Migration Rollback + +Procedures for rolling back database migrations in the destaquesgovbr data-platform. + +--- + +## Prerequisites + +- Access to `DATABASE_URL` (via Secret Manager or `.env.local`) +- Cloud SQL Proxy running (for production) or Docker PostgreSQL (for local) +- Python environment with dependencies installed (`poetry install`) + +--- + +## Scenario A: Rollback a Python Migration (e.g., 006) + +Python migrations that modify data (not schema) are the simplest to rollback. + +```bash +# 1. Preview the rollback +python scripts/migrate.py rollback 006 --dry-run + +# 2. Execute the rollback +python scripts/migrate.py rollback 006 --yes + +# 3. Verify +python scripts/migrate.py status +python scripts/migrate.py history +``` + +**Via CI/CD:** +1. Go to Actions > Database Migration +2. Set: command=`rollback`, target_version=`006`, dry_run=`true` +3. Review the output +4. Re-run with dry_run=`false`, confirm=`true` + +--- + +## Scenario B: Rollback a SQL Migration with Rollback File + +SQL migrations with a corresponding `_rollback.sql` file. + +```bash +# 1. Preview +python scripts/migrate.py rollback 005 --dry-run + +# 2. Execute +python scripts/migrate.py rollback 005 --yes + +# 3. Verify +python scripts/migrate.py status +``` + +**Important:** Rollback 005 (unique_id VARCHAR) requires that 006 (data migration) is rolled back first. Always rollback in reverse order. + +--- + +## Scenario C: Rollback a SQL Migration WITHOUT Rollback File + +If no `_rollback.sql` exists, the runner will error. You must: + +1. Write the rollback SQL manually +2. Save as `NNN_description_rollback.sql` in `scripts/migrations/` +3. Run `python scripts/migrate.py rollback NNN --yes` + +Or execute the rollback SQL directly: + +```bash +psql "$DATABASE_URL" -f path/to/manual_rollback.sql +``` + +Then record it in history: + +```sql +INSERT INTO migration_history (version, name, migration_type, operation, status, applied_by, description) +VALUES ('NNN', 'description', 'sql', 'rollback', 'success', 'manual', 'Manual rollback'); +``` + +--- + +## Scenario D: Full Rollback to a Previous State + +To rollback multiple migrations in reverse order: + +```bash +# Rollback 006, then 005, then 004 (reverse order) +python scripts/migrate.py rollback 006 --yes +python scripts/migrate.py rollback 005 --yes +python scripts/migrate.py rollback 004 --yes + +# Verify final state +python scripts/migrate.py status +``` + +--- + +## Emergency: Restore from Backup + +If rollback is not possible or data corruption occurred: + +```bash +# 1. List available backups +gcloud sql backups list --instance=destaquesgovbr-postgres --limit=5 + +# 2. Restore from a specific backup +gcloud sql backups restore BACKUP_ID --restore-instance=destaquesgovbr-postgres + +# 3. Verify the restoration +python scripts/migrate.py status +``` + +**Warning:** Backup restore replaces ALL data in the instance. All databases (govbrnews, umami, keycloak, federation) will be affected. + +--- + +## Post-Rollback Checklist + +- [ ] Verify `python scripts/migrate.py status` shows expected state +- [ ] Check `migration_history` for rollback record +- [ ] Test application connectivity (DAGs, portal, scraper) +- [ ] If unique_ids changed: trigger Typesense full-sync +- [ ] If unique_ids changed: verify HuggingFace sync +- [ ] Notify team in appropriate channel + +--- + +See also: +- [Migration System Documentation](../database/migrations.md) +- [Composer Recovery Runbook](./composer-recovery.md) diff --git a/scripts/migrations/README.md b/scripts/migrations/README.md index 2ad1bfc..3730c3f 100644 --- a/scripts/migrations/README.md +++ b/scripts/migrations/README.md @@ -1,64 +1,48 @@ -# Database Migrations - Phase 4.7 +# Database Migrations -Migrations para adicionar suporte a embeddings semânticos usando pgvector. +Migrations for the destaquesgovbr data-platform PostgreSQL database. -## Ordem de Execução +## Usage -Execute as migrations nesta ordem: +Use the generic migration runner: ```bash -# 1. Habilitar pgvector extension -psql $DATABASE_URL -f 001_add_pgvector_extension.sql +# Show status of all migrations +python scripts/migrate.py status -# 2. Adicionar colunas de embedding -psql $DATABASE_URL -f 002_add_embedding_column.sql +# Apply pending migrations (dry-run) +python scripts/migrate.py migrate --dry-run -# 3. Criar índices HNSW -psql $DATABASE_URL -f 003_create_embedding_index.sql -``` - -## Validação - -Após executar as migrations, valide: +# Apply pending migrations +python scripts/migrate.py migrate --yes -```sql --- Verificar pgvector habilitado -SELECT * FROM pg_extension WHERE extname = 'vector'; +# Rollback a specific migration +python scripts/migrate.py rollback 006 --yes --- Verificar colunas criadas -SELECT column_name, data_type -FROM information_schema.columns -WHERE table_name = 'news' - AND column_name LIKE '%embedding%'; +# Show history +python scripts/migrate.py history --- Verificar índices criados -SELECT indexname -FROM pg_indexes -WHERE tablename = 'news' - AND indexname LIKE '%embedding%'; +# Validate consistency +python scripts/migrate.py validate ``` -## Rollback +## Naming Convention -Para reverter as migrations (em ordem inversa): +| Type | Pattern | Example | +|------|---------|---------| +| SQL migration | `NNN_description.sql` | `005_alter_unique_id_varchar.sql` | +| SQL rollback | `NNN_description_rollback.sql` | `005_alter_unique_id_varchar_rollback.sql` | +| Python migration | `NNN_description.py` | `006_migrate_unique_ids.py` | -```sql --- 3. Remover índices -DROP INDEX IF EXISTS idx_news_content_embedding_hnsw; -DROP INDEX IF EXISTS idx_news_embedding_status; -DROP INDEX IF EXISTS idx_news_embedding_updated; -DROP INDEX IF EXISTS idx_news_published_at_2025; - --- 2. Remover colunas -ALTER TABLE news DROP COLUMN IF EXISTS content_embedding; -ALTER TABLE news DROP COLUMN IF EXISTS embedding_generated_at; - --- 1. Desabilitar pgvector (CUIDADO: pode afetar outras funcionalidades) --- DROP EXTENSION IF EXISTS vector CASCADE; -``` +## Current Migrations -## Estimativa de Storage +| # | File | Type | Description | +|---|------|------|-------------| +| 001 | `001_add_pgvector_extension.sql` | SQL | Enable pgvector | +| 002 | `002_add_embedding_column.sql` | SQL | Add embedding columns | +| 003 | `003_create_embedding_index.sql` | SQL | HNSW indexes | +| 004 | `004_create_news_features.sql` | SQL | Feature store table | +| 005 | `005_alter_unique_id_varchar.sql` | SQL | Widen unique_id to VARCHAR(120) | +| 006 | `006_migrate_unique_ids.py` | Python | Migrate unique_ids to readable slugs | -- **Embeddings** (~30k records de 2025): ~90 MB -- **HNSW index**: ~200 MB -- **Total adicional**: ~300 MB +See [docs/database/migrations.md](../../docs/database/migrations.md) for full documentation. From c5b28708326978bda1c468d96d9dba5e14eb4da2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20Rodrigues=20Mendon=C3=A7a?= Date: Fri, 20 Mar 2026 15:50:39 -0300 Subject: [PATCH 6/6] fix: aplicar correcoes da revisao de codigo (#116) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Correcoes baseadas no code review: - #4 (ALTO): Remover BEGIN/COMMIT explicito do rollback SQL 005 que quebrava o commit atomico do runner (COMMIT prematuro antes do registro em migration_history) - #1 (MEDIO): Remover server-side cursor desnecessario em 006_migrate_unique_ids.py — fetchall() carrega tudo em memoria de qualquer forma, o name= so adiciona confusao semantica - #9 (MEDIO): Sanitizar target_version com regex ^[0-9]{3}$ no workflow e substituir eval $CMD por execucao direta com bash array para prevenir command injection - #5 (BAIXO): Extrair _execute_with_history() em migrate.py para eliminar ~60% de duplicacao entre execute_migration() e execute_rollback(), reduzindo risco de divergencia futura Todos os 44 testes continuam passando. Co-Authored-By: Claude Opus 4.6 --- .github/workflows/db-migrate.yaml | 26 ++-- scripts/migrate.py | 133 +++++++++--------- .../005_alter_unique_id_varchar_rollback.sql | 7 +- scripts/migrations/006_migrate_unique_ids.py | 5 +- 4 files changed, 87 insertions(+), 84 deletions(-) diff --git a/.github/workflows/db-migrate.yaml b/.github/workflows/db-migrate.yaml index f1028fa..3581f08 100644 --- a/.github/workflows/db-migrate.yaml +++ b/.github/workflows/db-migrate.yaml @@ -42,6 +42,14 @@ jobs: name: Validate Inputs runs-on: ubuntu-latest steps: + - name: Validate target_version format + if: inputs.target_version != '' + run: | + if ! echo "${{ inputs.target_version }}" | grep -qE '^[0-9]{3}$'; then + echo "::error::target_version must be a 3-digit number (e.g. 006)" + exit 1 + fi + - name: Check confirmation for destructive operations run: | COMMAND="${{ inputs.command }}" @@ -178,31 +186,31 @@ jobs: DRY_RUN="${{ inputs.dry_run }}" TARGET="${{ inputs.target_version }}" - CMD="poetry run python scripts/migrate.py $COMMAND" + ARGS=() case "$COMMAND" in status|history|validate) - CMD="$CMD --db-url \"$LOCAL_URL\"" + ARGS+=("$COMMAND" --db-url "$LOCAL_URL") ;; migrate) - CMD="$CMD --db-url \"$LOCAL_URL\" --yes" + ARGS+=("migrate" --db-url "$LOCAL_URL" --yes) if [ "$DRY_RUN" = "true" ]; then - CMD="$CMD --dry-run" + ARGS+=(--dry-run) fi if [ -n "$TARGET" ]; then - CMD="$CMD --target $TARGET" + ARGS+=(--target "$TARGET") fi ;; rollback) - CMD="poetry run python scripts/migrate.py rollback $TARGET --db-url \"$LOCAL_URL\" --yes" + ARGS+=("rollback" "$TARGET" --db-url "$LOCAL_URL" --yes) if [ "$DRY_RUN" = "true" ]; then - CMD="$CMD --dry-run" + ARGS+=(--dry-run) fi ;; esac - echo "Executing: $COMMAND (dry_run=$DRY_RUN, target=$TARGET)" - eval $CMD + echo "Executing: ${ARGS[*]}" + poetry run python scripts/migrate.py "${ARGS[@]}" show-status: name: Show Status diff --git a/scripts/migrate.py b/scripts/migrate.py index e814d37..8e07b3c 100644 --- a/scripts/migrate.py +++ b/scripts/migrate.py @@ -270,69 +270,91 @@ def _load_python_module(path: Path): return module -def execute_migration( +def _execute_with_history( conn, migration: MigrationInfo, + operation: str, dry_run: bool, applied_by: str, run_id: str | None, + action_fn, ) -> None: - """Execute a single migration (SQL or Python) with atomic commit.""" - operation = "dry_run" if dry_run else "migrate" + """Execute an action (migrate or rollback) with atomic history recording. + + action_fn(conn) -> (description, execution_details) + """ + effective_op = "dry_run" if (dry_run and operation == "migrate") else operation started_at = time.time() - description = None - execution_details = None + verb = "Rolling back" if operation == "rollback" else "Executing" - logger.info(f"{'[DRY RUN] ' if dry_run else ''}Executing {migration.version}_{migration.name} ({migration.migration_type})") + logger.info( + f"{'[DRY RUN] ' if dry_run else ''}{verb} " + f"{migration.version}_{migration.name} ({migration.migration_type})" + ) try: - if migration.migration_type == "sql": - sql_content = migration.path.read_text() - cursor = conn.cursor() - try: - cursor.execute(sql_content) - finally: - cursor.close() - else: - # Python migration - module = _load_python_module(migration.path) - if not hasattr(module, "describe"): - raise AttributeError( - f"Python migration {migration.path.name} must define describe()" - ) - description = module.describe() - result = module.migrate(conn, dry_run=dry_run) - execution_details = result if isinstance(result, dict) else None + description, execution_details = action_fn(conn) if dry_run: _record_history( - conn, migration, operation, "success", started_at, + conn, migration, effective_op, "success", started_at, applied_by, run_id, description, execution_details, ) conn.rollback() logger.info(f"[DRY RUN] {migration.version} previewed (rolled back)") else: _record_history( - conn, migration, operation, "success", started_at, + conn, migration, effective_op, "success", started_at, applied_by, run_id, description, execution_details, ) conn.commit() - logger.info(f"{migration.version}_{migration.name} applied successfully") + logger.info(f"{migration.version}_{migration.name} {operation}d successfully") + except (FileNotFoundError, ValueError): + raise except Exception as e: conn.rollback() - # Record failure in a separate transaction try: _record_history( - conn, migration, operation, "failed", started_at, - applied_by, run_id, description, error_message=str(e), + conn, migration, effective_op, "failed", started_at, + applied_by, run_id, error_message=str(e), ) conn.commit() except Exception: - logger.warning("Could not record failure in migration_history") + logger.warning(f"Could not record {operation} failure in migration_history") raise +def execute_migration( + conn, + migration: MigrationInfo, + dry_run: bool, + applied_by: str, + run_id: str | None, +) -> None: + """Execute a single migration (SQL or Python) with atomic commit.""" + + def action(conn): + if migration.migration_type == "sql": + cursor = conn.cursor() + try: + cursor.execute(migration.path.read_text()) + finally: + cursor.close() + return None, None + else: + module = _load_python_module(migration.path) + if not hasattr(module, "describe"): + raise AttributeError( + f"Python migration {migration.path.name} must define describe()" + ) + description = module.describe() + result = module.migrate(conn, dry_run=dry_run) + return description, result if isinstance(result, dict) else None + + _execute_with_history(conn, migration, "migrate", dry_run, applied_by, run_id, action) + + # --------------------------------------------------------------------------- # Execute rollback # --------------------------------------------------------------------------- @@ -345,66 +367,41 @@ def execute_rollback( run_id: str | None, ) -> None: """Execute rollback for a single migration.""" - operation = "rollback" - started_at = time.time() - description = None - execution_details = None - - logger.info(f"{'[DRY RUN] ' if dry_run else ''}Rolling back {migration.version}_{migration.name}") - try: + def action(conn): if migration.migration_type == "sql": if not migration.rollback_path or not migration.rollback_path.exists(): raise FileNotFoundError( f"No rollback file for SQL migration {migration.version}_{migration.name}. " f"Expected: {migration.version}_{migration.name}_rollback.sql" ) - sql_content = migration.rollback_path.read_text() cursor = conn.cursor() try: - cursor.execute(sql_content) + cursor.execute(migration.rollback_path.read_text()) finally: cursor.close() + return None, None else: - # Python migration module = _load_python_module(migration.path) try: result = module.rollback(conn, dry_run=dry_run) - execution_details = result if isinstance(result, dict) else None + return None, result if isinstance(result, dict) else None except NotImplementedError as nie: _record_history( - conn, migration, operation, "unavailable", started_at, - applied_by, run_id, description, - error_message=str(nie), + conn, migration, "rollback", "unavailable", time.time(), + applied_by, run_id, error_message=str(nie), ) conn.commit() logger.warning(f"{migration.version} rollback unavailable: {nie}") - return + raise _RollbackUnavailable() - if dry_run: - conn.rollback() - logger.info(f"[DRY RUN] {migration.version} rollback previewed") - else: - _record_history( - conn, migration, operation, "success", started_at, - applied_by, run_id, description, execution_details, - ) - conn.commit() - logger.info(f"{migration.version}_{migration.name} rolled back successfully") + class _RollbackUnavailable(Exception): + pass - except (FileNotFoundError, ValueError): - raise - except Exception as e: - conn.rollback() - try: - _record_history( - conn, migration, operation, "failed", started_at, - applied_by, run_id, description, error_message=str(e), - ) - conn.commit() - except Exception: - logger.warning("Could not record rollback failure in migration_history") - raise + try: + _execute_with_history(conn, migration, "rollback", dry_run, applied_by, run_id, action) + except _RollbackUnavailable: + return # --------------------------------------------------------------------------- diff --git a/scripts/migrations/005_alter_unique_id_varchar_rollback.sql b/scripts/migrations/005_alter_unique_id_varchar_rollback.sql index efc346e..223e04f 100644 --- a/scripts/migrations/005_alter_unique_id_varchar_rollback.sql +++ b/scripts/migrations/005_alter_unique_id_varchar_rollback.sql @@ -3,8 +3,9 @@ -- -- WARNING: This rollback will FAIL if any unique_id exceeds 32 chars. -- Run 006 rollback first to restore MD5 IDs before running this. - -BEGIN; +-- +-- NOTE: Do NOT use BEGIN/COMMIT here. The migration runner manages +-- the transaction to ensure atomic commit with migration_history. -- Step 1: Drop view that depends on news.unique_id DROP VIEW IF EXISTS news_with_themes; @@ -38,5 +39,3 @@ ALTER TABLE news DROP COLUMN IF EXISTS legacy_unique_id; -- Step 5: Revert schema_version DELETE FROM schema_version WHERE version = '1.3'; - -COMMIT; diff --git a/scripts/migrations/006_migrate_unique_ids.py b/scripts/migrations/006_migrate_unique_ids.py index 2062695..31cf685 100644 --- a/scripts/migrations/006_migrate_unique_ids.py +++ b/scripts/migrations/006_migrate_unique_ids.py @@ -72,9 +72,8 @@ def _generate_id_with_extended_suffix(agency, published_at, title, extra_chars): def _fetch_all_news(conn): - """Fetch all news rows needed for migration using server-side cursor.""" - cursor = conn.cursor(name="fetch_news_for_migration") - cursor.itersize = 5000 + """Fetch all news rows needed for migration.""" + cursor = conn.cursor() cursor.execute( "SELECT unique_id, agency_key, published_at, title, legacy_unique_id " "FROM news ORDER BY unique_id"