Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 90 additions & 26 deletions evidence/packager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

from __future__ import annotations

import copy
import json
import subprocess
import tempfile
import copy
from dataclasses import dataclass, field
from datetime import datetime, timezone
from hashlib import sha256
from pathlib import Path
from typing import Any, Dict, Iterable, Mapping
from typing import Any, Iterable, Mapping
from zipfile import ZipFile

import yaml
Expand Down Expand Up @@ -57,7 +57,9 @@ def load_policy(policy_path: Path | None) -> dict[str, Any]:
if isinstance(rules, Mapping):
existing = merged.setdefault(section, {})
if isinstance(existing, Mapping):
existing.update(rules) # shallow merge is sufficient for numeric thresholds
existing.update(
rules
) # shallow merge is sufficient for numeric thresholds
else:
merged[section] = rules
return merged
Expand Down Expand Up @@ -88,36 +90,74 @@ def _evaluate_rules(value: float, rules: Mapping[str, Any]) -> str:
return status


def evaluate_policy(policy: Mapping[str, Any], *, metrics: Mapping[str, Any]) -> dict[str, Any]:
def evaluate_policy(
policy: Mapping[str, Any], *, metrics: Mapping[str, Any]
) -> dict[str, Any]:
evaluations: dict[str, Any] = {"checks": {}, "overall": "pass"}

sbom_metrics = metrics.get("sbom", {}) if isinstance(metrics.get("sbom"), Mapping) else {}
sbom_policy = policy.get("sbom_quality", {}) if isinstance(policy.get("sbom_quality"), Mapping) else {}
sbom_metrics = (
metrics.get("sbom", {})
if isinstance(metrics.get("sbom"), Mapping)
else {}
)
sbom_policy = (
policy.get("sbom_quality", {})
if isinstance(policy.get("sbom_quality"), Mapping)
else {}
)
for metric in ("coverage_percent", "license_coverage_percent"):
value = sbom_metrics.get(metric)
if value is None:
continue
status = _evaluate_rules(float(value), sbom_policy.get(metric, {}))
evaluations["checks"][f"sbom_{metric}"] = {"value": float(value), "status": status}
evaluations["checks"][f"sbom_{metric}"] = {
"value": float(value),
"status": status,
}

risk_metrics = metrics.get("risk", {}) if isinstance(metrics.get("risk"), Mapping) else {}
risk_policy = policy.get("risk", {}) if isinstance(policy.get("risk"), Mapping) else {}
risk_metrics = (
metrics.get("risk", {})
if isinstance(metrics.get("risk"), Mapping)
else {}
)
risk_policy = (
policy.get("risk", {})
if isinstance(policy.get("risk"), Mapping)
else {}
)
max_risk = risk_metrics.get("max_risk_score")
if max_risk is not None:
status = _evaluate_rules(float(max_risk), risk_policy.get("max_risk_score", {}))
status = _evaluate_rules(
float(max_risk), risk_policy.get("max_risk_score", {})
)
evaluations["checks"]["risk_max_risk_score"] = {
"value": float(max_risk),
"status": status,
}

repro_match = metrics.get("repro", {}).get("match") if isinstance(metrics.get("repro"), Mapping) else None
repro_policy = policy.get("repro", {}) if isinstance(policy.get("repro"), Mapping) else {}
repro_match = (
metrics.get("repro", {}).get("match")
if isinstance(metrics.get("repro"), Mapping)
else None
)
repro_policy = (
policy.get("repro", {})
if isinstance(policy.get("repro"), Mapping)
else {}
)
if repro_match is not None:
required = bool(repro_policy.get("require_match", True))
status = "pass" if (not required or repro_match) else "fail"
evaluations["checks"]["repro_match"] = {"value": bool(repro_match), "status": status}
evaluations["checks"]["repro_match"] = {
"value": bool(repro_match),
"status": status,
}

provenance_policy = policy.get("provenance", {}) if isinstance(policy.get("provenance"), Mapping) else {}
provenance_policy = (
policy.get("provenance", {})
if isinstance(policy.get("provenance"), Mapping)
else {}
)
attestation_count = int(metrics.get("provenance", {}).get("count", 0))
if provenance_policy.get("require_attestations"):
status = "pass" if attestation_count > 0 else "fail"
Expand Down Expand Up @@ -151,7 +191,9 @@ def _collect_files(paths: Iterable[Path]) -> list[Path]:
return files


