diff --git a/README.md b/README.md index eab343f..50a0aef 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ Anvil is a declarative AWS execution engine for running Python tasks across large account and region fleets. Describe the work in YAML, keep task logic in plain Python modules, and let the engine handle authentication, role assumption, dependency ordering, bounded concurrency, and structured results so repeatable AWS work can run faster without turning orchestration into custom scripts. -For a deeper look at the execution flow, see [docs/README.md](docs/README.md). +For more, see the [documentation](https://opsfoundry.dev/). ## Why Anvil? @@ -224,6 +224,15 @@ Execute all configured organizations and accounts from one or more YAML files. S ```console anvil run --help ``` +Run a single YAML file +```console +anvil run --config-file ./yaml/orgs.yaml +``` + +To run multiple YAML files in one command, pass them after a single `--config-file` flag. They run sequentially in the order provided. Each YAML remains an isolated run with its own summary file, and the overall command exits non-zero if any YAML run fails. +```console +anvil run --config-file ./yaml/orgs.yaml ./yaml/orgs2.yaml ./yaml/orgs3.yaml +``` Anvil writes per-target full results, write a flattened query file, and produce one summary file per YAML in a run-scoped result directory: @@ -237,9 +246,10 @@ results/ .json ``` +> [!NOTE] +> Use `--benchmark` only for performance investigations. It adds engine, target, account, region, and result-write timing details to result JSON, which can dramatically increase output size on large account, region, or task runs. +> Leave it off for normal audit/reporting runs, and enable it when comparing benchmark runs or looking for bottlenecks. -Use `--benchmark` only for performance investigations. It adds engine, target, account, region, and result-write timing details to result JSON, which can dramatically increase output size on large account, region, or task runs. -Leave it off for normal audit/reporting runs, and enable it when comparing benchmark runs or looking for bottlenecks. ### Result Queries @@ -249,67 +259,69 @@ Runs still write the existing full JSON result files. They also write JSONL reco Common queries: ```console -anvil results failures -anvil results failures --organization prod -anvil results accounts --status failed -anvil results tasks --task count_vpcs -anvil results regions --region us-east-1 -anvil results failures --fields account_id,region,task,error --limit 20 -anvil results tasks --status failed --jsonl +# Show every failure under ./results. +anvil results --status failed + +# Show failures for one organization or account-group target. +anvil results --target prod --status failed + +# Show failed account records only. +anvil results --type account --status failed + +# Show task records for one task name. +anvil results --type task --task count_vpcs + +# Show task records for one AWS region. +anvil results --type task --region us-east-1 + +# Show a compact failure view with selected fields and a row limit. +anvil results --status failed --fields account_id,region,task,error --limit 20 + +# Emit failed task records as JSONL. +anvil results --type task --status failed --jsonl ``` Advanced queries: ```console -anvil results failures --results-file ./results/orgs/2026-05-01T183012Z/results.jsonl -anvil results failures --results-file ./results/orgs/run-a/results.jsonl ./results/accounts/run-b/results.jsonl -anvil results tasks --organization prod --task count_vpcs --fields account_id,region,status,error -anvil results failures --fields record_type,target,account_id,region,task,error -anvil results tasks --status failed --fields account_id,region,error --jsonl -anvil results failures --fields target_type,target,account_id,task,error --limit 50 -``` +# Query one explicit run results file. +anvil results --status failed --results-file ./results/orgs/2026-05-01T183012Z/results.jsonl -All result query commands support `--organization`, `--account`, `--region`, -`--task`, `--status`, `--fields`, `--limit`, `--results-file` with one or more -JSONL paths, and `--json` or `--jsonl` for structured filtered output. Without -`--results-file`, Anvil queries every `results.jsonl` file under `./results`. +# Query multiple explicit run results files in one command. +anvil results --status failed --results-file ./results/orgs/run-a/results.jsonl ./results/accounts/run-b/results.jsonl -To run multiple YAML files in one command, pass them after a single `--config-file` flag. They run sequentially in the order provided. Each YAML remains an isolated run with its own summary file, and the overall command exits non-zero if any YAML run fails. -```console -anvil run --config-file ./yaml/orgs.yaml ./yaml/orgs2.yaml ./yaml/orgs3.yaml -``` +# Filter one task in one target and print selected fields. +anvil results --type task --target prod --task count_vpcs --fields account_id,region,status,error -### Region Selection +# Show failure rows with target, account, region, task, and error context. +anvil results --status failed --fields record_type,target,account_id,region,task,error -- `organizations:` configs can use explicit regions, `all`, glob selectors, or mixed glob and explicit selectors. -- `accounts:` configs require explicit region names only. See the YAML examples for complete region selection examples and edge-case behavior. +# Emit failed task rows as JSONL with only the selected fields. +anvil results --type task --status failed --fields account_id,region,error --jsonl -Within a single YAML, you can bound how many configured targets run in parallel. This is separate from each target's `max_workers` and `max_parallel_regions` settings: -```yaml -schema_version: 1 -max_parallel_targets: 4 -organizations: - - name: root - max_workers: 10 - max_parallel_regions: 2 +# Show the first 50 failure rows with target type context. +anvil results --status failed --fields target_type,target,account_id,task,error --limit 50 ``` -`max_parallel_regions` defaults to `1`, which preserves serial region execution within each account. Values from `2` through `4` allow bounded parallel region execution. Approximate account-region task streams per target are `max_workers * max_parallel_regions`, before considering `max_parallel_targets`. - -Use `max_parallel_regions` selectively. It is most useful when each region performs heavier, independent work, such as deep inventory, long paginated scans, slow regional service checks, or multiple regional tasks that hit different AWS services. For broad lightweight inventory across many accounts, account-level parallelism is often enough; increasing region parallelism can multiply AWS API pressure and make each regional call slower, especially when several tasks all call the same service. When tuning, start with `max_parallel_regions: 1`, raise it only for tasks with meaningful per-region runtime, and benchmark the full concurrency shape: `max_parallel_targets * max_workers * max_parallel_regions`. +#### Rerun failures: +> [!NOTE] +> `--rerun` infers the rerun scope from result records. It reloads the original config, reruns only matching failed accounts, narrows to failed regions and tasks when task-level failures are available, and includes required task dependencies automatically. +> Use scope filters such as `--target`, `--account`, `--region`, and `--task` to limit a rerun even further. Report-shaping flags such as `--type`, `--fields`, `--limit`, `--json`, and `--jsonl` are not supported with `--rerun`. -You can run `--include`, `--exclude`, or `--dry-run` to override the YAML file if you want to just test something or run on certain accounts. ```console -# Include only specific accounts: -anvil run --config-file orgs.yaml --include 111111111111 222222222222 +# Rerun failures from one explicit run results file. +anvil results --status failed --results-file ./results/orgs/2026-05-01T183012Z/results.jsonl --rerun -# Exclude specific accounts: -anvil run --config-file orgs.yaml --exclude 333333333333 444444444444 - -# Exclude specific accounts and perform a dry-run: -anvil run --config-file orgs.yaml --exclude 333333333333 444444444444 --dry-run +# Rerun failures from multiple explicit run results files in one command. +anvil results --status failed --results-file ./results/orgs/run-a/results.jsonl ./results/accounts/run-b/results.jsonl --rerun ``` +The result query command supports `--type`, `--target`, `--account`, +`--region`, `--task`, `--status`, `--fields`, `--limit`, `--results-file` with +one or more JSONL paths, and `--json` or `--jsonl` for structured filtered +output. `--status failed` matches any non-success status. Without +`--results-file`, Anvil queries every `results.jsonl` file under `./results`. + ### How task discovery works diff --git a/docs/README.md b/docs/README.md index e156125..0f14b31 100644 --- a/docs/README.md +++ b/docs/README.md @@ -563,9 +563,6 @@ Anvil currently exposes these primary command groups: - `tasks list` - `tasks validate` - `graph` -- `results failures` -- `results accounts` -- `results tasks` -- `results regions` +- `results` Configured targets can also be narrowed at invocation time with `--include`. Organization configs additionally support `--exclude` to remove discovered account IDs from the execution set. diff --git a/src/anvil/cli.py b/src/anvil/cli.py index eadd0a5..dd687b5 100644 --- a/src/anvil/cli.py +++ b/src/anvil/cli.py @@ -8,6 +8,8 @@ import datetime import json import logging +import shlex +from dataclasses import dataclass from pathlib import Path from anvil.graph import render_graph import yaml @@ -15,6 +17,8 @@ from anvil.descriptors import ConfigBranch, LoadedConfig from anvil.result_query import ( ResultFilters, + build_rerun_targets, + config_file_for_failure_records, failure_records, filter_records, format_records_jsonl, @@ -41,6 +45,18 @@ __LOGGER__ = logging.getLogger(__name__) +@dataclass(frozen=True, slots=True) +class WrittenRunResults: + """Paths and summary metadata written for one Anvil run.""" + + run_dir: Path + summary_path: Path + jsonl_path: Path + summary: dict[str, object] + target_file_count: int + jsonl_record_count: int + + def _load_targets_from_config_file(path: Path) -> LoadedConfig: """ Load and validate target descriptors from a YAML config file. @@ -129,7 +145,7 @@ def _target_result_file_path(*, target_results_dir: Path, target_name: str) -> P return result_file -def _write_run_results(*, config_file: Path, engine_result) -> None: +def _write_run_results(*, config_file: Path, engine_result) -> WrittenRunResults: run_dir = _create_results_run_dir(config_file=config_file) target_results_dir = run_dir / _target_results_dir_name(engine_result.config_branch) target_results_dir.mkdir() @@ -165,7 +181,9 @@ def _write_run_results(*, config_file: Path, engine_result) -> None: jsonl_path = jsonl_path_for_run(run_dir=run_dir) jsonl_record_count = write_jsonl_records( - path=jsonl_path, target_results=engine_result.target_results + path=jsonl_path, + target_results=engine_result.target_results, + config_file=config_file, ) summary = engine_result.build_summary() @@ -181,6 +199,48 @@ def _write_run_results(*, config_file: Path, engine_result) -> None: f"jsonl_records={jsonl_record_count}" ) + return WrittenRunResults( + run_dir=run_dir, + summary_path=summary_path, + jsonl_path=jsonl_path, + summary=summary, + target_file_count=len(engine_result.target_results), + jsonl_record_count=jsonl_record_count, + ) + + +def _summary_has_queryable_failures(summary: dict[str, object]) -> bool: + for key in ( + "total_failed_accounts", + "total_interrupted_accounts", + "total_failed_tasks", + ): + value = summary.get(key) + if isinstance(value, int) and value > 0: + return True + + return False + + +def _display_command_path(path: Path) -> str: + try: + display_path = f"./{path.resolve().relative_to(Path.cwd().resolve())}" + except ValueError: + display_path = str(path) + + display_path = display_path.replace("\\", "/") + return shlex.quote(display_path) + + +def _print_failure_followups(*, results_file: Path) -> None: + results_path = _display_command_path(results_file) + print() + print("View failures:") + print(f" anvil results --status failed --results-file {results_path}") + print() + print("Rerun failed accounts:") + print(f" anvil results --status failed --results-file {results_path} --rerun") + def _run_single_config_file(*, config_file: Path, args) -> int: loaded_config: LoadedConfig = _load_targets_from_config_file(config_file) @@ -194,7 +254,11 @@ def _run_single_config_file(*, config_file: Path, args) -> int: cli_exclude=args.exclude, benchmark_enabled=getattr(args, "benchmark", False), ) - _write_run_results(config_file=config_file, engine_result=engine_result) + written_results = _write_run_results( + config_file=config_file, engine_result=engine_result + ) + if _summary_has_queryable_failures(written_results.summary): + _print_failure_followups(results_file=written_results.jsonl_path) return 0 if engine_result.state is EngineState.COMPLETED_SUCCESS else 1 @@ -288,22 +352,17 @@ def _cmd_graph(args) -> int: return 0 -def _load_filtered_result_records( - args, *, record_type: str | None = None -) -> list[dict[str, object]]: +def _load_filtered_result_records(args) -> list[dict[str, object]]: records = load_result_records( results_dir=Path.cwd() / "results", files=args.results_file ) - if record_type is not None: - records = [ - record for record in records if record.get("record_type") == record_type - ] return filter_records( records, filters=ResultFilters( + record_type=args.type, status=args.status, - organization=args.organization, + target=args.target, account=args.account, region=args.region, task=args.task, @@ -341,29 +400,75 @@ def _emit_result_records(args, records: list[dict[str, object]]) -> None: ) -def _cmd_results_failures(args) -> int: - records = _load_filtered_result_records(args) - failures = failure_records(records) - _emit_result_records(args, failures) - return 0 - +def _validate_results_rerun_args( + args, *, parser: argparse.ArgumentParser | None = None +) -> None: + rejected_flags: list[str] = [] + if args.type is not None: + rejected_flags.append("--type") + if args.fields is not None: + rejected_flags.append("--fields") + if args.limit is not None: + rejected_flags.append("--limit") + if args.json: + rejected_flags.append("--json") + if args.jsonl: + rejected_flags.append("--jsonl") + + if rejected_flags: + rejected = ", ".join(rejected_flags) + message = f"{rejected} cannot be used with --rerun" + if parser is not None: + parser.error(message) + raise ValueError(message) + + +def _cmd_results(args) -> int: + if args.rerun: + _validate_results_rerun_args(args) + return _cmd_results_rerun(args) -def _cmd_results_accounts(args) -> int: - records = _load_filtered_result_records(args, record_type="account") + records = _load_filtered_result_records(args) _emit_result_records(args, records) return 0 -def _cmd_results_tasks(args) -> int: - records = _load_filtered_result_records(args, record_type="task") - _emit_result_records(args, records) - return 0 +def _cmd_results_rerun(args) -> int: + records = _load_filtered_result_records(args) + failures = failure_records(records) + if not failures: + print("No matching failures to rerun.") + return 0 + records_by_config = config_file_for_failure_records(failures=failures) -def _cmd_results_regions(args) -> int: - records = _load_filtered_result_records(args, record_type="task") - _emit_result_records(args, records) - return 0 + exit_code = 0 + for config_file, config_failures in records_by_config.items(): + loaded_config = _load_targets_from_config_file(config_file) + rerun_targets = build_rerun_targets( + loaded_config=loaded_config, failures=config_failures + ) + if not rerun_targets: + print(f"No configured targets matched failures for {config_file}.") + continue + + engine_result: EngineResult = run_multiple_targets( + targets=rerun_targets, + max_parallel_targets=loaded_config.max_parallel_targets, + cli_dry_run=args.dry_run, + cli_include=None, + cli_exclude=None, + benchmark_enabled=getattr(args, "benchmark", False), + ) + written_results = _write_run_results( + config_file=config_file, engine_result=engine_result + ) + if _summary_has_queryable_failures(written_results.summary): + _print_failure_followups(results_file=written_results.jsonl_path) + if engine_result.state is not EngineState.COMPLETED_SUCCESS: + exit_code = 1 + + return exit_code def _positive_int(value: str) -> int: @@ -392,7 +497,10 @@ def _add_results_query_args(parser: argparse.ArgumentParser) -> None: parser.add_argument( "--status", help="Filter by status: success, error, interrupted, or failed" ) - parser.add_argument("--organization", help="Filter by organization or target name") + parser.add_argument( + "--type", choices=["account", "task"], help="Filter by result record type" + ) + parser.add_argument("--target", help="Filter by organization or account-group name") parser.add_argument("--account", help="Filter by account ID or account alias") parser.add_argument("--region", help="Filter by AWS region") parser.add_argument("--task", help="Filter by task name") @@ -491,38 +599,35 @@ def main() -> None: "results", help="Query flattened run results" ) _add_log_level_arg(results_parser) - results_subparsers = results_parser.add_subparsers( - dest="results_command", required=True - ) - - results_failures_parser = results_subparsers.add_parser( - "failures", help="Show unsuccessful account and task records" - ) - _add_results_query_args(results_failures_parser) - results_failures_parser.set_defaults(func=_cmd_results_failures) - - results_accounts_parser = results_subparsers.add_parser( - "accounts", help="Show account result records" + _add_results_query_args(results_parser) + results_parser.add_argument( + "--rerun", + action="store_true", + help="Rerun failed targets narrowed to failed accounts, regions, and tasks", ) - _add_results_query_args(results_accounts_parser) - results_accounts_parser.set_defaults(func=_cmd_results_accounts) - - results_tasks_parser = results_subparsers.add_parser( - "tasks", help="Show task result records" + results_parser.add_argument( + "--dry-run", + action="store_true", + default=None, + help="With --rerun, run without making changes", ) - _add_results_query_args(results_tasks_parser) - results_tasks_parser.set_defaults(func=_cmd_results_tasks) - - results_regions_parser = results_subparsers.add_parser( - "regions", help="Show task result records filtered by region" + results_parser.add_argument( + "--benchmark", + action="store_true", + help=( + "With --rerun, " + "Include diagnostic phase timings in result JSON. " + "This can significantly increase output size." + ), ) - _add_results_query_args(results_regions_parser) - results_regions_parser.set_defaults(func=_cmd_results_regions) + results_parser.set_defaults(func=_cmd_results) args = parser.parse_args() if not args.command: parser.error("the following arguments are required: command") + if args.command == "results" and args.rerun: + _validate_results_rerun_args(args, parser=results_parser) logging.basicConfig( level=getattr(logging, args.log_level), diff --git a/src/anvil/result_query.py b/src/anvil/result_query.py index 5dd72ff..da6739e 100644 --- a/src/anvil/result_query.py +++ b/src/anvil/result_query.py @@ -1,10 +1,12 @@ from __future__ import annotations import json +from collections import defaultdict from dataclasses import dataclass +from dataclasses import replace from pathlib import Path -from anvil.descriptors import ConfigBranch +from anvil.descriptors import ConfigBranch, LoadedConfig, TargetDescriptor from anvil.results import AccountResult, TargetResult, TaskResult @@ -24,6 +26,8 @@ "account_alias", "account_group", "account_id", + "config_file", + "config_file_resolved", "dry_run", "duration_seconds", "ended_at", @@ -43,8 +47,9 @@ @dataclass(frozen=True, slots=True) class ResultFilters: + record_type: str | None = None status: str | None = None - organization: str | None = None + target: str | None = None account: str | None = None region: str | None = None task: str | None = None @@ -56,7 +61,7 @@ def jsonl_path_for_run(*, run_dir: Path) -> Path: def build_jsonl_records_for_target( - target_result: TargetResult, + target_result: TargetResult, *, config_file: Path | None = None ) -> list[dict[str, object]]: """Build flattened account and task records for a target result.""" target_type = _target_type(target_result.config_branch) @@ -67,6 +72,7 @@ def build_jsonl_records_for_target( target_result=target_result, target_type=target_type, account_result=account_result, + config_file=config_file, ) records.append( { @@ -93,12 +99,16 @@ def build_jsonl_records_for_target( return records -def write_jsonl_records(*, path: Path, target_results: list[TargetResult]) -> int: +def write_jsonl_records( + *, path: Path, target_results: list[TargetResult], config_file: Path | None = None +) -> int: """Write flattened result records and return the number of records written.""" record_count = 0 with path.open("w", encoding="utf-8") as handle: for target_result in target_results: - for record in build_jsonl_records_for_target(target_result): + for record in build_jsonl_records_for_target( + target_result, config_file=config_file + ): handle.write(json.dumps(record, separators=(",", ":"))) handle.write("\n") record_count += 1 @@ -142,13 +152,14 @@ def filter_records( records: list[dict[str, object]], *, filters: ResultFilters ) -> list[dict[str, object]]: """Return records matching all supplied filters.""" - normalized_status = _normalize_status(filters.status) + status_filter = _normalize_status_filter(filters.status) return [ record for record in records - if _matches(record, "status", normalized_status) - and _matches(record, "target", filters.organization) + if _matches(record, "record_type", filters.record_type) + and _matches_status(record, status_filter) + and _matches(record, "target", filters.target) and _matches_account(record, filters.account) and _matches(record, "region", filters.region) and _matches(record, "task", filters.task) @@ -160,11 +171,52 @@ def failure_records(records: list[dict[str, object]]) -> list[dict[str, object]] return [ record for record in records - if record.get("status") in {"error", "interrupted"} + if _record_is_unsuccessful(record) and record.get("record_type") in {"account", "task"} ] +def config_file_for_failure_records( + *, failures: list[dict[str, object]] +) -> dict[Path, list[dict[str, object]]]: + """Group failure records by their original config file.""" + records_by_config: dict[Path, list[dict[str, object]]] = defaultdict(list) + + for record in failures: + config_file = record.get("config_file_resolved") + if isinstance(config_file, str) and config_file: + records_by_config[Path(config_file)].append(record) + else: + raise ValueError( + "Result records do not include config_file_resolved and cannot be rerun." + ) + + return dict(records_by_config) + + +def build_rerun_targets( + *, loaded_config: LoadedConfig, failures: list[dict[str, object]] +) -> list[TargetDescriptor]: + """Build narrowed targets from failure records and the original config.""" + failures_by_target: dict[str, list[dict[str, object]]] = defaultdict(list) + for record in failures: + target_name = record.get("target") + if isinstance(target_name, str) and target_name: + failures_by_target[target_name].append(record) + + targets: list[TargetDescriptor] = [] + for target in loaded_config.targets: + if target.name not in failures_by_target: + continue + targets.extend( + _narrow_target_for_failure_records( + target=target, records=failures_by_target[target.name] + ) + ) + + return targets + + def parse_fields(fields: str | None) -> list[str] | None: """Parse and validate a comma-separated field projection.""" if fields is None: @@ -242,9 +294,13 @@ def _timed_status_record(result: AccountResult | TaskResult) -> dict[str, object def _base_account_record( - *, target_result: TargetResult, target_type: str, account_result: AccountResult + *, + target_result: TargetResult, + target_type: str, + account_result: AccountResult, + config_file: Path | None, ) -> dict[str, object]: - return { + record: dict[str, object] = { "target_type": target_type, target_type: target_result.target_name, "target": target_result.target_name, @@ -253,15 +309,143 @@ def _base_account_record( "account_id": account_result.account_id, "account_alias": account_result.account_alias, } + if config_file is not None: + record["config_file"] = config_file.as_posix() + record["config_file_resolved"] = config_file.resolve().as_posix() + + return record -def _normalize_status(status: str | None) -> str | None: +def _normalize_status_filter(status: str | None) -> str | set[str] | None: if status is None: return None normalized = status.strip().lower() - aliases = {"failed": "error", "failure": "error", "failures": "error"} - return aliases.get(normalized, normalized) + if normalized in {"failed", "failure", "failures"}: + return "failed" + + return {normalized} + + +def _record_is_unsuccessful(record: dict[str, object]) -> bool: + status = record.get("status") + return isinstance(status, str) and status.lower() != "success" + + +def _matches_status(record: dict[str, object], expected: str | set[str] | None) -> bool: + if expected is None: + return True + + actual = record.get("status") + if not isinstance(actual, str): + return False + + normalized_actual = actual.lower() + if expected == "failed": + return _record_is_unsuccessful(record) + + return normalized_actual in expected + + +def _task_specs_by_name(tasks: list[dict[str, object]]) -> dict[str, dict[str, object]]: + return { + str(task["name"]): task + for task in tasks + if isinstance(task.get("name"), str) and task.get("name") + } + + +def _expand_task_names_with_dependencies( + *, selected_names: set[str], tasks: list[dict[str, object]] +) -> list[dict[str, object]]: + task_specs = _task_specs_by_name(tasks) + expanded_names: set[str] = set() + + def add_with_dependencies(task_name: str) -> None: + if task_name in expanded_names: + return + task = task_specs.get(task_name) + if task is None: + return + for dependency in task.get("depends_on", []): + if isinstance(dependency, str): + add_with_dependencies(dependency) + expanded_names.add(task_name) + + for selected_name in selected_names: + add_with_dependencies(selected_name) + + return [ + task + for task in tasks + if isinstance(task.get("name"), str) and task["name"] in expanded_names + ] + + +def _narrow_target_for_failed_account( + *, target: TargetDescriptor, records: list[dict[str, object]] +) -> TargetDescriptor: + failed_account_ids = { + account_id + for account_id in (record.get("account_id") for record in records) + if isinstance(account_id, str) and account_id + } + task_records = [ + record + for record in records + if record.get("record_type") == "task" and _record_is_unsuccessful(record) + ] + task_failed_account_ids = { + account_id + for account_id in (record.get("account_id") for record in task_records) + if isinstance(account_id, str) and account_id + } + account_level_failure_exists = bool(failed_account_ids - task_failed_account_ids) + + failed_regions = { + region + for region in (record.get("region") for record in task_records) + if isinstance(region, str) and region + } + failed_task_names = { + task + for task in (record.get("task") for record in task_records) + if isinstance(task, str) and task + } + + regions = target.regions + if failed_regions and not account_level_failure_exists: + regions = [region for region in target.regions if region in failed_regions] + if not regions: + regions = sorted(failed_regions) + + tasks = target.tasks + if failed_task_names and not account_level_failure_exists: + tasks = _expand_task_names_with_dependencies( + selected_names=failed_task_names, tasks=target.tasks + ) + if not tasks: + tasks = target.tasks + + failed_account_id = sorted(failed_account_ids)[0] + return replace( + target, include=[failed_account_id], exclude=None, regions=regions, tasks=tasks + ) + + +def _narrow_target_for_failure_records( + *, target: TargetDescriptor, records: list[dict[str, object]] +) -> list[TargetDescriptor]: + records_by_account: dict[str, list[dict[str, object]]] = defaultdict(list) + for record in records: + account_id = record.get("account_id") + if isinstance(account_id, str) and account_id: + records_by_account[account_id].append(record) + + return [ + _narrow_target_for_failed_account(target=target, records=account_records) + for _, account_records in sorted(records_by_account.items()) + ] def _matches(record: dict[str, object], key: str, expected: str | None) -> bool: diff --git a/tests/cli/test_cli_parallel_targets.py b/tests/cli/test_cli_parallel_targets.py index cf0f5b1..52bbaab 100644 --- a/tests/cli/test_cli_parallel_targets.py +++ b/tests/cli/test_cli_parallel_targets.py @@ -46,7 +46,11 @@ def fake_run_multiple_targets(**kwargs): return SimpleNamespace(state=cli.EngineState.COMPLETED_SUCCESS) monkeypatch.setattr(cli, "run_multiple_targets", fake_run_multiple_targets) - monkeypatch.setattr(cli, "_write_run_results", lambda **kwargs: None) + monkeypatch.setattr( + cli, + "_write_run_results", + lambda **kwargs: SimpleNamespace(summary={}, jsonl_path=Path("results.jsonl")), + ) exit_code = cli._run_single_config_file(config_file=Path("orgs.yaml"), args=args) diff --git a/tests/cli/test_cli_smoke.py b/tests/cli/test_cli_smoke.py index 4cddee5..549d654 100644 --- a/tests/cli/test_cli_smoke.py +++ b/tests/cli/test_cli_smoke.py @@ -106,13 +106,19 @@ def test_write_run_results_uses_config_stem_and_run_id_directories(monkeypatch): monkeypatch.setattr(cli, "_build_run_id", lambda: "2026-05-01T120000Z") try: - cli._write_run_results( + written_results = cli._write_run_results( config_file=Path("yaml/orgs.yaml"), engine_result=engine_result ) assert summary_path.exists() assert target_path.exists() assert jsonl_path.exists() + assert written_results.run_dir == run_dir + assert written_results.summary_path == summary_path + assert written_results.jsonl_path == jsonl_path + assert written_results.summary == {"state": "completed_success"} + assert written_results.target_file_count == 1 + assert written_results.jsonl_record_count == 0 finally: monkeypatch.chdir(original_cwd) summary_path.unlink(missing_ok=True) @@ -155,7 +161,115 @@ def test_target_result_file_path_avoids_sanitized_name_collisions(): scratch_dir.rmdir() -def test_cmd_results_accounts_filters_status_and_outputs_json(capsys): +def test_print_failure_followups_uses_results_file_command(capsys, monkeypatch): + from pathlib import Path + + cli = _import_cli_or_skip() + scratch_dir = (Path("tests") / "_tmp" / "cli-followups").resolve() + results_file = scratch_dir / "results" / "orgs" / "run-a" / "results.jsonl" + original_cwd = Path.cwd() + + try: + results_file.parent.mkdir(parents=True) + monkeypatch.chdir(scratch_dir) + + cli._print_failure_followups(results_file=results_file) + + output = capsys.readouterr().out + assert ( + "anvil results --status failed --results-file " + "./results/orgs/run-a/results.jsonl" in output + ) + assert ( + "anvil results --status failed --results-file " + "./results/orgs/run-a/results.jsonl --rerun" + ) in output + finally: + monkeypatch.chdir(original_cwd) + if results_file.parent.exists(): + results_file.parent.rmdir() + run_parent = scratch_dir / "results" / "orgs" + if run_parent.exists(): + run_parent.rmdir() + results_dir = scratch_dir / "results" + if results_dir.exists(): + results_dir.rmdir() + if scratch_dir.exists(): + scratch_dir.rmdir() + + +def test_build_rerun_targets_narrows_accounts_regions_and_task_dependencies(): + from anvil.descriptors import ConfigBranch, LoadedConfig, TargetDescriptor + from anvil.result_query import build_rerun_targets + + loaded_config = LoadedConfig( + branch=ConfigBranch.ORGANIZATIONS, + targets=[ + TargetDescriptor( + config_branch=ConfigBranch.ORGANIZATIONS, + name="org-a", + regions=["us-east-1", "us-west-2"], + include=["111111111111", "222222222222"], + tasks=[ + {"name": "inventory"}, + {"name": "cleanup", "depends_on": ["inventory"]}, + {"name": "notify"}, + ], + ), + TargetDescriptor( + config_branch=ConfigBranch.ORGANIZATIONS, + name="org-b", + regions=["us-east-1"], + tasks=[{"name": "inventory"}], + ), + ], + ) + + targets = build_rerun_targets( + loaded_config=loaded_config, + failures=[ + { + "record_type": "account", + "target": "org-a", + "account_id": "111111111111", + "status": "error", + }, + { + "record_type": "task", + "target": "org-a", + "account_id": "111111111111", + "region": "us-west-2", + "task": "cleanup", + "status": "error", + }, + { + "record_type": "account", + "target": "org-a", + "account_id": "222222222222", + "status": "interrupted", + }, + ], + ) + + assert len(targets) == 2 + assert targets[0].name == "org-a" + assert targets[0].include == ["111111111111"] + assert targets[0].exclude is None + assert targets[0].regions == ["us-west-2"] + assert targets[0].tasks == [ + {"name": "inventory"}, + {"name": "cleanup", "depends_on": ["inventory"]}, + ] + assert targets[1].include == ["222222222222"] + assert targets[1].regions == ["us-east-1", "us-west-2"] + assert targets[1].tasks == [ + {"name": "inventory"}, + {"name": "cleanup", "depends_on": ["inventory"]}, + {"name": "notify"}, + ] + + +def test_cmd_results_filters_account_type_status_and_outputs_json(capsys): from pathlib import Path from types import SimpleNamespace @@ -176,6 +290,14 @@ def test_cmd_results_accounts_filters_status_and_outputs_json(capsys): '{"record_type":"account","target":"org-a","account_id":' '"222222222222","account_alias":"prod","status":"success"}' ), + ( + '{"record_type":"account","target":"org-a","account_id":' + '"333333333333","account_alias":"qa","status":"interrupted"}' + ), + ( + '{"record_type":"task","target":"org-a","account_id":' + '"444444444444","account_alias":"ops","status":"error"}' + ), ] ), encoding="utf-8", @@ -183,8 +305,9 @@ def test_cmd_results_accounts_filters_status_and_outputs_json(capsys): args = SimpleNamespace( results_file=[Path(jsonl_path)], + type="account", status="failed", - organization=None, + target=None, account=None, region=None, task=None, @@ -192,20 +315,23 @@ def test_cmd_results_accounts_filters_status_and_outputs_json(capsys): limit=None, json=True, jsonl=False, + rerun=False, ) - assert cli._cmd_results_accounts(args) == 0 + assert cli._cmd_results(args) == 0 output = capsys.readouterr().out assert '"account_id": "111111111111"' in output assert '"account_id": "222222222222"' not in output + assert '"account_id": "333333333333"' in output + assert '"account_id": "444444444444"' not in output finally: jsonl_path.unlink(missing_ok=True) if scratch_dir.exists(): scratch_dir.rmdir() -def test_cmd_results_tasks_outputs_jsonl_with_fields_and_limit(capsys): +def test_cmd_results_outputs_jsonl_with_fields_and_limit(capsys): from pathlib import Path from types import SimpleNamespace @@ -235,8 +361,9 @@ def test_cmd_results_tasks_outputs_jsonl_with_fields_and_limit(capsys): args = SimpleNamespace( results_file=[Path(jsonl_path)], + type="task", status="failed", - organization=None, + target=None, account=None, region=None, task="count_vpcs", @@ -244,9 +371,10 @@ def test_cmd_results_tasks_outputs_jsonl_with_fields_and_limit(capsys): limit=1, json=False, jsonl=True, + rerun=False, ) - assert cli._cmd_results_tasks(args) == 0 + assert cli._cmd_results(args) == 0 output = capsys.readouterr().out assert output.count("\n") == 1 @@ -258,3 +386,15 @@ def test_cmd_results_tasks_outputs_jsonl_with_fields_and_limit(capsys): jsonl_path.unlink(missing_ok=True) if scratch_dir.exists(): scratch_dir.rmdir() + + +def test_cmd_results_rerun_rejects_report_only_flags(): + from types import SimpleNamespace + + cli = _import_cli_or_skip() + args = SimpleNamespace( + rerun=True, type="account", fields="account_id", limit=1, json=True, jsonl=False + ) + + with pytest.raises(ValueError, match="--type, --fields, --limit, --json"): + cli._cmd_results(args) diff --git a/tests/results/test_result_query.py b/tests/results/test_result_query.py index 6cfed3c..c7d6d00 100644 --- a/tests/results/test_result_query.py +++ b/tests/results/test_result_query.py @@ -6,6 +6,8 @@ from anvil.result_query import ( ResultFilters, build_jsonl_records_for_target, + build_rerun_targets, + config_file_for_failure_records, failure_records, filter_records, format_records_jsonl, @@ -77,6 +79,19 @@ def test_build_jsonl_records_flattens_accounts_and_tasks(): json.dumps(records) +def test_build_jsonl_records_includes_config_file_when_supplied(): + from pathlib import Path + + records = build_jsonl_records_for_target( + _target_result(), config_file=Path("yaml/orgs.yaml") + ) + + assert records[0]["config_file"] == "yaml/orgs.yaml" + assert records[1]["config_file"] == "yaml/orgs.yaml" + assert Path(records[0]["config_file_resolved"]) == Path("yaml/orgs.yaml").resolve() + assert Path(records[1]["config_file_resolved"]) == Path("yaml/orgs.yaml").resolve() + + def test_filter_records_supports_failed_status_alias_and_common_fields(): records = build_jsonl_records_for_target(_target_result()) @@ -84,7 +99,7 @@ def test_filter_records_supports_failed_status_alias_and_common_fields(): records, filters=ResultFilters( status="failed", - organization="engineering", + target="engineering", account="dev", region="us-east-1", task="count_vpcs", @@ -95,6 +110,26 @@ def test_filter_records_supports_failed_status_alias_and_common_fields(): assert matches[0]["record_type"] == "task" +def test_filter_records_failed_status_matches_any_non_success_status(): + records = [ + {"record_type": "account", "status": "success", "account_id": "111"}, + {"record_type": "account", "status": "error", "account_id": "222"}, + {"record_type": "account", "status": "interrupted", "account_id": "333"}, + ] + + matches = filter_records(records, filters=ResultFilters(status="failed")) + + assert [record["account_id"] for record in matches] == ["222", "333"] + + +def test_filter_records_supports_record_type_filter(): + records = build_jsonl_records_for_target(_target_result()) + + matches = filter_records(records, filters=ResultFilters(record_type="account")) + + assert [record["record_type"] for record in matches] == ["account"] + + def test_failure_records_include_account_and_task_failures(): records = build_jsonl_records_for_target(_target_result()) @@ -103,6 +138,86 @@ def test_failure_records_include_account_and_task_failures(): assert [record["record_type"] for record in failures] == ["account", "task"] +def test_failure_records_include_any_non_success_status(): + records = [ + {"record_type": "account", "status": "success"}, + {"record_type": "account", "status": "interrupted"}, + {"record_type": "task", "status": "cancelled"}, + ] + + failures = failure_records(records) + + assert [record["status"] for record in failures] == ["interrupted", "cancelled"] + + +def test_config_file_for_failure_records_groups_by_config_path(): + from pathlib import Path + + records = [ + { + "config_file": "orgs.yaml", + "config_file_resolved": str(Path.cwd() / "orgs.yaml"), + "status": "error", + }, + { + "config_file": "accounts.yaml", + "config_file_resolved": str(Path.cwd() / "accounts.yaml"), + "status": "interrupted", + }, + { + "config_file": "orgs.yaml", + "config_file_resolved": str(Path.cwd() / "orgs.yaml"), + "status": "error", + }, + ] + + grouped = config_file_for_failure_records(failures=records) + + assert list(grouped) == [Path.cwd() / "orgs.yaml", Path.cwd() / "accounts.yaml"] + assert len(grouped[Path.cwd() / "orgs.yaml"]) == 2 + + +def test_build_rerun_targets_includes_interrupted_task_dependencies(): + from anvil.descriptors import LoadedConfig, TargetDescriptor + + loaded_config = LoadedConfig( + branch=ConfigBranch.ORGANIZATIONS, + targets=[ + TargetDescriptor( + config_branch=ConfigBranch.ORGANIZATIONS, + name="org-a", + regions=["us-east-1", "us-west-2"], + tasks=[ + {"name": "inventory"}, + {"name": "cleanup", "depends_on": ["inventory"]}, + ], + ) + ], + ) + + targets = build_rerun_targets( + loaded_config=loaded_config, + failures=[ + { + "record_type": "task", + "target": "org-a", + "account_id": "111111111111", + "region": "us-west-2", + "task": "cleanup", + "status": "interrupted", + } + ], + ) + + assert len(targets) == 1 + assert targets[0].include == ["111111111111"] + assert targets[0].regions == ["us-west-2"] + assert targets[0].tasks == [ + {"name": "inventory"}, + {"name": "cleanup", "depends_on": ["inventory"]}, + ] + + def test_parse_fields_validates_known_fields(): assert parse_fields("account_id, region,task") == ["account_id", "region", "task"]