From a67ec5b2a7bfe332a10f3c8140ae597b3fe6fca7 Mon Sep 17 00:00:00 2001 From: even-wei Date: Wed, 25 Feb 2026 14:30:04 +0800 Subject: [PATCH 1/4] feat: add PoC scripts for scenario detection and base mode classification Add detection heuristic prototype that analyzes dbt manifest.json to recommend shared base vs isolated base mode for Recce CI. The core insight: projects with incremental models need isolated base to avoid false alarm row count diffs. - detect_base_mode.py: parses manifest, classifies by materialization type, event_time coverage, and project scale - compare_environments.py: compares row counts between base/current schemas to validate false alarm patterns - metrics_daily_shipments.sql: test incremental model demonstrating the data divergence scenario Validated: 51/51 tables match with shared base (no incremental), 1 mismatch (-50.7%) when incremental model present. Refs: DRC-2820 Signed-off-by: even-wei --- models/metrics/metrics_daily_shipments.sql | 31 ++ scripts/compare_environments.py | 108 +++++++ scripts/detect_base_mode.py | 346 +++++++++++++++++++++ 3 files changed, 485 insertions(+) create mode 100644 models/metrics/metrics_daily_shipments.sql create mode 100644 scripts/compare_environments.py create mode 100644 scripts/detect_base_mode.py diff --git a/models/metrics/metrics_daily_shipments.sql b/models/metrics/metrics_daily_shipments.sql new file mode 100644 index 0000000..b778e73 --- /dev/null +++ b/models/metrics/metrics_daily_shipments.sql @@ -0,0 +1,31 @@ +{{ + config( + materialized = 'incremental', + unique_key = 'ship_date', + incremental_strategy = 'delete+insert' + ) +}} + +-- Incremental model: daily shipment metrics +-- This model demonstrates the isolated base scenario: +-- Production accumulates years of data, while a fresh PR build +-- only sees recent shipments, causing row count false alarms. + +select + oi.ship_date, + count(*) as shipment_count, + count(distinct oi.order_key) as order_count, + count(distinct oi.supplier_key) as supplier_count, + sum(oi.gross_item_sales_amount)::decimal(16,4) as total_revenue, + avg(oi.gross_item_sales_amount)::decimal(16,4) as avg_revenue_per_item +from + {{ ref('orders_items') }} oi +where + oi.ship_date is not null + {% if is_incremental() %} + and oi.ship_date > (select max(ship_date) from {{ this }}) + {% endif %} +group by + oi.ship_date +order by + oi.ship_date diff --git a/scripts/compare_environments.py b/scripts/compare_environments.py new file mode 100644 index 0000000..cf4163c --- /dev/null +++ b/scripts/compare_environments.py @@ -0,0 +1,108 @@ +"""Compare row counts between base and current schemas. + +Simulates Recce's row_count_diff check to demonstrate the difference between +shared base (same data) and isolated base (same sample window) scenarios. + +Usage: + uv run python scripts/compare_environments.py + uv run python scripts/compare_environments.py --base-schema base --current-schema current +""" + +import argparse + +import psycopg2 + + +def get_table_row_counts(conn, schema: str) -> dict[str, int]: + """Get row counts for all tables in a schema.""" + cur = conn.cursor() + cur.execute( + "SELECT table_name FROM information_schema.tables " + "WHERE table_schema = %s AND table_type = 'BASE TABLE' " + "ORDER BY table_name", + (schema,), + ) + tables = [row[0] for row in cur.fetchall()] + + counts = {} + for table in tables: + cur.execute(f'SELECT count(*) FROM "{schema}"."{table}"') + counts[table] = cur.fetchone()[0] + cur.close() + return counts + + +def compare(base_counts: dict, current_counts: dict) -> list[dict]: + """Compare row counts between base and current.""" + all_tables = sorted(set(base_counts.keys()) | set(current_counts.keys())) + results = [] + + for table in all_tables: + base_n = base_counts.get(table, 0) + current_n = current_counts.get(table, 0) + diff = current_n - base_n + pct = (diff / base_n * 100) if base_n > 0 else (100.0 if current_n > 0 else 0.0) + + results.append({ + "table": table, + "base": base_n, + "current": current_n, + "diff": diff, + "pct": pct, + "status": "match" if diff == 0 else "MISMATCH", + }) + + return results + + +def main(): + parser = argparse.ArgumentParser(description="Compare row counts between environments") + parser.add_argument("--host", default="localhost") + parser.add_argument("--port", type=int, default=5432) + parser.add_argument("--user", default="dbt") + parser.add_argument("--password", default="dbt") + parser.add_argument("--dbname", default="tpch") + parser.add_argument("--base-schema", default="base") + parser.add_argument("--current-schema", default="current") + args = parser.parse_args() + + conn = psycopg2.connect( + host=args.host, port=args.port, user=args.user, password=args.password, dbname=args.dbname + ) + + print(f"Comparing: {args.base_schema} vs {args.current_schema}") + print(f"Database: {args.host}:{args.port}/{args.dbname}") + print() + + base_counts = get_table_row_counts(conn, args.base_schema) + current_counts = get_table_row_counts(conn, args.current_schema) + results = compare(base_counts, current_counts) + + # Print results + matches = sum(1 for r in results if r["status"] == "match") + mismatches = sum(1 for r in results if r["status"] == "MISMATCH") + + print(f"{'Table':<40} {'Base':>10} {'Current':>10} {'Diff':>10} {'%':>8} {'Status'}") + print("-" * 90) + + for r in results: + status_marker = " " if r["status"] == "match" else "!!" + print( + f"{status_marker}{r['table']:<38} {r['base']:>10,} {r['current']:>10,} " + f"{r['diff']:>+10,} {r['pct']:>+7.1f}% {r['status']}" + ) + + print("-" * 90) + print(f"Total tables: {len(results)} | Matches: {matches} | Mismatches: {mismatches}") + + if mismatches == 0: + print("\nResult: ZERO false alarms — all row counts match between environments.") + else: + print(f"\nResult: {mismatches} table(s) have row count differences.") + print("These would appear as potential false alarms in Recce agent summaries.") + + conn.close() + + +if __name__ == "__main__": + main() diff --git a/scripts/detect_base_mode.py b/scripts/detect_base_mode.py new file mode 100644 index 0000000..c2af330 --- /dev/null +++ b/scripts/detect_base_mode.py @@ -0,0 +1,346 @@ +"""Detect whether a dbt project needs shared base or isolated base. + +Analyzes manifest.json to classify the project and recommend the appropriate +base environment mode for Recce. + +Usage: + uv run python scripts/detect_base_mode.py # default: target/manifest.json + uv run python scripts/detect_base_mode.py --manifest path/to/manifest.json + uv run python scripts/detect_base_mode.py --json # machine-readable output + +Detection signals: + 1. Incremental models → strong signal for isolated base + 2. Sources with event_time → enables --sample, makes isolated base feasible + 3. Materialization mix → all views = shared base fine + 4. Model count / complexity → large projects benefit more from isolation +""" + +import argparse +import json +import sys +from pathlib import Path + + +def load_manifest(path: str) -> dict: + manifest_path = Path(path) + if not manifest_path.exists(): + print(f"Error: manifest not found at {manifest_path}", file=sys.stderr) + print("Run 'dbt parse' or 'dbt build' first to generate the manifest.", file=sys.stderr) + sys.exit(1) + with open(manifest_path) as f: + return json.load(f) + + +def analyze_models(manifest: dict) -> dict: + """Extract model metadata from manifest.""" + models = {} + materialization_counts = {"table": 0, "view": 0, "ephemeral": 0, "incremental": 0, "other": 0} + incremental_models = [] + + for unique_id, node in manifest.get("nodes", {}).items(): + if node.get("resource_type") != "model": + continue + # Only count models from the root project, not packages + if node.get("package_name") != manifest.get("metadata", {}).get("project_name"): + continue + + mat = node.get("config", {}).get("materialized", "unknown") + name = node.get("name", unique_id) + schema_path = node.get("path", "") + + if mat in materialization_counts: + materialization_counts[mat] += 1 + else: + materialization_counts["other"] += 1 + + models[name] = { + "materialized": mat, + "path": schema_path, + "depends_on": node.get("depends_on", {}).get("nodes", []), + } + + if mat == "incremental": + inc_config = { + "name": name, + "path": schema_path, + "strategy": node.get("config", {}).get("incremental_strategy"), + "unique_key": node.get("config", {}).get("unique_key"), + } + incremental_models.append(inc_config) + + return { + "total_models": len(models), + "materialization_counts": materialization_counts, + "incremental_models": incremental_models, + "models": models, + } + + +def analyze_sources(manifest: dict) -> dict: + """Extract source metadata, focusing on event_time config.""" + sources = [] + sources_with_event_time = [] + sources_without_event_time = [] + + for unique_id, source in manifest.get("sources", {}).items(): + name = source.get("name", unique_id) + source_name = source.get("source_name", "unknown") + event_time = source.get("config", {}).get("event_time") + + entry = { + "name": name, + "source": source_name, + "event_time": event_time, + } + sources.append(entry) + + if event_time: + sources_with_event_time.append(entry) + else: + sources_without_event_time.append(entry) + + return { + "total_sources": len(sources), + "with_event_time": len(sources_with_event_time), + "without_event_time": len(sources_without_event_time), + "sources_with_event_time": sources_with_event_time, + "sources_without_event_time": sources_without_event_time, + } + + +def classify(model_analysis: dict, source_analysis: dict) -> dict: + """Classify the project and recommend base mode.""" + signals = [] + recommendation = "shared_base" + confidence = "high" + + inc_count = model_analysis["materialization_counts"]["incremental"] + total = model_analysis["total_models"] + table_count = model_analysis["materialization_counts"]["table"] + view_count = model_analysis["materialization_counts"]["view"] + event_time_pct = ( + source_analysis["with_event_time"] / source_analysis["total_sources"] * 100 + if source_analysis["total_sources"] > 0 + else 0 + ) + + # Signal 1: Incremental models (strongest signal) + if inc_count > 0: + inc_pct = inc_count / total * 100 + signals.append({ + "signal": "incremental_models", + "value": inc_count, + "detail": f"{inc_count} incremental model(s) ({inc_pct:.0f}% of project)", + "weight": "strong", + "direction": "isolated_base", + "reason": "Incremental models accumulate data over time. Shared base (production) " + "has full history while PR current has only recent data, causing " + "false alarms in row count and value diffs.", + }) + recommendation = "isolated_base" + else: + signals.append({ + "signal": "incremental_models", + "value": 0, + "detail": "No incremental models found", + "weight": "strong", + "direction": "shared_base", + "reason": "Without incremental models, base and current environments " + "produce the same data when given the same input.", + }) + + # Signal 2: Materialization profile + if total > 0 and view_count == total: + signals.append({ + "signal": "all_views", + "value": True, + "detail": "All models are views — no materialized data to diverge", + "weight": "moderate", + "direction": "shared_base", + "reason": "Views are recomputed on read; no stored state to diverge between environments.", + }) + elif table_count > 0: + signals.append({ + "signal": "table_models", + "value": table_count, + "detail": f"{table_count} table model(s) — full refresh on each build", + "weight": "weak", + "direction": "shared_base", + "reason": "Table models do full refresh. With the same source data, " + "base and current produce identical results.", + }) + + # Signal 3: event_time coverage (enables --sample feasibility) + if source_analysis["total_sources"] > 0: + if event_time_pct == 100: + signals.append({ + "signal": "event_time_coverage", + "value": f"{event_time_pct:.0f}%", + "detail": f"All {source_analysis['total_sources']} sources have event_time configured", + "weight": "moderate", + "direction": "enables_isolation", + "reason": "Full event_time coverage means --sample can filter all sources " + "to a consistent time window, making isolated base builds fast and deterministic.", + }) + elif event_time_pct > 0: + signals.append({ + "signal": "event_time_coverage", + "value": f"{event_time_pct:.0f}%", + "detail": f"{source_analysis['with_event_time']} of {source_analysis['total_sources']} " + f"sources have event_time", + "weight": "weak", + "direction": "partial_isolation", + "reason": "Partial event_time coverage means --sample will only filter some sources. " + "Tables without event_time will still get full data.", + }) + else: + signals.append({ + "signal": "event_time_coverage", + "value": "0%", + "detail": "No sources have event_time configured", + "weight": "moderate", + "direction": "blocks_sample", + "reason": "Without event_time on sources, --sample cannot be used. " + "Isolated base would require full rebuilds or alternative filtering.", + }) + + # Signal 4: Project scale + if total > 50: + signals.append({ + "signal": "project_scale", + "value": total, + "detail": f"{total} models — large project benefits more from isolation", + "weight": "weak", + "direction": "isolated_base", + "reason": "Larger projects have more surface area for false alarms. " + "Isolated base reduces noise across all comparisons.", + }) + + # Determine confidence + if inc_count > 0 and event_time_pct == 100: + confidence = "high" + recommendation = "isolated_base" + elif inc_count > 0 and event_time_pct < 100: + confidence = "medium" + recommendation = "isolated_base" + elif inc_count == 0: + confidence = "high" + recommendation = "shared_base" + + return { + "recommendation": recommendation, + "confidence": confidence, + "signals": signals, + } + + +def format_report( + model_analysis: dict, source_analysis: dict, classification: dict +) -> str: + """Format a human-readable report.""" + lines = [] + lines.append("=" * 60) + lines.append(" Recce Base Mode Detection Report") + lines.append("=" * 60) + lines.append("") + + # Recommendation + rec = classification["recommendation"] + conf = classification["confidence"] + if rec == "isolated_base": + lines.append(f" RECOMMENDATION: Isolated Base ({conf} confidence)") + lines.append(" Your project has characteristics that cause false alarms") + lines.append(" with a shared production base. Use isolated base mode") + lines.append(" with --sample for accurate PR comparisons.") + else: + lines.append(f" RECOMMENDATION: Shared Base ({conf} confidence)") + lines.append(" Your project works well with the default shared base.") + lines.append(" No special CI configuration needed.") + + lines.append("") + lines.append("-" * 60) + + # Model summary + mc = model_analysis["materialization_counts"] + lines.append(f" Models: {model_analysis['total_models']} total") + lines.append(f" table: {mc['table']} view: {mc['view']} " + f"ephemeral: {mc['ephemeral']} incremental: {mc['incremental']}") + lines.append("") + + # Source summary + sa = source_analysis + lines.append(f" Sources: {sa['total_sources']} total") + lines.append(f" with event_time: {sa['with_event_time']} " + f"without: {sa['without_event_time']}") + if sa["without_event_time"] > 0: + missing = [s["name"] for s in sa["sources_without_event_time"]] + lines.append(f" missing event_time: {', '.join(missing)}") + lines.append("") + + # Incremental models + if model_analysis["incremental_models"]: + lines.append(" Incremental models:") + for m in model_analysis["incremental_models"]: + strategy = m.get("strategy") or "default" + lines.append(f" - {m['name']} (strategy: {strategy})") + lines.append("") + + # Signals + lines.append("-" * 60) + lines.append(" Detection signals:") + lines.append("") + for sig in classification["signals"]: + icon = {"strong": "***", "moderate": "**", "weak": "*"}[sig["weight"]] + direction = sig["direction"].replace("_", " ") + lines.append(f" {icon} [{direction}] {sig['detail']}") + lines.append(f" {sig['reason']}") + lines.append("") + + lines.append("=" * 60) + return "\n".join(lines) + + +def main(): + parser = argparse.ArgumentParser(description="Detect recommended Recce base mode") + parser.add_argument( + "--manifest", + default="target/manifest.json", + help="Path to dbt manifest.json (default: target/manifest.json)", + ) + parser.add_argument( + "--json", + action="store_true", + dest="json_output", + help="Output machine-readable JSON", + ) + args = parser.parse_args() + + manifest = load_manifest(args.manifest) + model_analysis = analyze_models(manifest) + source_analysis = analyze_sources(manifest) + classification = classify(model_analysis, source_analysis) + + if args.json_output: + output = { + "models": { + "total": model_analysis["total_models"], + "materialization_counts": model_analysis["materialization_counts"], + "incremental_models": model_analysis["incremental_models"], + }, + "sources": { + "total": source_analysis["total_sources"], + "with_event_time": source_analysis["with_event_time"], + "without_event_time": source_analysis["without_event_time"], + "sources_without_event_time": [ + s["name"] for s in source_analysis["sources_without_event_time"] + ], + }, + "classification": classification, + } + print(json.dumps(output, indent=2)) + else: + print(format_report(model_analysis, source_analysis, classification)) + + +if __name__ == "__main__": + main() From aa759e149de9e1f4e778031eb27bede71bee266e Mon Sep 17 00:00:00 2001 From: even-wei Date: Wed, 25 Feb 2026 23:49:52 +0800 Subject: [PATCH 2/4] refactor: reframe detection around conditional logic, not data accumulation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The root cause of false alarms is NOT "incremental models accumulate data." It is conditional logic (is_incremental(), current_date(), target.name, {{ this }}) that produces different SQL depending on build context. Two environments built under different conditions run different queries against the same source → different results → false alarm diffs. - detect_base_mode.py: reframed signal explanations from "data accumulation" to "conditional logic produces non-deterministic SQL"; added snapshot model detection alongside incremental - metrics_daily_shipments.sql: added target-dependent else branch (pg-base gets 365 days, others get 90 days) to demonstrate the real-world conditional fork pattern seen in fct_cmab_strategy_reward Validated: pg-base (365d) vs pg-current (90d) produces -4.8% row diff on the conditional model, while all 50 deterministic models match. Refs: DRC-2820 Signed-off-by: even-wei --- models/metrics/metrics_daily_shipments.sql | 24 +++- scripts/detect_base_mode.py | 149 ++++++++++++++------- 2 files changed, 121 insertions(+), 52 deletions(-) diff --git a/models/metrics/metrics_daily_shipments.sql b/models/metrics/metrics_daily_shipments.sql index b778e73..5c2f3ac 100644 --- a/models/metrics/metrics_daily_shipments.sql +++ b/models/metrics/metrics_daily_shipments.sql @@ -6,10 +6,22 @@ ) }} --- Incremental model: daily shipment metrics --- This model demonstrates the isolated base scenario: --- Production accumulates years of data, while a fresh PR build --- only sees recent shipments, causing row count false alarms. +-- Demonstrates why incremental models cause false alarms in Recce. +-- +-- The root cause is NOT "data accumulation" — it's the conditional logic +-- below that produces DIFFERENT SQL depending on build context: +-- +-- is_incremental() = true → filters from max(ship_date) in existing table +-- is_incremental() = false → filters last N days from a reference date +-- (N depends on target: prod gets 365 days, dev/PR gets 90 days) +-- +-- Two environments built at different times or with different history +-- will run different SQL → different results → false alarm diffs. +-- +-- This mirrors real-world patterns like the fct_cmab_strategy_reward example +-- where prod gets -8 days and dev gets -2 days from current_date(). + +{% set reference_date = "'1998-08-02'" %} select oi.ship_date, @@ -24,6 +36,10 @@ where oi.ship_date is not null {% if is_incremental() %} and oi.ship_date > (select max(ship_date) from {{ this }}) + and oi.ship_date <= {{ reference_date }}::date + {% else %} + and oi.ship_date >= {{ reference_date }}::date - interval '{{ 365 if target.name == "pg-base" else 90 }} days' + and oi.ship_date <= {{ reference_date }}::date {% endif %} group by oi.ship_date diff --git a/scripts/detect_base_mode.py b/scripts/detect_base_mode.py index c2af330..03ae8ba 100644 --- a/scripts/detect_base_mode.py +++ b/scripts/detect_base_mode.py @@ -8,11 +8,20 @@ uv run python scripts/detect_base_mode.py --manifest path/to/manifest.json uv run python scripts/detect_base_mode.py --json # machine-readable output +Root cause of false alarms: + Incremental models contain conditional logic (is_incremental()) that produces + DIFFERENT SQL depending on build context — existing table state, build time, + target name. Two environments built under different conditions run different + queries against the same source data, producing different results. + + This is NOT about "data accumulation" or "data volume." It's about + non-deterministic SQL generation from conditional Jinja logic. + Detection signals: - 1. Incremental models → strong signal for isolated base - 2. Sources with event_time → enables --sample, makes isolated base feasible - 3. Materialization mix → all views = shared base fine - 4. Model count / complexity → large projects benefit more from isolation + 1. Incremental/snapshot models → contain is_incremental() conditional logic + 2. Sources with event_time → enables --sample for deterministic windows + 3. Materialization mix → all views/tables = deterministic output + 4. Model count / complexity → larger projects amplify the false alarm noise """ import argparse @@ -36,18 +45,31 @@ def analyze_models(manifest: dict) -> dict: models = {} materialization_counts = {"table": 0, "view": 0, "ephemeral": 0, "incremental": 0, "other": 0} incremental_models = [] + snapshot_models = [] + + project_name = manifest.get("metadata", {}).get("project_name") for unique_id, node in manifest.get("nodes", {}).items(): - if node.get("resource_type") != "model": + resource_type = node.get("resource_type") + if resource_type not in ("model", "snapshot"): continue # Only count models from the root project, not packages - if node.get("package_name") != manifest.get("metadata", {}).get("project_name"): + if node.get("package_name") != project_name: continue - mat = node.get("config", {}).get("materialized", "unknown") name = node.get("name", unique_id) schema_path = node.get("path", "") + if resource_type == "snapshot": + snapshot_models.append({ + "name": name, + "path": schema_path, + "strategy": node.get("config", {}).get("strategy"), + }) + continue + + mat = node.get("config", {}).get("materialized", "unknown") + if mat in materialization_counts: materialization_counts[mat] += 1 else: @@ -60,18 +82,18 @@ def analyze_models(manifest: dict) -> dict: } if mat == "incremental": - inc_config = { + incremental_models.append({ "name": name, "path": schema_path, "strategy": node.get("config", {}).get("incremental_strategy"), "unique_key": node.get("config", {}).get("unique_key"), - } - incremental_models.append(inc_config) + }) return { "total_models": len(models), "materialization_counts": materialization_counts, "incremental_models": incremental_models, + "snapshot_models": snapshot_models, "models": models, } @@ -109,12 +131,20 @@ def analyze_sources(manifest: dict) -> dict: def classify(model_analysis: dict, source_analysis: dict) -> dict: - """Classify the project and recommend base mode.""" + """Classify the project and recommend base mode. + + The core question: does the project contain models with conditional logic + that makes SQL output dependent on build context (time, existing state, + target)? If yes, two environments built under different conditions will + produce different results — causing false alarm diffs in Recce. + """ signals = [] recommendation = "shared_base" confidence = "high" inc_count = model_analysis["materialization_counts"]["incremental"] + snap_count = len(model_analysis["snapshot_models"]) + conditional_count = inc_count + snap_count total = model_analysis["total_models"] table_count = model_analysis["materialization_counts"]["table"] view_count = model_analysis["materialization_counts"]["view"] @@ -124,29 +154,42 @@ def classify(model_analysis: dict, source_analysis: dict) -> dict: else 0 ) - # Signal 1: Incremental models (strongest signal) - if inc_count > 0: - inc_pct = inc_count / total * 100 + # Signal 1: Models with conditional logic (strongest signal) + # Incremental models use is_incremental() which forks SQL based on: + # - Whether the target table already exists (state-dependent) + # - Often combined with current_date()/current_timestamp() (time-dependent) + # - Sometimes with target.name checks (target-dependent) + # Snapshots use similar conditional logic for SCD history tracking. + if conditional_count > 0: + parts = [] + if inc_count > 0: + parts.append(f"{inc_count} incremental") + if snap_count > 0: + parts.append(f"{snap_count} snapshot") + detail = f"{' + '.join(parts)} model(s) with conditional logic" + signals.append({ - "signal": "incremental_models", - "value": inc_count, - "detail": f"{inc_count} incremental model(s) ({inc_pct:.0f}% of project)", + "signal": "conditional_models", + "value": conditional_count, + "detail": detail, "weight": "strong", "direction": "isolated_base", - "reason": "Incremental models accumulate data over time. Shared base (production) " - "has full history while PR current has only recent data, causing " - "false alarms in row count and value diffs.", + "reason": "These models contain is_incremental() or snapshot logic that produces " + "different SQL depending on build context (existing table state, build " + "time, target name). Two environments built under different conditions " + "will run different queries → different results → false alarm diffs.", }) recommendation = "isolated_base" else: signals.append({ - "signal": "incremental_models", + "signal": "conditional_models", "value": 0, - "detail": "No incremental models found", + "detail": "No incremental or snapshot models found", "weight": "strong", "direction": "shared_base", - "reason": "Without incremental models, base and current environments " - "produce the same data when given the same input.", + "reason": "Without conditional logic (is_incremental, snapshots), all models " + "produce deterministic SQL. Same source data → same result regardless " + "of when or where the build runs.", }) # Signal 2: Materialization profile @@ -154,20 +197,21 @@ def classify(model_analysis: dict, source_analysis: dict) -> dict: signals.append({ "signal": "all_views", "value": True, - "detail": "All models are views — no materialized data to diverge", + "detail": "All models are views — recomputed on read, no stored state", "weight": "moderate", "direction": "shared_base", - "reason": "Views are recomputed on read; no stored state to diverge between environments.", + "reason": "Views generate deterministic SQL with no conditional logic. " + "Output depends only on current source data, not build history.", }) - elif table_count > 0: + elif table_count > 0 and conditional_count == 0: signals.append({ "signal": "table_models", "value": table_count, - "detail": f"{table_count} table model(s) — full refresh on each build", + "detail": f"{table_count} table model(s) — deterministic full refresh", "weight": "weak", "direction": "shared_base", - "reason": "Table models do full refresh. With the same source data, " - "base and current produce identical results.", + "reason": "Table models generate the same SQL every build (no conditional " + "logic). Same source data → identical results in any environment.", }) # Signal 3: event_time coverage (enables --sample feasibility) @@ -180,7 +224,8 @@ def classify(model_analysis: dict, source_analysis: dict) -> dict: "weight": "moderate", "direction": "enables_isolation", "reason": "Full event_time coverage means --sample can filter all sources " - "to a consistent time window, making isolated base builds fast and deterministic.", + "to a consistent time window, making isolated base builds fast " + "and deterministic.", }) elif event_time_pct > 0: signals.append({ @@ -190,8 +235,8 @@ def classify(model_analysis: dict, source_analysis: dict) -> dict: f"sources have event_time", "weight": "weak", "direction": "partial_isolation", - "reason": "Partial event_time coverage means --sample will only filter some sources. " - "Tables without event_time will still get full data.", + "reason": "Partial event_time coverage means --sample will only filter some " + "sources. Sources without event_time will still get full data.", }) else: signals.append({ @@ -201,7 +246,7 @@ def classify(model_analysis: dict, source_analysis: dict) -> dict: "weight": "moderate", "direction": "blocks_sample", "reason": "Without event_time on sources, --sample cannot be used. " - "Isolated base would require full rebuilds or alternative filtering.", + "Isolated base would require full rebuilds.", }) # Signal 4: Project scale @@ -209,21 +254,22 @@ def classify(model_analysis: dict, source_analysis: dict) -> dict: signals.append({ "signal": "project_scale", "value": total, - "detail": f"{total} models — large project benefits more from isolation", + "detail": f"{total} models — more surface area for false alarm noise", "weight": "weak", "direction": "isolated_base", - "reason": "Larger projects have more surface area for false alarms. " - "Isolated base reduces noise across all comparisons.", + "reason": "Larger projects amplify the impact of conditional models: " + "downstream models inherit the divergent data, spreading " + "false alarm diffs across more tables.", }) # Determine confidence - if inc_count > 0 and event_time_pct == 100: + if conditional_count > 0 and event_time_pct == 100: confidence = "high" recommendation = "isolated_base" - elif inc_count > 0 and event_time_pct < 100: + elif conditional_count > 0 and event_time_pct < 100: confidence = "medium" recommendation = "isolated_base" - elif inc_count == 0: + elif conditional_count == 0: confidence = "high" recommendation = "shared_base" @@ -249,13 +295,14 @@ def format_report( conf = classification["confidence"] if rec == "isolated_base": lines.append(f" RECOMMENDATION: Isolated Base ({conf} confidence)") - lines.append(" Your project has characteristics that cause false alarms") - lines.append(" with a shared production base. Use isolated base mode") - lines.append(" with --sample for accurate PR comparisons.") + lines.append(" Your project has models with conditional logic") + lines.append(" (is_incremental/snapshots) that produce different SQL") + lines.append(" depending on build context. Use isolated base mode") + lines.append(" so both environments run the same deterministic SQL.") else: lines.append(f" RECOMMENDATION: Shared Base ({conf} confidence)") - lines.append(" Your project works well with the default shared base.") - lines.append(" No special CI configuration needed.") + lines.append(" All models produce deterministic SQL — no conditional") + lines.append(" logic that varies by build context. Shared base is fine.") lines.append("") lines.append("-" * 60) @@ -265,6 +312,8 @@ def format_report( lines.append(f" Models: {model_analysis['total_models']} total") lines.append(f" table: {mc['table']} view: {mc['view']} " f"ephemeral: {mc['ephemeral']} incremental: {mc['incremental']}") + if model_analysis["snapshot_models"]: + lines.append(f" snapshots: {len(model_analysis['snapshot_models'])}") lines.append("") # Source summary @@ -277,12 +326,15 @@ def format_report( lines.append(f" missing event_time: {', '.join(missing)}") lines.append("") - # Incremental models + # Models with conditional logic if model_analysis["incremental_models"]: - lines.append(" Incremental models:") + lines.append(" Models with conditional logic:") for m in model_analysis["incremental_models"]: strategy = m.get("strategy") or "default" - lines.append(f" - {m['name']} (strategy: {strategy})") + lines.append(f" - {m['name']} (incremental, strategy: {strategy})") + for m in model_analysis["snapshot_models"]: + strategy = m.get("strategy") or "default" + lines.append(f" - {m['name']} (snapshot, strategy: {strategy})") lines.append("") # Signals @@ -326,6 +378,7 @@ def main(): "total": model_analysis["total_models"], "materialization_counts": model_analysis["materialization_counts"], "incremental_models": model_analysis["incremental_models"], + "snapshot_models": model_analysis["snapshot_models"], }, "sources": { "total": source_analysis["total_sources"], From 7036cad37a21d250500881a2e1ade1c73d7e3917 Mon Sep 17 00:00:00 2001 From: even-wei Date: Thu, 26 Feb 2026 12:23:43 +0800 Subject: [PATCH 3/4] =?UTF-8?q?feat:=20PoC=20v2=20=E2=80=94=20prove=20fals?= =?UTF-8?q?e=20alarms=20from=20conditional=20logic=20in=20any=20materializ?= =?UTF-8?q?ation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add models demonstrating that false alarms are caused by non-deterministic SQL patterns (target.name, current_date), not by materialization type: - metrics_regional_revenue (table): target.name date window → -68.8% mismatch - metrics_shipping_efficiency (table): target.name branching → -68.8% mismatch - metrics_order_summary (view): target.name date window → -78.7% mismatch - metrics_daily_orders (incremental, deterministic else): 0% match — safe Detection script v2: scans raw SQL for non-deterministic patterns instead of checking materialization type. Correctly flags 4/4 problematic models, correctly marks safe incremental as safe. Zero false positives, zero false negatives. Compare script: now includes views in row count comparison. Signed-off-by: even-wei --- models/metrics/metrics_daily_orders.sql | 42 +++ models/metrics/metrics_order_summary.sql | 29 ++ models/metrics/metrics_regional_revenue.sql | 11 + .../metrics/metrics_shipping_efficiency.sql | 17 +- scripts/compare_environments.py | 20 +- scripts/detect_base_mode.py | 292 ++++++++++++------ 6 files changed, 315 insertions(+), 96 deletions(-) create mode 100644 models/metrics/metrics_daily_orders.sql create mode 100644 models/metrics/metrics_order_summary.sql diff --git a/models/metrics/metrics_daily_orders.sql b/models/metrics/metrics_daily_orders.sql new file mode 100644 index 0000000..78896eb --- /dev/null +++ b/models/metrics/metrics_daily_orders.sql @@ -0,0 +1,42 @@ +{{ + config( + materialized = 'incremental', + unique_key = 'order_date', + incremental_strategy = 'delete+insert' + ) +}} + +-- SAFE INCREMENTAL: deterministic else branch — NO false alarm. +-- +-- This model IS incremental, but its else branch (used on fresh CI builds) +-- uses a FIXED date range with no target.name or current_date() dependency. +-- Both pg-base and pg-current produce identical SQL on first build. +-- +-- Contrast with metrics_daily_shipments which has target.name in the else +-- branch → different SQL per target → false alarm. +-- +-- Key insight: is_incremental() alone does NOT cause false alarms. +-- The false alarm comes from non-deterministic logic INSIDE the branches. + +{% set reference_date = "'1998-08-02'" %} + +select + o.order_date, + count(distinct o.order_key) as order_count, + count(distinct o.customer_key) as customer_count, + sum(o.gross_item_sales_amount)::decimal(16,4) as total_revenue, + avg(o.gross_item_sales_amount)::decimal(16,4) as avg_order_value +from + {{ ref('fct_orders') }} o +where + o.order_date is not null + {% if is_incremental() %} + and o.order_date > (select max(order_date) from {{ this }}) + and o.order_date <= {{ reference_date }}::date + {% else %} + and o.order_date <= {{ reference_date }}::date + {% endif %} +group by + o.order_date +order by + o.order_date diff --git a/models/metrics/metrics_order_summary.sql b/models/metrics/metrics_order_summary.sql new file mode 100644 index 0000000..351c293 --- /dev/null +++ b/models/metrics/metrics_order_summary.sql @@ -0,0 +1,29 @@ +{{ + config( + materialized = 'view' + ) +}} + +-- Monthly order summary by priority level +-- +-- FALSE ALARM DEMO: VIEW with target-dependent date window. +-- pg-base gets 5 years of history, pg-current gets 1 year. +-- Even views — which don't store data — produce different results +-- when their SQL definition varies by build context. +-- This is NOT incremental, NOT a table — it's a plain view. + +{% set reference_date = "'1998-08-02'" %} + +select + date_trunc('month', o.order_date) as order_month, + o.order_priority_code as priority, + count(distinct o.order_key) as order_count, + count(distinct o.customer_key) as customer_count, + sum(o.gross_item_sales_amount)::decimal(16,4) as total_revenue, + avg(o.gross_item_sales_amount)::decimal(16,4) as avg_order_value +from + {{ ref('fct_orders') }} o +where + o.order_date >= {{ reference_date }}::date - interval '{{ 1825 if target.name == "pg-base" else 365 }} days' + and o.order_date <= {{ reference_date }}::date +group by 1, 2 diff --git a/models/metrics/metrics_regional_revenue.sql b/models/metrics/metrics_regional_revenue.sql index 727ec63..a8be242 100644 --- a/models/metrics/metrics_regional_revenue.sql +++ b/models/metrics/metrics_regional_revenue.sql @@ -1,4 +1,12 @@ -- Revenue by region and nation over time +-- +-- FALSE ALARM DEMO: TABLE model with target-dependent date window. +-- pg-base gets 7 years of history, pg-current gets 2 years. +-- Same pattern as prod vs dev environments with different data needs. +-- This is NOT incremental — it's a plain table with conditional logic. + +{% set reference_date = "'1998-08-02'" %} + with orders as ( select * from {{ ref('fct_orders') }} @@ -20,4 +28,7 @@ select from orders o join customers c on o.customer_key = c.customer_key +where + o.order_date >= {{ reference_date }}::date - interval '{{ 2555 if target.name == "pg-base" else 730 }} days' + and o.order_date <= {{ reference_date }}::date group by 1, 2, 3 diff --git a/models/metrics/metrics_shipping_efficiency.sql b/models/metrics/metrics_shipping_efficiency.sql index 6a14769..b8f28e4 100644 --- a/models/metrics/metrics_shipping_efficiency.sql +++ b/models/metrics/metrics_shipping_efficiency.sql @@ -1,4 +1,12 @@ -- Average delivery time by shipping mode per month +-- +-- FALSE ALARM DEMO: TABLE model with target.name branching. +-- pg-base analyzes 7 years of ship dates, pg-current only 2 years. +-- Mirrors real pattern: prod builds full history, dev builds subset. +-- This is NOT incremental — it's a plain table with conditional logic. + +{% set reference_date = "'1998-08-02'" %} + with items as ( select * from {{ ref('fct_orders_items') }} @@ -15,5 +23,12 @@ select round(sum(case when i.receipt_date > i.commit_date then 1 else 0 end)::decimal / nullif(count(*), 0) * 100, 2) as late_pct from items i -where i.receipt_date is not null +where + i.receipt_date is not null + {% if target.name == 'pg-base' %} + and i.ship_date >= {{ reference_date }}::date - interval '2555 days' + {% else %} + and i.ship_date >= {{ reference_date }}::date - interval '730 days' + {% endif %} + and i.ship_date <= {{ reference_date }}::date group by 1, 2 diff --git a/scripts/compare_environments.py b/scripts/compare_environments.py index cf4163c..523fb43 100644 --- a/scripts/compare_environments.py +++ b/scripts/compare_environments.py @@ -14,20 +14,21 @@ def get_table_row_counts(conn, schema: str) -> dict[str, int]: - """Get row counts for all tables in a schema.""" + """Get row counts for all tables and views in a schema.""" cur = conn.cursor() cur.execute( - "SELECT table_name FROM information_schema.tables " - "WHERE table_schema = %s AND table_type = 'BASE TABLE' " + "SELECT table_name, table_type FROM information_schema.tables " + "WHERE table_schema = %s AND table_type IN ('BASE TABLE', 'VIEW') " "ORDER BY table_name", (schema,), ) - tables = [row[0] for row in cur.fetchall()] + relations = [(row[0], row[1]) for row in cur.fetchall()] counts = {} - for table in tables: - cur.execute(f'SELECT count(*) FROM "{schema}"."{table}"') - counts[table] = cur.fetchone()[0] + for name, rel_type in relations: + cur.execute(f'SELECT count(*) FROM "{schema}"."{name}"') + tag = "(view)" if rel_type == "VIEW" else "" + counts[f"{name} {tag}".strip()] = cur.fetchone()[0] cur.close() return counts @@ -82,14 +83,15 @@ def main(): matches = sum(1 for r in results if r["status"] == "match") mismatches = sum(1 for r in results if r["status"] == "MISMATCH") - print(f"{'Table':<40} {'Base':>10} {'Current':>10} {'Diff':>10} {'%':>8} {'Status'}") + print(f"{'Table':<40} {'Base':>10} {'Current':>10} {'Diff':>10} {'Result'}") print("-" * 90) for r in results: status_marker = " " if r["status"] == "match" else "!!" + label = f"{r['pct']:+.1f}% [{r['status']}]" print( f"{status_marker}{r['table']:<38} {r['base']:>10,} {r['current']:>10,} " - f"{r['diff']:>+10,} {r['pct']:>+7.1f}% {r['status']}" + f"{r['diff']:>+10,} {label}" ) print("-" * 90) diff --git a/scripts/detect_base_mode.py b/scripts/detect_base_mode.py index 03ae8ba..d0976d0 100644 --- a/scripts/detect_base_mode.py +++ b/scripts/detect_base_mode.py @@ -1,6 +1,6 @@ """Detect whether a dbt project needs shared base or isolated base. -Analyzes manifest.json to classify the project and recommend the appropriate +Analyzes model SQL to classify the project and recommend the appropriate base environment mode for Recce. Usage: @@ -9,27 +9,62 @@ uv run python scripts/detect_base_mode.py --json # machine-readable output Root cause of false alarms: - Incremental models contain conditional logic (is_incremental()) that produces - DIFFERENT SQL depending on build context — existing table state, build time, - target name. Two environments built under different conditions run different - queries against the same source data, producing different results. - - This is NOT about "data accumulation" or "data volume." It's about - non-deterministic SQL generation from conditional Jinja logic. - -Detection signals: - 1. Incremental/snapshot models → contain is_incremental() conditional logic - 2. Sources with event_time → enables --sample for deterministic windows - 3. Materialization mix → all views/tables = deterministic output - 4. Model count / complexity → larger projects amplify the false alarm noise + Conditional or non-deterministic logic in models produces DIFFERENT SQL + depending on build context — target name, build time, existing table state. + Two environments built under different conditions run different queries + against the same source data, producing different results. + + This is NOT limited to incremental models. Any model (table, view, ephemeral) + with conditional Jinja or non-deterministic functions can cause false alarms. + Conversely, an incremental model with a deterministic else branch is SAFE + in CI (both envs get fresh builds → same SQL). + +Detection approach — SQL pattern scanning: + Scans raw_code from manifest for patterns that make SQL non-deterministic: + - target.name/schema → different SQL per target environment + - current_date()/now()→ different results per build time + Note: is_incremental() and {{ this }} are NOT flagged by themselves — + they only matter if combined with non-deterministic patterns above. """ import argparse import json +import re import sys from pathlib import Path +# Jinja/SQL patterns that make SQL non-deterministic across environments. +# +# Key insight: is_incremental() and {{ this }} are NOT flagged here. +# In CI, both envs get fresh builds → is_incremental() returns false → +# {{ this }} is never reached. An incremental model with a deterministic +# else branch produces identical SQL in both environments. +# +# What actually causes false alarms is non-deterministic content INSIDE +# the branches: target.name, current_date(), etc. +CONDITIONAL_PATTERNS = [ + { + "name": "target.name", + "regex": re.compile(r"\btarget\s*\.\s*name\b"), + "weight": "strong", + "reason": "SQL varies by target environment — different targets produce different queries", + }, + { + "name": "target.schema", + "regex": re.compile(r"\btarget\s*\.\s*schema\b"), + "weight": "strong", + "reason": "SQL varies by target schema — different targets produce different queries", + }, + { + "name": "current_date/now", + "regex": re.compile(r"\b(current_date|current_timestamp|now\s*\(\s*\)|getdate\s*\(\s*\))\b", re.IGNORECASE), + "weight": "moderate", + "reason": "Result depends on build time — builds at different times produce different data", + }, +] + + def load_manifest(path: str) -> dict: manifest_path = Path(path) if not manifest_path.exists(): @@ -53,7 +88,6 @@ def analyze_models(manifest: dict) -> dict: resource_type = node.get("resource_type") if resource_type not in ("model", "snapshot"): continue - # Only count models from the root project, not packages if node.get("package_name") != project_name: continue @@ -79,6 +113,7 @@ def analyze_models(manifest: dict) -> dict: "materialized": mat, "path": schema_path, "depends_on": node.get("depends_on", {}).get("nodes", []), + "raw_code": node.get("raw_code", ""), } if mat == "incremental": @@ -98,6 +133,59 @@ def analyze_models(manifest: dict) -> dict: } +def strip_sql_comments(code: str) -> str: + """Remove SQL line comments (--) and block comments (/* */) from code. + + Preserves Jinja comments ({# #}) since they're already stripped by dbt. + This prevents false positives from keywords mentioned in comments like: + -- no target.name or current_date() dependency + """ + # Remove block comments + code = re.sub(r"/\*.*?\*/", "", code, flags=re.DOTALL) + # Remove line comments (but not inside strings — good enough for PoC) + code = re.sub(r"--[^\n]*", "", code) + return code + + +def scan_sql_patterns(model_analysis: dict) -> list[dict]: + """Scan ALL model SQL for non-deterministic patterns. + + Scans every model (including incremental) for patterns that make SQL + vary by build context. An incremental model with a deterministic else + branch will NOT be flagged — only those with target.name, current_date(), + etc. in their code. + + SQL comments are stripped before scanning to avoid false positives. + """ + findings = [] + + for name, model in model_analysis["models"].items(): + raw_code = model.get("raw_code", "") + if not raw_code: + continue + + code_only = strip_sql_comments(raw_code) + + model_patterns = [] + for pattern in CONDITIONAL_PATTERNS: + if pattern["regex"].search(code_only): + model_patterns.append({ + "pattern": pattern["name"], + "weight": pattern["weight"], + "reason": pattern["reason"], + }) + + if model_patterns: + findings.append({ + "name": name, + "materialized": model["materialized"], + "path": model["path"], + "patterns": model_patterns, + }) + + return findings + + def analyze_sources(manifest: dict) -> dict: """Extract source metadata, focusing on event_time config.""" sources = [] @@ -130,78 +218,88 @@ def analyze_sources(manifest: dict) -> dict: } -def classify(model_analysis: dict, source_analysis: dict) -> dict: +def classify(model_analysis: dict, source_analysis: dict, sql_findings: list[dict]) -> dict: """Classify the project and recommend base mode. - The core question: does the project contain models with conditional logic - that makes SQL output dependent on build context (time, existing state, - target)? If yes, two environments built under different conditions will - produce different results — causing false alarm diffs in Recce. + Detection is based on SQL pattern scanning of ALL models (including + incremental). An incremental model with a deterministic else branch + is NOT flagged — only models with target.name, current_date(), etc. """ signals = [] recommendation = "shared_base" confidence = "high" - inc_count = model_analysis["materialization_counts"]["incremental"] - snap_count = len(model_analysis["snapshot_models"]) - conditional_count = inc_count + snap_count + conditional_count = len(sql_findings) total = model_analysis["total_models"] table_count = model_analysis["materialization_counts"]["table"] view_count = model_analysis["materialization_counts"]["view"] + inc_count = model_analysis["materialization_counts"]["incremental"] + snap_count = len(model_analysis["snapshot_models"]) event_time_pct = ( source_analysis["with_event_time"] / source_analysis["total_sources"] * 100 if source_analysis["total_sources"] > 0 else 0 ) - # Signal 1: Models with conditional logic (strongest signal) - # Incremental models use is_incremental() which forks SQL based on: - # - Whether the target table already exists (state-dependent) - # - Often combined with current_date()/current_timestamp() (time-dependent) - # - Sometimes with target.name checks (target-dependent) - # Snapshots use similar conditional logic for SCD history tracking. + # Signal 1: Models with non-deterministic SQL patterns if conditional_count > 0: - parts = [] - if inc_count > 0: - parts.append(f"{inc_count} incremental") - if snap_count > 0: - parts.append(f"{snap_count} snapshot") - detail = f"{' + '.join(parts)} model(s) with conditional logic" + model_details = [] + for f in sql_findings: + patterns = ", ".join(p["pattern"] for p in f["patterns"]) + model_details.append(f"{f['name']} ({f['materialized']}: {patterns})") signals.append({ - "signal": "conditional_models", + "signal": "non_deterministic_sql", "value": conditional_count, - "detail": detail, + "detail": f"{conditional_count} model(s) with non-deterministic SQL", "weight": "strong", "direction": "isolated_base", - "reason": "These models contain is_incremental() or snapshot logic that produces " - "different SQL depending on build context (existing table state, build " - "time, target name). Two environments built under different conditions " - "will run different queries → different results → false alarm diffs.", + "reason": "These models contain target.name, current_date(), or other patterns " + "that produce different SQL per build context. " + "Models: " + "; ".join(model_details), }) recommendation = "isolated_base" else: signals.append({ - "signal": "conditional_models", + "signal": "deterministic_sql", "value": 0, - "detail": "No incremental or snapshot models found", + "detail": "All models produce deterministic SQL (no target.name, current_date, etc.)", "weight": "strong", "direction": "shared_base", - "reason": "Without conditional logic (is_incremental, snapshots), all models " - "produce deterministic SQL. Same source data → same result regardless " - "of when or where the build runs.", + "reason": "SQL scanning found no non-deterministic patterns. " + "Same source data → same result regardless of build context.", + }) + + # Signal 2: Incremental models with safe else branches (informational) + safe_incremental = [] + for name, model in model_analysis["models"].items(): + if model["materialized"] == "incremental": + is_flagged = any(f["name"] == name for f in sql_findings) + if not is_flagged: + safe_incremental.append(name) + + if safe_incremental: + signals.append({ + "signal": "safe_incremental", + "value": len(safe_incremental), + "detail": f"{len(safe_incremental)} incremental model(s) with deterministic else branch — safe in CI", + "weight": "moderate", + "direction": "shared_base", + "reason": f"These incremental models have no target.name or current_date() in " + f"their code. In CI (fresh builds), is_incremental() returns false → " + f"both envs run the same deterministic SQL. " + f"Models: {', '.join(safe_incremental)}", }) - # Signal 2: Materialization profile - if total > 0 and view_count == total: + # Signal 3: Materialization profile + if total > 0 and view_count == total and conditional_count == 0: signals.append({ "signal": "all_views", "value": True, "detail": "All models are views — recomputed on read, no stored state", "weight": "moderate", "direction": "shared_base", - "reason": "Views generate deterministic SQL with no conditional logic. " - "Output depends only on current source data, not build history.", + "reason": "Views with no non-deterministic logic generate deterministic SQL.", }) elif table_count > 0 and conditional_count == 0: signals.append({ @@ -210,11 +308,11 @@ def classify(model_analysis: dict, source_analysis: dict) -> dict: "detail": f"{table_count} table model(s) — deterministic full refresh", "weight": "weak", "direction": "shared_base", - "reason": "Table models generate the same SQL every build (no conditional " - "logic). Same source data → identical results in any environment.", + "reason": "Table models with no non-deterministic logic generate the same SQL " + "every build.", }) - # Signal 3: event_time coverage (enables --sample feasibility) + # Signal 4: event_time coverage (enables --sample feasibility) if source_analysis["total_sources"] > 0: if event_time_pct == 100: signals.append({ @@ -249,7 +347,7 @@ def classify(model_analysis: dict, source_analysis: dict) -> dict: "Isolated base would require full rebuilds.", }) - # Signal 4: Project scale + # Signal 5: Project scale if total > 50: signals.append({ "signal": "project_scale", @@ -257,55 +355,66 @@ def classify(model_analysis: dict, source_analysis: dict) -> dict: "detail": f"{total} models — more surface area for false alarm noise", "weight": "weak", "direction": "isolated_base", - "reason": "Larger projects amplify the impact of conditional models: " - "downstream models inherit the divergent data, spreading " - "false alarm diffs across more tables.", + "reason": "Larger projects amplify the impact: downstream models inherit " + "divergent data, spreading false alarm diffs across more tables.", }) # Determine confidence if conditional_count > 0 and event_time_pct == 100: confidence = "high" - recommendation = "isolated_base" - elif conditional_count > 0 and event_time_pct < 100: + elif conditional_count > 0: confidence = "medium" - recommendation = "isolated_base" - elif conditional_count == 0: + else: confidence = "high" - recommendation = "shared_base" + + # Populate by_materialization breakdown + mat_breakdown = {} + for f in sql_findings: + mat = f["materialized"] + mat_breakdown[mat] = mat_breakdown.get(mat, 0) + 1 return { "recommendation": recommendation, "confidence": confidence, "signals": signals, + "conditional_models": { + "total": conditional_count, + "by_materialization": mat_breakdown, + }, } def format_report( - model_analysis: dict, source_analysis: dict, classification: dict + model_analysis: dict, source_analysis: dict, sql_findings: list[dict], + classification: dict, ) -> str: """Format a human-readable report.""" lines = [] - lines.append("=" * 60) - lines.append(" Recce Base Mode Detection Report") - lines.append("=" * 60) + lines.append("=" * 70) + lines.append(" Recce Base Mode Detection Report (v2)") + lines.append("=" * 70) lines.append("") # Recommendation rec = classification["recommendation"] conf = classification["confidence"] + cond = classification["conditional_models"] if rec == "isolated_base": lines.append(f" RECOMMENDATION: Isolated Base ({conf} confidence)") - lines.append(" Your project has models with conditional logic") - lines.append(" (is_incremental/snapshots) that produce different SQL") - lines.append(" depending on build context. Use isolated base mode") - lines.append(" so both environments run the same deterministic SQL.") + lines.append(f" Found {cond['total']} model(s) with non-deterministic SQL") + mat_parts = [f"{v} {k}" for k, v in cond["by_materialization"].items()] + if mat_parts: + lines.append(f" breakdown: {', '.join(mat_parts)}") + lines.append("") + lines.append(" These models produce different SQL depending on build context.") + lines.append(" Use isolated base so both environments build deterministically.") else: lines.append(f" RECOMMENDATION: Shared Base ({conf} confidence)") - lines.append(" All models produce deterministic SQL — no conditional") - lines.append(" logic that varies by build context. Shared base is fine.") + lines.append(" No non-deterministic patterns found in any model.") + lines.append(" All models produce deterministic SQL — shared base is fine.") lines.append("") - lines.append("-" * 60) + lines.append("-" * 70) # Model summary mc = model_analysis["materialization_counts"] @@ -326,19 +435,28 @@ def format_report( lines.append(f" missing event_time: {', '.join(missing)}") lines.append("") - # Models with conditional logic - if model_analysis["incremental_models"]: - lines.append(" Models with conditional logic:") - for m in model_analysis["incremental_models"]: - strategy = m.get("strategy") or "default" - lines.append(f" - {m['name']} (incremental, strategy: {strategy})") - for m in model_analysis["snapshot_models"]: - strategy = m.get("strategy") or "default" - lines.append(f" - {m['name']} (snapshot, strategy: {strategy})") + # Models with non-deterministic SQL + if sql_findings: + lines.append(" Models with non-deterministic SQL:") + for f in sql_findings: + patterns = ", ".join(p["pattern"] for p in f["patterns"]) + lines.append(f" - {f['name']} ({f['materialized']}) — {patterns}") + lines.append("") + + # Safe incremental models (informational) + safe_inc = [ + name for name, m in model_analysis["models"].items() + if m["materialized"] == "incremental" + and not any(f["name"] == name for f in sql_findings) + ] + if safe_inc: + lines.append(" Safe incremental models (deterministic else branch):") + for name in safe_inc: + lines.append(f" - {name} (incremental, no target.name/current_date)") lines.append("") # Signals - lines.append("-" * 60) + lines.append("-" * 70) lines.append(" Detection signals:") lines.append("") for sig in classification["signals"]: @@ -348,7 +466,7 @@ def format_report( lines.append(f" {sig['reason']}") lines.append("") - lines.append("=" * 60) + lines.append("=" * 70) return "\n".join(lines) @@ -370,7 +488,8 @@ def main(): manifest = load_manifest(args.manifest) model_analysis = analyze_models(manifest) source_analysis = analyze_sources(manifest) - classification = classify(model_analysis, source_analysis) + sql_findings = scan_sql_patterns(model_analysis) + classification = classify(model_analysis, source_analysis, sql_findings) if args.json_output: output = { @@ -388,11 +507,12 @@ def main(): s["name"] for s in source_analysis["sources_without_event_time"] ], }, + "sql_findings": sql_findings, "classification": classification, } print(json.dumps(output, indent=2)) else: - print(format_report(model_analysis, source_analysis, classification)) + print(format_report(model_analysis, source_analysis, sql_findings, classification)) if __name__ == "__main__": From abfccbbea0f7d4b84a012155b85be48de83388aa Mon Sep 17 00:00:00 2001 From: even-wei Date: Thu, 26 Feb 2026 17:03:32 +0800 Subject: [PATCH 4/4] feat: add compiled SQL diff detection and approach comparison MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Prototype two detection methods for non-deterministic dbt models: 1. compiled_sql_diff.py — Compiles under two targets, normalizes schema names and batch metadata, diffs remaining SQL. Requires --full-refresh to catch incremental else branches. 2. compare_detection_approaches.py — Runs both Jinja scanning and compiled SQL diffing side-by-side, comparing accuracy against ground truth. Key findings: - Both approaches achieve 100% accuracy on dbt-tpch (73 models) - Compiled SQL diff needs --full-refresh for incremental models - Schema normalization must be precise (db.schema.table only) - dbt_batch_id/ts must be stripped as compile-time artifacts Relates to DRC-2863 Signed-off-by: even-wei --- scripts/compare_detection_approaches.py | 149 ++++++++++ scripts/compiled_sql_diff.py | 351 ++++++++++++++++++++++++ 2 files changed, 500 insertions(+) create mode 100644 scripts/compare_detection_approaches.py create mode 100644 scripts/compiled_sql_diff.py diff --git a/scripts/compare_detection_approaches.py b/scripts/compare_detection_approaches.py new file mode 100644 index 0000000..7923010 --- /dev/null +++ b/scripts/compare_detection_approaches.py @@ -0,0 +1,149 @@ +"""Compare detection accuracy: Jinja scanning vs compiled SQL diffing. + +Runs both approaches on the same dbt project and compares results. + +Usage: + uv run python scripts/compare_detection_approaches.py +""" + +import json +import subprocess +import sys + + +def run_jinja_scanning() -> set[str]: + """Run Jinja pattern scanning and return flagged model names.""" + result = subprocess.run( + ["uv", "run", "python", "scripts/detect_base_mode.py", "--json"], + capture_output=True, text=True, + ) + if result.returncode != 0: + print(f"Error running detect_base_mode.py: {result.stderr}", file=sys.stderr) + return set() + + data = json.loads(result.stdout) + return {f["name"] for f in data["sql_findings"]} + + +def run_compiled_diff(base_dir: str, current_dir: str) -> set[str]: + """Run compiled SQL diff and return flagged model names.""" + result = subprocess.run( + ["uv", "run", "python", "scripts/compiled_sql_diff.py", + "--base-dir", base_dir, + "--current-dir", current_dir, + "--json"], + capture_output=True, text=True, + ) + if result.returncode != 0: + print(f"Error running compiled_sql_diff.py: {result.stderr}", file=sys.stderr) + return set() + + data = json.loads(result.stdout) + return {f["model"] for f in data["non_deterministic"]} + + +def main(): + print("=" * 70) + print(" Detection Approach Comparison") + print("=" * 70) + print() + + # Expected ground truth: models with target.name branching + ground_truth = { + "metrics_daily_shipments", # incremental, target.name in else + "metrics_shipping_efficiency", # table, target.name if/else + "metrics_regional_revenue", # table, target.name inline + "metrics_order_summary", # view, target.name inline + } + safe_models = { + "metrics_daily_orders", # incremental, deterministic else + } + + print(" Ground truth (should be flagged):") + for m in sorted(ground_truth): + print(f" - {m}") + print() + print(" Safe models (should NOT be flagged):") + for m in sorted(safe_models): + print(f" - {m}") + print() + + # Approach 1: Jinja pattern scanning + print("-" * 70) + print(" Approach 1: Jinja Pattern Scanning (raw_code regex)") + jinja_flagged = run_jinja_scanning() + print(f" Flagged: {sorted(jinja_flagged)}") + + jinja_tp = ground_truth & jinja_flagged + jinja_fn = ground_truth - jinja_flagged + jinja_fp = jinja_flagged - ground_truth + jinja_safe_correct = safe_models - jinja_flagged + + print(f" True positives: {len(jinja_tp)}/{len(ground_truth)}") + print(f" False negatives: {len(jinja_fn)} {sorted(jinja_fn) if jinja_fn else ''}") + print(f" False positives: {len(jinja_fp)} {sorted(jinja_fp) if jinja_fp else ''}") + print(f" Safe correctly: {len(jinja_safe_correct)}/{len(safe_models)}") + print() + + # Approach 2a: Compiled SQL diff (without --full-refresh) + print("-" * 70) + print(" Approach 2a: Compiled SQL Diff (existing tables → is_incremental=true)") + diff_flagged = run_compiled_diff("target/compiled_pg_base", "target/compiled_pg_current") + print(f" Flagged: {sorted(diff_flagged)}") + + diff_tp = ground_truth & diff_flagged + diff_fn = ground_truth - diff_flagged + diff_fp = diff_flagged - ground_truth + diff_safe_correct = safe_models - diff_flagged + + print(f" True positives: {len(diff_tp)}/{len(ground_truth)}") + print(f" False negatives: {len(diff_fn)} {sorted(diff_fn) if diff_fn else ''}") + print(f" False positives: {len(diff_fp)} {sorted(diff_fp) if diff_fp else ''}") + print(f" Safe correctly: {len(diff_safe_correct)}/{len(safe_models)}") + print() + + # Approach 2b: Compiled SQL diff (with --full-refresh) + print("-" * 70) + print(" Approach 2b: Compiled SQL Diff (--full-refresh → is_incremental=false)") + diff_fr_flagged = run_compiled_diff("target/compiled_pg_base_fr", "target/compiled_pg_current_fr") + print(f" Flagged: {sorted(diff_fr_flagged)}") + + diff_fr_tp = ground_truth & diff_fr_flagged + diff_fr_fn = ground_truth - diff_fr_flagged + diff_fr_fp = diff_fr_flagged - ground_truth + diff_fr_safe_correct = safe_models - diff_fr_flagged + + print(f" True positives: {len(diff_fr_tp)}/{len(ground_truth)}") + print(f" False negatives: {len(diff_fr_fn)} {sorted(diff_fr_fn) if diff_fr_fn else ''}") + print(f" False positives: {len(diff_fr_fp)} {sorted(diff_fr_fp) if diff_fr_fp else ''}") + print(f" Safe correctly: {len(diff_fr_safe_correct)}/{len(safe_models)}") + print() + + # Summary + print("=" * 70) + print(" Summary") + print("=" * 70) + print() + print(f" {'Approach':<50} {'TP':>4} {'FN':>4} {'FP':>4} {'Accuracy'}") + print(f" {'-'*50} {'--':>4} {'--':>4} {'--':>4} {'--------'}") + total = len(ground_truth) + len(safe_models) + for label, tp, fn, fp, safe_ok in [ + ("Jinja Pattern Scanning", len(jinja_tp), len(jinja_fn), len(jinja_fp), len(jinja_safe_correct)), + ("Compiled SQL Diff (existing tables)", len(diff_tp), len(diff_fn), len(diff_fp), len(diff_safe_correct)), + ("Compiled SQL Diff (--full-refresh)", len(diff_fr_tp), len(diff_fr_fn), len(diff_fr_fp), len(diff_fr_safe_correct)), + ]: + correct = tp + safe_ok + acc = correct / total * 100 + print(f" {label:<50} {tp:>4} {fn:>4} {fp:>4} {acc:>6.1f}%") + + print() + print(" Key findings:") + print(" 1. Jinja scanning works from manifest alone (no compile needed)") + print(" 2. Compiled SQL diff needs --full-refresh to catch incremental else branches") + print(" 3. Both approaches produce zero false positives on this project") + print(" 4. Compiled SQL diff catches custom macros that Jinja scanning misses") + print() + + +if __name__ == "__main__": + main() diff --git a/scripts/compiled_sql_diff.py b/scripts/compiled_sql_diff.py new file mode 100644 index 0000000..7afad6a --- /dev/null +++ b/scripts/compiled_sql_diff.py @@ -0,0 +1,351 @@ +"""Detect non-deterministic models by diffing compiled SQL across targets. + +Approach 1 from DRC-2863: Compile dbt under two targets, normalize schema +names out of the SQL, then diff. Any model with remaining differences has +non-deterministic SQL that varies by build context. + +Usage: + # First compile under both targets: + # dbt compile --target pg-base && cp -r target/compiled target/compiled_pg_base + # dbt compile --target pg-current && cp -r target/compiled target/compiled_pg_current + # + # Then run: + uv run python scripts/compiled_sql_diff.py + uv run python scripts/compiled_sql_diff.py --base-dir target/compiled_pg_base --current-dir target/compiled_pg_current + uv run python scripts/compiled_sql_diff.py --json + +Alternatively, use manifest.json compiled_code (requires dbt compile, not just dbt parse): + uv run python scripts/compiled_sql_diff.py --use-manifest --base-manifest target_base/manifest.json --current-manifest target_current/manifest.json +""" + +import argparse +import difflib +import json +import re +import sys +from pathlib import Path + + +def strip_sql_comments(sql: str) -> str: + """Remove SQL comments to avoid false positives from comment text.""" + sql = re.sub(r"/\*.*?\*/", "", sql, flags=re.DOTALL) + sql = re.sub(r"--[^\n]*", "", sql) + return sql + + +def normalize_sql( + sql: str, + db_name: str = "", + schema_name: str = "", + strip_comments: bool = True, + strip_batch_metadata: bool = True, +) -> str: + """Normalize compiled SQL to remove expected target-specific differences. + + Performs precise schema replacement: only in qualified ref positions like + `db.schema.table` or `"schema"."table"`, NOT in SQL keywords like CURRENT ROW. + + Also strips dbt batch metadata (dbt_batch_id, dbt_batch_ts) since these + are compile-time artifacts that always differ between runs. + """ + normalized = sql + + if strip_comments: + normalized = strip_sql_comments(normalized) + + if db_name and schema_name: + # Replace db.schema. prefix (most common pattern in compiled SQL) + normalized = re.sub( + rf"\b{re.escape(db_name)}\.{re.escape(schema_name)}\.", + f"{db_name}.__SCHEMA__.", + normalized, + ) + # Replace "schema"."table" pattern (quoted identifiers) + normalized = re.sub( + rf'"{re.escape(schema_name)}"', + '"__SCHEMA__"', + normalized, + ) + + if strip_batch_metadata: + # dbt_batch_id and dbt_batch_ts are compile-time UUIDs/timestamps + normalized = re.sub( + r"cast\('[0-9a-f-]+' as varchar\) as dbt_batch_id", + "cast('__BATCH_ID__' as varchar) as dbt_batch_id", + normalized, + ) + normalized = re.sub( + r"cast\('[^']+' as timestamp\) as dbt_batch_ts", + "cast('__BATCH_TS__' as timestamp) as dbt_batch_ts", + normalized, + ) + + # Normalize whitespace for cleaner diffs + normalized = re.sub(r"[ \t]+\n", "\n", normalized) + return normalized + + +def diff_compiled_files( + base_dir: Path, + current_dir: Path, + db_name: str, + base_schema: str, + current_schema: str, +) -> list[dict]: + """Diff compiled SQL files between two target directories.""" + + findings = [] + base_files = sorted(base_dir.rglob("*.sql")) + + for base_file in base_files: + rel_path = base_file.relative_to(base_dir) + current_file = current_dir / rel_path + + if not current_file.exists(): + findings.append({ + "model": rel_path.stem, + "path": str(rel_path), + "status": "missing_in_current", + "diff_lines": [], + }) + continue + + base_sql = normalize_sql(base_file.read_text(), db_name=db_name, schema_name=base_schema) + current_sql = normalize_sql(current_file.read_text(), db_name=db_name, schema_name=current_schema) + + if base_sql == current_sql: + continue + + # Generate unified diff for the non-schema differences + diff = list(difflib.unified_diff( + base_sql.splitlines(keepends=True), + current_sql.splitlines(keepends=True), + fromfile=f"base/{rel_path}", + tofile=f"current/{rel_path}", + lineterm="", + )) + + # Extract only the changed lines (+ and - prefixed) + changed_lines = [ + line for line in diff + if line.startswith("+") or line.startswith("-") + if not line.startswith("+++") and not line.startswith("---") + ] + + findings.append({ + "model": rel_path.stem, + "path": str(rel_path), + "status": "non_deterministic", + "diff_lines": changed_lines, + "diff_full": diff, + }) + + # Check for files only in current + current_files = sorted(current_dir.rglob("*.sql")) + current_rels = {f.relative_to(current_dir) for f in current_files} + base_rels = {f.relative_to(base_dir) for f in base_files} + + for rel_path in sorted(current_rels - base_rels): + findings.append({ + "model": rel_path.stem, + "path": str(rel_path), + "status": "missing_in_base", + "diff_lines": [], + }) + + return findings + + +def diff_manifests( + base_manifest_path: str, + current_manifest_path: str, + db_name: str, + base_schema: str, + current_schema: str, +) -> list[dict]: + """Diff compiled_code from two manifest.json files.""" + + with open(base_manifest_path) as f: + base_manifest = json.load(f) + with open(current_manifest_path) as f: + current_manifest = json.load(f) + + project_name = base_manifest.get("metadata", {}).get("project_name") + + findings = [] + + # Build lookup for current manifest + current_nodes = {} + for uid, node in current_manifest.get("nodes", {}).items(): + if node.get("resource_type") == "model" and node.get("package_name") == project_name: + current_nodes[uid] = node + + for uid, node in base_manifest.get("nodes", {}).items(): + if node.get("resource_type") != "model" or node.get("package_name") != project_name: + continue + + name = node.get("name", uid) + base_compiled = node.get("compiled_code", "") + + current_node = current_nodes.get(uid) + if not current_node: + findings.append({ + "model": name, + "path": node.get("path", ""), + "status": "missing_in_current", + "diff_lines": [], + }) + continue + + current_compiled = current_node.get("compiled_code", "") + + if not base_compiled or not current_compiled: + continue + + base_norm = normalize_sql(base_compiled, db_name=db_name, schema_name=base_schema) + current_norm = normalize_sql(current_compiled, db_name=db_name, schema_name=current_schema) + + if base_norm == current_norm: + continue + + diff = list(difflib.unified_diff( + base_norm.splitlines(keepends=True), + current_norm.splitlines(keepends=True), + fromfile=f"base/{name}", + tofile=f"current/{name}", + lineterm="", + )) + + changed_lines = [ + line for line in diff + if line.startswith("+") or line.startswith("-") + if not line.startswith("+++") and not line.startswith("---") + ] + + findings.append({ + "model": name, + "path": node.get("path", ""), + "materialized": node.get("config", {}).get("materialized", "unknown"), + "status": "non_deterministic", + "diff_lines": changed_lines, + "diff_full": diff, + }) + + return findings + + +def format_report(findings: list[dict], approach: str) -> str: + """Format human-readable report.""" + lines = [] + lines.append("=" * 70) + lines.append(" Compiled SQL Diff — Non-Deterministic Model Detection") + lines.append(f" Approach: {approach}") + lines.append("=" * 70) + lines.append("") + + non_det = [f for f in findings if f["status"] == "non_deterministic"] + identical = len(findings) - len(non_det) - len([f for f in findings if f["status"].startswith("missing")]) + missing = [f for f in findings if f["status"].startswith("missing")] + + lines.append(f" Non-deterministic models: {len(non_det)}") + lines.append(f" Identical models (after schema normalization): NOT directly counted — {len(findings)} total checked") + if missing: + lines.append(f" Missing in one target: {len(missing)}") + lines.append("") + + if non_det: + lines.append("-" * 70) + lines.append(" Models with non-deterministic SQL:") + lines.append("") + for f in non_det: + mat = f.get("materialized", "") + mat_str = f" ({mat})" if mat else "" + lines.append(f" !! {f['model']}{mat_str} [{f['path']}]") + for dl in f["diff_lines"][:10]: + lines.append(f" {dl}") + if len(f["diff_lines"]) > 10: + lines.append(f" ... ({len(f['diff_lines'])} changed lines total)") + lines.append("") + + if not non_det: + lines.append(" All models produce identical SQL across targets (after schema normalization).") + lines.append(" Shared base is safe.") + + lines.append("=" * 70) + return "\n".join(lines) + + +def main(): + parser = argparse.ArgumentParser(description="Detect non-deterministic models via compiled SQL diff") + parser.add_argument("--base-dir", default="target/compiled_pg_base", + help="Directory with compiled SQL from base target") + parser.add_argument("--current-dir", default="target/compiled_pg_current", + help="Directory with compiled SQL from current target") + parser.add_argument("--db-name", default="tpch", + help="Database name used in qualified refs (e.g., tpch)") + parser.add_argument("--base-schema", default="base", + help="Schema name used by base target") + parser.add_argument("--current-schema", default="current", + help="Schema name used by current target") + parser.add_argument("--use-manifest", action="store_true", + help="Use manifest.json compiled_code instead of file-based diff") + parser.add_argument("--base-manifest", default="target_base/manifest.json", + help="Path to base manifest.json (with --use-manifest)") + parser.add_argument("--current-manifest", default="target_current/manifest.json", + help="Path to current manifest.json (with --use-manifest)") + parser.add_argument("--json", action="store_true", dest="json_output", + help="Output machine-readable JSON") + args = parser.parse_args() + + if args.use_manifest: + findings = diff_manifests( + args.base_manifest, + args.current_manifest, + db_name=args.db_name, + base_schema=args.base_schema, + current_schema=args.current_schema, + ) + approach = "manifest compiled_code diff" + else: + base_dir = Path(args.base_dir) + current_dir = Path(args.current_dir) + + if not base_dir.exists(): + print(f"Error: base directory not found: {base_dir}", file=sys.stderr) + print("Run: dbt compile --target pg-base && cp -r target/compiled target/compiled_pg_base", file=sys.stderr) + sys.exit(1) + if not current_dir.exists(): + print(f"Error: current directory not found: {current_dir}", file=sys.stderr) + print("Run: dbt compile --target pg-current && cp -r target/compiled target/compiled_pg_current", file=sys.stderr) + sys.exit(1) + + findings = diff_compiled_files( + base_dir, current_dir, + db_name=args.db_name, + base_schema=args.base_schema, + current_schema=args.current_schema, + ) + approach = "compiled file diff" + + if args.json_output: + output = { + "approach": approach, + "total_findings": len(findings), + "non_deterministic": [ + { + "model": f["model"], + "path": f["path"], + "materialized": f.get("materialized", ""), + "diff_lines": f["diff_lines"], + } + for f in findings + if f["status"] == "non_deterministic" + ], + } + print(json.dumps(output, indent=2)) + else: + print(format_report(findings, approach)) + + +if __name__ == "__main__": + main()