From 7617726c925c551d3ca63fc37a152fca2c8a5b17 Mon Sep 17 00:00:00 2001 From: cubic Bot Date: Fri, 17 Oct 2025 11:38:11 +0000 Subject: [PATCH] Preserve per-tag directories to prevent bundle overwrites (merges into #96) --- evidence/packager.py | 116 ++++++++++++++++++++++++++-------- tests/test_evidence_bundle.py | 57 +++++++++++++---- 2 files changed, 135 insertions(+), 38 deletions(-) diff --git a/evidence/packager.py b/evidence/packager.py index 20985cef9..a389dd45e 100644 --- a/evidence/packager.py +++ b/evidence/packager.py @@ -2,15 +2,15 @@ from __future__ import annotations +import copy import json import subprocess import tempfile -import copy from dataclasses import dataclass, field from datetime import datetime, timezone from hashlib import sha256 from pathlib import Path -from typing import Any, Dict, Iterable, Mapping +from typing import Any, Iterable, Mapping from zipfile import ZipFile import yaml @@ -57,7 +57,9 @@ def load_policy(policy_path: Path | None) -> dict[str, Any]: if isinstance(rules, Mapping): existing = merged.setdefault(section, {}) if isinstance(existing, Mapping): - existing.update(rules) # shallow merge is sufficient for numeric thresholds + existing.update( + rules + ) # shallow merge is sufficient for numeric thresholds else: merged[section] = rules return merged @@ -88,36 +90,74 @@ def _evaluate_rules(value: float, rules: Mapping[str, Any]) -> str: return status -def evaluate_policy(policy: Mapping[str, Any], *, metrics: Mapping[str, Any]) -> dict[str, Any]: +def evaluate_policy( + policy: Mapping[str, Any], *, metrics: Mapping[str, Any] +) -> dict[str, Any]: evaluations: dict[str, Any] = {"checks": {}, "overall": "pass"} - sbom_metrics = metrics.get("sbom", {}) if isinstance(metrics.get("sbom"), Mapping) else {} - sbom_policy = policy.get("sbom_quality", {}) if isinstance(policy.get("sbom_quality"), Mapping) else {} + sbom_metrics = ( + metrics.get("sbom", {}) + if isinstance(metrics.get("sbom"), Mapping) + else {} + ) + sbom_policy = ( + policy.get("sbom_quality", {}) + if isinstance(policy.get("sbom_quality"), Mapping) + else {} + ) for metric in ("coverage_percent", "license_coverage_percent"): value = sbom_metrics.get(metric) if value is None: continue status = _evaluate_rules(float(value), sbom_policy.get(metric, {})) - evaluations["checks"][f"sbom_{metric}"] = {"value": float(value), "status": status} + evaluations["checks"][f"sbom_{metric}"] = { + "value": float(value), + "status": status, + } - risk_metrics = metrics.get("risk", {}) if isinstance(metrics.get("risk"), Mapping) else {} - risk_policy = policy.get("risk", {}) if isinstance(policy.get("risk"), Mapping) else {} + risk_metrics = ( + metrics.get("risk", {}) + if isinstance(metrics.get("risk"), Mapping) + else {} + ) + risk_policy = ( + policy.get("risk", {}) + if isinstance(policy.get("risk"), Mapping) + else {} + ) max_risk = risk_metrics.get("max_risk_score") if max_risk is not None: - status = _evaluate_rules(float(max_risk), risk_policy.get("max_risk_score", {})) + status = _evaluate_rules( + float(max_risk), risk_policy.get("max_risk_score", {}) + ) evaluations["checks"]["risk_max_risk_score"] = { "value": float(max_risk), "status": status, } - repro_match = metrics.get("repro", {}).get("match") if isinstance(metrics.get("repro"), Mapping) else None - repro_policy = policy.get("repro", {}) if isinstance(policy.get("repro"), Mapping) else {} + repro_match = ( + metrics.get("repro", {}).get("match") + if isinstance(metrics.get("repro"), Mapping) + else None + ) + repro_policy = ( + policy.get("repro", {}) + if isinstance(policy.get("repro"), Mapping) + else {} + ) if repro_match is not None: required = bool(repro_policy.get("require_match", True)) status = "pass" if (not required or repro_match) else "fail" - evaluations["checks"]["repro_match"] = {"value": bool(repro_match), "status": status} + evaluations["checks"]["repro_match"] = { + "value": bool(repro_match), + "status": status, + } - provenance_policy = policy.get("provenance", {}) if isinstance(policy.get("provenance"), Mapping) else {} + provenance_policy = ( + policy.get("provenance", {}) + if isinstance(policy.get("provenance"), Mapping) + else {} + ) attestation_count = int(metrics.get("provenance", {}).get("count", 0)) if provenance_policy.get("require_attestations"): status = "pass" if attestation_count > 0 else "fail" @@ -151,7 +191,9 @@ def _collect_files(paths: Iterable[Path]) -> list[Path]: return files -def _sign_manifest(manifest_path: Path, signature_path: Path, key_path: Path) -> None: +def _sign_manifest( + manifest_path: Path, signature_path: Path, key_path: Path +) -> None: command = [ "cosign", "sign-blob", @@ -182,13 +224,23 @@ def create_bundle(inputs: BundleInputs) -> dict[str, Any]: if not Path(path).is_file(): raise FileNotFoundError(f"Required evidence file missing: {path}") if inputs.provenance_dir and not inputs.provenance_dir.exists(): - raise FileNotFoundError(f"Provenance directory '{inputs.provenance_dir}' not found") + raise FileNotFoundError( + f"Provenance directory '{inputs.provenance_dir}' not found" + ) - quality_payload = json.loads(inputs.sbom_quality_json.read_text(encoding="utf-8")) + quality_payload = json.loads( + inputs.sbom_quality_json.read_text(encoding="utf-8") + ) risk_payload = json.loads(inputs.risk_report.read_text(encoding="utf-8")) - repro_payload = json.loads(inputs.repro_attestation.read_text(encoding="utf-8")) + repro_payload = json.loads( + inputs.repro_attestation.read_text(encoding="utf-8") + ) - provenance_files = _collect_files([inputs.provenance_dir]) if inputs.provenance_dir else [] + provenance_files = ( + _collect_files([inputs.provenance_dir]) + if inputs.provenance_dir + else [] + ) extra_files = _collect_files(inputs.extra_paths) bundle_files: list[tuple[Path, str]] = [] artefact_descriptors: list[dict[str, Any]] = [] @@ -202,7 +254,10 @@ def create_bundle(inputs: BundleInputs) -> dict[str, Any]: mapping.extend( [ (inputs.risk_report, "risk/risk.json"), - (inputs.repro_attestation, f"repro/{inputs.repro_attestation.name}"), + ( + inputs.repro_attestation, + f"repro/{inputs.repro_attestation.name}", + ), ] ) @@ -241,9 +296,13 @@ def create_bundle(inputs: BundleInputs) -> dict[str, Any]: metrics = { "sbom": quality_payload.get("metrics", {}), "risk": { - "component_count": risk_payload.get("summary", {}).get("component_count"), + "component_count": risk_payload.get("summary", {}).get( + "component_count" + ), "cve_count": risk_payload.get("summary", {}).get("cve_count"), - "max_risk_score": risk_payload.get("summary", {}).get("max_risk_score"), + "max_risk_score": risk_payload.get("summary", {}).get( + "max_risk_score" + ), }, "repro": {"match": bool(repro_payload.get("match"))}, "provenance": {"count": len(provenance_files)}, @@ -261,23 +320,28 @@ def create_bundle(inputs: BundleInputs) -> dict[str, Any]: "evaluations": evaluations, } - manifest_path = resolve_within_root(manifest_dir, f"{tag}.yaml") + tag_path = Path(tag.replace(":", "_")) + manifest_path = resolve_within_root(manifest_dir, f"{tag_path}.yaml") + manifest_path.parent.mkdir(parents=True, exist_ok=True) with manifest_path.open("w", encoding="utf-8") as handle: yaml.safe_dump(manifest, handle, sort_keys=False) - bundle_path = resolve_within_root(bundle_dir, f"{tag}.zip") + bundle_path = resolve_within_root(bundle_dir, f"{tag_path}.zip") + bundle_path.parent.mkdir(parents=True, exist_ok=True) with ZipFile(bundle_path, "w") as archive: for source, arcname in bundle_files: archive.write(source, arcname) archive.write(manifest_path, "MANIFEST.yaml") if inputs.sign_key: - with tempfile.NamedTemporaryFile(suffix=".sig", delete=False) as tmp_signature: + with tempfile.NamedTemporaryFile( + suffix=".sig", delete=False + ) as tmp_signature: tmp_path = Path(tmp_signature.name) try: _sign_manifest(manifest_path, tmp_path, inputs.sign_key) archive.write(tmp_path, "MANIFEST.yaml.sig") finally: - if 'tmp_path' in locals() and tmp_path.exists(): + if "tmp_path" in locals() and tmp_path.exists(): tmp_path.unlink() manifest["bundle_path"] = str(bundle_path) diff --git a/tests/test_evidence_bundle.py b/tests/test_evidence_bundle.py index 7ded6d2df..82de9d1ca 100644 --- a/tests/test_evidence_bundle.py +++ b/tests/test_evidence_bundle.py @@ -6,6 +6,7 @@ import yaml from cli.fixops_ci import main as ci_main +from evidence.packager import _collect_files from services.evidence.packager import ( BundleInputs, EvidencePackager, @@ -13,7 +14,6 @@ load_policy, ) from services.evidence.store import EvidenceStore -from evidence.packager import _collect_files def _write_json(path: Path, payload: dict) -> Path: @@ -29,14 +29,25 @@ def test_create_bundle(tmp_path: Path) -> None: ) quality_json = _write_json( tmp_path / "analysis/sbom_quality_report.json", - {"metrics": {"coverage_percent": 95.0, "license_coverage_percent": 90.0}}, + { + "metrics": { + "coverage_percent": 95.0, + "license_coverage_percent": 90.0, + } + }, ) quality_html = tmp_path / "reports/sbom_quality_report.html" quality_html.parent.mkdir(parents=True, exist_ok=True) quality_html.write_text("quality", encoding="utf-8") risk_report = _write_json( tmp_path / "artifacts/risk.json", - {"summary": {"component_count": 2, "cve_count": 1, "max_risk_score": 60.0}}, + { + "summary": { + "component_count": 2, + "cve_count": 1, + "max_risk_score": 60.0, + } + }, ) provenance_dir = tmp_path / "artifacts/attestations" provenance_dir.mkdir(parents=True, exist_ok=True) @@ -50,10 +61,15 @@ def test_create_bundle(tmp_path: Path) -> None: policy_path.write_text( yaml.safe_dump( { - "risk": {"max_risk_score": {"warn_above": 80, "fail_above": 90}}, + "risk": { + "max_risk_score": {"warn_above": 80, "fail_above": 90} + }, "sbom_quality": { "coverage_percent": {"warn_below": 80, "fail_below": 60}, - "license_coverage_percent": {"warn_below": 80, "fail_below": 60}, + "license_coverage_percent": { + "warn_below": 80, + "fail_below": 60, + }, }, "repro": {"require_match": True}, "provenance": {"require_attestations": True}, @@ -93,29 +109,45 @@ def test_fixops_ci_evidence_bundle(tmp_path: Path) -> None: ) quality_json = _write_json( tmp_path / "analysis/sbom_quality_report.json", - {"metrics": {"coverage_percent": 85.0, "license_coverage_percent": 80.0}}, + { + "metrics": { + "coverage_percent": 85.0, + "license_coverage_percent": 80.0, + } + }, ) quality_html = tmp_path / "reports/sbom_quality_report.html" quality_html.parent.mkdir(parents=True, exist_ok=True) quality_html.write_text("quality", encoding="utf-8") risk_report = _write_json( tmp_path / "artifacts/risk.json", - {"summary": {"component_count": 2, "cve_count": 1, "max_risk_score": 65.0}}, + { + "summary": { + "component_count": 2, + "cve_count": 1, + "max_risk_score": 65.0, + } + }, ) provenance_dir = tmp_path / "artifacts/attestations" provenance_dir.mkdir(parents=True, exist_ok=True) (provenance_dir / "build.json").write_text("{}", encoding="utf-8") repro_dir = tmp_path / "artifacts/repro/attestations" - repro_attestation = _write_json(repro_dir / f"{tag}.json", {"match": True}) + _write_json(repro_dir / f"{tag}.json", {"match": True}) policy_path = tmp_path / "config/policy.yml" policy_path.parent.mkdir(parents=True, exist_ok=True) policy_path.write_text( yaml.safe_dump( { - "risk": {"max_risk_score": {"warn_above": 80, "fail_above": 90}}, + "risk": { + "max_risk_score": {"warn_above": 80, "fail_above": 90} + }, "sbom_quality": { "coverage_percent": {"warn_below": 70, "fail_below": 50}, - "license_coverage_percent": {"warn_below": 70, "fail_below": 50}, + "license_coverage_percent": { + "warn_below": 70, + "fail_below": 50, + }, }, "repro": {"require_match": True}, "provenance": {"require_attestations": True}, @@ -149,9 +181,10 @@ def test_fixops_ci_evidence_bundle(tmp_path: Path) -> None: ] ) assert exit_code == 0 - bundle_path = tmp_path / "evidence/bundles" / f"{tag}.zip" + tag_path = Path(tag.replace(":", "_")) + bundle_path = tmp_path / "evidence/bundles" / f"{tag_path}.zip" assert bundle_path.is_file() - manifest_path = tmp_path / "evidence/manifests" / f"{tag}.yaml" + manifest_path = tmp_path / "evidence/manifests" / f"{tag_path}.yaml" assert manifest_path.is_file() manifest = yaml.safe_load(manifest_path.read_text(encoding="utf-8")) assert manifest["evaluations"]["overall"] == "pass"