def _sign_manifest(manifest_path: Path, signature_path: Path, key_path: Path) -> None:
def _sign_manifest(
manifest_path: Path, signature_path: Path, key_path: Path
) -> None:
command = [
"cosign",
"sign-blob",
Expand Down Expand Up @@ -182,13 +224,23 @@ def create_bundle(inputs: BundleInputs) -> dict[str, Any]:
if not Path(path).is_file():
raise FileNotFoundError(f"Required evidence file missing: {path}")
if inputs.provenance_dir and not inputs.provenance_dir.exists():
raise FileNotFoundError(f"Provenance directory '{inputs.provenance_dir}' not found")
raise FileNotFoundError(
f"Provenance directory '{inputs.provenance_dir}' not found"
)

quality_payload = json.loads(inputs.sbom_quality_json.read_text(encoding="utf-8"))
quality_payload = json.loads(
inputs.sbom_quality_json.read_text(encoding="utf-8")
)
risk_payload = json.loads(inputs.risk_report.read_text(encoding="utf-8"))
repro_payload = json.loads(inputs.repro_attestation.read_text(encoding="utf-8"))
repro_payload = json.loads(
inputs.repro_attestation.read_text(encoding="utf-8")
)

provenance_files = _collect_files([inputs.provenance_dir]) if inputs.provenance_dir else []
provenance_files = (
_collect_files([inputs.provenance_dir])
if inputs.provenance_dir
else []
)
extra_files = _collect_files(inputs.extra_paths)
bundle_files: list[tuple[Path, str]] = []
artefact_descriptors: list[dict[str, Any]] = []
Expand All @@ -202,7 +254,10 @@ def create_bundle(inputs: BundleInputs) -> dict[str, Any]:
mapping.extend(
[
(inputs.risk_report, "risk/risk.json"),
(inputs.repro_attestation, f"repro/{inputs.repro_attestation.name}"),
(
inputs.repro_attestation,
f"repro/{inputs.repro_attestation.name}",
),
]
)

Expand Down Expand Up @@ -241,9 +296,13 @@ def create_bundle(inputs: BundleInputs) -> dict[str, Any]:
metrics = {
"sbom": quality_payload.get("metrics", {}),
"risk": {
"component_count": risk_payload.get("summary", {}).get("component_count"),
"component_count": risk_payload.get("summary", {}).get(
"component_count"
),
"cve_count": risk_payload.get("summary", {}).get("cve_count"),
"max_risk_score": risk_payload.get("summary", {}).get("max_risk_score"),
"max_risk_score": risk_payload.get("summary", {}).get(
"max_risk_score"
),
},
"repro": {"match": bool(repro_payload.get("match"))},
"provenance": {"count": len(provenance_files)},
Expand All @@ -261,23 +320,28 @@ def create_bundle(inputs: BundleInputs) -> dict[str, Any]:
"evaluations": evaluations,
}

manifest_path = resolve_within_root(manifest_dir, f"{tag}.yaml")
tag_path = Path(tag.replace(":", "_"))
manifest_path = resolve_within_root(manifest_dir, f"{tag_path}.yaml")
manifest_path.parent.mkdir(parents=True, exist_ok=True)
with manifest_path.open("w", encoding="utf-8") as handle:
yaml.safe_dump(manifest, handle, sort_keys=False)

bundle_path = resolve_within_root(bundle_dir, f"{tag}.zip")
bundle_path = resolve_within_root(bundle_dir, f"{tag_path}.zip")
bundle_path.parent.mkdir(parents=True, exist_ok=True)
with ZipFile(bundle_path, "w") as archive:
for source, arcname in bundle_files:
archive.write(source, arcname)
archive.write(manifest_path, "MANIFEST.yaml")
if inputs.sign_key:
with tempfile.NamedTemporaryFile(suffix=".sig", delete=False) as tmp_signature:
with tempfile.NamedTemporaryFile(
suffix=".sig", delete=False
) as tmp_signature:
tmp_path = Path(tmp_signature.name)
try:
_sign_manifest(manifest_path, tmp_path, inputs.sign_key)
archive.write(tmp_path, "MANIFEST.yaml.sig")
finally:
if 'tmp_path' in locals() and tmp_path.exists():
if "tmp_path" in locals() and tmp_path.exists():
tmp_path.unlink()

manifest["bundle_path"] = str(bundle_path)
Expand Down
57 changes: 45 additions & 12 deletions tests/test_evidence_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
import yaml

from cli.fixops_ci import main as ci_main
from evidence.packager import _collect_files
from services.evidence.packager import (
BundleInputs,
EvidencePackager,
evaluate_policy,
load_policy,
)
from services.evidence.store import EvidenceStore
from evidence.packager import _collect_files


def _write_json(path: Path, payload: dict) -> Path:
Expand All @@ -29,14 +29,25 @@ def test_create_bundle(tmp_path: Path) -> None:
)
quality_json = _write_json(
tmp_path / "analysis/sbom_quality_report.json",
{"metrics": {"coverage_percent": 95.0, "license_coverage_percent": 90.0}},
{
"metrics": {
"coverage_percent": 95.0,
"license_coverage_percent": 90.0,
}
},
)
quality_html = tmp_path / "reports/sbom_quality_report.html"
quality_html.parent.mkdir(parents=True, exist_ok=True)
quality_html.write_text("<html>quality</html>", encoding="utf-8")
risk_report = _write_json(
tmp_path / "artifacts/risk.json",
{"summary": {"component_count": 2, "cve_count": 1, "max_risk_score": 60.0}},
{
"summary": {
"component_count": 2,
"cve_count": 1,
"max_risk_score": 60.0,
}
},
)
provenance_dir = tmp_path / "artifacts/attestations"
provenance_dir.mkdir(parents=True, exist_ok=True)
Expand All @@ -50,10 +61,15 @@ def test_create_bundle(tmp_path: Path) -> None:
policy_path.write_text(
yaml.safe_dump(
{
"risk": {"max_risk_score": {"warn_above": 80, "fail_above": 90}},
"risk": {
"max_risk_score": {"warn_above": 80, "fail_above": 90}
},
"sbom_quality": {
"coverage_percent": {"warn_below": 80, "fail_below": 60},
"license_coverage_percent": {"warn_below": 80, "fail_below": 60},
"license_coverage_percent": {
"warn_below": 80,
"fail_below": 60,
},
},
"repro": {"require_match": True},
"provenance": {"require_attestations": True},
Expand Down Expand Up @@ -93,29 +109,45 @@ def test_fixops_ci_evidence_bundle(tmp_path: Path) -> None:
)
quality_json = _write_json(
tmp_path / "analysis/sbom_quality_report.json",
{"metrics": {"coverage_percent": 85.0, "license_coverage_percent": 80.0}},
{
"metrics": {
"coverage_percent": 85.0,
"license_coverage_percent": 80.0,
}
},
)
quality_html = tmp_path / "reports/sbom_quality_report.html"
quality_html.parent.mkdir(parents=True, exist_ok=True)
quality_html.write_text("<html>quality</html>", encoding="utf-8")
risk_report = _write_json(
tmp_path / "artifacts/risk.json",
{"summary": {"component_count": 2, "cve_count": 1, "max_risk_score": 65.0}},
{
"summary": {
"component_count": 2,
"cve_count": 1,
"max_risk_score": 65.0,
}
},
)
provenance_dir = tmp_path / "artifacts/attestations"
provenance_dir.mkdir(parents=True, exist_ok=True)
(provenance_dir / "build.json").write_text("{}", encoding="utf-8")
repro_dir = tmp_path / "artifacts/repro/attestations"
repro_attestation = _write_json(repro_dir / f"{tag}.json", {"match": True})
_write_json(repro_dir / f"{tag}.json", {"match": True})
policy_path = tmp_path / "config/policy.yml"
policy_path.parent.mkdir(parents=True, exist_ok=True)
policy_path.write_text(
yaml.safe_dump(
{
"risk": {"max_risk_score": {"warn_above": 80, "fail_above": 90}},
"risk": {
"max_risk_score": {"warn_above": 80, "fail_above": 90}
},
"sbom_quality": {
"coverage_percent": {"warn_below": 70, "fail_below": 50},
"license_coverage_percent": {"warn_below": 70, "fail_below": 50},
"license_coverage_percent": {
"warn_below": 70,
"fail_below": 50,
},
},
"repro": {"require_match": True},
"provenance": {"require_attestations": True},
Expand Down Expand Up @@ -149,9 +181,10 @@ def test_fixops_ci_evidence_bundle(tmp_path: Path) -> None:
]
)
assert exit_code == 0
bundle_path = tmp_path / "evidence/bundles" / f"{tag}.zip"
tag_path = Path(tag.replace(":", "_"))
bundle_path = tmp_path / "evidence/bundles" / f"{tag_path}.zip"
assert bundle_path.is_file()
manifest_path = tmp_path / "evidence/manifests" / f"{tag}.yaml"
manifest_path = tmp_path / "evidence/manifests" / f"{tag_path}.yaml"
assert manifest_path.is_file()
manifest = yaml.safe_load(manifest_path.read_text(encoding="utf-8"))
assert manifest["evaluations"]["overall"] == "pass"
Expand Down
Loading