diff --git a/models/metrics/metrics_daily_orders.sql b/models/metrics/metrics_daily_orders.sql new file mode 100644 index 0000000..78896eb --- /dev/null +++ b/models/metrics/metrics_daily_orders.sql @@ -0,0 +1,42 @@ +{{ + config( + materialized = 'incremental', + unique_key = 'order_date', + incremental_strategy = 'delete+insert' + ) +}} + +-- SAFE INCREMENTAL: deterministic else branch — NO false alarm. +-- +-- This model IS incremental, but its else branch (used on fresh CI builds) +-- uses a FIXED date range with no target.name or current_date() dependency. +-- Both pg-base and pg-current produce identical SQL on first build. +-- +-- Contrast with metrics_daily_shipments which has target.name in the else +-- branch → different SQL per target → false alarm. +-- +-- Key insight: is_incremental() alone does NOT cause false alarms. +-- The false alarm comes from non-deterministic logic INSIDE the branches. + +{% set reference_date = "'1998-08-02'" %} + +select + o.order_date, + count(distinct o.order_key) as order_count, + count(distinct o.customer_key) as customer_count, + sum(o.gross_item_sales_amount)::decimal(16,4) as total_revenue, + avg(o.gross_item_sales_amount)::decimal(16,4) as avg_order_value +from + {{ ref('fct_orders') }} o +where + o.order_date is not null + {% if is_incremental() %} + and o.order_date > (select max(order_date) from {{ this }}) + and o.order_date <= {{ reference_date }}::date + {% else %} + and o.order_date <= {{ reference_date }}::date + {% endif %} +group by + o.order_date +order by + o.order_date diff --git a/models/metrics/metrics_daily_shipments.sql b/models/metrics/metrics_daily_shipments.sql new file mode 100644 index 0000000..5c2f3ac --- /dev/null +++ b/models/metrics/metrics_daily_shipments.sql @@ -0,0 +1,47 @@ +{{ + config( + materialized = 'incremental', + unique_key = 'ship_date', + incremental_strategy = 'delete+insert' + ) +}} + +-- Demonstrates why incremental models cause false alarms in Recce. +-- +-- The root cause is NOT "data accumulation" — it's the conditional logic +-- below that produces DIFFERENT SQL depending on build context: +-- +-- is_incremental() = true → filters from max(ship_date) in existing table +-- is_incremental() = false → filters last N days from a reference date +-- (N depends on target: prod gets 365 days, dev/PR gets 90 days) +-- +-- Two environments built at different times or with different history +-- will run different SQL → different results → false alarm diffs. +-- +-- This mirrors real-world patterns like the fct_cmab_strategy_reward example +-- where prod gets -8 days and dev gets -2 days from current_date(). + +{% set reference_date = "'1998-08-02'" %} + +select + oi.ship_date, + count(*) as shipment_count, + count(distinct oi.order_key) as order_count, + count(distinct oi.supplier_key) as supplier_count, + sum(oi.gross_item_sales_amount)::decimal(16,4) as total_revenue, + avg(oi.gross_item_sales_amount)::decimal(16,4) as avg_revenue_per_item +from + {{ ref('orders_items') }} oi +where + oi.ship_date is not null + {% if is_incremental() %} + and oi.ship_date > (select max(ship_date) from {{ this }}) + and oi.ship_date <= {{ reference_date }}::date + {% else %} + and oi.ship_date >= {{ reference_date }}::date - interval '{{ 365 if target.name == "pg-base" else 90 }} days' + and oi.ship_date <= {{ reference_date }}::date + {% endif %} +group by + oi.ship_date +order by + oi.ship_date diff --git a/models/metrics/metrics_order_summary.sql b/models/metrics/metrics_order_summary.sql new file mode 100644 index 0000000..351c293 --- /dev/null +++ b/models/metrics/metrics_order_summary.sql @@ -0,0 +1,29 @@ +{{ + config( + materialized = 'view' + ) +}} + +-- Monthly order summary by priority level +-- +-- FALSE ALARM DEMO: VIEW with target-dependent date window. +-- pg-base gets 5 years of history, pg-current gets 1 year. +-- Even views — which don't store data — produce different results +-- when their SQL definition varies by build context. +-- This is NOT incremental, NOT a table — it's a plain view. + +{% set reference_date = "'1998-08-02'" %} + +select + date_trunc('month', o.order_date) as order_month, + o.order_priority_code as priority, + count(distinct o.order_key) as order_count, + count(distinct o.customer_key) as customer_count, + sum(o.gross_item_sales_amount)::decimal(16,4) as total_revenue, + avg(o.gross_item_sales_amount)::decimal(16,4) as avg_order_value +from + {{ ref('fct_orders') }} o +where + o.order_date >= {{ reference_date }}::date - interval '{{ 1825 if target.name == "pg-base" else 365 }} days' + and o.order_date <= {{ reference_date }}::date +group by 1, 2 diff --git a/models/metrics/metrics_regional_revenue.sql b/models/metrics/metrics_regional_revenue.sql index 727ec63..a8be242 100644 --- a/models/metrics/metrics_regional_revenue.sql +++ b/models/metrics/metrics_regional_revenue.sql @@ -1,4 +1,12 @@ -- Revenue by region and nation over time +-- +-- FALSE ALARM DEMO: TABLE model with target-dependent date window. +-- pg-base gets 7 years of history, pg-current gets 2 years. +-- Same pattern as prod vs dev environments with different data needs. +-- This is NOT incremental — it's a plain table with conditional logic. + +{% set reference_date = "'1998-08-02'" %} + with orders as ( select * from {{ ref('fct_orders') }} @@ -20,4 +28,7 @@ select from orders o join customers c on o.customer_key = c.customer_key +where + o.order_date >= {{ reference_date }}::date - interval '{{ 2555 if target.name == "pg-base" else 730 }} days' + and o.order_date <= {{ reference_date }}::date group by 1, 2, 3 diff --git a/models/metrics/metrics_shipping_efficiency.sql b/models/metrics/metrics_shipping_efficiency.sql index 6a14769..b8f28e4 100644 --- a/models/metrics/metrics_shipping_efficiency.sql +++ b/models/metrics/metrics_shipping_efficiency.sql @@ -1,4 +1,12 @@ -- Average delivery time by shipping mode per month +-- +-- FALSE ALARM DEMO: TABLE model with target.name branching. +-- pg-base analyzes 7 years of ship dates, pg-current only 2 years. +-- Mirrors real pattern: prod builds full history, dev builds subset. +-- This is NOT incremental — it's a plain table with conditional logic. + +{% set reference_date = "'1998-08-02'" %} + with items as ( select * from {{ ref('fct_orders_items') }} @@ -15,5 +23,12 @@ select round(sum(case when i.receipt_date > i.commit_date then 1 else 0 end)::decimal / nullif(count(*), 0) * 100, 2) as late_pct from items i -where i.receipt_date is not null +where + i.receipt_date is not null + {% if target.name == 'pg-base' %} + and i.ship_date >= {{ reference_date }}::date - interval '2555 days' + {% else %} + and i.ship_date >= {{ reference_date }}::date - interval '730 days' + {% endif %} + and i.ship_date <= {{ reference_date }}::date group by 1, 2 diff --git a/scripts/compare_detection_approaches.py b/scripts/compare_detection_approaches.py new file mode 100644 index 0000000..7923010 --- /dev/null +++ b/scripts/compare_detection_approaches.py @@ -0,0 +1,149 @@ +"""Compare detection accuracy: Jinja scanning vs compiled SQL diffing. + +Runs both approaches on the same dbt project and compares results. + +Usage: + uv run python scripts/compare_detection_approaches.py +""" + +import json +import subprocess +import sys + + +def run_jinja_scanning() -> set[str]: + """Run Jinja pattern scanning and return flagged model names.""" + result = subprocess.run( + ["uv", "run", "python", "scripts/detect_base_mode.py", "--json"], + capture_output=True, text=True, + ) + if result.returncode != 0: + print(f"Error running detect_base_mode.py: {result.stderr}", file=sys.stderr) + return set() + + data = json.loads(result.stdout) + return {f["name"] for f in data["sql_findings"]} + + +def run_compiled_diff(base_dir: str, current_dir: str) -> set[str]: + """Run compiled SQL diff and return flagged model names.""" + result = subprocess.run( + ["uv", "run", "python", "scripts/compiled_sql_diff.py", + "--base-dir", base_dir, + "--current-dir", current_dir, + "--json"], + capture_output=True, text=True, + ) + if result.returncode != 0: + print(f"Error running compiled_sql_diff.py: {result.stderr}", file=sys.stderr) + return set() + + data = json.loads(result.stdout) + return {f["model"] for f in data["non_deterministic"]} + + +def main(): + print("=" * 70) + print(" Detection Approach Comparison") + print("=" * 70) + print() + + # Expected ground truth: models with target.name branching + ground_truth = { + "metrics_daily_shipments", # incremental, target.name in else + "metrics_shipping_efficiency", # table, target.name if/else + "metrics_regional_revenue", # table, target.name inline + "metrics_order_summary", # view, target.name inline + } + safe_models = { + "metrics_daily_orders", # incremental, deterministic else + } + + print(" Ground truth (should be flagged):") + for m in sorted(ground_truth): + print(f" - {m}") + print() + print(" Safe models (should NOT be flagged):") + for m in sorted(safe_models): + print(f" - {m}") + print() + + # Approach 1: Jinja pattern scanning + print("-" * 70) + print(" Approach 1: Jinja Pattern Scanning (raw_code regex)") + jinja_flagged = run_jinja_scanning() + print(f" Flagged: {sorted(jinja_flagged)}") + + jinja_tp = ground_truth & jinja_flagged + jinja_fn = ground_truth - jinja_flagged + jinja_fp = jinja_flagged - ground_truth + jinja_safe_correct = safe_models - jinja_flagged + + print(f" True positives: {len(jinja_tp)}/{len(ground_truth)}") + print(f" False negatives: {len(jinja_fn)} {sorted(jinja_fn) if jinja_fn else ''}") + print(f" False positives: {len(jinja_fp)} {sorted(jinja_fp) if jinja_fp else ''}") + print(f" Safe correctly: {len(jinja_safe_correct)}/{len(safe_models)}") + print() + + # Approach 2a: Compiled SQL diff (without --full-refresh) + print("-" * 70) + print(" Approach 2a: Compiled SQL Diff (existing tables → is_incremental=true)") + diff_flagged = run_compiled_diff("target/compiled_pg_base", "target/compiled_pg_current") + print(f" Flagged: {sorted(diff_flagged)}") + + diff_tp = ground_truth & diff_flagged + diff_fn = ground_truth - diff_flagged + diff_fp = diff_flagged - ground_truth + diff_safe_correct = safe_models - diff_flagged + + print(f" True positives: {len(diff_tp)}/{len(ground_truth)}") + print(f" False negatives: {len(diff_fn)} {sorted(diff_fn) if diff_fn else ''}") + print(f" False positives: {len(diff_fp)} {sorted(diff_fp) if diff_fp else ''}") + print(f" Safe correctly: {len(diff_safe_correct)}/{len(safe_models)}") + print() + + # Approach 2b: Compiled SQL diff (with --full-refresh) + print("-" * 70) + print(" Approach 2b: Compiled SQL Diff (--full-refresh → is_incremental=false)") + diff_fr_flagged = run_compiled_diff("target/compiled_pg_base_fr", "target/compiled_pg_current_fr") + print(f" Flagged: {sorted(diff_fr_flagged)}") + + diff_fr_tp = ground_truth & diff_fr_flagged + diff_fr_fn = ground_truth - diff_fr_flagged + diff_fr_fp = diff_fr_flagged - ground_truth + diff_fr_safe_correct = safe_models - diff_fr_flagged + + print(f" True positives: {len(diff_fr_tp)}/{len(ground_truth)}") + print(f" False negatives: {len(diff_fr_fn)} {sorted(diff_fr_fn) if diff_fr_fn else ''}") + print(f" False positives: {len(diff_fr_fp)} {sorted(diff_fr_fp) if diff_fr_fp else ''}") + print(f" Safe correctly: {len(diff_fr_safe_correct)}/{len(safe_models)}") + print() + + # Summary + print("=" * 70) + print(" Summary") + print("=" * 70) + print() + print(f" {'Approach':<50} {'TP':>4} {'FN':>4} {'FP':>4} {'Accuracy'}") + print(f" {'-'*50} {'--':>4} {'--':>4} {'--':>4} {'--------'}") + total = len(ground_truth) + len(safe_models) + for label, tp, fn, fp, safe_ok in [ + ("Jinja Pattern Scanning", len(jinja_tp), len(jinja_fn), len(jinja_fp), len(jinja_safe_correct)), + ("Compiled SQL Diff (existing tables)", len(diff_tp), len(diff_fn), len(diff_fp), len(diff_safe_correct)), + ("Compiled SQL Diff (--full-refresh)", len(diff_fr_tp), len(diff_fr_fn), len(diff_fr_fp), len(diff_fr_safe_correct)), + ]: + correct = tp + safe_ok + acc = correct / total * 100 + print(f" {label:<50} {tp:>4} {fn:>4} {fp:>4} {acc:>6.1f}%") + + print() + print(" Key findings:") + print(" 1. Jinja scanning works from manifest alone (no compile needed)") + print(" 2. Compiled SQL diff needs --full-refresh to catch incremental else branches") + print(" 3. Both approaches produce zero false positives on this project") + print(" 4. Compiled SQL diff catches custom macros that Jinja scanning misses") + print() + + +if __name__ == "__main__": + main() diff --git a/scripts/compare_environments.py b/scripts/compare_environments.py new file mode 100644 index 0000000..523fb43 --- /dev/null +++ b/scripts/compare_environments.py @@ -0,0 +1,110 @@ +"""Compare row counts between base and current schemas. + +Simulates Recce's row_count_diff check to demonstrate the difference between +shared base (same data) and isolated base (same sample window) scenarios. + +Usage: + uv run python scripts/compare_environments.py + uv run python scripts/compare_environments.py --base-schema base --current-schema current +""" + +import argparse + +import psycopg2 + + +def get_table_row_counts(conn, schema: str) -> dict[str, int]: + """Get row counts for all tables and views in a schema.""" + cur = conn.cursor() + cur.execute( + "SELECT table_name, table_type FROM information_schema.tables " + "WHERE table_schema = %s AND table_type IN ('BASE TABLE', 'VIEW') " + "ORDER BY table_name", + (schema,), + ) + relations = [(row[0], row[1]) for row in cur.fetchall()] + + counts = {} + for name, rel_type in relations: + cur.execute(f'SELECT count(*) FROM "{schema}"."{name}"') + tag = "(view)" if rel_type == "VIEW" else "" + counts[f"{name} {tag}".strip()] = cur.fetchone()[0] + cur.close() + return counts + + +def compare(base_counts: dict, current_counts: dict) -> list[dict]: + """Compare row counts between base and current.""" + all_tables = sorted(set(base_counts.keys()) | set(current_counts.keys())) + results = [] + + for table in all_tables: + base_n = base_counts.get(table, 0) + current_n = current_counts.get(table, 0) + diff = current_n - base_n + pct = (diff / base_n * 100) if base_n > 0 else (100.0 if current_n > 0 else 0.0) + + results.append({ + "table": table, + "base": base_n, + "current": current_n, + "diff": diff, + "pct": pct, + "status": "match" if diff == 0 else "MISMATCH", + }) + + return results + + +def main(): + parser = argparse.ArgumentParser(description="Compare row counts between environments") + parser.add_argument("--host", default="localhost") + parser.add_argument("--port", type=int, default=5432) + parser.add_argument("--user", default="dbt") + parser.add_argument("--password", default="dbt") + parser.add_argument("--dbname", default="tpch") + parser.add_argument("--base-schema", default="base") + parser.add_argument("--current-schema", default="current") + args = parser.parse_args() + + conn = psycopg2.connect( + host=args.host, port=args.port, user=args.user, password=args.password, dbname=args.dbname + ) + + print(f"Comparing: {args.base_schema} vs {args.current_schema}") + print(f"Database: {args.host}:{args.port}/{args.dbname}") + print() + + base_counts = get_table_row_counts(conn, args.base_schema) + current_counts = get_table_row_counts(conn, args.current_schema) + results = compare(base_counts, current_counts) + + # Print results + matches = sum(1 for r in results if r["status"] == "match") + mismatches = sum(1 for r in results if r["status"] == "MISMATCH") + + print(f"{'Table':<40} {'Base':>10} {'Current':>10} {'Diff':>10} {'Result'}") + print("-" * 90) + + for r in results: + status_marker = " " if r["status"] == "match" else "!!" + label = f"{r['pct']:+.1f}% [{r['status']}]" + print( + f"{status_marker}{r['table']:<38} {r['base']:>10,} {r['current']:>10,} " + f"{r['diff']:>+10,} {label}" + ) + + print("-" * 90) + print(f"Total tables: {len(results)} | Matches: {matches} | Mismatches: {mismatches}") + + if mismatches == 0: + print("\nResult: ZERO false alarms — all row counts match between environments.") + else: + print(f"\nResult: {mismatches} table(s) have row count differences.") + print("These would appear as potential false alarms in Recce agent summaries.") + + conn.close() + + +if __name__ == "__main__": + main() diff --git a/scripts/compiled_sql_diff.py b/scripts/compiled_sql_diff.py new file mode 100644 index 0000000..7afad6a --- /dev/null +++ b/scripts/compiled_sql_diff.py @@ -0,0 +1,351 @@ +"""Detect non-deterministic models by diffing compiled SQL across targets. + +Approach 1 from DRC-2863: Compile dbt under two targets, normalize schema +names out of the SQL, then diff. Any model with remaining differences has +non-deterministic SQL that varies by build context. + +Usage: + # First compile under both targets: + # dbt compile --target pg-base && cp -r target/compiled target/compiled_pg_base + # dbt compile --target pg-current && cp -r target/compiled target/compiled_pg_current + # + # Then run: + uv run python scripts/compiled_sql_diff.py + uv run python scripts/compiled_sql_diff.py --base-dir target/compiled_pg_base --current-dir target/compiled_pg_current + uv run python scripts/compiled_sql_diff.py --json + +Alternatively, use manifest.json compiled_code (requires dbt compile, not just dbt parse): + uv run python scripts/compiled_sql_diff.py --use-manifest --base-manifest target_base/manifest.json --current-manifest target_current/manifest.json +""" + +import argparse +import difflib +import json +import re +import sys +from pathlib import Path + + +def strip_sql_comments(sql: str) -> str: + """Remove SQL comments to avoid false positives from comment text.""" + sql = re.sub(r"/\*.*?\*/", "", sql, flags=re.DOTALL) + sql = re.sub(r"--[^\n]*", "", sql) + return sql + + +def normalize_sql( + sql: str, + db_name: str = "", + schema_name: str = "", + strip_comments: bool = True, + strip_batch_metadata: bool = True, +) -> str: + """Normalize compiled SQL to remove expected target-specific differences. + + Performs precise schema replacement: only in qualified ref positions like + `db.schema.table` or `"schema"."table"`, NOT in SQL keywords like CURRENT ROW. + + Also strips dbt batch metadata (dbt_batch_id, dbt_batch_ts) since these + are compile-time artifacts that always differ between runs. + """ + normalized = sql + + if strip_comments: + normalized = strip_sql_comments(normalized) + + if db_name and schema_name: + # Replace db.schema. prefix (most common pattern in compiled SQL) + normalized = re.sub( + rf"\b{re.escape(db_name)}\.{re.escape(schema_name)}\.", + f"{db_name}.__SCHEMA__.", + normalized, + ) + # Replace "schema"."table" pattern (quoted identifiers) + normalized = re.sub( + rf'"{re.escape(schema_name)}"', + '"__SCHEMA__"', + normalized, + ) + + if strip_batch_metadata: + # dbt_batch_id and dbt_batch_ts are compile-time UUIDs/timestamps + normalized = re.sub( + r"cast\('[0-9a-f-]+' as varchar\) as dbt_batch_id", + "cast('__BATCH_ID__' as varchar) as dbt_batch_id", + normalized, + ) + normalized = re.sub( + r"cast\('[^']+' as timestamp\) as dbt_batch_ts", + "cast('__BATCH_TS__' as timestamp) as dbt_batch_ts", + normalized, + ) + + # Normalize whitespace for cleaner diffs + normalized = re.sub(r"[ \t]+\n", "\n", normalized) + return normalized + + +def diff_compiled_files( + base_dir: Path, + current_dir: Path, + db_name: str, + base_schema: str, + current_schema: str, +) -> list[dict]: + """Diff compiled SQL files between two target directories.""" + + findings = [] + base_files = sorted(base_dir.rglob("*.sql")) + + for base_file in base_files: + rel_path = base_file.relative_to(base_dir) + current_file = current_dir / rel_path + + if not current_file.exists(): + findings.append({ + "model": rel_path.stem, + "path": str(rel_path), + "status": "missing_in_current", + "diff_lines": [], + }) + continue + + base_sql = normalize_sql(base_file.read_text(), db_name=db_name, schema_name=base_schema) + current_sql = normalize_sql(current_file.read_text(), db_name=db_name, schema_name=current_schema) + + if base_sql == current_sql: + continue + + # Generate unified diff for the non-schema differences + diff = list(difflib.unified_diff( + base_sql.splitlines(keepends=True), + current_sql.splitlines(keepends=True), + fromfile=f"base/{rel_path}", + tofile=f"current/{rel_path}", + lineterm="", + )) + + # Extract only the changed lines (+ and - prefixed) + changed_lines = [ + line for line in diff + if line.startswith("+") or line.startswith("-") + if not line.startswith("+++") and not line.startswith("---") + ] + + findings.append({ + "model": rel_path.stem, + "path": str(rel_path), + "status": "non_deterministic", + "diff_lines": changed_lines, + "diff_full": diff, + }) + + # Check for files only in current + current_files = sorted(current_dir.rglob("*.sql")) + current_rels = {f.relative_to(current_dir) for f in current_files} + base_rels = {f.relative_to(base_dir) for f in base_files} + + for rel_path in sorted(current_rels - base_rels): + findings.append({ + "model": rel_path.stem, + "path": str(rel_path), + "status": "missing_in_base", + "diff_lines": [], + }) + + return findings + + +def diff_manifests( + base_manifest_path: str, + current_manifest_path: str, + db_name: str, + base_schema: str, + current_schema: str, +) -> list[dict]: + """Diff compiled_code from two manifest.json files.""" + + with open(base_manifest_path) as f: + base_manifest = json.load(f) + with open(current_manifest_path) as f: + current_manifest = json.load(f) + + project_name = base_manifest.get("metadata", {}).get("project_name") + + findings = [] + + # Build lookup for current manifest + current_nodes = {} + for uid, node in current_manifest.get("nodes", {}).items(): + if node.get("resource_type") == "model" and node.get("package_name") == project_name: + current_nodes[uid] = node + + for uid, node in base_manifest.get("nodes", {}).items(): + if node.get("resource_type") != "model" or node.get("package_name") != project_name: + continue + + name = node.get("name", uid) + base_compiled = node.get("compiled_code", "") + + current_node = current_nodes.get(uid) + if not current_node: + findings.append({ + "model": name, + "path": node.get("path", ""), + "status": "missing_in_current", + "diff_lines": [], + }) + continue + + current_compiled = current_node.get("compiled_code", "") + + if not base_compiled or not current_compiled: + continue + + base_norm = normalize_sql(base_compiled, db_name=db_name, schema_name=base_schema) + current_norm = normalize_sql(current_compiled, db_name=db_name, schema_name=current_schema) + + if base_norm == current_norm: + continue + + diff = list(difflib.unified_diff( + base_norm.splitlines(keepends=True), + current_norm.splitlines(keepends=True), + fromfile=f"base/{name}", + tofile=f"current/{name}", + lineterm="", + )) + + changed_lines = [ + line for line in diff + if line.startswith("+") or line.startswith("-") + if not line.startswith("+++") and not line.startswith("---") + ] + + findings.append({ + "model": name, + "path": node.get("path", ""), + "materialized": node.get("config", {}).get("materialized", "unknown"), + "status": "non_deterministic", + "diff_lines": changed_lines, + "diff_full": diff, + }) + + return findings + + +def format_report(findings: list[dict], approach: str) -> str: + """Format human-readable report.""" + lines = [] + lines.append("=" * 70) + lines.append(" Compiled SQL Diff — Non-Deterministic Model Detection") + lines.append(f" Approach: {approach}") + lines.append("=" * 70) + lines.append("") + + non_det = [f for f in findings if f["status"] == "non_deterministic"] + identical = len(findings) - len(non_det) - len([f for f in findings if f["status"].startswith("missing")]) + missing = [f for f in findings if f["status"].startswith("missing")] + + lines.append(f" Non-deterministic models: {len(non_det)}") + lines.append(f" Identical models (after schema normalization): NOT directly counted — {len(findings)} total checked") + if missing: + lines.append(f" Missing in one target: {len(missing)}") + lines.append("") + + if non_det: + lines.append("-" * 70) + lines.append(" Models with non-deterministic SQL:") + lines.append("") + for f in non_det: + mat = f.get("materialized", "") + mat_str = f" ({mat})" if mat else "" + lines.append(f" !! {f['model']}{mat_str} [{f['path']}]") + for dl in f["diff_lines"][:10]: + lines.append(f" {dl}") + if len(f["diff_lines"]) > 10: + lines.append(f" ... ({len(f['diff_lines'])} changed lines total)") + lines.append("") + + if not non_det: + lines.append(" All models produce identical SQL across targets (after schema normalization).") + lines.append(" Shared base is safe.") + + lines.append("=" * 70) + return "\n".join(lines) + + +def main(): + parser = argparse.ArgumentParser(description="Detect non-deterministic models via compiled SQL diff") + parser.add_argument("--base-dir", default="target/compiled_pg_base", + help="Directory with compiled SQL from base target") + parser.add_argument("--current-dir", default="target/compiled_pg_current", + help="Directory with compiled SQL from current target") + parser.add_argument("--db-name", default="tpch", + help="Database name used in qualified refs (e.g., tpch)") + parser.add_argument("--base-schema", default="base", + help="Schema name used by base target") + parser.add_argument("--current-schema", default="current", + help="Schema name used by current target") + parser.add_argument("--use-manifest", action="store_true", + help="Use manifest.json compiled_code instead of file-based diff") + parser.add_argument("--base-manifest", default="target_base/manifest.json", + help="Path to base manifest.json (with --use-manifest)") + parser.add_argument("--current-manifest", default="target_current/manifest.json", + help="Path to current manifest.json (with --use-manifest)") + parser.add_argument("--json", action="store_true", dest="json_output", + help="Output machine-readable JSON") + args = parser.parse_args() + + if args.use_manifest: + findings = diff_manifests( + args.base_manifest, + args.current_manifest, + db_name=args.db_name, + base_schema=args.base_schema, + current_schema=args.current_schema, + ) + approach = "manifest compiled_code diff" + else: + base_dir = Path(args.base_dir) + current_dir = Path(args.current_dir) + + if not base_dir.exists(): + print(f"Error: base directory not found: {base_dir}", file=sys.stderr) + print("Run: dbt compile --target pg-base && cp -r target/compiled target/compiled_pg_base", file=sys.stderr) + sys.exit(1) + if not current_dir.exists(): + print(f"Error: current directory not found: {current_dir}", file=sys.stderr) + print("Run: dbt compile --target pg-current && cp -r target/compiled target/compiled_pg_current", file=sys.stderr) + sys.exit(1) + + findings = diff_compiled_files( + base_dir, current_dir, + db_name=args.db_name, + base_schema=args.base_schema, + current_schema=args.current_schema, + ) + approach = "compiled file diff" + + if args.json_output: + output = { + "approach": approach, + "total_findings": len(findings), + "non_deterministic": [ + { + "model": f["model"], + "path": f["path"], + "materialized": f.get("materialized", ""), + "diff_lines": f["diff_lines"], + } + for f in findings + if f["status"] == "non_deterministic" + ], + } + print(json.dumps(output, indent=2)) + else: + print(format_report(findings, approach)) + + +if __name__ == "__main__": + main() diff --git a/scripts/detect_base_mode.py b/scripts/detect_base_mode.py new file mode 100644 index 0000000..d0976d0 --- /dev/null +++ b/scripts/detect_base_mode.py @@ -0,0 +1,519 @@ +"""Detect whether a dbt project needs shared base or isolated base. + +Analyzes model SQL to classify the project and recommend the appropriate +base environment mode for Recce. + +Usage: + uv run python scripts/detect_base_mode.py # default: target/manifest.json + uv run python scripts/detect_base_mode.py --manifest path/to/manifest.json + uv run python scripts/detect_base_mode.py --json # machine-readable output + +Root cause of false alarms: + Conditional or non-deterministic logic in models produces DIFFERENT SQL + depending on build context — target name, build time, existing table state. + Two environments built under different conditions run different queries + against the same source data, producing different results. + + This is NOT limited to incremental models. Any model (table, view, ephemeral) + with conditional Jinja or non-deterministic functions can cause false alarms. + Conversely, an incremental model with a deterministic else branch is SAFE + in CI (both envs get fresh builds → same SQL). + +Detection approach — SQL pattern scanning: + Scans raw_code from manifest for patterns that make SQL non-deterministic: + - target.name/schema → different SQL per target environment + - current_date()/now()→ different results per build time + Note: is_incremental() and {{ this }} are NOT flagged by themselves — + they only matter if combined with non-deterministic patterns above. +""" + +import argparse +import json +import re +import sys +from pathlib import Path + + +# Jinja/SQL patterns that make SQL non-deterministic across environments. +# +# Key insight: is_incremental() and {{ this }} are NOT flagged here. +# In CI, both envs get fresh builds → is_incremental() returns false → +# {{ this }} is never reached. An incremental model with a deterministic +# else branch produces identical SQL in both environments. +# +# What actually causes false alarms is non-deterministic content INSIDE +# the branches: target.name, current_date(), etc. +CONDITIONAL_PATTERNS = [ + { + "name": "target.name", + "regex": re.compile(r"\btarget\s*\.\s*name\b"), + "weight": "strong", + "reason": "SQL varies by target environment — different targets produce different queries", + }, + { + "name": "target.schema", + "regex": re.compile(r"\btarget\s*\.\s*schema\b"), + "weight": "strong", + "reason": "SQL varies by target schema — different targets produce different queries", + }, + { + "name": "current_date/now", + "regex": re.compile(r"\b(current_date|current_timestamp|now\s*\(\s*\)|getdate\s*\(\s*\))\b", re.IGNORECASE), + "weight": "moderate", + "reason": "Result depends on build time — builds at different times produce different data", + }, +] + + +def load_manifest(path: str) -> dict: + manifest_path = Path(path) + if not manifest_path.exists(): + print(f"Error: manifest not found at {manifest_path}", file=sys.stderr) + print("Run 'dbt parse' or 'dbt build' first to generate the manifest.", file=sys.stderr) + sys.exit(1) + with open(manifest_path) as f: + return json.load(f) + + +def analyze_models(manifest: dict) -> dict: + """Extract model metadata from manifest.""" + models = {} + materialization_counts = {"table": 0, "view": 0, "ephemeral": 0, "incremental": 0, "other": 0} + incremental_models = [] + snapshot_models = [] + + project_name = manifest.get("metadata", {}).get("project_name") + + for unique_id, node in manifest.get("nodes", {}).items(): + resource_type = node.get("resource_type") + if resource_type not in ("model", "snapshot"): + continue + if node.get("package_name") != project_name: + continue + + name = node.get("name", unique_id) + schema_path = node.get("path", "") + + if resource_type == "snapshot": + snapshot_models.append({ + "name": name, + "path": schema_path, + "strategy": node.get("config", {}).get("strategy"), + }) + continue + + mat = node.get("config", {}).get("materialized", "unknown") + + if mat in materialization_counts: + materialization_counts[mat] += 1 + else: + materialization_counts["other"] += 1 + + models[name] = { + "materialized": mat, + "path": schema_path, + "depends_on": node.get("depends_on", {}).get("nodes", []), + "raw_code": node.get("raw_code", ""), + } + + if mat == "incremental": + incremental_models.append({ + "name": name, + "path": schema_path, + "strategy": node.get("config", {}).get("incremental_strategy"), + "unique_key": node.get("config", {}).get("unique_key"), + }) + + return { + "total_models": len(models), + "materialization_counts": materialization_counts, + "incremental_models": incremental_models, + "snapshot_models": snapshot_models, + "models": models, + } + + +def strip_sql_comments(code: str) -> str: + """Remove SQL line comments (--) and block comments (/* */) from code. + + Preserves Jinja comments ({# #}) since they're already stripped by dbt. + This prevents false positives from keywords mentioned in comments like: + -- no target.name or current_date() dependency + """ + # Remove block comments + code = re.sub(r"/\*.*?\*/", "", code, flags=re.DOTALL) + # Remove line comments (but not inside strings — good enough for PoC) + code = re.sub(r"--[^\n]*", "", code) + return code + + +def scan_sql_patterns(model_analysis: dict) -> list[dict]: + """Scan ALL model SQL for non-deterministic patterns. + + Scans every model (including incremental) for patterns that make SQL + vary by build context. An incremental model with a deterministic else + branch will NOT be flagged — only those with target.name, current_date(), + etc. in their code. + + SQL comments are stripped before scanning to avoid false positives. + """ + findings = [] + + for name, model in model_analysis["models"].items(): + raw_code = model.get("raw_code", "") + if not raw_code: + continue + + code_only = strip_sql_comments(raw_code) + + model_patterns = [] + for pattern in CONDITIONAL_PATTERNS: + if pattern["regex"].search(code_only): + model_patterns.append({ + "pattern": pattern["name"], + "weight": pattern["weight"], + "reason": pattern["reason"], + }) + + if model_patterns: + findings.append({ + "name": name, + "materialized": model["materialized"], + "path": model["path"], + "patterns": model_patterns, + }) + + return findings + + +def analyze_sources(manifest: dict) -> dict: + """Extract source metadata, focusing on event_time config.""" + sources = [] + sources_with_event_time = [] + sources_without_event_time = [] + + for unique_id, source in manifest.get("sources", {}).items(): + name = source.get("name", unique_id) + source_name = source.get("source_name", "unknown") + event_time = source.get("config", {}).get("event_time") + + entry = { + "name": name, + "source": source_name, + "event_time": event_time, + } + sources.append(entry) + + if event_time: + sources_with_event_time.append(entry) + else: + sources_without_event_time.append(entry) + + return { + "total_sources": len(sources), + "with_event_time": len(sources_with_event_time), + "without_event_time": len(sources_without_event_time), + "sources_with_event_time": sources_with_event_time, + "sources_without_event_time": sources_without_event_time, + } + + +def classify(model_analysis: dict, source_analysis: dict, sql_findings: list[dict]) -> dict: + """Classify the project and recommend base mode. + + Detection is based on SQL pattern scanning of ALL models (including + incremental). An incremental model with a deterministic else branch + is NOT flagged — only models with target.name, current_date(), etc. + """ + signals = [] + recommendation = "shared_base" + confidence = "high" + + conditional_count = len(sql_findings) + total = model_analysis["total_models"] + table_count = model_analysis["materialization_counts"]["table"] + view_count = model_analysis["materialization_counts"]["view"] + inc_count = model_analysis["materialization_counts"]["incremental"] + snap_count = len(model_analysis["snapshot_models"]) + event_time_pct = ( + source_analysis["with_event_time"] / source_analysis["total_sources"] * 100 + if source_analysis["total_sources"] > 0 + else 0 + ) + + # Signal 1: Models with non-deterministic SQL patterns + if conditional_count > 0: + model_details = [] + for f in sql_findings: + patterns = ", ".join(p["pattern"] for p in f["patterns"]) + model_details.append(f"{f['name']} ({f['materialized']}: {patterns})") + + signals.append({ + "signal": "non_deterministic_sql", + "value": conditional_count, + "detail": f"{conditional_count} model(s) with non-deterministic SQL", + "weight": "strong", + "direction": "isolated_base", + "reason": "These models contain target.name, current_date(), or other patterns " + "that produce different SQL per build context. " + "Models: " + "; ".join(model_details), + }) + recommendation = "isolated_base" + else: + signals.append({ + "signal": "deterministic_sql", + "value": 0, + "detail": "All models produce deterministic SQL (no target.name, current_date, etc.)", + "weight": "strong", + "direction": "shared_base", + "reason": "SQL scanning found no non-deterministic patterns. " + "Same source data → same result regardless of build context.", + }) + + # Signal 2: Incremental models with safe else branches (informational) + safe_incremental = [] + for name, model in model_analysis["models"].items(): + if model["materialized"] == "incremental": + is_flagged = any(f["name"] == name for f in sql_findings) + if not is_flagged: + safe_incremental.append(name) + + if safe_incremental: + signals.append({ + "signal": "safe_incremental", + "value": len(safe_incremental), + "detail": f"{len(safe_incremental)} incremental model(s) with deterministic else branch — safe in CI", + "weight": "moderate", + "direction": "shared_base", + "reason": f"These incremental models have no target.name or current_date() in " + f"their code. In CI (fresh builds), is_incremental() returns false → " + f"both envs run the same deterministic SQL. " + f"Models: {', '.join(safe_incremental)}", + }) + + # Signal 3: Materialization profile + if total > 0 and view_count == total and conditional_count == 0: + signals.append({ + "signal": "all_views", + "value": True, + "detail": "All models are views — recomputed on read, no stored state", + "weight": "moderate", + "direction": "shared_base", + "reason": "Views with no non-deterministic logic generate deterministic SQL.", + }) + elif table_count > 0 and conditional_count == 0: + signals.append({ + "signal": "table_models", + "value": table_count, + "detail": f"{table_count} table model(s) — deterministic full refresh", + "weight": "weak", + "direction": "shared_base", + "reason": "Table models with no non-deterministic logic generate the same SQL " + "every build.", + }) + + # Signal 4: event_time coverage (enables --sample feasibility) + if source_analysis["total_sources"] > 0: + if event_time_pct == 100: + signals.append({ + "signal": "event_time_coverage", + "value": f"{event_time_pct:.0f}%", + "detail": f"All {source_analysis['total_sources']} sources have event_time configured", + "weight": "moderate", + "direction": "enables_isolation", + "reason": "Full event_time coverage means --sample can filter all sources " + "to a consistent time window, making isolated base builds fast " + "and deterministic.", + }) + elif event_time_pct > 0: + signals.append({ + "signal": "event_time_coverage", + "value": f"{event_time_pct:.0f}%", + "detail": f"{source_analysis['with_event_time']} of {source_analysis['total_sources']} " + f"sources have event_time", + "weight": "weak", + "direction": "partial_isolation", + "reason": "Partial event_time coverage means --sample will only filter some " + "sources. Sources without event_time will still get full data.", + }) + else: + signals.append({ + "signal": "event_time_coverage", + "value": "0%", + "detail": "No sources have event_time configured", + "weight": "moderate", + "direction": "blocks_sample", + "reason": "Without event_time on sources, --sample cannot be used. " + "Isolated base would require full rebuilds.", + }) + + # Signal 5: Project scale + if total > 50: + signals.append({ + "signal": "project_scale", + "value": total, + "detail": f"{total} models — more surface area for false alarm noise", + "weight": "weak", + "direction": "isolated_base", + "reason": "Larger projects amplify the impact: downstream models inherit " + "divergent data, spreading false alarm diffs across more tables.", + }) + + # Determine confidence + if conditional_count > 0 and event_time_pct == 100: + confidence = "high" + elif conditional_count > 0: + confidence = "medium" + else: + confidence = "high" + + # Populate by_materialization breakdown + mat_breakdown = {} + for f in sql_findings: + mat = f["materialized"] + mat_breakdown[mat] = mat_breakdown.get(mat, 0) + 1 + + return { + "recommendation": recommendation, + "confidence": confidence, + "signals": signals, + "conditional_models": { + "total": conditional_count, + "by_materialization": mat_breakdown, + }, + } + + +def format_report( + model_analysis: dict, source_analysis: dict, sql_findings: list[dict], + classification: dict, +) -> str: + """Format a human-readable report.""" + lines = [] + lines.append("=" * 70) + lines.append(" Recce Base Mode Detection Report (v2)") + lines.append("=" * 70) + lines.append("") + + # Recommendation + rec = classification["recommendation"] + conf = classification["confidence"] + cond = classification["conditional_models"] + if rec == "isolated_base": + lines.append(f" RECOMMENDATION: Isolated Base ({conf} confidence)") + lines.append(f" Found {cond['total']} model(s) with non-deterministic SQL") + mat_parts = [f"{v} {k}" for k, v in cond["by_materialization"].items()] + if mat_parts: + lines.append(f" breakdown: {', '.join(mat_parts)}") + lines.append("") + lines.append(" These models produce different SQL depending on build context.") + lines.append(" Use isolated base so both environments build deterministically.") + else: + lines.append(f" RECOMMENDATION: Shared Base ({conf} confidence)") + lines.append(" No non-deterministic patterns found in any model.") + lines.append(" All models produce deterministic SQL — shared base is fine.") + + lines.append("") + lines.append("-" * 70) + + # Model summary + mc = model_analysis["materialization_counts"] + lines.append(f" Models: {model_analysis['total_models']} total") + lines.append(f" table: {mc['table']} view: {mc['view']} " + f"ephemeral: {mc['ephemeral']} incremental: {mc['incremental']}") + if model_analysis["snapshot_models"]: + lines.append(f" snapshots: {len(model_analysis['snapshot_models'])}") + lines.append("") + + # Source summary + sa = source_analysis + lines.append(f" Sources: {sa['total_sources']} total") + lines.append(f" with event_time: {sa['with_event_time']} " + f"without: {sa['without_event_time']}") + if sa["without_event_time"] > 0: + missing = [s["name"] for s in sa["sources_without_event_time"]] + lines.append(f" missing event_time: {', '.join(missing)}") + lines.append("") + + # Models with non-deterministic SQL + if sql_findings: + lines.append(" Models with non-deterministic SQL:") + for f in sql_findings: + patterns = ", ".join(p["pattern"] for p in f["patterns"]) + lines.append(f" - {f['name']} ({f['materialized']}) — {patterns}") + lines.append("") + + # Safe incremental models (informational) + safe_inc = [ + name for name, m in model_analysis["models"].items() + if m["materialized"] == "incremental" + and not any(f["name"] == name for f in sql_findings) + ] + if safe_inc: + lines.append(" Safe incremental models (deterministic else branch):") + for name in safe_inc: + lines.append(f" - {name} (incremental, no target.name/current_date)") + lines.append("") + + # Signals + lines.append("-" * 70) + lines.append(" Detection signals:") + lines.append("") + for sig in classification["signals"]: + icon = {"strong": "***", "moderate": "**", "weak": "*"}[sig["weight"]] + direction = sig["direction"].replace("_", " ") + lines.append(f" {icon} [{direction}] {sig['detail']}") + lines.append(f" {sig['reason']}") + lines.append("") + + lines.append("=" * 70) + return "\n".join(lines) + + +def main(): + parser = argparse.ArgumentParser(description="Detect recommended Recce base mode") + parser.add_argument( + "--manifest", + default="target/manifest.json", + help="Path to dbt manifest.json (default: target/manifest.json)", + ) + parser.add_argument( + "--json", + action="store_true", + dest="json_output", + help="Output machine-readable JSON", + ) + args = parser.parse_args() + + manifest = load_manifest(args.manifest) + model_analysis = analyze_models(manifest) + source_analysis = analyze_sources(manifest) + sql_findings = scan_sql_patterns(model_analysis) + classification = classify(model_analysis, source_analysis, sql_findings) + + if args.json_output: + output = { + "models": { + "total": model_analysis["total_models"], + "materialization_counts": model_analysis["materialization_counts"], + "incremental_models": model_analysis["incremental_models"], + "snapshot_models": model_analysis["snapshot_models"], + }, + "sources": { + "total": source_analysis["total_sources"], + "with_event_time": source_analysis["with_event_time"], + "without_event_time": source_analysis["without_event_time"], + "sources_without_event_time": [ + s["name"] for s in source_analysis["sources_without_event_time"] + ], + }, + "sql_findings": sql_findings, + "classification": classification, + } + print(json.dumps(output, indent=2)) + else: + print(format_report(model_analysis, source_analysis, sql_findings, classification)) + + +if __name__ == "__main__": + main()