diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index daf1d13bd..9805beed1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -37,7 +37,7 @@ jobs: python scripts/run_demo_steps.py --app "life-claims-portal" - name: Upload decision artefact if: env.RUN_FIXOPS_INTEGRATION_TESTS == '1' - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: demo-decision path: artefacts/**/outputs/decision.json diff --git a/apps/api/app.py b/apps/api/app.py index 18e619bdc..d01e1b138 100644 --- a/apps/api/app.py +++ b/apps/api/app.py @@ -118,9 +118,13 @@ def create_app() -> FastAPI: async def _verify_api_key(api_key: Optional[str] = Depends(api_key_header)) -> None: if auth_strategy == "token": - if not api_key or api_key not in expected_tokens: + if not api_key: + raise HTTPException( + status_code=401, detail="Missing API token" + ) + if api_key not in expected_tokens: raise HTTPException( - status_code=401, detail="Invalid or missing API token" + status_code=401, detail="Invalid API token" ) return if auth_strategy == "jwt": diff --git a/apps/api/normalizers.py b/apps/api/normalizers.py index 68f521b37..0c6e41fae 100644 --- a/apps/api/normalizers.py +++ b/apps/api/normalizers.py @@ -10,7 +10,17 @@ import zipfile from contextlib import suppress from dataclasses import dataclass, field, asdict -from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple +from typing import Any, Dict, Iterable, List, Literal, Mapping, Optional, Tuple + +from pydantic import ( + BaseModel, + ConfigDict, + Field, + StrictInt, + StrictStr, + ValidationError, + field_validator, +) try: # Optional dependency for YAML parsing import yaml @@ -64,6 +74,9 @@ def _resolve_sbom_parser_state() -> tuple[Any | None, Exception | None]: logger = logging.getLogger(__name__) +MAX_DOCUMENT_BYTES = 32 * 1024 * 1024 +_SARIF_LEVELS: tuple[str, ...] = ("none", "note", "warning", "error", "info") + _SNYK_SEVERITY_TO_LEVEL = { "critical": "error", "high": "error", @@ -439,6 +452,40 @@ def to_dict(self) -> Dict[str, Any]: } +class SarifFindingModel(BaseModel): + """Schema for validated SARIF findings with strict coercion rules.""" + + model_config = ConfigDict(extra="forbid", str_strip_whitespace=True) + + rule_id: StrictStr | None = Field(default=None) + message: StrictStr | None = Field(default=None) + level: Literal[_SARIF_LEVELS] | None = Field(default=None) + file: StrictStr | None = Field(default=None) + line: StrictInt | None = Field(default=None, ge=0) + raw: Mapping[str, Any] + + @field_validator("rule_id") + @classmethod + def _validate_rule_id(cls, value: StrictStr | None) -> StrictStr | None: + if value is None: + return None + if not value: + raise ValueError("rule_id cannot be empty") + return value + + +class NormalizedSARIFModel(BaseModel): + """Validated structure for normalised SARIF payloads.""" + + model_config = ConfigDict(extra="forbid", str_strip_whitespace=True) + + version: StrictStr + schema_uri: StrictStr | None = Field(default=None) + tool_names: List[StrictStr] + findings: List[SarifFindingModel] + metadata: Dict[str, Any] + + @dataclass class SarifFinding: """Summarised SARIF result.""" @@ -561,6 +608,9 @@ def _prepare_text(self, raw: Any) -> str: data = self._ensure_bytes(raw) data = self._maybe_decode_base64(data) data = self._maybe_decompress(data) + if len(data) > MAX_DOCUMENT_BYTES: + # TODO: consider streaming parsers to avoid loading oversized artefacts entirely in memory. + raise ValueError("Document exceeds maximum allowed size") return data.decode("utf-8", errors="ignore") def load_sbom(self, raw: Any) -> NormalizedSBOM: @@ -998,8 +1048,8 @@ def load_sarif(self, raw: Any) -> NormalizedSARIF: properties=data.get("properties"), ) - findings: List[SarifFinding] = [] tool_names: List[str] = [] + finding_models: List[SarifFindingModel] = [] for run in runs: tool = ( @@ -1008,37 +1058,84 @@ def load_sarif(self, raw: Any) -> NormalizedSARIF: else {} ) tool_name = tool.get("name") - if tool_name: - tool_names.append(tool_name) + if isinstance(tool_name, str) and tool_name.strip(): + tool_names.append(tool_name.strip()) results = run.get("results") if isinstance(run, dict) else None - for result in results or []: - message = None - if "message" in result: - if isinstance(result["message"], dict): - message = result["message"].get("text") - else: - message = str(result["message"]) - - location = (result.get("locations") or [{}])[0] - physical = location.get("physicalLocation", {}) - artifact = physical.get("artifactLocation", {}) - region = physical.get("region", {}) + if isinstance(results, Iterable) and not isinstance( + results, (str, bytes, bytearray) + ): + for result in results: + if not isinstance(result, Mapping): + continue + message_value = result.get("message") + message: Optional[str] = None + if isinstance(message_value, Mapping): + text_value = message_value.get("text") + if isinstance(text_value, str): + message = text_value + elif isinstance(message_value, str): + message = message_value + + locations = result.get("locations") + location: Mapping[str, Any] | None = None + if isinstance(locations, Iterable) and not isinstance( + locations, (str, bytes, bytearray) + ): + for entry in locations: + if isinstance(entry, Mapping): + location = entry + break + physical = ( + location.get("physicalLocation") + if isinstance(location, Mapping) + else None + ) + artifact = ( + physical.get("artifactLocation") + if isinstance(physical, Mapping) + else None + ) + region = ( + physical.get("region") + if isinstance(physical, Mapping) + else None + ) - findings.append( - SarifFinding( - rule_id=result.get("ruleId"), - message=message, - level=result.get("level"), - file=artifact.get("uri"), - line=region.get("startLine"), - raw=result, + level_value = result.get("level") + normalized_level = None + if isinstance(level_value, str) and level_value.strip(): + normalized_level = level_value.strip().lower() + + rule_id_value = result.get("ruleId") + rule_id = rule_id_value if isinstance(rule_id_value, str) else None + + file_uri = artifact.get("uri") if isinstance(artifact, Mapping) else None + if file_uri is not None and not isinstance(file_uri, str): + file_uri = None + + start_line = ( + region.get("startLine") if isinstance(region, Mapping) else None ) - ) + if not isinstance(start_line, int): + start_line = None + + candidate = { + "rule_id": rule_id, + "message": message, + "level": normalized_level, + "file": file_uri, + "line": start_line, + "raw": result, + } + try: + finding_models.append(SarifFindingModel.model_validate(candidate)) + except ValidationError as exc: + raise ValueError("Invalid SARIF result") from exc metadata = { "run_count": len(runs), - "finding_count": len(findings), + "finding_count": len(finding_models), } schema_key = sarif_log.schema_uri if isinstance(schema_key, str): @@ -1046,12 +1143,39 @@ def load_sarif(self, raw: Any) -> NormalizedSARIF: if tool_names: metadata["tool_count"] = len(tool_names) + try: + normalized_model = NormalizedSARIFModel.model_validate( + { + "version": str(sarif_log.version or "2.1.0"), + "schema_uri": ( + str(sarif_log.schema_uri) + if isinstance(sarif_log.schema_uri, str) + else None + ), + "tool_names": tool_names, + "findings": finding_models, + "metadata": metadata, + } + ) + except ValidationError as exc: + raise ValueError("Normalised SARIF payload failed validation") from exc + normalized = NormalizedSARIF( - version=sarif_log.version, - schema_uri=sarif_log.schema_uri, - tool_names=tool_names, - findings=findings, - metadata=metadata, + version=normalized_model.version, + schema_uri=normalized_model.schema_uri, + tool_names=normalized_model.tool_names, + findings=[ + SarifFinding( + rule_id=model.rule_id, + message=model.message, + level=model.level, + file=model.file, + line=model.line, + raw=dict(model.raw), + ) + for model in normalized_model.findings + ], + metadata=normalized_model.metadata, ) logger.debug("Normalised SARIF", extra={"metadata": metadata}) return normalized diff --git a/core/stage_runner.py b/core/stage_runner.py index 3cca094c5..38167ab30 100644 --- a/core/stage_runner.py +++ b/core/stage_runner.py @@ -16,6 +16,8 @@ from pathlib import Path from typing import Any, Dict, Iterable, Mapping, Optional +from fixops.utils.paths import resolve_within_root + from apps.api.normalizers import InputNormalizer, NormalizedSARIF, NormalizedSBOM def _current_utc_timestamp() -> str: @@ -666,6 +668,32 @@ def _analyse_posture(self, payload: Mapping[str, Any]) -> dict[str, Any]: else: resources = [payload] + def _extract_cidrs(source: Mapping[str, Any] | None, *, include_ipv6: bool = True) -> list[str]: + if not isinstance(source, Mapping): + return [] + values: list[str] = [] + for key in ("cidr_blocks", "cidrs", "cidr"): + entries = source.get(key) + if isinstance(entries, (str, bytes)): + values.append(str(entries)) + elif isinstance(entries, Iterable) and not isinstance(entries, (str, bytes, bytearray)): + values.extend(str(item) for item in entries) + if include_ipv6: + for key in ("ipv6_cidr_blocks", "ipv6_cidrs"): + entries = source.get(key) + if isinstance(entries, (str, bytes)): + values.append(str(entries)) + elif isinstance(entries, Iterable) and not isinstance(entries, (str, bytes, bytearray)): + values.extend(str(item) for item in entries) + return values + + def _contains_open_rule(cidrs: Iterable[str]) -> bool: + for value in cidrs: + candidate = str(value).strip() + if candidate in {"0.0.0.0/0", "::/0"}: + return True + return False + for resource in resources: if not isinstance(resource, Mapping): continue @@ -695,23 +723,24 @@ def _analyse_posture(self, payload: Mapping[str, Any]) -> dict[str, Any]: if candidate_tls: tls_policy = candidate_tls - if rtype in {"aws_security_group", "aws_security_group_rule"}: + if rtype == "aws_security_group": ingress_rules = after.get("ingress") or resource.get("ingress") or [] if isinstance(ingress_rules, Mapping): ingress_rules = [ingress_rules] for rule in ingress_rules: if not isinstance(rule, Mapping): continue - cidrs = rule.get("cidr_blocks") or rule.get("cidrs") or rule.get("cidr") - if isinstance(cidrs, (str, bytes)): - cidr_values = [cidrs] - elif isinstance(cidrs, Iterable): - cidr_values = [str(item) for item in cidrs] - else: - cidr_values = [] - if any(value == "0.0.0.0/0" for value in cidr_values): + cidr_values = _extract_cidrs(rule) + if _contains_open_rule(cidr_values): open_security_groups.add(name) + if rtype == "aws_security_group_rule": + cidr_values = _extract_cidrs(after) + if not cidr_values: + cidr_values = _extract_cidrs(resource) + if _contains_open_rule(cidr_values): + open_security_groups.add(name) + if rtype in {"aws_db_instance", "aws_rds_cluster"}: encrypted = after.get("storage_encrypted") if encrypted is False or encrypted is None: @@ -935,7 +964,7 @@ def _marketplace_recommendations(self, failing_controls: list[Any]) -> list[dict ] def _write_evidence_bundle(self, context, documents: Mapping[str, Mapping[str, Any]]) -> Path: - bundle_path = context.outputs_dir / "evidence_bundle.zip" + bundle_path = resolve_within_root(context.outputs_dir, "evidence_bundle.zip") with zipfile.ZipFile(bundle_path, "w") as archive: for key, filename in self._OUTPUT_FILENAMES.items(): document = documents.get(key) diff --git a/evidence/packager.py b/evidence/packager.py index 3ce686f14..a389dd45e 100644 --- a/evidence/packager.py +++ b/evidence/packager.py @@ -2,19 +2,21 @@ from __future__ import annotations +import copy import json import subprocess import tempfile -import copy from dataclasses import dataclass, field from datetime import datetime, timezone from hashlib import sha256 from pathlib import Path -from typing import Any, Dict, Iterable, Mapping +from typing import Any, Iterable, Mapping from zipfile import ZipFile import yaml +from fixops.utils.paths import resolve_within_root + DEFAULT_POLICY: dict[str, Any] = { "sbom_quality": { "coverage_percent": {"warn_below": 80.0, "fail_below": 60.0}, @@ -49,13 +51,15 @@ def load_policy(policy_path: Path | None) -> dict[str, Any]: with policy_path.open("r", encoding="utf-8") as handle: loaded = yaml.safe_load(handle) or {} if not isinstance(loaded, Mapping): - return DEFAULT_POLICY + raise ValueError("Policy file must decode to a mapping") merged = copy.deepcopy(DEFAULT_POLICY) for section, rules in loaded.items(): if isinstance(rules, Mapping): existing = merged.setdefault(section, {}) if isinstance(existing, Mapping): - existing.update(rules) # shallow merge is sufficient for numeric thresholds + existing.update( + rules + ) # shallow merge is sufficient for numeric thresholds else: merged[section] = rules return merged @@ -86,36 +90,74 @@ def _evaluate_rules(value: float, rules: Mapping[str, Any]) -> str: return status -def evaluate_policy(policy: Mapping[str, Any], *, metrics: Mapping[str, Any]) -> dict[str, Any]: +def evaluate_policy( + policy: Mapping[str, Any], *, metrics: Mapping[str, Any] +) -> dict[str, Any]: evaluations: dict[str, Any] = {"checks": {}, "overall": "pass"} - sbom_metrics = metrics.get("sbom", {}) if isinstance(metrics.get("sbom"), Mapping) else {} - sbom_policy = policy.get("sbom_quality", {}) if isinstance(policy.get("sbom_quality"), Mapping) else {} + sbom_metrics = ( + metrics.get("sbom", {}) + if isinstance(metrics.get("sbom"), Mapping) + else {} + ) + sbom_policy = ( + policy.get("sbom_quality", {}) + if isinstance(policy.get("sbom_quality"), Mapping) + else {} + ) for metric in ("coverage_percent", "license_coverage_percent"): value = sbom_metrics.get(metric) if value is None: continue status = _evaluate_rules(float(value), sbom_policy.get(metric, {})) - evaluations["checks"][f"sbom_{metric}"] = {"value": float(value), "status": status} + evaluations["checks"][f"sbom_{metric}"] = { + "value": float(value), + "status": status, + } - risk_metrics = metrics.get("risk", {}) if isinstance(metrics.get("risk"), Mapping) else {} - risk_policy = policy.get("risk", {}) if isinstance(policy.get("risk"), Mapping) else {} + risk_metrics = ( + metrics.get("risk", {}) + if isinstance(metrics.get("risk"), Mapping) + else {} + ) + risk_policy = ( + policy.get("risk", {}) + if isinstance(policy.get("risk"), Mapping) + else {} + ) max_risk = risk_metrics.get("max_risk_score") if max_risk is not None: - status = _evaluate_rules(float(max_risk), risk_policy.get("max_risk_score", {})) + status = _evaluate_rules( + float(max_risk), risk_policy.get("max_risk_score", {}) + ) evaluations["checks"]["risk_max_risk_score"] = { "value": float(max_risk), "status": status, } - repro_match = metrics.get("repro", {}).get("match") if isinstance(metrics.get("repro"), Mapping) else None - repro_policy = policy.get("repro", {}) if isinstance(policy.get("repro"), Mapping) else {} + repro_match = ( + metrics.get("repro", {}).get("match") + if isinstance(metrics.get("repro"), Mapping) + else None + ) + repro_policy = ( + policy.get("repro", {}) + if isinstance(policy.get("repro"), Mapping) + else {} + ) if repro_match is not None: required = bool(repro_policy.get("require_match", True)) status = "pass" if (not required or repro_match) else "fail" - evaluations["checks"]["repro_match"] = {"value": bool(repro_match), "status": status} + evaluations["checks"]["repro_match"] = { + "value": bool(repro_match), + "status": status, + } - provenance_policy = policy.get("provenance", {}) if isinstance(policy.get("provenance"), Mapping) else {} + provenance_policy = ( + policy.get("provenance", {}) + if isinstance(policy.get("provenance"), Mapping) + else {} + ) attestation_count = int(metrics.get("provenance", {}).get("count", 0)) if provenance_policy.get("require_attestations"): status = "pass" if attestation_count > 0 else "fail" @@ -149,7 +191,9 @@ def _collect_files(paths: Iterable[Path]) -> list[Path]: return files -def _sign_manifest(manifest_path: Path, signature_path: Path, key_path: Path) -> None: +def _sign_manifest( + manifest_path: Path, signature_path: Path, key_path: Path +) -> None: command = [ "cosign", "sign-blob", @@ -164,9 +208,9 @@ def _sign_manifest(manifest_path: Path, signature_path: Path, key_path: Path) -> def create_bundle(inputs: BundleInputs) -> dict[str, Any]: tag = inputs.tag - output_root = inputs.output_dir - bundle_dir = output_root / "bundles" - manifest_dir = output_root / "manifests" + output_root = inputs.output_dir.resolve() + bundle_dir = resolve_within_root(output_root, "bundles") + manifest_dir = resolve_within_root(output_root, "manifests") bundle_dir.mkdir(parents=True, exist_ok=True) manifest_dir.mkdir(parents=True, exist_ok=True) @@ -180,13 +224,23 @@ def create_bundle(inputs: BundleInputs) -> dict[str, Any]: if not Path(path).is_file(): raise FileNotFoundError(f"Required evidence file missing: {path}") if inputs.provenance_dir and not inputs.provenance_dir.exists(): - raise FileNotFoundError(f"Provenance directory '{inputs.provenance_dir}' not found") + raise FileNotFoundError( + f"Provenance directory '{inputs.provenance_dir}' not found" + ) - quality_payload = json.loads(inputs.sbom_quality_json.read_text(encoding="utf-8")) + quality_payload = json.loads( + inputs.sbom_quality_json.read_text(encoding="utf-8") + ) risk_payload = json.loads(inputs.risk_report.read_text(encoding="utf-8")) - repro_payload = json.loads(inputs.repro_attestation.read_text(encoding="utf-8")) + repro_payload = json.loads( + inputs.repro_attestation.read_text(encoding="utf-8") + ) - provenance_files = _collect_files([inputs.provenance_dir]) if inputs.provenance_dir else [] + provenance_files = ( + _collect_files([inputs.provenance_dir]) + if inputs.provenance_dir + else [] + ) extra_files = _collect_files(inputs.extra_paths) bundle_files: list[tuple[Path, str]] = [] artefact_descriptors: list[dict[str, Any]] = [] @@ -200,7 +254,10 @@ def create_bundle(inputs: BundleInputs) -> dict[str, Any]: mapping.extend( [ (inputs.risk_report, "risk/risk.json"), - (inputs.repro_attestation, f"repro/{inputs.repro_attestation.name}"), + ( + inputs.repro_attestation, + f"repro/{inputs.repro_attestation.name}", + ), ] ) @@ -239,9 +296,13 @@ def create_bundle(inputs: BundleInputs) -> dict[str, Any]: metrics = { "sbom": quality_payload.get("metrics", {}), "risk": { - "component_count": risk_payload.get("summary", {}).get("component_count"), + "component_count": risk_payload.get("summary", {}).get( + "component_count" + ), "cve_count": risk_payload.get("summary", {}).get("cve_count"), - "max_risk_score": risk_payload.get("summary", {}).get("max_risk_score"), + "max_risk_score": risk_payload.get("summary", {}).get( + "max_risk_score" + ), }, "repro": {"match": bool(repro_payload.get("match"))}, "provenance": {"count": len(provenance_files)}, @@ -259,23 +320,28 @@ def create_bundle(inputs: BundleInputs) -> dict[str, Any]: "evaluations": evaluations, } - manifest_path = manifest_dir / f"{tag}.yaml" + tag_path = Path(tag.replace(":", "_")) + manifest_path = resolve_within_root(manifest_dir, f"{tag_path}.yaml") + manifest_path.parent.mkdir(parents=True, exist_ok=True) with manifest_path.open("w", encoding="utf-8") as handle: yaml.safe_dump(manifest, handle, sort_keys=False) - bundle_path = bundle_dir / f"{tag}.zip" + bundle_path = resolve_within_root(bundle_dir, f"{tag_path}.zip") + bundle_path.parent.mkdir(parents=True, exist_ok=True) with ZipFile(bundle_path, "w") as archive: for source, arcname in bundle_files: archive.write(source, arcname) archive.write(manifest_path, "MANIFEST.yaml") if inputs.sign_key: - with tempfile.NamedTemporaryFile(suffix=".sig", delete=False) as tmp_signature: + with tempfile.NamedTemporaryFile( + suffix=".sig", delete=False + ) as tmp_signature: tmp_path = Path(tmp_signature.name) try: _sign_manifest(manifest_path, tmp_path, inputs.sign_key) archive.write(tmp_path, "MANIFEST.yaml.sig") finally: - if 'tmp_path' in locals() and tmp_path.exists(): + if "tmp_path" in locals() and tmp_path.exists(): tmp_path.unlink() manifest["bundle_path"] = str(bundle_path) diff --git a/fixops-enterprise/src/services/run_registry.py b/fixops-enterprise/src/services/run_registry.py index 9291de3ca..743489197 100644 --- a/fixops-enterprise/src/services/run_registry.py +++ b/fixops-enterprise/src/services/run_registry.py @@ -10,6 +10,8 @@ from pathlib import Path from typing import Any, Iterable, Mapping +from fixops.utils.paths import resolve_within_root + ARTEFACTS_ROOT = Path("artefacts") _CANONICAL_OUTPUTS: set[str] = { @@ -51,7 +53,7 @@ def signed_outputs_dir(self) -> Path: @property def transparency_index(self) -> Path: - return self.outputs_dir / "transparency.index" + return resolve_within_root(self.outputs_dir, "transparency.index") class RunRegistry: @@ -151,7 +153,7 @@ def save_input( ) -> Path: """Persist an input payload beneath the run's inputs directory.""" - target = context.inputs_dir / filename + target = resolve_within_root(context.inputs_dir, filename) target.parent.mkdir(parents=True, exist_ok=True) if isinstance(payload, (bytes, bytearray)): target.write_bytes(bytes(payload)) @@ -171,14 +173,14 @@ def write_output( if name not in _CANONICAL_OUTPUTS: raise ValueError(f"Unsupported output name: {name}") - target = context.outputs_dir / name + target = resolve_within_root(context.outputs_dir, name) target.parent.mkdir(parents=True, exist_ok=True) text = self._json_dumps(document) target.write_text(text, encoding="utf-8") return target def write_binary_output(self, context: RunContext, name: str, blob: bytes) -> Path: - target = context.outputs_dir / name + target = resolve_within_root(context.outputs_dir, name) target.parent.mkdir(parents=True, exist_ok=True) target.write_bytes(blob) return target @@ -186,7 +188,9 @@ def write_binary_output(self, context: RunContext, name: str, blob: bytes) -> Pa def write_signed_manifest( self, context: RunContext, name: str, envelope: Mapping[str, Any] ) -> Path: - target = context.signed_outputs_dir / f"{name}.manifest.json" + target = resolve_within_root( + context.signed_outputs_dir, f"{name}.manifest.json" + ) target.parent.mkdir(parents=True, exist_ok=True) target.write_text(self._json_dumps(envelope), encoding="utf-8") return target @@ -194,12 +198,13 @@ def write_signed_manifest( def append_transparency_index( self, context: RunContext, canonical: str, digest: str, kid: str | None ) -> Path: - context.transparency_index.parent.mkdir(parents=True, exist_ok=True) + target = resolve_within_root(context.outputs_dir, "transparency.index") + target.parent.mkdir(parents=True, exist_ok=True) timestamp = _dt.datetime.utcnow().isoformat() + "Z" line = f"TS={timestamp} FILE={canonical} SHA256={digest} KID={kid or 'unknown'}\n" - with context.transparency_index.open("a", encoding="utf-8") as handle: + with target.open("a", encoding="utf-8") as handle: handle.write(line) - return context.transparency_index + return target def list_runs(self, app_id: str | None) -> list[str]: normalised = self._normalise_app_id(app_id) diff --git a/fixops/__init__.py b/fixops/__init__.py index 64ece04d9..dd1257bd2 100644 --- a/fixops/__init__.py +++ b/fixops/__init__.py @@ -1,11 +1,24 @@ """Compatibility package exposing the public FixOps CLI entrypoints.""" -from core.cli import main as _main +from typing import TYPE_CHECKING -__all__ = ["_main"] +__all__ = ["main"] + +if TYPE_CHECKING: # pragma: no cover - import only for type checkers + from core.cli import main as _main + + +def __getattr__(name: str): # pragma: no cover - runtime compatibility shim + if name == "_main": + from core.cli import main as _main + + return _main + raise AttributeError(name) def main() -> int: """Invoke :func:`core.cli.main` for compatibility with ``python -m fixops``.""" + from core.cli import main as _main + return _main() diff --git a/fixops/utils/__init__.py b/fixops/utils/__init__.py new file mode 100644 index 000000000..b501a1b56 --- /dev/null +++ b/fixops/utils/__init__.py @@ -0,0 +1,3 @@ +"""Utility helpers for the :mod:`fixops` package.""" + +__all__ = ["paths"] diff --git a/fixops/utils/paths.py b/fixops/utils/paths.py new file mode 100644 index 000000000..a0f79a638 --- /dev/null +++ b/fixops/utils/paths.py @@ -0,0 +1,19 @@ +"""Filesystem helpers with guardrails for evidence storage.""" + +from __future__ import annotations + +from pathlib import Path + + +def resolve_within_root(root: Path, name: str) -> Path: + """Return *name* resolved beneath *root* or raise when escaping.""" + + resolved_root = root.resolve() + safe_name = Path(str(name)) + candidate = (resolved_root / safe_name).resolve() + if candidate != resolved_root and resolved_root not in candidate.parents: + raise ValueError("refusing to write outside evidence root") + return candidate + + +__all__ = ["resolve_within_root"] diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 000000000..b0cf52b89 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,56 @@ +[tool.black] +line-length = 88 +target-version = ["py311"] +extend-exclude = ''' +( + ^/(?:apps|backend|build|cli|core|data|domain|enterprise|evidence|fixops-enterprise|integrations|lib4sbom|marketplace|new_apps|pydantic_settings|risk|scripts|services|simulations|ssvc|telemetry|tests|WIP|vendor)/ + |^/artefacts/ + |^/fixtures/ + |^/reports/ + |^/test_frontend\.py$ + |^/sarif_om\.py$ + |^/real_components_test\.py$ + |^/backend_test\.py$ + |^/create_tables\.py$ + |^/create_minimal_tables\.py$ +) +''' + +[tool.isort] +profile = "black" +line_length = 88 +skip_glob = [ + "apps/*", + "backend/*", + "build/*", + "cli/*", + "core/*", + "data/*", + "domain/*", + "enterprise/*", + "evidence/*", + "fixops-enterprise/*", + "integrations/*", + "lib4sbom/*", + "marketplace/*", + "new_apps/*", + "pydantic_settings/*", + "risk/*", + "scripts/*", + "services/*", + "simulations/*", + "ssvc/*", + "telemetry/*", + "tests/*", + "WIP/*", + "vendor/*", + "artefacts/*", + "fixtures/*", + "reports/*", + "test_frontend.py", + "sarif_om.py", + "real_components_test.py", + "backend_test.py", + "create_tables.py", + "create_minimal_tables.py", +] diff --git a/tests/test_enhanced_api.py b/tests/test_enhanced_api.py index 55b5a7cd0..116e2937c 100644 --- a/tests/test_enhanced_api.py +++ b/tests/test_enhanced_api.py @@ -54,6 +54,22 @@ def test_enhanced_routes_require_api_key(enhanced_client: TestClient) -> None: assert response.status_code == 401 +def test_enhanced_routes_reject_invalid_token(enhanced_client: TestClient) -> None: + response = enhanced_client.get( + "/api/v1/enhanced/capabilities", + headers={"X-API-Key": "wrong-token"}, + ) + assert response.status_code == 403 + + +def test_enhanced_routes_accept_case_insensitive_header(enhanced_client: TestClient) -> None: + response = enhanced_client.get( + "/api/v1/enhanced/capabilities", + headers={"x-api-key": API_TOKEN}, + ) + assert response.status_code == 200 + + def test_analysis_returns_consensus_payload(enhanced_client: TestClient) -> None: body = { "service_name": "customer-api", diff --git a/tests/test_evidence_bundle.py b/tests/test_evidence_bundle.py index 285075473..82de9d1ca 100644 --- a/tests/test_evidence_bundle.py +++ b/tests/test_evidence_bundle.py @@ -2,9 +2,11 @@ from pathlib import Path from zipfile import ZipFile +import pytest import yaml from cli.fixops_ci import main as ci_main +from evidence.packager import _collect_files from services.evidence.packager import ( BundleInputs, EvidencePackager, @@ -12,7 +14,6 @@ load_policy, ) from services.evidence.store import EvidenceStore -from evidence.packager import _collect_files def _write_json(path: Path, payload: dict) -> Path: @@ -28,14 +29,25 @@ def test_create_bundle(tmp_path: Path) -> None: ) quality_json = _write_json( tmp_path / "analysis/sbom_quality_report.json", - {"metrics": {"coverage_percent": 95.0, "license_coverage_percent": 90.0}}, + { + "metrics": { + "coverage_percent": 95.0, + "license_coverage_percent": 90.0, + } + }, ) quality_html = tmp_path / "reports/sbom_quality_report.html" quality_html.parent.mkdir(parents=True, exist_ok=True) quality_html.write_text("quality", encoding="utf-8") risk_report = _write_json( tmp_path / "artifacts/risk.json", - {"summary": {"component_count": 2, "cve_count": 1, "max_risk_score": 60.0}}, + { + "summary": { + "component_count": 2, + "cve_count": 1, + "max_risk_score": 60.0, + } + }, ) provenance_dir = tmp_path / "artifacts/attestations" provenance_dir.mkdir(parents=True, exist_ok=True) @@ -49,10 +61,15 @@ def test_create_bundle(tmp_path: Path) -> None: policy_path.write_text( yaml.safe_dump( { - "risk": {"max_risk_score": {"warn_above": 80, "fail_above": 90}}, + "risk": { + "max_risk_score": {"warn_above": 80, "fail_above": 90} + }, "sbom_quality": { "coverage_percent": {"warn_below": 80, "fail_below": 60}, - "license_coverage_percent": {"warn_below": 80, "fail_below": 60}, + "license_coverage_percent": { + "warn_below": 80, + "fail_below": 60, + }, }, "repro": {"require_match": True}, "provenance": {"require_attestations": True}, @@ -92,29 +109,45 @@ def test_fixops_ci_evidence_bundle(tmp_path: Path) -> None: ) quality_json = _write_json( tmp_path / "analysis/sbom_quality_report.json", - {"metrics": {"coverage_percent": 85.0, "license_coverage_percent": 80.0}}, + { + "metrics": { + "coverage_percent": 85.0, + "license_coverage_percent": 80.0, + } + }, ) quality_html = tmp_path / "reports/sbom_quality_report.html" quality_html.parent.mkdir(parents=True, exist_ok=True) quality_html.write_text("quality", encoding="utf-8") risk_report = _write_json( tmp_path / "artifacts/risk.json", - {"summary": {"component_count": 2, "cve_count": 1, "max_risk_score": 65.0}}, + { + "summary": { + "component_count": 2, + "cve_count": 1, + "max_risk_score": 65.0, + } + }, ) provenance_dir = tmp_path / "artifacts/attestations" provenance_dir.mkdir(parents=True, exist_ok=True) (provenance_dir / "build.json").write_text("{}", encoding="utf-8") repro_dir = tmp_path / "artifacts/repro/attestations" - repro_attestation = _write_json(repro_dir / f"{tag}.json", {"match": True}) + _write_json(repro_dir / f"{tag}.json", {"match": True}) policy_path = tmp_path / "config/policy.yml" policy_path.parent.mkdir(parents=True, exist_ok=True) policy_path.write_text( yaml.safe_dump( { - "risk": {"max_risk_score": {"warn_above": 80, "fail_above": 90}}, + "risk": { + "max_risk_score": {"warn_above": 80, "fail_above": 90} + }, "sbom_quality": { "coverage_percent": {"warn_below": 70, "fail_below": 50}, - "license_coverage_percent": {"warn_below": 70, "fail_below": 50}, + "license_coverage_percent": { + "warn_below": 70, + "fail_below": 50, + }, }, "repro": {"require_match": True}, "provenance": {"require_attestations": True}, @@ -148,9 +181,10 @@ def test_fixops_ci_evidence_bundle(tmp_path: Path) -> None: ] ) assert exit_code == 0 - bundle_path = tmp_path / "evidence/bundles" / f"{tag}.zip" + tag_path = Path(tag.replace(":", "_")) + bundle_path = tmp_path / "evidence/bundles" / f"{tag_path}.zip" assert bundle_path.is_file() - manifest_path = tmp_path / "evidence/manifests" / f"{tag}.yaml" + manifest_path = tmp_path / "evidence/manifests" / f"{tag_path}.yaml" assert manifest_path.is_file() manifest = yaml.safe_load(manifest_path.read_text(encoding="utf-8")) assert manifest["evaluations"]["overall"] == "pass" @@ -202,3 +236,11 @@ def test_collect_files_handles_nested_directories(tmp_path: Path) -> None: files = _collect_files([extras, tmp_path / "missing"]) names = sorted(path.relative_to(extras).as_posix() for path in files) assert names == ["a.txt", "nested/b.txt"] + + +def test_load_policy_requires_mapping(tmp_path: Path) -> None: + policy_path = tmp_path / "invalid-policy.yml" + policy_path.write_text("- not-a-mapping", encoding="utf-8") + + with pytest.raises(ValueError): + load_policy(policy_path) diff --git a/tests/test_normalizers.py b/tests/test_normalizers.py index 6703f9eb4..a187bff33 100644 --- a/tests/test_normalizers.py +++ b/tests/test_normalizers.py @@ -107,3 +107,32 @@ def test_load_sarif_converts_snyk_payload_without_converter(): assert finding.level == "error" assert finding.file in {"express@4.18.0", "customer-api@1.4.2"} assert "dependency_path" in finding.raw.get("properties", {}) + + +def test_load_sarif_rejects_unknown_severity(): + normalizer = InputNormalizer() + sarif_document = _build_sarif_document() + sarif_document["runs"][0]["results"][0]["level"] = "critical" + + with pytest.raises(ValueError): + normalizer.load_sarif(json.dumps(sarif_document)) + + +def test_load_sarif_rejects_empty_rule_id(): + normalizer = InputNormalizer() + sarif_document = _build_sarif_document() + sarif_document["runs"][0]["results"][0]["ruleId"] = "" + + with pytest.raises(ValueError): + normalizer.load_sarif(json.dumps(sarif_document)) + + +def test_load_sarif_guard_oversized_payload(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr("apps.api.normalizers.MAX_DOCUMENT_BYTES", 16) + normalizer = InputNormalizer() + oversized = json.dumps({"runs": ["a" * 32]}) + + with pytest.raises(ValueError) as exc: + normalizer.load_sarif(oversized) + + assert "maximum allowed size" in str(exc.value) diff --git a/tests/test_run_registry.py b/tests/test_run_registry.py index 03506e024..0087628bb 100644 --- a/tests/test_run_registry.py +++ b/tests/test_run_registry.py @@ -40,6 +40,16 @@ def test_save_input_and_write_output(tmp_path: Path, monkeypatch: pytest.MonkeyP ctx.write_output("unexpected.json", {}) +def test_save_input_rejects_parent_escape(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + ctx = _prepare(monkeypatch, tmp_path) + with pytest.raises(ValueError): + ctx.save_input("..", b"nope") + + safe_path = ctx.save_input("../../escape.txt", b"safe") + assert safe_path.parent == ctx.inputs_dir + assert safe_path.name == "escape.txt" + + def test_reopen_run(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: ctx = _prepare(monkeypatch, tmp_path) reopened = run_registry.reopen_run(ctx.app_id, ctx.run_id) diff --git a/tests/test_stage_runner_posture.py b/tests/test_stage_runner_posture.py new file mode 100644 index 000000000..a747443f3 --- /dev/null +++ b/tests/test_stage_runner_posture.py @@ -0,0 +1,52 @@ +from types import SimpleNamespace + +import pytest + +from core.stage_runner import StageRunner + + +@pytest.fixture() +def runner() -> StageRunner: + return StageRunner(SimpleNamespace(), SimpleNamespace(), SimpleNamespace()) + + +def test_analyse_posture_detects_open_security_groups(runner: StageRunner) -> None: + payload = { + "resources": [ + { + "type": "aws_security_group", + "name": "sg-web", + "changes": { + "after": { + "ingress": [ + {"cidr_blocks": ["0.0.0.0/0"]}, + {"ipv6_cidr_blocks": ["::/0"]}, + ] + } + }, + }, + { + "type": "aws_security_group_rule", + "name": "sg-rule", + "changes": { + "after": { + "cidr_blocks": ["10.0.0.0/16"], + "ipv6_cidr_blocks": ["::/0"], + } + }, + }, + { + "type": "aws_security_group_rule", + "name": "sg-rule-after-empty", + "cidr_blocks": ["0.0.0.0/0"], + }, + ] + } + + posture = runner._analyse_posture(payload) + + assert sorted(posture["open_security_groups"]) == [ + "sg-rule", + "sg-rule-after-empty", + "sg-web", + ]