diff --git a/config/fixops.overlay.yml b/config/fixops.overlay.yml index 36e0ecf92..f4faedb02 100644 --- a/config/fixops.overlay.yml +++ b/config/fixops.overlay.yml @@ -42,6 +42,10 @@ "enforce_ticket_sync": false, "capture_feedback": false }, + "signing": { + "provider": "env", + "rotation_sla_days": 45 + }, "modules": { "guardrails": {"enabled": true}, "context_engine": {"enabled": true}, @@ -282,6 +286,17 @@ } } }, + "policy_engine": { + "opa": { + "enabled": false, + "url": "https://opa.fixops.local:8181", + "policy_package": "fixops", + "health_path": "/health?bundles", + "bundle_status_path": "/v1/bundles/fixops/status", + "auth_token_env": "FIXOPS_OPA_TOKEN", + "request_timeout_seconds": 5 + } + }, "analytics": { "baseline": { "findings_per_interval": 120, diff --git a/docs/decisionfactory_alignment.md b/docs/decisionfactory_alignment.md index 7428c7d4f..db3629090 100644 --- a/docs/decisionfactory_alignment.md +++ b/docs/decisionfactory_alignment.md @@ -1,78 +1,30 @@ # DecisionFactory.ai Alignment Status -This document tracks the implementation status of the DecisionFactory.ai requirements against the current FixOps blended enterprise codebase. Each section references the authoritative source files that were reviewed to determine coverage. +This document tracks the implementation status of the DecisionFactory.ai requirements across the FixOps codebase. To reduce cognitive load, the alignment work is now split into three parts that can be reviewed independently: -## 1. Evidence must be RSA-SHA256 signed (non-repudiation) -- **Status:** ✅ Implemented -- **Notes:** `EvidenceLake.store_evidence` now applies `rsa_sign` from `src/utils/crypto.py` and persists the resulting Base64 signature, algorithm metadata, and key fingerprint alongside the existing SHA-256 checksum. Retrieval verifies both the checksum and signature before returning evidence records. - - References: `src/services/evidence_lake.py`, `src/utils/crypto.py` +- **Part 1 – Implemented capabilities:** Everything DecisionFactory.ai already gets out-of-the-box. +- **Part 2 – Partially implemented capabilities:** Workstreams that are in motion but still have visible gaps. +- **Part 3 – Missing capabilities:** Features that have not yet been started. -## 2. OPA/Rego policy-as-code runtime (demo+enterprise) -- **Status:** ❌ Missing -- **Notes:** The policy engine still uses a hand-rolled evaluator (`_evaluate_rego_rule`) and never instantiates the production OPA adapter in `src/services/real_opa_engine.py`. No policy input payload is sent to an OPA instance, and there are no automated tests covering Rego bundles. - - References: `src/services/policy_engine.py`, `src/services/real_opa_engine.py` - -## 3. Explainability with SHAP/LIME alongside LLM narratives -- **Status:** ❌ Missing -- **Notes:** Processing relies on LLM-driven explanations without any SHAP/LIME feature attribution artefacts. There is no `xai_shap.py` module and no `/processing/explain` endpoint that emits attribution vectors. - - References: `src/services/processing_layer.py`, `src/api/v1/processing_layer.py` - -## 4. RL/MDP learning loop for actions (defer/patch/accept) -- **Status:** ❌ Missing -- **Notes:** `enhanced_decision_engine` lacks any reinforcement-learning policy hooks. There is no `rl_policy.py`, experience logging, or `FEATURE_RL` toggle. - - References: `src/services/enhanced_decision_engine.py` - -## 5. VEX ingestion (SPDX/CycloneDX) to suppress `not_affected` -- **Status:** ❌ Missing -- **Notes:** SBOM parsing ignores VEX data and there is no `vex_parser`. Findings with vendor `NOT_AFFECTED` assertions remain untouched during triage. - - References: `src/services/sbom_parser.py` - -## 6. EPSS/KEV should influence SSVC/Markov transitions -- **Status:** ⚠️ Partial -- **Notes:** Feed ingestion captures counts, but the processing layer does not adjust SSVC priors or Markov transition probabilities based on EPSS percentiles or KEV membership. - - References: `src/services/feeds_service.py`, `src/services/processing_layer.py` +--- -## 7. Policy gate must BLOCK any KEV finding unless waived -- **Status:** ⚠️ Partial -- **Notes:** `/policy/evaluate` blocks when KEV findings coincide with high/critical severity, yet it lacks waiver handling and does not enforce a hard block for all KEV detections as required. - - References: `src/api/v1/policy.py` +## Part 1 – Implemented capabilities ✅ -## 8. Evidence export: signed JSON + printable PDF bundle -- **Status:** ❌ Missing -- **Notes:** There is no exporter that assembles a signed JSON + PDF package or a `/evidence/{id}/download` route. Evidence storage ends with database persistence only. - - References: `src/services/evidence_lake.py` +See [`Part 1 – Implemented capabilities`](decisionfactory_alignment/part-1-implemented.md) for the full breakdown of production-ready features. -## 9. Key management: KMS/HSM integration and rotation policy -- **Status:** ⚠️ Partial -- **Notes:** `EnvKeyProvider` implements RSA keys and stubs exist for AWS/Azure, but rotation routines, provider configuration flags, and operational documentation remain incomplete relative to the design brief. - - References: `src/utils/crypto.py`, `src/config/settings.py`, `docs/SECURITY.md` +--- -## 10. Multi-tenant RBAC (owner, approver, auditor, integrator) -- **Status:** ❌ Missing -- **Notes:** User models do not reference tenants, nor are role checks enforced on policy/evidence/feed APIs as described. - - References: `src/models/user.py`, `src/api/v1/auth.py` +## Part 2 – Partially implemented capabilities ⚠️ -## 11. Observability: Prometheus metrics for hot path -- **Status:** ⚠️ Partial -- **Notes:** Health endpoints exist, yet there is no Prometheus exporter capturing the enumerated latency/counter metrics or a bundled Grafana dashboard. - - References: `src/api/v1/monitoring.py`, `src/services/metrics.py` +See [`Part 2 – Partially implemented capabilities`](decisionfactory_alignment/part-2-partial.md) for the detailed list of in-flight workstreams and the remaining gaps to close. -## 12. CLI demo/enterprise overlays -- **Status:** ⚠️ Partial -- **Notes:** CLI overlays toggle demo vs enterprise modes, but flags for signing provider, RL, SHAP, and OPA URL are absent. - - References: `fixops/cli.py`, `config/*.overlay.yml` +--- -## 13. CI/CD adapters & Postman collections kept in sync -- **Status:** ⚠️ Partial -- **Notes:** Collections exist but do not include KEV hard-block, SHAP evidence, or signed download test cases. Negative signature validation scenarios are missing. - - References: `src/api/v1/cicd.py`, `postman/FixOps-CICD-Tests.postman_collection.json` +## Part 3 – Missing capabilities ❌ -## 14. Kubernetes manifests reflect new env vars and readiness -- **Status:** ⚠️ Partial -- **Notes:** Manifests do not surface `SIGNING_PROVIDER`, `KEY_ID`, `OPA_URL`, or `FEATURE_RL` environment variables. Probe configuration remains unchanged. - - References: `kubernetes/*.yaml` +See [`Part 3 – Missing capabilities`](decisionfactory_alignment/part-3-missing.md) for the six DecisionFactory.ai requirements that still need to be built from scratch. --- ### Summary -The RSA signing pathway has been implemented, but the remaining DecisionFactory.ai alignment items—OPA/Rego integration, SHAP explainability, RL policy learning, VEX ingestion, enriched policy gating, and operational overlays—are still outstanding. +RSA signing is fully aligned today. The remaining work concentrates on production OPA/Rego enforcement, net-new explainability and RL automation, VEX ingestion, richer evidence exports, and operational surface area (policy gating, EPSS/KEV-aware scoring hardening, key management backends, observability, CLI/Kubernetes configurability, and CI/CD test coverage). diff --git a/docs/decisionfactory_alignment/part-1-implemented.md b/docs/decisionfactory_alignment/part-1-implemented.md new file mode 100644 index 000000000..e13a22394 --- /dev/null +++ b/docs/decisionfactory_alignment/part-1-implemented.md @@ -0,0 +1,9 @@ +# Part 1 – Implemented capabilities ✅ + +> These requirements are fulfilled in production builds today. Each entry highlights the runtime behaviour and where to find the supporting code. + +### 1. Evidence must be RSA-SHA256 signed (non-repudiation) +- **Status:** ✅ Implemented +- **Notes:** Evidence records are serialized in a canonical order, signed with RSA-SHA256, and stored with the Base64 signature, signing algorithm, and public-key fingerprint. Retrieval verifies both the hash and the signature before returning the record to callers. + - References: `fixops-blended-enterprise/src/services/evidence_lake.py`, `fixops-blended-enterprise/src/utils/crypto.py` + diff --git a/docs/decisionfactory_alignment/part-2-partial.md b/docs/decisionfactory_alignment/part-2-partial.md new file mode 100644 index 000000000..3352c3132 --- /dev/null +++ b/docs/decisionfactory_alignment/part-2-partial.md @@ -0,0 +1,66 @@ +# Part 2 – Partially implemented capabilities ⚠️ + +> These are the "in-flight" items: some coverage exists, but the DecisionFactory.ai specification still calls for additional functionality. + +### 6. EPSS/KEV should influence SSVC/Markov transitions +- **Status:** ⚠️ Partial +- **Current coverage:** Feed refresh jobs persist EPSS/KEV snapshots, and the processing layer adjusts Markov transitions and exploitation priors when the data is present, so the probabilistic core is wired for real signals. +- **Missing work:** + - Guarantee that EPSS/KEV inputs reach every decision path (REST + batch) with regression tests covering the hand-off. + - Add validation that proves fallback heuristics engage when scientific libraries (pgmpy, pomegranate, mchmm) are unavailable. + - Publish operator runbooks documenting how to enable and monitor EPSS/KEV ingestion in production. +- **References:** `fixops-blended-enterprise/src/services/feeds_service.py`, `fixops-blended-enterprise/src/services/processing_layer.py`, `fixops-blended-enterprise/src/services/decision_engine.py` + +### 7. Policy gate must BLOCK any KEV finding unless waived +- **Status:** ⚠️ Partial +- **Current coverage:** `/policy/evaluate` escalates KEV-tagged findings to hard blocks when they also carry high or critical severities, so the enforcement logic is wired into the runtime path. +- **Missing work:** + - Implement a waiver object (API + persistence) so platform security can temporarily suppress a KEV block with auditable approval metadata. + - Promote KEV detections to hard blocks regardless of severity unless an approved waiver exists. + - Extend regression suites to prove the deny-by-default behaviour and successful waiver usage. +- **References:** `fixops-blended-enterprise/src/api/v1/policy.py` + +### 9. Key management: KMS/HSM integration and rotation policy +- **Status:** ⚠️ Partial +- **Current coverage:** The environment-backed `EnvKeyProvider` ships with RSA signing, on-demand rotation, and operator documentation that spells out how to rotate local keys. +- **Missing work:** + - Flesh out the AWS KMS and Azure Key Vault providers so they can load, rotate, and attest to keys managed remotely. + - Surface configuration flags in settings overlays/CLI to allow tenant-level provider selection. + - Automate rotation health checks and alerts to satisfy the DecisionFactory.ai rotation SLAs. +- **References:** `fixops-blended-enterprise/src/utils/crypto.py`, `docs/SECURITY.md` + +### 11. Observability: Prometheus metrics for hot path +- **Status:** ⚠️ Partial +- **Current coverage:** A `/metrics` endpoint exposes counters for decision verdicts, enabling Prometheus scrapes of core automation throughput. +- **Missing work:** + - Instrument HTTP request latency and error ratios for decision, evidence, and policy endpoints. + - Publish histograms and gauges that map directly to the DecisionFactory.ai hot-path metrics checklist. + - Provide a Grafana dashboard (JSON + screenshots) so adopters can deploy a ready-made view. +- **References:** `fixops-blended-enterprise/src/main.py`, `fixops-blended-enterprise/src/services/metrics.py` + +### 12. CLI demo/enterprise overlays +- **Status:** ⚠️ Partial +- **Current coverage:** The CLI profiles and overlay YAML let operators toggle demo vs. enterprise modules and core automation settings. +- **Missing work:** + - Introduce switches/fields for selecting the signing provider, enabling RL/SHAP experiments, and pointing to external OPA endpoints. + - Validate overlay schema updates with automated tests to ensure flags round-trip into the runtime configuration. + - Document overlay examples for each DecisionFactory.ai deployment persona. +- **References:** `fixops/fixops/cli.py`, `config/fixops.overlay.yml` + +### 13. CI/CD adapters & Postman collections kept in sync +- **Status:** ⚠️ Partial +- **Current coverage:** Postman suites already cover health checks, baseline decision outcomes, and happy-path CI/CD interactions. +- **Missing work:** + - Add KEV hard-block scenarios, signed evidence retrieval flows, and negative signature verification tests. + - Keep the CI/CD adapters and Postman collections versioned together with automation that fails when they drift. + - Capture regression data for RL/SHAP toggles so new explainability features remain exercised. +- **References:** `fixops-blended-enterprise/postman/POSTMAN_COMPLETION.md` + +### 14. Kubernetes manifests reflect new env vars and readiness +- **Status:** ⚠️ Partial +- **Current coverage:** Deployments ship readiness probes and surface the legacy secret/env var set. +- **Missing work:** + - Add ConfigMap entries and deployment wiring for `SIGNING_PROVIDER`, `KEY_ID`, `OPA_SERVER_URL`, and the proposed RL/SHAP feature toggles. + - Ensure the manifests expose liveness/readiness gates for the new metrics and policy services. + - Provide Helm/Kustomize overlays (or manifest snippets) that map to DecisionFactory.ai’s reference environments. +- **References:** `fixops-blended-enterprise/kubernetes/backend-deployment.yaml`, `fixops-blended-enterprise/kubernetes/configmap.yaml` diff --git a/docs/decisionfactory_alignment/part-3-missing.md b/docs/decisionfactory_alignment/part-3-missing.md new file mode 100644 index 000000000..0469f8d47 --- /dev/null +++ b/docs/decisionfactory_alignment/part-3-missing.md @@ -0,0 +1,62 @@ +# DecisionFactory Alignment — Part 3: Missing Capabilities ❌ + +The following DecisionFactory.ai requirements have not yet been started in FixOps. Six distinct capability areas remain open, each requiring net-new implementation work. + +## 2. OPA/Rego policy-as-code runtime (demo + enterprise) +- **Status:** ❌ Missing +- **Why it matters:** DecisionFactory.ai assumes every deployment enforces policies via production OPA/Rego bundles, so skipping the real adapter leaves policy evaluations non-compliant. +- **What to build:** + - Instantiate the `RealOPAEngine` client in non-demo modes and ship configuration for pointing at external OPA endpoints. + - Implement policy input marshalling plus health checks that prove Rego bundles load and evaluate requests end-to-end. + - Add automated tests and documentation covering policy bundle deployment and failure handling. + - Evidence: `fixops-blended-enterprise/src/services/policy_engine.py` still executes inline helpers while `fixops-blended-enterprise/src/services/real_opa_engine.py` remains unused in non-demo flows. + +## 3. Explainability with SHAP/LIME alongside LLM narratives +- **Status:** ❌ Missing +- **Why it matters:** DecisionFactory.ai expects both deterministic narratives and data-driven feature attribution so security reviewers can validate each recommendation. +- **What to build:** + - Introduce a SHAP/LIME service that can run against the decision engine’s feature vectors. + - Provide storage and API responses that return attribution artefacts with each decision/evidence record. + - Update documentation and demos so explainability toggles are visible to operators. + - Evidence: repository search returns no SHAP/LIME modules. + +## 4. RL/MDP learning loop for actions (defer/patch/accept) +- **Status:** ❌ Missing +- **Why it matters:** DecisionFactory.ai highlights a reinforcement-learning control loop that continuously tunes defer/patch/accept policies based on outcomes. +- **What to build:** + - Capture experience tuples from deployment outcomes and store them for training. + - Implement policy evaluation + improvement routines (e.g., Q-learning or policy gradients) and expose a feature toggle for rollout. + - Instrument observability hooks so RL performance can be reviewed. + - Evidence: repository search returns no reinforcement learning hooks. + +## 5. VEX ingestion (SPDX/CycloneDX) to suppress `not_affected` +- **Status:** ❌ Missing +- **Why it matters:** Without VEX ingestion, customers cannot rely on supplier attestations to automatically downgrade unaffected findings. +- **What to build:** + - Parse SPDX/CycloneDX VEX documents and merge supplier assertions into the evidence store. + - Wire suppression logic into decision evaluation so `not_affected` findings skip remediation queues. + - Add regression tests and documentation covering VEX ingestion workflows. + - Evidence: repository search shows only documentation mentions of VEX without runtime ingestion. + +## 8. Evidence export: signed JSON + printable PDF bundle +- **Status:** ❌ Missing +- **Why it matters:** Auditors demand tamper-evident artefacts plus a human-readable packet when exporting DecisionFactory evidence. +- **What to build:** + - Assemble a bundle generator that signs JSON payloads, renders a PDF summary, and packages them for download. + - Publish a `/evidence/{id}/download` endpoint that enforces RBAC and streams the signed bundle. + - Verify signatures during export tests and document the operational flow. + - References: `fixops/evidence.py`, `fixops-blended-enterprise/src/api/v1` + +## 10. Multi-tenant RBAC (owner, approver, auditor, integrator) +- **Status:** ❌ Missing +- **Why it matters:** DecisionFactory.ai scopes access by tenant and persona; without that mapping, shared environments lack the minimum access guarantees. +- **What to build:** + - Extend the user/tenant data model with the owner/approver/auditor/integrator roles. + - Enforce role checks across decision, evidence, policy, and configuration APIs. + - Provide migration scripts and admin tooling so operators can assign roles safely. + - References: `fixops-blended-enterprise/src/models/user.py` + +--- + +### Snapshot +Six capability tracks remain missing. Closing them requires production OPA/Rego enforcement, net-new explainability tooling, a reinforcement-learning decision loop, VEX suppression support, signed evidence exports, and multi-tenant RBAC aligned with the DecisionFactory.ai role taxonomy. diff --git a/fastapi/__init__.py b/fastapi/__init__.py index 488066483..656c63321 100644 --- a/fastapi/__init__.py +++ b/fastapi/__init__.py @@ -22,6 +22,10 @@ def Depends(dependency: Callable[..., Any] | None = None) -> Callable[..., Any] return dependency +def Query(default: Any = None, description: str | None = None) -> Any: + return default + + def File(default: Any) -> Any: return default @@ -109,6 +113,32 @@ def invoke(self, params: Mapping[str, str], body: Optional[Dict[str, Any]]) -> A return self.endpoint(**kwargs) +class APIRouter: + def __init__(self, prefix: str = "", tags: Optional[List[str]] | None = None) -> None: + self.prefix = prefix or "" + self.tags = tags or [] + self._routes: List[_Route] = [] + + def _register(self, method: str, path: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]: + full_path = f"{self.prefix}{path}" + + def decorator(func: Callable[..., Any]) -> Callable[..., Any]: + self._routes.append(_Route(method, full_path, func)) + return func + + return decorator + + def post(self, path: str, **_: Any) -> Callable[[Callable[..., Any]], Callable[..., Any]]: + return self._register("POST", path) + + def get(self, path: str, **_: Any) -> Callable[[Callable[..., Any]], Callable[..., Any]]: + return self._register("GET", path) + + def add_api_route(self, path: str, endpoint: Callable[..., Any], methods: Optional[List[str]] = None, **_: Any) -> None: + for method in methods or ["GET"]: + self._routes.append(_Route(method, f"{self.prefix}{path}", endpoint)) + + class FastAPI: def __init__(self, title: str | None = None, version: str | None = None) -> None: self.title = title @@ -141,14 +171,24 @@ def _handle(self, method: str, path: str, body: Optional[Dict[str, Any]]) -> Any raise HTTPException(status_code=404, detail="Not Found") +class _StatusCodes: + HTTP_201_CREATED = 201 + + +status = _StatusCodes() + + from .testclient import TestClient # noqa: E402 (import after FastAPI definition) __all__ = [ "FastAPI", + "APIRouter", "HTTPException", "Depends", + "Query", "File", "UploadFile", "RequestValidationError", + "status", "TestClient", ] diff --git a/fixops-blended-enterprise/src/api/v1/policy.py b/fixops-blended-enterprise/src/api/v1/policy.py index f6239e8ee..6a4d26906 100644 --- a/fixops-blended-enterprise/src/api/v1/policy.py +++ b/fixops-blended-enterprise/src/api/v1/policy.py @@ -1,46 +1,577 @@ -""" -Policy evaluation endpoints for CI/CD gates (SSVC-aware) -""" -from typing import Dict, Any, List -from fastapi import APIRouter, HTTPException -from pydantic import BaseModel +"""Policy evaluation endpoints for CI/CD gates (SSVC-aware).""" + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any, Dict, Iterable, List, Optional, Sequence, Set, Tuple + import structlog +from fastapi import APIRouter, Depends, HTTPException, Query, status +from pydantic import BaseModel, ConfigDict, Field, computed_field, field_validator +from sqlalchemy import or_, select +from sqlalchemy.ext.asyncio import AsyncSession from src.config.settings import get_settings +from src.db.session import get_db +from src.models.waivers import get_kev_waiver_model +from src.services.real_opa_engine import get_opa_engine logger = structlog.get_logger() router = APIRouter(prefix="/policy", tags=["policy-gates"]) settings = get_settings() +KevWaiverModel = get_kev_waiver_model() + +if KevWaiverModel is None: # pragma: no cover - misconfiguration safeguard + raise RuntimeError("KEV waiver model is unavailable for the configured database") + + +def _normalize_datetime(value: datetime) -> datetime: + """Return a timezone-naive UTC datetime for persistence and comparison.""" + + if value.tzinfo is not None: + return value.astimezone(timezone.utc).replace(tzinfo=None) + return value + class GateRequest(BaseModel): + """Payload accepted by the policy gate evaluation endpoint.""" + decision: str # ALLOW/BLOCK/DEFER confidence: float - signals: Dict[str, Any] = {} - findings: List[Dict[str, Any]] = [] + signals: Dict[str, Any] = Field(default_factory=dict) + findings: List[Dict[str, Any]] = Field(default_factory=list) + + model_config = ConfigDict(extra="allow") + class GateResponse(BaseModel): + """Structured response for policy evaluation requests.""" + allow: bool reason: str required_actions: List[str] + +class WaiverCreate(BaseModel): + """Schema for creating or updating a KEV waiver.""" + + model_config = ConfigDict(str_strip_whitespace=True) + + cve_id: str = Field(..., description="CVE identifier for the KEV finding", min_length=5) + service_name: Optional[str] = Field( + default=None, + description="Optional service scope; omit for platform-wide waivers", + max_length=255, + ) + justification: str = Field(..., description="Business justification for the waiver", min_length=10) + approved_by: str = Field(..., description="Approver recorded for audit", max_length=255) + expires_at: datetime = Field(..., description="UTC expiration timestamp") + change_ticket: Optional[str] = Field( + default=None, + description="Change, risk, or exception ticket tracking the waiver", + max_length=255, + ) + finding_id: Optional[str] = Field( + default=None, + description="Specific finding identifier when the waiver is scoped narrowly", + max_length=36, + ) + requested_by: Optional[str] = Field( + default=None, + description="Security user requesting the waiver for audit attribution", + max_length=255, + ) + + @field_validator("cve_id") + @classmethod + def _normalize_cve(cls, value: str) -> str: + normalized = value.upper().strip() + if not normalized.startswith("CVE-"): + raise ValueError("cve_id must be a valid CVE identifier (e.g. CVE-2024-12345)") + return normalized + + @field_validator("expires_at") + @classmethod + def _validate_expiry(cls, value: datetime) -> datetime: + normalized = _normalize_datetime(value) + if normalized <= _normalize_datetime(datetime.now(timezone.utc)): + raise ValueError("expires_at must be in the future") + return normalized + + +class WaiverResponse(BaseModel): + """Serialized KEV waiver returned to clients.""" + + model_config = ConfigDict(from_attributes=True) + + id: str + cve_id: str + service_name: Optional[str] + finding_id: Optional[str] + justification: str + approved_by: str + approved_at: datetime + expires_at: datetime + change_ticket: Optional[str] + is_active: bool + created_by: Optional[str] + created_at: datetime + modified_by: Optional[str] + updated_at: datetime + + @computed_field(return_type=str) + def status(self) -> str: + """Return the current lifecycle state for observability and UI badges.""" + + if not self.is_active: + return "revoked" + + expires = _normalize_datetime(self.expires_at) + now = _normalize_datetime(datetime.now(timezone.utc)) + return "active" if expires >= now else "expired" + + +def _extract_service_name(signals: Dict[str, Any]) -> Optional[str]: + """Resolve a service identifier from heterogeneous payloads.""" + + candidate_keys = ("service_name", "service", "serviceId", "service_id", "application") + for key in candidate_keys: + value = signals.get(key) + if isinstance(value, str) and value.strip(): + return value.strip() + return None + + +def _coerce_iterable(value: Any) -> Iterable[Any]: + if value is None: + return [] + if isinstance(value, (list, tuple, set)): + return value + return [value] + + +def _extract_kev_cves(signals: Dict[str, Any], findings: Sequence[Dict[str, Any]]) -> Set[str]: + """Collect CVE identifiers for KEV findings from signals and finding payloads.""" + + kev_ids: Set[str] = set() + + for key in ("kev_cves", "kev_ids", "kev_findings"): + for entry in _coerce_iterable(signals.get(key)): + if isinstance(entry, str): + if entry.strip(): + kev_ids.add(entry.strip().upper()) + elif isinstance(entry, dict): + candidate = entry.get("cve") or entry.get("cve_id") or entry.get("id") + if isinstance(candidate, str) and candidate.strip(): + kev_ids.add(candidate.strip().upper()) + + for finding in findings or []: + if not isinstance(finding, dict): + continue + cve = finding.get("cve_id") or finding.get("cve") or finding.get("kev_reference") + is_kev = bool(finding.get("kev") or finding.get("is_kev") or finding.get("kev_reference")) + if cve and isinstance(cve, str): + cve_upper = cve.strip().upper() + if is_kev or cve_upper in kev_ids: + kev_ids.add(cve_upper) + + return {cve for cve in kev_ids if cve.startswith("CVE-")} + + +def _extract_environment(signals: Dict[str, Any]) -> Optional[str]: + """Resolve an environment label from the provided signals.""" + + for key in ("environment", "env", "deployment_environment", "target_env"): + value = signals.get(key) + if isinstance(value, str) and value.strip(): + return value.strip() + return None + + +def _as_bool(value: Any) -> bool: + if isinstance(value, bool): + return value + if isinstance(value, (int, float)): + return bool(value) + if isinstance(value, str): + lowered = value.strip().lower() + if lowered in {"true", "1", "yes", "y", "enabled"}: + return True + if lowered in {"false", "0", "no", "n", "disabled"}: + return False + return False + + +def _as_float(value: Any) -> Optional[float]: + if value is None: + return None + try: + return float(value) + except (TypeError, ValueError): + return None + + +def _normalize_severity(value: Any) -> Optional[str]: + if value is None: + return None + normalized = str(value).strip().upper() + return normalized or None + + +def _collect_vulnerabilities(findings: Sequence[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Normalise finding payloads so remote OPA policies receive consistent input.""" + + entries: List[Dict[str, Any]] = [] + for finding in findings or []: + if not isinstance(finding, dict): + continue + + fix_available = finding.get("fix_available") + if fix_available is None: + fix_available = finding.get("fixAvailable") + + entry = { + "id": finding.get("id") + or finding.get("finding_id") + or finding.get("uuid"), + "cve_id": ( + finding.get("cve_id") + or finding.get("cve") + or finding.get("cveId") + or finding.get("kev_id") + ), + "kev": _as_bool(finding.get("kev") or finding.get("is_kev")), + "severity": _normalize_severity( + finding.get("severity") + or finding.get("severity_label") + or finding.get("severity_level"), + ), + "fix_available": _as_bool(fix_available), + "cvss_score": _as_float( + finding.get("cvss_score") + or finding.get("cvss") + or finding.get("cvss_v3"), + ), + "epss": _as_float(finding.get("epss") or finding.get("epss_score")), + "title": finding.get("title") or finding.get("name"), + } + entries.append(entry) + + return entries + + +def _build_sbom_payload(signals: Dict[str, Any]) -> Dict[str, Any]: + sbom_data = signals.get("sbom") if isinstance(signals.get("sbom"), dict) else None + components = signals.get("sbom_components") + payload: Dict[str, Any] = { + "sbom_present": _as_bool(signals.get("sbom_present")) or bool(sbom_data), + "sbom_valid": _as_bool(signals.get("sbom_valid")), + } + if sbom_data is not None: + payload["sbom"] = sbom_data + if isinstance(components, list): + payload["components"] = components + if signals.get("sbom_required") is not None: + payload["sbom_required"] = _as_bool(signals.get("sbom_required")) + return payload + + +async def _evaluate_remote_policies( + request: GateRequest, service_name: Optional[str] +) -> List[Tuple[str, Dict[str, Any]]]: + """Evaluate remote OPA policies when running in enterprise mode.""" + + if settings.DEMO_MODE or not getattr(settings, "OPA_SERVER_URL", None): + return [] + + vulnerabilities = _collect_vulnerabilities(request.findings) + sbom_payload = _build_sbom_payload(request.signals) + environment = _extract_environment(request.signals) + + policy_inputs: List[Tuple[str, Dict[str, Any]]] = [] + + if vulnerabilities: + vuln_payload: Dict[str, Any] = {"vulnerabilities": vulnerabilities} + if service_name: + vuln_payload["service_name"] = service_name + if environment: + vuln_payload["environment"] = environment + vuln_payload["kev_findings"] = [entry for entry in vulnerabilities if entry.get("kev")] + policy_inputs.append(("vulnerability", vuln_payload)) + + if sbom_payload: + if service_name: + sbom_payload.setdefault("service_name", service_name) + if environment: + sbom_payload.setdefault("environment", environment) + policy_inputs.append(("sbom", sbom_payload)) + + if not policy_inputs: + return [] + + try: + engine = await get_opa_engine() + if not await engine.health_check(): + logger.warning("OPA engine health check failed; skipping remote enforcement") + return [] + + decisions: List[Tuple[str, Dict[str, Any]]] = [] + for policy_name, payload in policy_inputs: + try: + decision = await engine.evaluate_policy(policy_name, payload) + except Exception as exc: # pragma: no cover - network/transient failure guard + logger.warning( + "OPA policy evaluation failed", policy=policy_name, error=str(exc) + ) + decisions.append( + ( + policy_name, + { + "decision": "error", + "rationale": f"OPA evaluation error: {exc}", + "error": True, + }, + ) + ) + else: + decisions.append((policy_name, decision)) + return decisions + except Exception as exc: # pragma: no cover - defensive guardrail + logger.warning("OPA enforcement aborted", error=str(exc)) + return [] + + +def _map_opa_results(results: List[Tuple[str, Dict[str, Any]]]) -> Optional[GateResponse]: + """Translate OPA policy decisions into gate responses.""" + + for policy_name, decision in results: + outcome = str(decision.get("decision", "")).lower() + rationale = decision.get("rationale") or decision.get("details") or "" + + if outcome == "allow": + continue + + if outcome == "block": + reason = ( + f"OPA policy '{policy_name}' blocked the request: {rationale}".strip() + or f"OPA policy '{policy_name}' blocked the request" + ) + return GateResponse( + allow=False, + reason=reason, + required_actions=[ + "Review OPA policy findings", + "Apply required remediations", + "Re-run policy evaluation", + ], + ) + + if outcome in {"defer", "error"} or decision.get("error"): + reason = ( + f"OPA policy '{policy_name}' requires manual review: {rationale}".strip() + or f"OPA policy '{policy_name}' requires manual review" + ) + return GateResponse( + allow=False, + reason=reason, + required_actions=[ + "Investigate OPA policy status", + "Resolve bundle discrepancies", + "Re-run policy evaluation", + ], + ) + + return None + + +async def _get_active_waivers( + db: AsyncSession, + cve_ids: Set[str], + service_name: Optional[str], +) -> Dict[str, List[Any]]: + """Fetch active waivers matching the provided CVE identifiers.""" + + if not cve_ids: + return {} + + now = _normalize_datetime(datetime.now(timezone.utc)) + + stmt = ( + select(KevWaiverModel) + .where( + KevWaiverModel.cve_id.in_(list(cve_ids)), + KevWaiverModel.is_active.is_(True), + KevWaiverModel.expires_at >= now, + ) + ) + + if service_name: + stmt = stmt.where( + or_( + KevWaiverModel.service_name.is_(None), + KevWaiverModel.service_name == service_name, + ) + ) + + result = await db.execute(stmt) + rows = result.scalars().all() + + waivers: Dict[str, List[Any]] = {} + for row in rows: + waivers.setdefault(row.cve_id, []).append(row) + return waivers + + +def _kev_block_response(unwaived: Set[str]) -> GateResponse: + sorted_ids = ", ".join(sorted(unwaived)) if unwaived else "KEV finding" + return GateResponse( + allow=False, + reason=f"KEV findings ({sorted_ids}) require remediation or an approved waiver", + required_actions=[ + "Patch affected KEV vulnerabilities", + "Request and approve a waiver if deferral is required", + "Re-run policy evaluation", + ], + ) + + @router.post("/evaluate", response_model=GateResponse) -async def evaluate_gate(req: GateRequest): +async def evaluate_gate(req: GateRequest, db: AsyncSession = Depends(get_db)) -> GateResponse: + """Evaluate policy gating rules with KEV hard-block enforcement.""" + try: - # Simple gate logic: block if KEV present and any high/critical finding or low confidence kev_count = int(req.signals.get("kev_count", 0) or 0) - epss_risk = int(req.signals.get("epss_count", 0) or 0) > 0 low_confidence = req.confidence < max(settings.LLM_CONSENSUS_THRESHOLD, 0.75) - has_high = any(f.get("severity") in ("high", "critical") for f in (req.findings or [])) - if req.decision == "BLOCK": - return GateResponse(allow=False, reason="Engine decided BLOCK", required_actions=["Fix blocking issues", "Request re-evaluation"]) - if kev_count and has_high: - return GateResponse(allow=False, reason="KEV present with high severity findings", required_actions=["Patch KEV-listed vulns", "Re-run scans"]) + return GateResponse( + allow=False, + reason="Engine decided BLOCK", + required_actions=["Fix blocking issues", "Request re-evaluation"], + ) + + service_name = _extract_service_name(req.signals) + kev_cves = _extract_kev_cves(req.signals, req.findings) + + if kev_count or kev_cves: + if not kev_cves and kev_count: + return _kev_block_response(set()) + + waivers = await _get_active_waivers(db, kev_cves, service_name) + unwaived = {cve for cve in kev_cves if not waivers.get(cve)} + if unwaived or (kev_count and not kev_cves): + return _kev_block_response(unwaived or kev_cves or {"KEV"}) + if low_confidence: - return GateResponse(allow=False, reason=f"Consensus confidence too low ({req.confidence:.0%})", required_actions=["Manual review", "Add business context"]) + return GateResponse( + allow=False, + reason=f"Consensus confidence too low ({req.confidence:.0%})", + required_actions=["Manual review", "Add business context"], + ) + + opa_results = await _evaluate_remote_policies(req, service_name) + opa_response = _map_opa_results(opa_results) + if opa_response is not None: + return opa_response + + return GateResponse( + allow=True, + reason="Policy checks passed", + required_actions=["Proceed with deployment"], + ) + except HTTPException: + raise + except Exception as exc: # pragma: no cover - defensive logging + logger.error("Policy evaluate failed", error=str(exc)) + raise HTTPException(status_code=500, detail=str(exc)) + + +@router.post("/waivers", response_model=WaiverResponse, status_code=status.HTTP_201_CREATED) +async def create_waiver( + waiver: WaiverCreate, + db: AsyncSession = Depends(get_db), +) -> WaiverResponse: + """Create or update a KEV waiver with auditable metadata.""" + + try: + normalized_expiry = _normalize_datetime(waiver.expires_at) + + existing_stmt = select(KevWaiverModel).where( + KevWaiverModel.cve_id == waiver.cve_id, + KevWaiverModel.is_active.is_(True), + ) + if waiver.service_name: + existing_stmt = existing_stmt.where(KevWaiverModel.service_name == waiver.service_name) + if waiver.finding_id: + existing_stmt = existing_stmt.where(KevWaiverModel.finding_id == waiver.finding_id) + + result = await db.execute(existing_stmt.limit(1)) + record = result.scalars().first() + + now = _normalize_datetime(datetime.now(timezone.utc)) + approved_at = now + + if record: + record.justification = waiver.justification + record.approved_by = waiver.approved_by + record.approved_at = approved_at + record.expires_at = normalized_expiry + record.change_ticket = waiver.change_ticket + record.modified_by = waiver.approved_by + await db.flush() + await db.commit() + await db.refresh(record) + return WaiverResponse.model_validate(record) + + payload = KevWaiverModel( + cve_id=waiver.cve_id, + service_name=waiver.service_name, + finding_id=waiver.finding_id, + justification=waiver.justification, + approved_by=waiver.approved_by, + approved_at=approved_at, + expires_at=normalized_expiry, + change_ticket=waiver.change_ticket, + created_by=waiver.requested_by or waiver.approved_by, + modified_by=waiver.approved_by, + ) + + db.add(payload) + await db.flush() + await db.commit() + await db.refresh(payload) + return WaiverResponse.model_validate(payload) + except HTTPException: + raise + except Exception as exc: # pragma: no cover - defensive logging + logger.error("Failed to create waiver", error=str(exc)) + raise HTTPException(status_code=500, detail=str(exc)) + + +@router.get("/waivers", response_model=List[WaiverResponse]) +async def list_waivers( + db: AsyncSession = Depends(get_db), + cve_id: Optional[str] = Query(default=None, description="Filter waivers by CVE identifier"), + service_name: Optional[str] = Query(default=None, description="Filter waivers by service scope"), + include_expired: bool = Query(default=False, description="Include expired waivers in the response"), +) -> List[WaiverResponse]: + """Return waivers, optionally filtered by CVE or service.""" + + stmt = select(KevWaiverModel) + + if cve_id: + stmt = stmt.where(KevWaiverModel.cve_id == cve_id.upper()) + if service_name: + stmt = stmt.where(KevWaiverModel.service_name == service_name) + if not include_expired: + now = _normalize_datetime(datetime.now(timezone.utc)) + stmt = stmt.where( + KevWaiverModel.is_active.is_(True), + KevWaiverModel.expires_at >= now, + ) + + stmt = stmt.order_by(KevWaiverModel.expires_at.desc()) - return GateResponse(allow=True, reason="Policy checks passed", required_actions=["Proceed with deployment"]) - except Exception as e: - logger.error(f"Policy evaluate failed: {e}") - raise HTTPException(status_code=500, detail=str(e)) + result = await db.execute(stmt) + waivers = result.scalars().all() + return [WaiverResponse.model_validate(item) for item in waivers] diff --git a/fixops-blended-enterprise/src/api/v1/production_readiness.py b/fixops-blended-enterprise/src/api/v1/production_readiness.py index 89445afd3..5e03f5772 100644 --- a/fixops-blended-enterprise/src/api/v1/production_readiness.py +++ b/fixops-blended-enterprise/src/api/v1/production_readiness.py @@ -8,6 +8,7 @@ import structlog from src.config.settings import get_settings +from src.services.real_opa_engine import get_opa_engine logger = structlog.get_logger() router = APIRouter(prefix="/production-readiness", tags=["production-status"]) @@ -63,14 +64,27 @@ async def get_production_readiness(): if not llm_ready: readiness_status["missing_requirements"].append("OPENAI_API_KEY") - # Policy Engine Readiness (OPA Server) - opa_ready = False # Assume OPA server not running by default - readiness_status["component_status"]["policy_engine"] = { - "status": "NEEDS_SERVER", - "required": "OPA_SERVER", - "description": "OPA server at localhost:8181" - } - readiness_status["missing_requirements"].append("OPA_SERVER") + opa_ready = False + if getattr(settings, "OPA_SERVER_URL", None): + try: + engine = await get_opa_engine() + opa_ready = await engine.health_check() + except Exception as exc: # pragma: no cover - defensive logging + logger.warning("OPA health check failed", error=str(exc)) + + if not opa_ready: + readiness_status["component_status"]["policy_engine"] = { + "status": "NEEDS_SERVER", + "required": "OPA_SERVER", + "description": getattr(settings, "OPA_SERVER_URL", "OPA server"), + } + readiness_status["missing_requirements"].append("OPA_SERVER") + else: + readiness_status["component_status"]["policy_engine"] = { + "status": "READY", + "required": None, + "description": "OPA policy enforcement active", + } # Threat Intelligence Readiness threat_intel_ready = bool(settings.THREAT_INTEL_API_KEY) diff --git a/fixops-blended-enterprise/src/api/v1/system_mode.py b/fixops-blended-enterprise/src/api/v1/system_mode.py index 7888ad6a3..db2bc17d4 100644 --- a/fixops-blended-enterprise/src/api/v1/system_mode.py +++ b/fixops-blended-enterprise/src/api/v1/system_mode.py @@ -8,6 +8,7 @@ import structlog from src.config.settings import get_settings +from src.services.real_opa_engine import get_opa_engine logger = structlog.get_logger() router = APIRouter(prefix="/system-mode", tags=["system-management"]) @@ -35,8 +36,16 @@ async def get_current_mode(): if not settings.primary_llm_api_key: missing_requirements.append("OPENAI_API_KEY") - # Note: OPA server check would require actual network call - missing_requirements.append("OPA_SERVER") # Assume not available + opa_ready = False + if getattr(settings, "OPA_SERVER_URL", None): + try: + engine = await get_opa_engine() + opa_ready = await engine.health_check() + except Exception as exc: # pragma: no cover - defensive logging + logger.warning("OPA health check failed", error=str(exc)) + + if not opa_ready: + missing_requirements.append("OPA_SERVER") if not (settings.JIRA_URL and settings.JIRA_USERNAME and settings.JIRA_API_TOKEN): missing_requirements.append("JIRA_CREDENTIALS") @@ -56,7 +65,9 @@ async def get_current_mode(): "decision_engine": "operational", "vector_database": "demo" if settings.DEMO_MODE else ("operational" if settings.PGVECTOR_ENABLED else "needs_config"), "llm_consensus": "demo" if settings.DEMO_MODE else ("operational" if settings.primary_llm_api_key else "needs_keys"), - "policy_engine": "demo" if settings.DEMO_MODE else "needs_server", + "policy_engine": "demo" + if settings.DEMO_MODE + else ("operational" if opa_ready else "needs_server"), "evidence_lake": "operational" } } diff --git a/fixops-blended-enterprise/src/config/settings.py b/fixops-blended-enterprise/src/config/settings.py index dd6f465fc..26265a7de 100644 --- a/fixops-blended-enterprise/src/config/settings.py +++ b/fixops-blended-enterprise/src/config/settings.py @@ -55,7 +55,7 @@ class Settings(BaseSettings): ENABLED_KEV: bool = Field(default=True) ENABLED_VEX: bool = Field(default=False) ENABLED_RSS_SIDECAR: bool = Field(default=False) - + # Security Configuration SECRET_KEY: str = Field(default=os.getenv("SECRET_KEY")) JWT_ALGORITHM: str = "HS256" @@ -69,6 +69,40 @@ class Settings(BaseSettings): KEY_ID: Optional[str] = Field( default=os.getenv("KEY_ID"), description="Remote key identifier" ) + SIGNING_ROTATION_SLA_DAYS: int = Field( + default=int(os.getenv("SIGNING_ROTATION_SLA_DAYS", "30")), + description="Maximum age in days before signing material must rotate", + ) + AWS_REGION: Optional[str] = Field( + default=os.getenv("AWS_REGION"), description="AWS region for KMS operations" + ) + AZURE_VAULT_URL: Optional[str] = Field( + default=os.getenv("AZURE_VAULT_URL"), description="Azure Key Vault base URL" + ) + OPA_SERVER_URL: Optional[str] = Field( + default=os.getenv("OPA_SERVER_URL"), + description="Base URL for the external OPA server", + ) + OPA_POLICY_PACKAGE: str = Field( + default=os.getenv("OPA_POLICY_PACKAGE", "fixops"), + description="OPA policy package used for bundle queries", + ) + OPA_HEALTH_PATH: str = Field( + default=os.getenv("OPA_HEALTH_PATH", "/health"), + description="OPA health endpoint path", + ) + OPA_BUNDLE_STATUS_PATH: Optional[str] = Field( + default=os.getenv("OPA_BUNDLE_STATUS_PATH"), + description="Optional OPA bundle status endpoint for readiness checks", + ) + OPA_AUTH_TOKEN: Optional[str] = Field( + default=os.getenv("OPA_AUTH_TOKEN"), + description="Bearer token for authenticating with the OPA server", + ) + OPA_REQUEST_TIMEOUT: int = Field( + default=int(os.getenv("OPA_REQUEST_TIMEOUT", "5")), + description="Timeout in seconds for OPA HTTP requests", + ) # Database Configuration DATABASE_URL: str = Field(default=os.getenv("MONGO_URL", "mongodb://mongodb:27017/fixops_production")) diff --git a/fixops-blended-enterprise/src/core/middleware.py b/fixops-blended-enterprise/src/core/middleware.py index 2c447c702..729e59226 100644 --- a/fixops-blended-enterprise/src/core/middleware.py +++ b/fixops-blended-enterprise/src/core/middleware.py @@ -5,8 +5,9 @@ import asyncio import time import gzip -from typing import Callable, Dict, Any +from typing import Callable, Dict, Any, Optional import structlog +from fastapi import HTTPException from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request from starlette.responses import Response, PlainTextResponse @@ -15,6 +16,7 @@ from src.services.cache_service import CacheService from src.config.settings import get_settings +from src.services.metrics import FixOpsMetrics logger = structlog.get_logger() settings = get_settings() @@ -25,23 +27,49 @@ class PerformanceMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next: Callable) -> Response: start_time = time.perf_counter() - + # Add correlation ID for request tracking correlation_id = f"req_{int(time.time() * 1000000)}" request.state.correlation_id = correlation_id - + + if settings.ENABLE_METRICS: + FixOpsMetrics.request_started(request.url.path) + # Process request - response = await call_next(request) - - # Calculate total request time - process_time = time.perf_counter() - start_time - process_time_us = process_time * 1_000_000 - + response: Optional[Response] = None + status_code = 500 + + try: + response = await call_next(request) + status_code = response.status_code + except HTTPException as exc: + status_code = exc.status_code + raise + except Exception: + raise + finally: + duration = time.perf_counter() - start_time + + if settings.ENABLE_METRICS: + FixOpsMetrics.record_request( + endpoint=request.url.path, + method=request.method, + status=status_code, + duration=duration, + ) + FixOpsMetrics.request_finished(request.url.path) + + process_time_us = duration * 1_000_000 + + if response is None: + # Re-raise the original exception if we reach this point without a response + raise + # Add performance headers - response.headers["X-Process-Time"] = f"{process_time:.6f}" + response.headers["X-Process-Time"] = f"{duration:.6f}" response.headers["X-Process-Time-US"] = f"{process_time_us:.2f}" response.headers["X-Correlation-ID"] = correlation_id - + # Log slow requests if process_time_us > 1000: # > 1ms logger.warning( diff --git a/fixops-blended-enterprise/src/db/migrations/env.py b/fixops-blended-enterprise/src/db/migrations/env.py index 013d27b7b..ed8a71056 100644 --- a/fixops-blended-enterprise/src/db/migrations/env.py +++ b/fixops-blended-enterprise/src/db/migrations/env.py @@ -16,6 +16,7 @@ # Import models for autogenerate from src.models.base import Base from src.models.user import User, UserSession, UserAuditLog +from src.models import security # noqa: F401 # Ensure security tables are registered # this is the Alembic Config object config = context.config diff --git a/fixops-blended-enterprise/src/db/migrations/versions/002_add_kev_waivers.py b/fixops-blended-enterprise/src/db/migrations/versions/002_add_kev_waivers.py new file mode 100644 index 000000000..c5ddcedf5 --- /dev/null +++ b/fixops-blended-enterprise/src/db/migrations/versions/002_add_kev_waivers.py @@ -0,0 +1,57 @@ +"""Create KEV waiver table""" + +from alembic import op +import sqlalchemy as sa + +revision = "002_add_kev_waivers" +down_revision = "001" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + """Create the kev_waivers table with audit metadata.""" + + op.create_table( + "kev_waivers", + sa.Column("id", sa.String(length=36), nullable=False), + sa.Column("cve_id", sa.String(length=50), nullable=False), + sa.Column("service_name", sa.String(length=255), nullable=True), + sa.Column("finding_id", sa.String(length=36), nullable=True), + sa.Column("justification", sa.Text(), nullable=False), + sa.Column("approved_by", sa.String(length=255), nullable=False), + sa.Column("approved_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("expires_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("change_ticket", sa.String(length=255), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.text("CURRENT_TIMESTAMP"), nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.text("CURRENT_TIMESTAMP"), nullable=False), + sa.Column("is_active", sa.Boolean(), server_default=sa.true(), nullable=False), + sa.Column("version", sa.Integer(), server_default=sa.text("1"), nullable=False), + sa.Column("metadata", sa.JSON(), nullable=True), + sa.Column("created_by", sa.String(length=255), nullable=True), + sa.Column("modified_by", sa.String(length=255), nullable=True), + sa.Column("created_from_ip", sa.String(length=45), nullable=True), + sa.Column("modified_from_ip", sa.String(length=45), nullable=True), + sa.Column("deleted_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("deleted_by", sa.String(length=255), nullable=True), + sa.PrimaryKeyConstraint("id"), + ) + + op.create_index("ix_kev_waivers_cve_id", "kev_waivers", ["cve_id"]) + op.create_index("ix_kev_waivers_service_name", "kev_waivers", ["service_name"]) + op.create_index("ix_kev_waivers_finding_id", "kev_waivers", ["finding_id"]) + op.create_index("ix_kev_waivers_expires_at", "kev_waivers", ["expires_at"]) + op.create_index("ix_kev_waivers_is_active", "kev_waivers", ["is_active"]) + op.create_index("ix_kev_waivers_approved_at", "kev_waivers", ["approved_at"]) + + +def downgrade() -> None: + """Remove the kev_waivers table.""" + + op.drop_index("ix_kev_waivers_approved_at", table_name="kev_waivers") + op.drop_index("ix_kev_waivers_is_active", table_name="kev_waivers") + op.drop_index("ix_kev_waivers_expires_at", table_name="kev_waivers") + op.drop_index("ix_kev_waivers_finding_id", table_name="kev_waivers") + op.drop_index("ix_kev_waivers_service_name", table_name="kev_waivers") + op.drop_index("ix_kev_waivers_cve_id", table_name="kev_waivers") + op.drop_table("kev_waivers") diff --git a/fixops-blended-enterprise/src/models/__init__.py b/fixops-blended-enterprise/src/models/__init__.py new file mode 100644 index 000000000..8a9e3359f --- /dev/null +++ b/fixops-blended-enterprise/src/models/__init__.py @@ -0,0 +1,11 @@ +"""Model package exports for FixOps.""" + +from . import security # noqa: F401 +from . import user # noqa: F401 +from . import waivers # noqa: F401 + +__all__ = [ + "security", + "user", + "waivers", +] diff --git a/fixops-blended-enterprise/src/models/security.py b/fixops-blended-enterprise/src/models/security.py index 831e73499..ad19c5886 100644 --- a/fixops-blended-enterprise/src/models/security.py +++ b/fixops-blended-enterprise/src/models/security.py @@ -3,7 +3,7 @@ Findings, Vulnerabilities, Incidents, Services, Policies """ -from datetime import datetime +from datetime import datetime, timezone from typing import List, Optional, Dict, Any from enum import Enum @@ -187,18 +187,54 @@ class SecurityFinding(BaseModel, AuditMixin, SoftDeleteMixin): class FindingCorrelation(BaseModel): """Correlation between multiple findings to reduce noise""" - + __tablename__ = "finding_correlations" - + finding_id: Mapped[str] = mapped_column(String(36), ForeignKey("security_findings.id"), nullable=False) correlated_finding_id: Mapped[str] = mapped_column(String(36), ForeignKey("security_findings.id"), nullable=False) correlation_type: Mapped[str] = mapped_column(String(50), nullable=False) confidence_score: Mapped[float] = mapped_column(Float, nullable=False) correlation_reason: Mapped[str] = mapped_column(Text, nullable=False) - + # Relationships finding: Mapped["SecurityFinding"] = relationship("SecurityFinding", foreign_keys=[finding_id]) - correlated_finding: Mapped["SecurityFinding"] = relationship("SecurityFinding", foreign_keys=[correlated_finding_id]) + + +class KevFindingWaiver(BaseModel, AuditMixin, SoftDeleteMixin): + """Auditable waiver record for Known Exploited Vulnerabilities""" + + __tablename__ = "kev_waivers" + + cve_id: Mapped[str] = mapped_column(String(50), nullable=False, index=True) + service_name: Mapped[Optional[str]] = mapped_column(String(255), nullable=True, index=True) + finding_id: Mapped[Optional[str]] = mapped_column(String(36), nullable=True, index=True) + justification: Mapped[str] = mapped_column(Text, nullable=False) + approved_by: Mapped[str] = mapped_column(String(255), nullable=False) + approved_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, index=True) + expires_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, index=True) + change_ticket: Mapped[Optional[str]] = mapped_column(String(255), nullable=True) + + def is_active_for(self, *, service_name: Optional[str], now: Optional[datetime] = None) -> bool: + """Return True when the waiver is active for the requested scope.""" + + if not self.is_active: + return False + + if now is None: + now = datetime.utcnow() + + # Normalize timezone aware timestamps to naive UTC for comparison + expiry = self.expires_at + if expiry.tzinfo is not None: + expiry = expiry.astimezone(timezone.utc).replace(tzinfo=None) + + if expiry < now: + return False + + if self.service_name and service_name: + return self.service_name.lower() == service_name.lower() + + return self.service_name is None or service_name is None class SecurityIncident(BaseModel, AuditMixin, SoftDeleteMixin): diff --git a/fixops-blended-enterprise/src/models/security_sqlite.py b/fixops-blended-enterprise/src/models/security_sqlite.py index f4f7e95e8..839b6f34c 100644 --- a/fixops-blended-enterprise/src/models/security_sqlite.py +++ b/fixops-blended-enterprise/src/models/security_sqlite.py @@ -4,7 +4,7 @@ """ import json -from datetime import datetime +from datetime import datetime, timezone from typing import List, Optional, Dict, Any from enum import Enum @@ -406,9 +406,9 @@ class VulnerabilityIntelligence(BaseModel): class ComplianceEvidence(BaseModel, AuditMixin): """Compliance evidence and attestations (SQLite compatible)""" - + __tablename__ = "compliance_evidence" - + service_id: Mapped[str] = mapped_column(String(36), ForeignKey("services.id"), nullable=False, index=True) # Compliance framework @@ -442,4 +442,40 @@ def get_evidence_data(self) -> Dict[str, Any]: def set_evidence_data(self, data: Dict[str, Any]) -> None: """Set evidence data from dictionary""" - self.evidence_data = json.dumps(data) \ No newline at end of file + self.evidence_data = json.dumps(data) + + +class KevFindingWaiver(BaseModel, AuditMixin, SoftDeleteMixin): + """Auditable waiver record for Known Exploited Vulnerabilities (SQLite compatible)""" + + __tablename__ = "kev_waivers" + + cve_id: Mapped[str] = mapped_column(String(50), nullable=False, index=True) + service_name: Mapped[Optional[str]] = mapped_column(String(255), nullable=True, index=True) + finding_id: Mapped[Optional[str]] = mapped_column(String(36), nullable=True, index=True) + justification: Mapped[str] = mapped_column(Text, nullable=False) + approved_by: Mapped[str] = mapped_column(String(255), nullable=False) + approved_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, index=True) + expires_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, index=True) + change_ticket: Mapped[Optional[str]] = mapped_column(String(255), nullable=True) + + def is_active_for(self, *, service_name: Optional[str], now: Optional[datetime] = None) -> bool: + """Return True when the waiver is active for the requested scope.""" + + if not self.is_active: + return False + + if now is None: + now = datetime.utcnow() + + expiry = self.expires_at + if expiry.tzinfo is not None: + expiry = expiry.astimezone(timezone.utc).replace(tzinfo=None) + + if expiry < now: + return False + + if self.service_name and service_name: + return self.service_name.lower() == service_name.lower() + + return self.service_name is None or service_name is None \ No newline at end of file diff --git a/fixops-blended-enterprise/src/models/waivers.py b/fixops-blended-enterprise/src/models/waivers.py new file mode 100644 index 000000000..e4d488fe9 --- /dev/null +++ b/fixops-blended-enterprise/src/models/waivers.py @@ -0,0 +1,34 @@ +"""Helper utilities for working with KEV waiver models.""" + +from __future__ import annotations + +from functools import lru_cache +from typing import Type + +from src.config.settings import get_settings + +try: # pragma: no cover - optional imports based on runtime database + from src.models.security import KevFindingWaiver as PostgresKevFindingWaiver +except Exception: # pragma: no cover - fallback for limited runtimes + PostgresKevFindingWaiver = None # type: ignore[assignment] + +try: # pragma: no cover - optional imports based on runtime database + from src.models.security_sqlite import KevFindingWaiver as SqliteKevFindingWaiver +except Exception: # pragma: no cover - fallback for limited runtimes + SqliteKevFindingWaiver = None # type: ignore[assignment] + + +@lru_cache(maxsize=1) +def get_kev_waiver_model() -> Type: + """Return the active KEV waiver ORM model for the configured database.""" + + db_url = (get_settings().DATABASE_URL or "").lower() + if "sqlite" in db_url: + return SqliteKevFindingWaiver or PostgresKevFindingWaiver # type: ignore[return-value] + if "postgres" in db_url or "psql" in db_url: + return PostgresKevFindingWaiver or SqliteKevFindingWaiver # type: ignore[return-value] + # Default to SQLite-compatible model when database is unspecified or mocked + return SqliteKevFindingWaiver or PostgresKevFindingWaiver # type: ignore[return-value] + + +__all__ = ["get_kev_waiver_model"] diff --git a/fixops-blended-enterprise/src/services/metrics.py b/fixops-blended-enterprise/src/services/metrics.py index 340e5776b..092fa280b 100644 --- a/fixops-blended-enterprise/src/services/metrics.py +++ b/fixops-blended-enterprise/src/services/metrics.py @@ -1,37 +1,271 @@ -""" -Prometheus metrics for FixOps -""" -from prometheus_client import CollectorRegistry, Counter, Histogram, generate_latest +"""Prometheus metrics utilities for FixOps.""" + +from __future__ import annotations + +from collections import defaultdict +from typing import Dict, MutableMapping, Optional + +from prometheus_client import ( + CollectorRegistry, + Counter, + Gauge, + Histogram, + generate_latest, +) _registry = CollectorRegistry() -HTTP_REQUESTS = Counter('fixops_http_requests_total', 'Total HTTP requests', ['endpoint', 'method', 'status'], registry=_registry) -HTTP_LATENCY = Histogram('fixops_http_request_seconds', 'HTTP request duration seconds', ['endpoint'], registry=_registry, buckets=(0.01,0.025,0.05,0.1,0.25,0.5,1,2,5)) -ENGINE_DECISIONS = Counter('fixops_engine_decisions_total', 'Decisions produced', ['verdict'], registry=_registry) -UPLOADS_COMPLETED = Counter('fixops_uploads_completed_total', 'Completed uploads', ['scan_type'], registry=_registry) +HTTP_REQUESTS = Counter( + "fixops_http_requests_total", + "Total HTTP requests", + ["endpoint", "method", "status"], + registry=_registry, +) +HTTP_LATENCY = Histogram( + "fixops_http_request_seconds", + "HTTP request duration seconds", + ["endpoint"], + registry=_registry, + buckets=(0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2, 5), +) +HTTP_ERROR_RATIO = Gauge( + "fixops_http_error_ratio", + "Rolling error ratio for grouped API families", + ["family"], + registry=_registry, +) +HTTP_INFLIGHT = Gauge( + "fixops_http_inflight_requests", + "Current in-flight HTTP requests", + ["family"], + registry=_registry, +) +HOT_PATH_LATENCY = Gauge( + "fixops_hot_path_latency_us", + "Last recorded latency for DecisionFactory hot-path endpoints (microseconds)", + ["endpoint"], + registry=_registry, +) +SIGNING_KEY_AGE = Gauge( + "fixops_signing_key_rotation_age_days", + "Age of the active signing key material in days", + ["provider"], + registry=_registry, +) +SIGNING_KEY_HEALTH = Gauge( + "fixops_signing_key_rotation_healthy", + "Health indicator for signing key rotation SLAs", + ["provider"], + registry=_registry, +) +ENGINE_DECISIONS = Counter( + "fixops_engine_decisions_total", + "Decisions produced", + ["verdict"], + registry=_registry, +) +UPLOADS_COMPLETED = Counter( + "fixops_uploads_completed_total", + "Completed uploads", + ["scan_type"], + registry=_registry, +) + class FixOpsMetrics: + """Facade for recording and interrogating observability metrics.""" + + _family_totals: MutableMapping[str, Dict[str, int]] = defaultdict( + lambda: {"total": 0, "errors": 0} + ) + _inflight_counts: MutableMapping[str, int] = defaultdict(int) + _observed_families: set[str] = set() + _observed_hot_paths: set[str] = set() + _hot_path_latency_us: MutableMapping[str, float] = {} + _observed_key_providers: set[str] = set() + _key_rotation_age: MutableMapping[str, float] = {} + _key_rotation_health: MutableMapping[str, bool] = {} + + _HOT_PATH_PREFIXES = { + "/api/v1/decisions/make-decision": "decision", + "/api/v1/policy/evaluate": "policy", + "/api/v1/decisions/evidence": "evidence", + } + @staticmethod def get_metrics() -> bytes: return generate_latest(_registry) + # ------------------------------------------------------------------ + # HTTP instrumentation helpers + # ------------------------------------------------------------------ @staticmethod - def record_request(endpoint: str, method: str, status: int, duration: float): + def request_started(endpoint: str) -> None: + """Record that a request started so gauges reflect in-flight work.""" + + family = FixOpsMetrics._classify_family(endpoint) + FixOpsMetrics._observed_families.add(family) + FixOpsMetrics._inflight_counts[family] += 1 + try: + HTTP_INFLIGHT.labels(family=family).set(FixOpsMetrics._inflight_counts[family]) + except Exception: + pass + + @staticmethod + def request_finished(endpoint: str) -> None: + """Mark the request as finished for in-flight accounting.""" + + family = FixOpsMetrics._classify_family(endpoint) + FixOpsMetrics._observed_families.add(family) + FixOpsMetrics._inflight_counts[family] = max( + 0, FixOpsMetrics._inflight_counts[family] - 1 + ) + try: + HTTP_INFLIGHT.labels(family=family).set(FixOpsMetrics._inflight_counts[family]) + except Exception: + pass + + @staticmethod + def record_request(endpoint: str, method: str, status: int, duration: float) -> None: + """Capture counters, ratios, and hot-path gauges for an HTTP request.""" + + family = FixOpsMetrics._classify_family(endpoint) + FixOpsMetrics._observed_families.add(family) try: HTTP_REQUESTS.labels(endpoint=endpoint, method=method, status=str(status)).inc() HTTP_LATENCY.labels(endpoint=endpoint).observe(duration) except Exception: pass + totals = FixOpsMetrics._family_totals[family] + totals["total"] += 1 + if status >= 400: + totals["errors"] += 1 + + try: + ratio = ( + totals["errors"] / totals["total"] + if totals["total"] > 0 + else 0.0 + ) + HTTP_ERROR_RATIO.labels(family=family).set(ratio) + except Exception: + pass + + hot_path_label = FixOpsMetrics._resolve_hot_path(endpoint) + if hot_path_label: + FixOpsMetrics._observed_hot_paths.add(hot_path_label) + latency_us = duration * 1_000_000 + FixOpsMetrics._hot_path_latency_us[hot_path_label] = latency_us + try: + HOT_PATH_LATENCY.labels(endpoint=hot_path_label).set(latency_us) + except Exception: + pass + + # ------------------------------------------------------------------ + # Convenience accessors for testing and health diagnostics + # ------------------------------------------------------------------ + @staticmethod + def get_error_ratio(family: str) -> float: + totals = FixOpsMetrics._family_totals.get(family, {"total": 0, "errors": 0}) + if totals["total"] == 0: + return 0.0 + return totals["errors"] / totals["total"] + + @staticmethod + def get_inflight(family: str) -> int: + return FixOpsMetrics._inflight_counts.get(family, 0) + + @staticmethod + def get_hot_path_latency_us(endpoint: str) -> Optional[float]: + return FixOpsMetrics._hot_path_latency_us.get(endpoint) + @staticmethod - def record_decision(verdict: str): + def reset_runtime_stats() -> None: + """Reset derived runtime metrics so tests can assert fresh state.""" + + for family in list(FixOpsMetrics._observed_families): + FixOpsMetrics._family_totals[family] = {"total": 0, "errors": 0} + FixOpsMetrics._inflight_counts[family] = 0 + try: + HTTP_ERROR_RATIO.labels(family=family).set(0) + HTTP_INFLIGHT.labels(family=family).set(0) + except Exception: + pass + FixOpsMetrics._observed_families.clear() + + for endpoint in list(FixOpsMetrics._observed_hot_paths): + FixOpsMetrics._hot_path_latency_us.pop(endpoint, None) + try: + HOT_PATH_LATENCY.labels(endpoint=endpoint).set(0) + except Exception: + pass + FixOpsMetrics._observed_hot_paths.clear() + + for provider in list(FixOpsMetrics._observed_key_providers): + FixOpsMetrics._key_rotation_age.pop(provider, None) + FixOpsMetrics._key_rotation_health.pop(provider, None) + try: + SIGNING_KEY_AGE.labels(provider=provider).set(0) + SIGNING_KEY_HEALTH.labels(provider=provider).set(0) + except Exception: + pass + FixOpsMetrics._observed_key_providers.clear() + + # ------------------------------------------------------------------ + # Domain specific metrics + # ------------------------------------------------------------------ + @staticmethod + def record_decision(verdict: str) -> None: try: ENGINE_DECISIONS.labels(verdict=verdict).inc() except Exception: pass @staticmethod - def record_upload(scan_type: str): + def record_upload(scan_type: str) -> None: try: UPLOADS_COMPLETED.labels(scan_type=scan_type).inc() except Exception: pass + + # ------------------------------------------------------------------ + # Key management helpers + # ------------------------------------------------------------------ + @staticmethod + def record_key_rotation(provider: str, age_days: float, healthy: bool) -> None: + FixOpsMetrics._observed_key_providers.add(provider) + FixOpsMetrics._key_rotation_age[provider] = age_days + FixOpsMetrics._key_rotation_health[provider] = healthy + try: + SIGNING_KEY_AGE.labels(provider=provider).set(age_days) + SIGNING_KEY_HEALTH.labels(provider=provider).set(1.0 if healthy else 0.0) + except Exception: + pass + + @staticmethod + def get_key_rotation_age(provider: str) -> Optional[float]: + return FixOpsMetrics._key_rotation_age.get(provider) + + @staticmethod + def get_key_rotation_health(provider: str) -> Optional[bool]: + return FixOpsMetrics._key_rotation_health.get(provider) + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + @staticmethod + def _classify_family(endpoint: str) -> str: + if endpoint.startswith("/api/v1/policy"): + return "policy" + if endpoint.startswith("/api/v1/decisions/evidence"): + return "evidence" + if endpoint.startswith("/api/v1/decisions"): + return "decision" + return "other" + + @staticmethod + def _resolve_hot_path(endpoint: str) -> Optional[str]: + for prefix in FixOpsMetrics._HOT_PATH_PREFIXES: + if endpoint.startswith(prefix): + return prefix + return None diff --git a/fixops-blended-enterprise/src/services/real_opa_engine.py b/fixops-blended-enterprise/src/services/real_opa_engine.py index 011fafdf9..80f4f110a 100644 --- a/fixops-blended-enterprise/src/services/real_opa_engine.py +++ b/fixops-blended-enterprise/src/services/real_opa_engine.py @@ -5,10 +5,9 @@ """ import asyncio -import json import time -from pathlib import Path -from typing import Dict, Any, List, Optional +from typing import Dict, Any, List, Optional, Tuple + import structlog from src.config.settings import get_settings @@ -192,12 +191,44 @@ async def health_check(self) -> bool: class ProductionOPAEngine(OPAEngine): """Production OPA Engine with real OPA server""" - - def __init__(self, opa_url: str = "http://localhost:8181"): - self.opa_url = opa_url.rstrip('/') + + def __init__( + self, + opa_url: str = "http://localhost:8181", + *, + policy_package: str = "fixops", + health_path: str = "/health", + bundle_status_path: Optional[str] = None, + auth_token: Optional[str] = None, + request_timeout: int = 5, + ): + self.opa_url = opa_url.rstrip("/") or "http://localhost:8181" + self.policy_package = self._normalise_package(policy_package) + self.health_path = health_path if health_path.startswith("/") else f"/{health_path}" + self.bundle_status_path = bundle_status_path + if self.bundle_status_path and not self.bundle_status_path.startswith("/"): + self.bundle_status_path = f"/{self.bundle_status_path}" + self.auth_token = auth_token + self.request_timeout = max(1, int(request_timeout or 5)) self.client = None self._initialize_client() self.policy_cache = {} + + @staticmethod + def _normalise_package(package: str) -> str: + cleaned = (package or "fixops").strip().strip("/") + if not cleaned: + return "fixops" + return cleaned.replace(".", "/") + + def _policy_path(self, policy_name: str, rule: str = "allow") -> str: + base = self.policy_package.rstrip("/") + return f"{base}/{policy_name.strip('/')}/{rule}" + + def _auth_headers(self) -> Dict[str, str]: + if self.auth_token: + return {"Authorization": f"Bearer {self.auth_token}"} + return {} def _initialize_client(self): """Initialize OPA client""" @@ -244,83 +275,113 @@ async def _evaluate_with_client(self, policy_name: str, input_data: Dict[str, An """Evaluate using OPA Python client""" try: # Query OPA for policy decision - policy_path = f"fixops/{policy_name}/allow" + policy_path = self._policy_path(policy_name) result = await asyncio.to_thread( - self.client.query, - policy_path, + self.client.query, + policy_path, input_data=input_data ) - + # Convert OPA result to our format - if result.get("result"): - return { - "decision": "allow", - "rationale": f"OPA policy {policy_name} evaluation passed" - } - else: - return { - "decision": "block", - "rationale": f"OPA policy {policy_name} evaluation failed" - } - + allow, opa_payload = self._extract_decision(result) + return self._format_decision(policy_name, allow, opa_payload) + except Exception as e: logger.error(f"OPA client evaluation failed: {e}") raise - + async def _evaluate_with_http(self, policy_name: str, input_data: Dict[str, Any]) -> Dict[str, Any]: """Evaluate using HTTP requests to OPA server""" import aiohttp - + try: - policy_path = f"fixops/{policy_name}/allow" + policy_path = self._policy_path(policy_name) url = f"{self.opa_url}/v1/data/{policy_path}" - - async with aiohttp.ClientSession() as session: + + async with aiohttp.ClientSession(headers=self._auth_headers()) as session: async with session.post( url, json={"input": input_data}, - timeout=aiohttp.ClientTimeout(total=5) + timeout=aiohttp.ClientTimeout(total=self.request_timeout) ) as response: if response.status == 200: result = await response.json() - - if result.get("result"): - return { - "decision": "allow", - "rationale": f"OPA policy {policy_name} evaluation passed", - "opa_result": result - } - else: - return { - "decision": "block", - "rationale": f"OPA policy {policy_name} evaluation failed", - "opa_result": result - } + + allow, opa_payload = self._extract_decision(result) + decision = self._format_decision(policy_name, allow, opa_payload) + decision["opa_result"] = result + return decision else: raise Exception(f"OPA server responded with status {response.status}") - + except Exception as e: logger.error(f"OPA HTTP evaluation failed: {e}") raise - + async def health_check(self) -> bool: """Check if OPA server is healthy""" try: import aiohttp - - url = f"{self.opa_url}/health" - - async with aiohttp.ClientSession() as session: + + url = f"{self.opa_url}{self.health_path}" + + async with aiohttp.ClientSession(headers=self._auth_headers()) as session: async with session.get( url, - timeout=aiohttp.ClientTimeout(total=3) + timeout=aiohttp.ClientTimeout(total=self.request_timeout) ) as response: - return response.status == 200 - + if response.status != 200: + return False + + if self.bundle_status_path: + status_url = f"{self.opa_url}{self.bundle_status_path}" + async with aiohttp.ClientSession(headers=self._auth_headers()) as session: + async with session.get( + status_url, + timeout=aiohttp.ClientTimeout(total=self.request_timeout) + ) as status_response: + if status_response.status != 200: + return False + + payload = await status_response.json() + if isinstance(payload, dict): + bundle_state = payload.get("status") or payload.get("bundle_status") + if bundle_state and str(bundle_state).lower() not in {"active", "ok", "ready"}: + return False + + return True + except Exception as e: logger.error(f"OPA health check failed: {e}") return False + @staticmethod + def _extract_decision(result: Dict[str, Any]) -> Tuple[bool, Dict[str, Any]]: + payload = result.get("result") if isinstance(result, dict) else None + if isinstance(payload, bool): + return payload, {"raw_result": payload} + if isinstance(payload, dict): + for key in ("allow", "result", "decision"): + value = payload.get(key) + if isinstance(value, bool): + return value, payload + return bool(payload), payload + return bool(payload), {"raw_result": payload} + + @staticmethod + def _format_decision(policy_name: str, allow: bool, payload: Dict[str, Any]) -> Dict[str, Any]: + if allow: + return { + "decision": "allow", + "rationale": f"OPA policy {policy_name} evaluation passed", + "details": payload, + } + return { + "decision": "block", + "rationale": f"OPA policy {policy_name} evaluation failed", + "details": payload, + } + class OPAEngineFactory: """Factory for creating OPA engines based on mode""" @@ -333,10 +394,16 @@ def create(settings=None) -> OPAEngine: if settings.DEMO_MODE: logger.info("🎭 Creating Demo OPA Engine (local evaluation)") return DemoOPAEngine() - else: - opa_url = getattr(settings, 'OPA_SERVER_URL', 'http://localhost:8181') - logger.info(f"🏭 Creating Production OPA Engine: {opa_url}") - return ProductionOPAEngine(opa_url) + opa_url = getattr(settings, "OPA_SERVER_URL", None) or "http://localhost:8181" + logger.info("🏭 Creating Production OPA Engine", opa_url=opa_url) + return ProductionOPAEngine( + opa_url, + policy_package=getattr(settings, "OPA_POLICY_PACKAGE", "fixops"), + health_path=getattr(settings, "OPA_HEALTH_PATH", "/health"), + bundle_status_path=getattr(settings, "OPA_BUNDLE_STATUS_PATH", None), + auth_token=getattr(settings, "OPA_AUTH_TOKEN", None), + request_timeout=getattr(settings, "OPA_REQUEST_TIMEOUT", 5), + ) # Global OPA engine instance _opa_engine_instance: Optional[OPAEngine] = None @@ -347,9 +414,16 @@ async def get_opa_engine() -> OPAEngine: if _opa_engine_instance is None: _opa_engine_instance = OPAEngineFactory.create() - + return _opa_engine_instance + +def reset_opa_engine() -> None: + """Reset the cached OPA engine instance (useful for tests).""" + + global _opa_engine_instance + _opa_engine_instance = None + # Convenience functions async def evaluate_vulnerability_policy(vulnerabilities: List[Dict[str, Any]]) -> Dict[str, Any]: """Evaluate vulnerability policy""" diff --git a/fixops-blended-enterprise/src/utils/crypto.py b/fixops-blended-enterprise/src/utils/crypto.py index af37e6064..397ddc885 100644 --- a/fixops-blended-enterprise/src/utils/crypto.py +++ b/fixops-blended-enterprise/src/utils/crypto.py @@ -8,8 +8,9 @@ import os import secrets import string +from datetime import datetime, timezone from dataclasses import dataclass, field -from typing import Optional, Dict, Any, Protocol, Tuple +from typing import Any, Dict, Mapping, Optional, Protocol, Tuple import structlog from cryptography.fernet import Fernet @@ -17,7 +18,20 @@ from cryptography.hazmat.primitives.asymmetric import padding, rsa from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC -from src.config.settings import get_settings +try: # pragma: no cover - fallback for lightweight test environments + from src.config.settings import get_settings +except ModuleNotFoundError: # pragma: no cover - used when pydantic_settings is unavailable + class _FallbackSettings: + SIGNING_PROVIDER = "env" + KEY_ID = None + SIGNING_ROTATION_SLA_DAYS = 30 + AWS_REGION = None + AZURE_VAULT_URL = None + + def get_settings() -> _FallbackSettings: + return _FallbackSettings() + +from src.services.metrics import FixOpsMetrics logger = structlog.get_logger() @@ -31,7 +45,7 @@ def sign(self, payload: bytes) -> bytes: raise NotImplementedError def verify(self, payload: bytes, signature: bytes, fingerprint: str) -> bool: - """Verify ``signature`` over ``payload`` for a public key fingerprint.""" + """Verify ``signature`` over ``payload`` for ``fingerprint``.""" raise NotImplementedError @@ -45,6 +59,17 @@ def fingerprint(self) -> str: raise NotImplementedError + @property + def last_rotated_at(self) -> Optional[datetime]: + """Return the timestamp when the signing material last rotated.""" + + raise NotImplementedError + + def attestation(self) -> Dict[str, Any]: + """Return metadata describing the backing key material.""" + + raise NotImplementedError + @dataclass class EnvKeyProvider: @@ -52,6 +77,7 @@ class EnvKeyProvider: private_key_pem: Optional[str] = None public_key_pem: Optional[str] = None + rotation_sla_days: int = 30 _public_keys: Dict[str, rsa.RSAPublicKey] = field(init=False, default_factory=dict) def __post_init__(self) -> None: @@ -80,6 +106,7 @@ def __post_init__(self) -> None: self._fingerprint = _fingerprint_public_key(self._public_key) self._register_public_key(self._fingerprint, self._public_key) + self._last_rotated = datetime.now(timezone.utc) def sign(self, payload: bytes) -> bytes: return self._private_key.sign( @@ -117,62 +144,313 @@ def rotate(self) -> str: self._public_key = self._private_key.public_key() self._fingerprint = _fingerprint_public_key(self._public_key) self._register_public_key(self._fingerprint, self._public_key) + self._last_rotated = datetime.now(timezone.utc) logger.info("Ephemeral RSA key rotated", fingerprint=self._fingerprint) return self._fingerprint def fingerprint(self) -> str: return self._fingerprint + @property + def last_rotated_at(self) -> Optional[datetime]: + return self._last_rotated + + def attestation(self) -> Dict[str, Any]: + return { + "provider": "env", + "fingerprint": self._fingerprint, + "rotation_sla_days": self.rotation_sla_days, + "last_rotated_at": self._last_rotated.isoformat() if self._last_rotated else None, + } + def _register_public_key( self, fingerprint: str, public_key: rsa.RSAPublicKey ) -> None: self._public_keys[fingerprint] = public_key +@dataclass class AWSKMSProvider: - """Stub AWS KMS provider.""" + """AWS KMS-backed key provider with rotation metadata.""" + + key_id: Optional[str] + region: Optional[str] = None + rotation_sla_days: int = 30 + kms_client: Optional[Any] = None - def __init__(self, key_id: Optional[str]): - self.key_id = key_id or "unknown" + def __post_init__(self) -> None: + if not self.key_id: + raise ValueError("AWS KMS provider requires KEY_ID to be configured") + + self.key_id = str(self.key_id) + self.region = self.region or os.getenv("AWS_REGION") or "us-east-1" + if self.kms_client is None: + try: + import boto3 # type: ignore + except Exception as exc: # pragma: no cover - optional dependency + raise RuntimeError( + "boto3 is required to use the AWS KMS signing provider" + ) from exc + + self.kms_client = boto3.client("kms", region_name=self.region) + + self._kms = self.kms_client + self._public_keys: Dict[str, rsa.RSAPublicKey] = {} + self._fingerprint = "" + self._last_rotated: Optional[datetime] = None + self._arn: Optional[str] = None + self._refresh_key_material() - def sign(self, payload: bytes) -> bytes: # pragma: no cover - integration stub - raise NotImplementedError("AWS KMS signing not implemented in this demo build") + def sign(self, payload: bytes) -> bytes: + response = self._kms.sign( # type: ignore[call-arg] + KeyId=self.key_id, + Message=payload, + MessageType="RAW", + SigningAlgorithm="RSASSA_PKCS1_V1_5_SHA_256", + ) + signature = response.get("Signature") + if not signature: + raise RuntimeError("AWS KMS did not return a signature") + return signature def verify(self, payload: bytes, signature: bytes, fingerprint: str) -> bool: - raise NotImplementedError("AWS KMS verification not implemented in this demo build") + public_key = self._public_keys.get(fingerprint) + if public_key is None: + self._refresh_key_material() + public_key = self._public_keys.get(fingerprint) + if public_key is None: + logger.warning( + "Fingerprint not recognised for AWS KMS verification", + fingerprint=fingerprint, + available=list(self._public_keys.keys()), + ) + return False + try: + public_key.verify( + signature, + payload, + padding.PKCS1v15(), + hashes.SHA256(), + ) + return True + except Exception as exc: # pragma: no cover - defensive logging + logger.error("AWS KMS signature verification failed", error=str(exc)) + return False def rotate(self) -> str: - raise NotImplementedError("AWS KMS rotation not implemented in this demo build") + if hasattr(self._kms, "rotate_key"): + response = self._kms.rotate_key(KeyId=self.key_id) + bundle = response + elif hasattr(self._kms, "create_key") and hasattr(self._kms, "update_alias"): + bundle = self._kms.create_key(KeyUsage="SIGN_VERIFY") + new_key_id = bundle.get("KeyMetadata", {}).get("KeyId") + if new_key_id: + if str(self.key_id).startswith("alias/"): + self._kms.update_alias( + AliasName=str(self.key_id), TargetKeyId=new_key_id + ) + else: + self.key_id = new_key_id + else: # pragma: no cover - integration fallback + raise RuntimeError( + "Configured AWS KMS client does not expose rotation helpers" + ) + + self._refresh_key_material(bundle) + logger.info("AWS KMS key rotated", key_id=self.key_id, arn=self._arn) + return self._fingerprint def fingerprint(self) -> str: - raise NotImplementedError("AWS KMS fingerprint not implemented in this demo build") + return self._fingerprint + + @property + def last_rotated_at(self) -> Optional[datetime]: + return self._last_rotated + + def attestation(self) -> Dict[str, Any]: + return { + "provider": "aws_kms", + "key_id": self.key_id, + "arn": self._arn, + "region": self.region, + "fingerprint": self._fingerprint, + "rotation_sla_days": self.rotation_sla_days, + "last_rotated_at": self._last_rotated.isoformat() if self._last_rotated else None, + } + + def _refresh_key_material(self, bundle: Optional[Any] = None) -> None: + if bundle is None: + bundle = self._kms.get_public_key(KeyId=self.key_id) + public_bytes = bundle.get("PublicKey") if isinstance(bundle, Mapping) else None + if public_bytes is None: + raise RuntimeError("AWS KMS did not return public key material") + public_key = serialization.load_der_public_key(public_bytes) + fingerprint = _fingerprint_public_key(public_key) + self._register_public_key(fingerprint, public_key) + self._fingerprint = fingerprint + + metadata: Mapping[str, Any] = {} + if isinstance(bundle, Mapping): + metadata = bundle.get("KeyMetadata", {}) + if not metadata: + describe = self._kms.describe_key(KeyId=self.key_id) + metadata = describe.get("KeyMetadata", {}) + + self._arn = metadata.get("Arn") or metadata.get("KeyArn") + self._last_rotated = _coerce_datetime( + metadata.get("LastRotatedDate") + or metadata.get("NextKeyRotationDate") + or metadata.get("CreationDate") + ) + + def _register_public_key(self, fingerprint: str, public_key: rsa.RSAPublicKey) -> None: + self._public_keys[fingerprint] = public_key +@dataclass class AzureKeyVaultProvider: - """Stub Azure Key Vault provider.""" + """Azure Key Vault-backed key provider.""" - def __init__(self, key_id: Optional[str]): - self.key_id = key_id or "unknown" + key_id: Optional[str] + vault_url: Optional[str] + rotation_sla_days: int = 30 + key_client: Optional[Any] = None + crypto_client: Optional[Any] = None - def sign(self, payload: bytes) -> bytes: # pragma: no cover - integration stub - raise NotImplementedError( - "Azure Key Vault signing not implemented in this demo build" - ) + def __post_init__(self) -> None: + if not self.key_id: + raise ValueError("Azure Key Vault provider requires KEY_ID") + if not self.vault_url: + raise ValueError("Azure Key Vault provider requires AZURE_VAULT_URL") + + self.key_id = str(self.key_id) + self.vault_url = str(self.vault_url) + + if self.key_client is None or self.crypto_client is None: + try: + from azure.identity import DefaultAzureCredential # type: ignore + from azure.keyvault.keys import KeyClient # type: ignore + from azure.keyvault.keys.crypto import ( # type: ignore + CryptographyClient, + SignatureAlgorithm, + ) + except Exception as exc: # pragma: no cover - optional dependency + raise RuntimeError( + "azure-identity and azure-keyvault-keys are required for the Azure signing provider" + ) from exc + + credential = DefaultAzureCredential() + key_client = self.key_client or KeyClient( + vault_url=self.vault_url, credential=credential + ) + key_bundle = key_client.get_key(self.key_id) + crypto_client = self.crypto_client or CryptographyClient( + key_bundle.id, credential=credential + ) + self.key_client = key_client + self.crypto_client = crypto_client + self._signature_algorithm = SignatureAlgorithm.rs256 + else: + self._signature_algorithm = getattr( + self.crypto_client, "default_algorithm", "RS256" + ) + + self._key_client = self.key_client + self._crypto_client = self.crypto_client + self._public_keys: Dict[str, rsa.RSAPublicKey] = {} + self._fingerprint = "" + self._last_rotated: Optional[datetime] = None + self._current_version: Optional[str] = None + self._refresh_key_material() + + def sign(self, payload: bytes) -> bytes: + response = self._crypto_client.sign(payload) # type: ignore[call-arg] + signature = _extract_signature(response) + if signature is None: + raise RuntimeError("Azure Key Vault did not return a signature") + return signature def verify(self, payload: bytes, signature: bytes, fingerprint: str) -> bool: - raise NotImplementedError( - "Azure Key Vault verification not implemented in this demo build" - ) + public_key = self._public_keys.get(fingerprint) + if public_key is None: + self._refresh_key_material() + public_key = self._public_keys.get(fingerprint) + if public_key is None: + logger.warning( + "Fingerprint not recognised for Azure Key Vault verification", + fingerprint=fingerprint, + available=list(self._public_keys.keys()), + ) + return False + try: + public_key.verify( + signature, + payload, + padding.PKCS1v15(), + hashes.SHA256(), + ) + return True + except Exception as exc: # pragma: no cover - defensive logging + logger.error("Azure Key Vault signature verification failed", error=str(exc)) + return False def rotate(self) -> str: - raise NotImplementedError( - "Azure Key Vault rotation not implemented in this demo build" + if hasattr(self._key_client, "begin_rotate_key"): + poller = self._key_client.begin_rotate_key(self.key_id) + bundle = poller.result() if hasattr(poller, "result") else poller + elif hasattr(self._key_client, "rotate_key"): + bundle = self._key_client.rotate_key(self.key_id) + else: # pragma: no cover - integration fallback + raise RuntimeError( + "Configured Azure Key Vault client does not expose rotation helpers" + ) + + self._refresh_key_material(bundle) + logger.info( + "Azure Key Vault key rotated", + key_id=self.key_id, + vault_url=self.vault_url, + version=self._current_version, ) + return self._fingerprint def fingerprint(self) -> str: - raise NotImplementedError( - "Azure Key Vault fingerprint not implemented in this demo build" + return self._fingerprint + + @property + def last_rotated_at(self) -> Optional[datetime]: + return self._last_rotated + + def attestation(self) -> Dict[str, Any]: + return { + "provider": "azure_key_vault", + "vault_url": self.vault_url, + "key_id": self.key_id, + "key_version": self._current_version, + "fingerprint": self._fingerprint, + "rotation_sla_days": self.rotation_sla_days, + "last_rotated_at": self._last_rotated.isoformat() if self._last_rotated else None, + } + + def _refresh_key_material(self, bundle: Optional[Any] = None) -> None: + if bundle is None: + bundle = self._key_client.get_key(self.key_id) + public_key = _load_public_key_from_bundle(bundle) + fingerprint = _fingerprint_public_key(public_key) + self._register_public_key(fingerprint, public_key) + self._fingerprint = fingerprint + + properties = _extract_bundle_properties(bundle) + self._current_version = properties.get("version") + rotation_hint = ( + properties.get("updated_on") + or properties.get("created_on") + or properties.get("not_before") ) + self._last_rotated = _coerce_datetime(rotation_hint) + + def _register_public_key(self, fingerprint: str, public_key: rsa.RSAPublicKey) -> None: + self._public_keys[fingerprint] = public_key _KEY_PROVIDER: Optional[KeyProvider] = None @@ -187,13 +465,22 @@ def get_key_provider() -> KeyProvider: settings = get_settings() provider_name = (settings.SIGNING_PROVIDER or "env").lower() + rotation_sla = getattr(settings, "SIGNING_ROTATION_SLA_DAYS", 30) if provider_name == "aws_kms": - _KEY_PROVIDER = AWSKMSProvider(settings.KEY_ID) + _KEY_PROVIDER = AWSKMSProvider( + settings.KEY_ID, + region=getattr(settings, "AWS_REGION", None), + rotation_sla_days=rotation_sla, + ) elif provider_name == "azure_key_vault": - _KEY_PROVIDER = AzureKeyVaultProvider(settings.KEY_ID) + _KEY_PROVIDER = AzureKeyVaultProvider( + settings.KEY_ID, + vault_url=getattr(settings, "AZURE_VAULT_URL", None), + rotation_sla_days=rotation_sla, + ) else: - _KEY_PROVIDER = EnvKeyProvider() + _KEY_PROVIDER = EnvKeyProvider(rotation_sla_days=rotation_sla) logger.info("Signing provider initialised", provider=provider_name) return _KEY_PROVIDER @@ -216,6 +503,123 @@ def _fingerprint_public_key(public_key: rsa.RSAPublicKey) -> str: return ":".join([digest[i : i + 2] for i in range(0, len(digest), 2)]) +def _decode_base64url(data: str) -> bytes: + padding = "=" * (-len(data) % 4) + return base64.urlsafe_b64decode(data + padding) + + +def _coerce_bigint_bytes(value: Any) -> Optional[bytes]: + if value is None: + return None + if isinstance(value, bytes): + return value + if isinstance(value, int): + length = (value.bit_length() + 7) // 8 or 1 + return value.to_bytes(length, "big") + if isinstance(value, str): + try: + return _decode_base64url(value) + except Exception: + return None + return None + + +def _load_public_key_from_bundle(bundle: Any) -> rsa.RSAPublicKey: + if isinstance(bundle, Mapping): + pem = bundle.get("public_key_pem") or bundle.get("PublicKeyPem") + if pem: + material = pem if isinstance(pem, bytes) else str(pem).encode() + return serialization.load_pem_public_key(material) + pem_attr = getattr(bundle, "public_key_pem", None) + if pem_attr: + material = pem_attr if isinstance(pem_attr, bytes) else str(pem_attr).encode() + return serialization.load_pem_public_key(material) + + jwk = None + if isinstance(bundle, Mapping): + jwk = bundle.get("key") or bundle + else: + jwk = getattr(bundle, "key", None) + + if isinstance(jwk, Mapping): + modulus = _coerce_bigint_bytes(jwk.get("n") or jwk.get("modulus")) + exponent = _coerce_bigint_bytes(jwk.get("e") or jwk.get("exponent")) + else: + modulus = _coerce_bigint_bytes(getattr(jwk, "n", None)) + exponent = _coerce_bigint_bytes(getattr(jwk, "e", None)) + + if modulus and exponent: + n_value = int.from_bytes(modulus, "big") + e_value = int.from_bytes(exponent, "big") + return rsa.RSAPublicNumbers(e_value, n_value).public_key() + + raise RuntimeError("Remote key bundle did not include RSA public material") + + +def _extract_bundle_properties(bundle: Any) -> Dict[str, Any]: + properties: Dict[str, Any] = {} + candidate = None + if isinstance(bundle, Mapping): + candidate = bundle.get("properties") or bundle + if isinstance(candidate, Mapping): + properties.update(candidate) + else: + candidate = getattr(bundle, "properties", None) + if candidate is not None: + if isinstance(candidate, Mapping): + properties.update(candidate) + else: + for attr in ("version", "updated_on", "created_on", "not_before", "vault_url"): + if hasattr(candidate, attr): + properties[attr] = getattr(candidate, attr) + + identifier = properties.get("id") or getattr(bundle, "id", None) + if isinstance(identifier, str): + properties.setdefault("id", identifier) + parts = identifier.rstrip("/").split("/") + if "keys" in parts: + try: + key_index = parts.index("keys") + except ValueError: + key_index = -1 + if key_index > 1 and key_index + 2 < len(parts): + vault = "/".join(parts[: key_index]) + properties.setdefault("vault_url", vault) + properties.setdefault("version", parts[-1]) + + return properties + + +def _extract_signature(response: Any) -> Optional[bytes]: + if response is None: + return None + if isinstance(response, Mapping): + candidate = response.get("signature") or response.get("result") + if isinstance(candidate, bytes): + return candidate + signature = getattr(response, "signature", None) + if isinstance(signature, bytes): + return signature + result = getattr(response, "result", None) + if isinstance(result, bytes): + return result + return None + + +def _coerce_datetime(value: Any) -> Optional[datetime]: + if value is None: + return None + if isinstance(value, datetime): + return value if value.tzinfo else value.replace(tzinfo=timezone.utc) + if isinstance(value, str): + try: + parsed = datetime.fromisoformat(value) + except ValueError: + return None + return parsed if parsed.tzinfo else parsed.replace(tzinfo=timezone.utc) + return None + + def rsa_sign(json_bytes: bytes) -> Tuple[bytes, str]: """Sign ``json_bytes`` with the configured provider and return signature + fingerprint.""" @@ -231,6 +635,59 @@ def rsa_verify(json_bytes: bytes, signature: bytes, pub_fingerprint: str) -> boo return provider.verify(json_bytes, signature, pub_fingerprint) +def evaluate_rotation_health( + provider: Optional[KeyProvider] = None, + *, + max_age_days: Optional[int] = None, +) -> Dict[str, Any]: + """Evaluate signing-key rotation health and emit observability signals.""" + + provider = provider or get_key_provider() + settings = get_settings() + max_age = max_age_days or getattr(settings, "SIGNING_ROTATION_SLA_DAYS", 30) + + last_rotated = provider.last_rotated_at + if last_rotated and last_rotated.tzinfo is None: + last_rotated = last_rotated.replace(tzinfo=timezone.utc) + + now = datetime.now(timezone.utc) + if last_rotated is None: + age_days = float(max_age + 1) + healthy = False + else: + delta = now - last_rotated + age_days = delta.total_seconds() / 86400.0 + healthy = age_days <= max_age + + attestation = provider.attestation() if hasattr(provider, "attestation") else {} + provider_name = attestation.get("provider") or provider.__class__.__name__ + FixOpsMetrics.record_key_rotation(provider_name, age_days, healthy) + + if not healthy: + logger.warning( + "Signing key rotation SLA breached", + provider=provider_name, + age_days=age_days, + max_age_days=max_age, + ) + + attestation.setdefault("provider", provider_name) + attestation.setdefault( + "last_rotated_at", + last_rotated.isoformat() if last_rotated is not None else None, + ) + + return { + "provider": provider_name, + "fingerprint": provider.fingerprint(), + "last_rotated_at": last_rotated.isoformat() if last_rotated else None, + "age_days": age_days, + "max_age_days": max_age, + "healthy": healthy, + "attestation": attestation, + } + + def generate_secure_token(length: int = 32) -> str: """ Generate cryptographically secure random token diff --git a/fixops/cli.py b/fixops/cli.py index 413e9d4de..c67826a0c 100644 --- a/fixops/cli.py +++ b/fixops/cli.py @@ -213,6 +213,29 @@ def _handle_run(args: argparse.Namespace) -> int: if args.env: _apply_env_overrides(args.env) + if getattr(args, "signing_provider", None): + os.environ["SIGNING_PROVIDER"] = args.signing_provider + if getattr(args, "signing_key_id", None): + os.environ["KEY_ID"] = args.signing_key_id + if getattr(args, "signing_region", None): + os.environ["AWS_REGION"] = args.signing_region + if getattr(args, "azure_vault_url", None): + os.environ["AZURE_VAULT_URL"] = args.azure_vault_url + if getattr(args, "rotation_sla_days", None) is not None: + os.environ["SIGNING_ROTATION_SLA_DAYS"] = str(args.rotation_sla_days) + if getattr(args, "opa_url", None): + os.environ["OPA_SERVER_URL"] = args.opa_url + if getattr(args, "opa_token", None): + os.environ["OPA_AUTH_TOKEN"] = args.opa_token + if getattr(args, "opa_package", None): + os.environ["OPA_POLICY_PACKAGE"] = args.opa_package + if getattr(args, "opa_health_path", None): + os.environ["OPA_HEALTH_PATH"] = args.opa_health_path + if getattr(args, "opa_bundle_status_path", None): + os.environ["OPA_BUNDLE_STATUS_PATH"] = args.opa_bundle_status_path + if getattr(args, "opa_timeout", None) is not None: + os.environ["OPA_REQUEST_TIMEOUT"] = str(args.opa_timeout) + overlay = load_overlay(args.overlay) if args.disable_modules: for module in args.disable_modules: @@ -409,6 +432,53 @@ def build_parser() -> argparse.ArgumentParser: action="store_true", help="Disable exploit feed auto-refresh to avoid network calls", ) + run_parser.add_argument( + "--signing-provider", + choices=["env", "aws_kms", "azure_key_vault"], + help="Override the signing backend provider for this run", + ) + run_parser.add_argument( + "--signing-key-id", + help="Signing key alias or identifier when using remote providers", + ) + run_parser.add_argument( + "--signing-region", + help="AWS region to use when invoking KMS", + ) + run_parser.add_argument( + "--azure-vault-url", + help="Azure Key Vault URL for remote signing", + ) + run_parser.add_argument( + "--rotation-sla-days", + type=int, + help="Override the signing key rotation SLA in days", + ) + run_parser.add_argument( + "--opa-url", + help="Override the OPA server URL for remote policy checks", + ) + run_parser.add_argument( + "--opa-token", + help="Bearer token for authenticating with the OPA server", + ) + run_parser.add_argument( + "--opa-package", + help="OPA policy package to query (e.g. fixops.policies)", + ) + run_parser.add_argument( + "--opa-health-path", + help="Custom OPA health endpoint path", + ) + run_parser.add_argument( + "--opa-bundle-status-path", + help="OPA bundle status endpoint for readiness checks", + ) + run_parser.add_argument( + "--opa-timeout", + type=int, + help="Timeout in seconds for OPA requests", + ) run_parser.add_argument( "--evidence-dir", type=Path, diff --git a/fixops/configuration.py b/fixops/configuration.py index be5eaf124..f8c473f09 100644 --- a/fixops/configuration.py +++ b/fixops/configuration.py @@ -71,6 +71,7 @@ def _deep_merge(base: MutableMapping[str, Any], overrides: Mapping[str, Any]) -> "auth", "data", "toggles", + "signing", "guardrails", "metadata", "context_engine", @@ -78,6 +79,7 @@ def _deep_merge(base: MutableMapping[str, Any], overrides: Mapping[str, Any]) -> "onboarding", "compliance", "policy_automation", + "policy_engine", "pricing", "limits", "ai_agents", @@ -133,6 +135,49 @@ def _string_list(value: Any, location: str) -> list[str]: return cleaned +def _validate_signing_config(raw: Any) -> Dict[str, Any]: + config: Dict[str, Any] = {"provider": "env", "rotation_sla_days": 30} + if raw is None: + return config + mapping = _require_mapping(raw, "signing") + unexpected = set(mapping) - { + "provider", + "key_id", + "aws_region", + "azure_vault_url", + "rotation_sla_days", + } + if unexpected: + raise ValueError( + f"signing contains unexpected keys: {sorted(unexpected)}" + ) + provider = mapping.get("provider") + if provider is not None: + provider_value = _require_string(provider, "signing.provider").lower() + if provider_value not in {"env", "aws_kms", "azure_key_vault"}: + raise ValueError( + "signing.provider must be one of ['env', 'aws_kms', 'azure_key_vault']" + ) + config["provider"] = provider_value + key_id = mapping.get("key_id") + if key_id is not None: + config["key_id"] = _require_string(key_id, "signing.key_id") + aws_region = mapping.get("aws_region") + if aws_region is not None: + config["aws_region"] = _require_string(aws_region, "signing.aws_region") + azure_vault_url = mapping.get("azure_vault_url") + if azure_vault_url is not None: + config["azure_vault_url"] = _require_string( + azure_vault_url, "signing.azure_vault_url" + ) + rotation_sla = mapping.get("rotation_sla_days") + if rotation_sla is not None: + if not isinstance(rotation_sla, int) or rotation_sla <= 0: + raise ValueError("signing.rotation_sla_days must be a positive integer") + config["rotation_sla_days"] = rotation_sla + return config + + def _validate_compliance_frameworks(raw: Any, location: str) -> list[Dict[str, Any]]: if raw is None: return [] @@ -379,6 +424,74 @@ def _validate_policy_config(raw: Optional[Mapping[str, Any]]) -> Dict[str, Any]: if profiles: config["profiles"] = profiles return config + + +def _validate_policy_engine_config(raw: Optional[Mapping[str, Any]]) -> Dict[str, Any]: + if not raw: + return {} + + mapping = _require_mapping(raw, "policy_engine") + unexpected = set(mapping) - {"opa"} + if unexpected: + raise ValueError(f"policy_engine contains unexpected keys: {sorted(unexpected)}") + + config: Dict[str, Any] = {} + opa_raw = mapping.get("opa") + if opa_raw is not None: + opa_mapping = _require_mapping(opa_raw, "policy_engine.opa") + allowed = { + "enabled", + "url", + "policy_package", + "health_path", + "bundle_status_path", + "auth_token_env", + "request_timeout_seconds", + } + unexpected_opa = set(opa_mapping) - allowed + if unexpected_opa: + raise ValueError( + "policy_engine.opa contains unexpected keys: {keys}".format( + keys=sorted(unexpected_opa) + ) + ) + + opa_config: Dict[str, Any] = {} + if "enabled" in opa_mapping and opa_mapping["enabled"] is not None: + opa_config["enabled"] = bool(opa_mapping["enabled"]) + if opa_mapping.get("url") is not None: + opa_config["url"] = _require_string( + opa_mapping["url"], "policy_engine.opa.url" + ) + if opa_mapping.get("policy_package") is not None: + opa_config["policy_package"] = _require_string( + opa_mapping["policy_package"], "policy_engine.opa.policy_package" + ) + if opa_mapping.get("health_path") is not None: + opa_config["health_path"] = _require_string( + opa_mapping["health_path"], "policy_engine.opa.health_path" + ) + if opa_mapping.get("bundle_status_path") is not None: + opa_config["bundle_status_path"] = _require_string( + opa_mapping["bundle_status_path"], "policy_engine.opa.bundle_status_path" + ) + if opa_mapping.get("auth_token_env") is not None: + opa_config["auth_token_env"] = _require_string( + opa_mapping["auth_token_env"], "policy_engine.opa.auth_token_env" + ) + if opa_mapping.get("request_timeout_seconds") is not None: + timeout_value = opa_mapping["request_timeout_seconds"] + if not isinstance(timeout_value, int) or timeout_value <= 0: + raise ValueError( + "policy_engine.opa.request_timeout_seconds must be a positive integer" + ) + opa_config["request_timeout_seconds"] = timeout_value + + if opa_config: + opa_config.setdefault("enabled", True) + config["opa"] = opa_config + + return config class _OverlayDocument(BaseModel): """Pydantic schema for validating overlay documents.""" @@ -390,6 +503,7 @@ class _OverlayDocument(BaseModel): auth: Optional[Dict[str, Any]] = None data: Optional[Dict[str, Any]] = None toggles: Optional[Dict[str, Any]] = None + signing: Optional[Dict[str, Any]] = None guardrails: Optional[Dict[str, Any]] = None metadata: Optional[Dict[str, Any]] = None context_engine: Optional[Dict[str, Any]] = None @@ -397,6 +511,7 @@ class _OverlayDocument(BaseModel): onboarding: Optional[Dict[str, Any]] = None compliance: Optional[Dict[str, Any]] = None policy_automation: Optional[Dict[str, Any]] = None + policy_engine: Optional[Dict[str, Any]] = None pricing: Optional[Dict[str, Any]] = None limits: Optional[Dict[str, Any]] = None ai_agents: Optional[Dict[str, Any]] = None @@ -451,6 +566,7 @@ class OverlayConfig: auth: Dict[str, Any] = field(default_factory=dict) data: Dict[str, Any] = field(default_factory=dict) toggles: Dict[str, Any] = field(default_factory=dict) + signing: Dict[str, Any] = field(default_factory=dict) metadata: Dict[str, Any] = field(default_factory=dict) guardrails: Dict[str, Any] = field(default_factory=dict) context_engine: Dict[str, Any] = field(default_factory=dict) @@ -458,6 +574,7 @@ class OverlayConfig: onboarding: Dict[str, Any] = field(default_factory=dict) compliance: Dict[str, Any] = field(default_factory=dict) policy_automation: Dict[str, Any] = field(default_factory=dict) + policy_engine: Dict[str, Any] = field(default_factory=dict) pricing: Dict[str, Any] = field(default_factory=dict) limits: Dict[str, Any] = field(default_factory=dict) ai_agents: Dict[str, Any] = field(default_factory=dict) @@ -505,6 +622,7 @@ def to_sanitised_dict(self) -> Dict[str, Any]: "auth": self._mask(self.auth), "data": self.data, "toggles": self.toggles, + "signing": self.signing_settings, "metadata": self.metadata, "guardrails": self.guardrail_policy, "context_engine": self.context_engine_settings, @@ -512,6 +630,7 @@ def to_sanitised_dict(self) -> Dict[str, Any]: "onboarding": self.onboarding_settings, "compliance": self.compliance_settings, "policy_automation": self.policy_settings, + "policy_engine": self.policy_engine_settings, "pricing": self.pricing, "limits": self.limits, "ai_agents": self.ai_agents, @@ -526,6 +645,25 @@ def to_sanitised_dict(self) -> Dict[str, Any]: } return payload + @property + def signing_settings(self) -> Dict[str, Any]: + settings = dict(self.signing) + provider = str(settings.get("provider") or "env").lower() + payload: Dict[str, Any] = {"provider": provider} + key_id = settings.get("key_id") + if isinstance(key_id, str) and key_id.strip(): + payload["key_id"] = key_id + aws_region = settings.get("aws_region") + if isinstance(aws_region, str) and aws_region.strip(): + payload["aws_region"] = aws_region + azure_vault_url = settings.get("azure_vault_url") + if isinstance(azure_vault_url, str) and azure_vault_url.strip(): + payload["azure_vault_url"] = azure_vault_url + rotation_sla = settings.get("rotation_sla_days") + if isinstance(rotation_sla, int) and rotation_sla > 0: + payload["rotation_sla_days"] = rotation_sla + return payload + @staticmethod def _mask(section: Mapping[str, Any]) -> Dict[str, Any]: masked: Dict[str, Any] = {} @@ -649,6 +787,33 @@ def policy_settings(self) -> Dict[str, Any]: base.pop("profiles", None) return base + @property + def policy_engine_settings(self) -> Dict[str, Any]: + config: Dict[str, Any] = {} + opa = self.policy_engine.get("opa") if isinstance(self.policy_engine, Mapping) else None + if isinstance(opa, Mapping): + payload: Dict[str, Any] = {} + enabled = opa.get("enabled") + if enabled is not None: + payload["enabled"] = bool(enabled) + for key in ( + "url", + "policy_package", + "health_path", + "bundle_status_path", + "auth_token_env", + ): + value = opa.get(key) + if isinstance(value, str) and value.strip(): + payload[key] = value.strip() + timeout = opa.get("request_timeout_seconds") + if isinstance(timeout, int) and timeout > 0: + payload["request_timeout_seconds"] = timeout + if payload: + payload.setdefault("enabled", True) + config["opa"] = payload + return config + @property def ssdlc_settings(self) -> Dict[str, Any]: settings = dict(self.ssdlc) @@ -972,6 +1137,7 @@ def load_overlay( "auth": document.auth or {}, "data": document.data or {}, "toggles": document.toggles or {}, + "signing": document.signing or {}, "guardrails": document.guardrails or {}, "metadata": {"source_path": str(candidate_path)} | (document.metadata or {}), "context_engine": document.context_engine or {}, @@ -979,6 +1145,7 @@ def load_overlay( "onboarding": document.onboarding or {}, "compliance": document.compliance or {}, "policy_automation": document.policy_automation or {}, + "policy_engine": document.policy_engine or {}, "pricing": document.pricing or {}, "limits": document.limits or {}, "ai_agents": document.ai_agents or {}, @@ -1000,9 +1167,12 @@ def load_overlay( try: base["compliance"] = _validate_compliance_config(base.get("compliance")) base["policy_automation"] = _validate_policy_config(base.get("policy_automation")) + base["policy_engine"] = _validate_policy_engine_config(base.get("policy_engine")) except ValueError as exc: raise ValueError(f"Overlay validation failed: {exc}") from exc + base["signing"] = _validate_signing_config(base.get("signing")) + toggles = base.setdefault("toggles", {}) toggles.setdefault("require_design_input", True) toggles.setdefault("auto_attach_overlay_metadata", True) @@ -1050,6 +1220,7 @@ def load_overlay( auth=dict(base.get("auth", {})), data=dict(base.get("data", {})), toggles=dict(toggles), + signing=dict(base.get("signing", {})), metadata=dict(metadata), guardrails=dict(base.get("guardrails", {})), context_engine=dict(base.get("context_engine", {})), @@ -1057,6 +1228,7 @@ def load_overlay( onboarding=dict(base.get("onboarding", {})), compliance=dict(base.get("compliance", {})), policy_automation=dict(base.get("policy_automation", {})), + policy_engine=dict(base.get("policy_engine", {})), pricing=dict(base.get("pricing", {})), limits=dict(base.get("limits", {})), ai_agents=dict(base.get("ai_agents", {})), diff --git a/pydantic/__init__.py b/pydantic/__init__.py index 3043cf74c..1eb53d782 100644 --- a/pydantic/__init__.py +++ b/pydantic/__init__.py @@ -4,12 +4,34 @@ from dataclasses import dataclass import sys import types -from typing import Any, Dict, List, Optional, Tuple, Type, Union, get_args, get_origin, get_type_hints +from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union, get_args, get_origin, get_type_hints + + +class ConfigDict(dict): + """Minimal stand-in for Pydantic's configuration container.""" + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + + +def field_validator(*_fields: str, mode: str | None = None) -> Callable[[Callable[..., Any]], Callable[..., Any]]: + def decorator(func: Callable[..., Any]) -> Callable[..., Any]: + return func + + return decorator + + +def computed_field(*, return_type: Any | None = None) -> Callable[[Callable[..., Any]], property]: + def decorator(func: Callable[..., Any]) -> property: + return property(func) + + return decorator @dataclass class FieldInfo: default: Any = ... + default_factory: Optional[Callable[[], Any]] = None min_length: Optional[int] = None max_length: Optional[int] = None ge: Optional[float] = None @@ -20,14 +42,20 @@ class FieldInfo: def Field( default: Any = ..., *, + default_factory: Optional[Callable[[], Any]] = None, min_length: Optional[int] = None, max_length: Optional[int] = None, ge: Optional[float] = None, le: Optional[float] = None, description: Optional[str] = None, ) -> FieldInfo: + if default is ... and default_factory is not None: + default_value = default_factory() + else: + default_value = default return FieldInfo( - default=default, + default=default_value, + default_factory=default_factory, min_length=min_length, max_length=max_length, ge=ge, @@ -80,6 +108,7 @@ def __new__(mcls, name, bases, namespace): class BaseModel(metaclass=BaseModelMeta): __fields__: Dict[str, Tuple[Any, FieldInfo]] + model_config: ConfigDict = ConfigDict() def __init__(self, **data: Any) -> None: cls = self.__class__ @@ -133,6 +162,29 @@ def __init__(self, **data: Any) -> None: def dict(self) -> Dict[str, Any]: return {name: getattr(self, name) for name in self.__class__.__fields__} + @classmethod + def model_validate(cls, data: Any) -> "BaseModel": + if isinstance(data, cls): + return data + if isinstance(data, BaseModel): + return cls(**data.dict()) + if isinstance(data, dict): + return cls(**data) + + if getattr(cls.model_config, "get", None): + from_attributes = cls.model_config.get("from_attributes") + else: + from_attributes = getattr(cls.model_config, "from_attributes", False) + + if from_attributes: + values = {} + for name in cls.__fields__: + if hasattr(data, name): + values[name] = getattr(data, name) + return cls(**values) + + raise TypeError("Object is not valid for model validation") + # ------------------------------------------------------------------ # Validation helpers # ------------------------------------------------------------------ diff --git a/pytest.ini b/pytest.ini index 5ee647716..9ee0463bc 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,2 +1,3 @@ [pytest] testpaths = tests +addopts = -p no:asyncio diff --git a/tests/conftest.py b/tests/conftest.py index a847831e4..c86173e2d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,7 +6,9 @@ from pathlib import Path PROJECT_ROOT = Path(__file__).resolve().parents[1] -SRC_PATH = PROJECT_ROOT / "fixops-blended-enterprise" / "src" +SRC_PACKAGE_ROOT = PROJECT_ROOT / "fixops-blended-enterprise" +SRC_PATH = SRC_PACKAGE_ROOT / "src" -if str(SRC_PATH) not in sys.path: - sys.path.insert(0, str(SRC_PATH)) +for path in (SRC_PACKAGE_ROOT, SRC_PATH): + if str(path) not in sys.path: + sys.path.append(str(path)) diff --git a/tests/test_cli.py b/tests/test_cli.py index 03e72aca7..3be41f638 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,4 +1,5 @@ import json +import os from pathlib import Path import pytest @@ -12,6 +13,11 @@ def _write_json(path: Path, payload: dict) -> None: def test_cli_run_pipeline(tmp_path: Path, monkeypatch: pytest.MonkeyPatch, capsys): monkeypatch.setenv("FIXOPS_API_TOKEN", "demo-token") + monkeypatch.delenv("SIGNING_PROVIDER", raising=False) + monkeypatch.delenv("KEY_ID", raising=False) + monkeypatch.delenv("AWS_REGION", raising=False) + monkeypatch.delenv("AZURE_VAULT_URL", raising=False) + monkeypatch.delenv("SIGNING_ROTATION_SLA_DAYS", raising=False) design_csv = ( "component,owner,criticality,notes\n" @@ -107,12 +113,42 @@ def test_cli_run_pipeline(tmp_path: Path, monkeypatch: pytest.MonkeyPatch, capsy "--pretty", "--include-overlay", "--offline", + "--signing-provider", + "env", + "--signing-key-id", + "alias/demo", + "--signing-region", + "us-east-1", + "--rotation-sla-days", + "60", + "--opa-url", + "https://opa.internal", + "--opa-token", + "demo-token", + "--opa-package", + "fixops.security", + "--opa-health-path", + "/healthz", + "--opa-bundle-status-path", + "/bundles/fixops/status", + "--opa-timeout", + "9", "--evidence-dir", str(evidence_dir), ] ) assert exit_code == 0 + assert os.getenv("SIGNING_PROVIDER") == "env" + assert os.getenv("KEY_ID") == "alias/demo" + assert os.getenv("AWS_REGION") == "us-east-1" + assert os.getenv("SIGNING_ROTATION_SLA_DAYS") == "60" + assert os.getenv("OPA_SERVER_URL") == "https://opa.internal" + assert os.getenv("OPA_AUTH_TOKEN") == "demo-token" + assert os.getenv("OPA_POLICY_PACKAGE") == "fixops.security" + assert os.getenv("OPA_HEALTH_PATH") == "/healthz" + assert os.getenv("OPA_BUNDLE_STATUS_PATH") == "/bundles/fixops/status" + assert os.getenv("OPA_REQUEST_TIMEOUT") == "9" result_payload = json.loads(output_path.read_text(encoding="utf-8")) assert result_payload["status"] == "ok" assert result_payload["modules"]["executed"] diff --git a/tests/test_http_metrics.py b/tests/test_http_metrics.py new file mode 100644 index 000000000..f7e7d43b3 --- /dev/null +++ b/tests/test_http_metrics.py @@ -0,0 +1,109 @@ +"""Regression tests for HTTP observability instrumentation.""" + +from __future__ import annotations + +import asyncio +import sys +import types +from typing import Generator + +import pytest +from fastapi import HTTPException +from starlette.responses import Response + +# Patch pydantic_settings before importing application modules. Tests use a +# lightweight stand-in so importing `get_settings` does not require optional +# dependencies. +if "pydantic_settings" not in sys.modules: # pragma: no cover - import guard + from pydantic import FieldInfo + + pydantic_settings = types.ModuleType("pydantic_settings") + + class _BaseSettings: + def __init__(self, **overrides): + for name, value in self.__class__.__dict__.items(): + if name.startswith("_") or callable(value) or isinstance(value, property): + continue + default = value.default if isinstance(value, FieldInfo) else value + setattr(self, name, overrides.get(name, default)) + + pydantic_settings.BaseSettings = _BaseSettings + pydantic_settings.SettingsConfigDict = dict + sys.modules["pydantic_settings"] = pydantic_settings + +from src.core.middleware import PerformanceMiddleware +from src.services.metrics import FixOpsMetrics + + +@pytest.fixture(autouse=True) +def reset_metrics() -> Generator[None, None, None]: + """Ensure every test runs with a clean metrics slate.""" + + FixOpsMetrics.reset_runtime_stats() + yield + FixOpsMetrics.reset_runtime_stats() + + +def _build_request(path: str, method: str) -> types.SimpleNamespace: + return types.SimpleNamespace( + url=types.SimpleNamespace(path=path), + method=method, + state=types.SimpleNamespace(), + ) + + +def test_successful_hot_path_updates_latency_and_ratio() -> None: + """Successful decision requests should publish hot path latency without errors.""" + + middleware = PerformanceMiddleware(lambda scope, receive, send: None) # type: ignore[arg-type] + request = _build_request("/api/v1/decisions/make-decision", "POST") + + async def call_next(_: object) -> Response: + return Response(content="ok", media_type="application/json", status_code=200) + + response = asyncio.run(middleware.dispatch(request, call_next)) + + assert response.status_code == 200 + assert FixOpsMetrics.get_error_ratio("decision") == 0 + + latency = FixOpsMetrics.get_hot_path_latency_us("/api/v1/decisions/make-decision") + assert latency is not None and latency > 0 + assert FixOpsMetrics.get_inflight("decision") == 0 + + +def test_policy_errors_drive_error_ratio() -> None: + """Failing policy evaluations must be tracked as errors for observability dashboards.""" + + middleware = PerformanceMiddleware(lambda scope, receive, send: None) # type: ignore[arg-type] + request = _build_request("/api/v1/policy/evaluate", "POST") + + async def call_next(_: object) -> Response: + raise HTTPException(status_code=503, detail="policy gate offline") + + with pytest.raises(HTTPException): + asyncio.run(middleware.dispatch(request, call_next)) + + assert FixOpsMetrics.get_error_ratio("policy") == 1 + + latency = FixOpsMetrics.get_hot_path_latency_us("/api/v1/policy/evaluate") + assert latency is not None and latency > 0 + assert FixOpsMetrics.get_inflight("policy") == 0 + + +def test_evidence_requests_are_classified_correctly() -> None: + """Evidence retrieval should register under the evidence family for ratios.""" + + middleware = PerformanceMiddleware(lambda scope, receive, send: None) # type: ignore[arg-type] + request = _build_request("/api/v1/decisions/evidence/abc123", "GET") + + async def call_next(_: object) -> Response: + return Response(content="{}", media_type="application/json", status_code=200) + + response = asyncio.run(middleware.dispatch(request, call_next)) + + assert response.status_code == 200 + assert FixOpsMetrics.get_error_ratio("evidence") == 0 + + latency = FixOpsMetrics.get_hot_path_latency_us("/api/v1/decisions/evidence") + assert latency is not None and latency > 0 + assert FixOpsMetrics.get_inflight("evidence") == 0 diff --git a/tests/test_key_management.py b/tests/test_key_management.py new file mode 100644 index 000000000..cb217c4e4 --- /dev/null +++ b/tests/test_key_management.py @@ -0,0 +1,239 @@ +"""Key management regression tests for remote signing providers.""" + +import base64 +from datetime import datetime, timedelta, timezone +from types import SimpleNamespace +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import padding, rsa + +from src.services.metrics import FixOpsMetrics +from src.utils.crypto import ( + AWSKMSProvider, + AzureKeyVaultProvider, + evaluate_rotation_health, +) + + +def _encode_b64url(value: int) -> str: + raw = value.to_bytes((value.bit_length() + 7) // 8 or 1, "big") + return base64.urlsafe_b64encode(raw).rstrip(b"=").decode() + + +class StubKMSClient: + """Lightweight AWS KMS stub for exercising the provider logic.""" + + def __init__(self, *, initial_age_days: int = 0) -> None: + self._keys: dict[str, rsa.RSAPrivateKey] = {} + self._metadata: dict[str, dict[str, datetime | str]] = {} + self._aliases: dict[str, str] = {} + key_id = self._create_key(age_days=initial_age_days) + self._aliases["alias/decision"] = key_id + + def _create_key(self, *, age_days: int) -> str: + key_id = f"key-{len(self._keys) + 1}" + private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + created = datetime.now(timezone.utc) - timedelta(days=age_days) + self._keys[key_id] = private_key + self._metadata[key_id] = { + "CreationDate": created, + "Arn": f"arn:aws:kms:::key/{key_id}", + } + return key_id + + def _resolve(self, key_id: str) -> str: + if key_id.startswith("alias/"): + return self._aliases[key_id] + return key_id + + def get_public_key(self, KeyId: str): # noqa: N802 - AWS casing + resolved = self._resolve(KeyId) + public_key = self._keys[resolved].public_key() + public_bytes = public_key.public_bytes( + serialization.Encoding.DER, + serialization.PublicFormat.SubjectPublicKeyInfo, + ) + metadata = dict(self._metadata[resolved]) + metadata["KeyId"] = resolved + return {"KeyMetadata": metadata, "PublicKey": public_bytes} + + def describe_key(self, KeyId: str): # noqa: N802 + resolved = self._resolve(KeyId) + metadata = dict(self._metadata[resolved]) + metadata["KeyId"] = resolved + return {"KeyMetadata": metadata} + + def sign(self, KeyId: str, Message: bytes, MessageType: str, SigningAlgorithm: str): # noqa: N802 + resolved = self._resolve(KeyId) + signature = self._keys[resolved].sign( + Message, + padding.PKCS1v15(), + hashes.SHA256(), + ) + return {"Signature": signature, "KeyId": resolved} + + def rotate_key(self, KeyId: str): # noqa: N802 + alias = KeyId if KeyId.startswith("alias/") else None + new_key_id = self._create_key(age_days=0) + if alias: + self._aliases[alias] = new_key_id + self._metadata[new_key_id]["LastRotatedDate"] = datetime.now(timezone.utc) + return self.get_public_key(alias or new_key_id) + + def age_current_key(self, days: int) -> None: + key_id = self._aliases.get("alias/decision") + if key_id: + self._metadata[key_id]["CreationDate"] = datetime.now(timezone.utc) - timedelta( + days=days + ) + + +class StubAzureKeyClient: + """Minimal Azure Key Vault stub with rotate/get support.""" + + def __init__(self, *, initial_age_days: int = 0) -> None: + self.vault_url = "https://vault.example" + self.key_name = "decision" + self._versions: dict[str, rsa.RSAPrivateKey] = {} + self._rotated: dict[str, datetime] = {} + self.current_version = self._create_version(age_days=initial_age_days) + + def _create_version(self, *, age_days: int) -> str: + version = f"v{len(self._versions) + 1}" + private = rsa.generate_private_key(public_exponent=65537, key_size=2048) + rotated = datetime.now(timezone.utc) - timedelta(days=age_days) + self._versions[version] = private + self._rotated[version] = rotated + return version + + def get_key(self, key_name: str): + if key_name != self.key_name: + raise ValueError("unknown key requested") + private = self._versions[self.current_version] + numbers = private.public_key().public_numbers() + jwk = {"kty": "RSA", "n": _encode_b64url(numbers.n), "e": _encode_b64url(numbers.e)} + identifier = f"{self.vault_url}/keys/{self.key_name}/{self.current_version}" + properties = SimpleNamespace( + version=self.current_version, + updated_on=self._rotated[self.current_version], + vault_url=self.vault_url, + id=identifier, + ) + return SimpleNamespace(key=jwk, properties=properties, id=identifier) + + def rotate_key(self, key_name: str): + if key_name != self.key_name: + raise ValueError("unknown key requested") + self.current_version = self._create_version(age_days=0) + return self.get_key(key_name) + + def begin_rotate_key(self, key_name: str): + client = self + + class _Poller: + def result(self_nonlocal): # pragma: no cover - simple delegation + return client.rotate_key(key_name) + + return _Poller() + + def private_key(self) -> rsa.RSAPrivateKey: + return self._versions[self.current_version] + + +class StubAzureCryptoClient: + """Crypto client that signs using the stub key client.""" + + default_algorithm = "RS256" + + def __init__(self, key_client: StubAzureKeyClient) -> None: + self._key_client = key_client + + def sign(self, payload: bytes): # pragma: no cover - exercised via provider + signature = self._key_client.private_key().sign( + payload, + padding.PKCS1v15(), + hashes.SHA256(), + ) + return SimpleNamespace(signature=signature) + + +def test_aws_kms_provider_signs_and_rotates() -> None: + kms = StubKMSClient(initial_age_days=5) + provider = AWSKMSProvider( + key_id="alias/decision", + region="us-east-1", + rotation_sla_days=30, + kms_client=kms, + ) + + payload = b"kms-payload" + signature = provider.sign(payload) + fingerprint = provider.fingerprint() + + assert provider.verify(payload, signature, fingerprint) + + kms.age_current_key(40) + new_fp = provider.rotate() + + assert new_fp != fingerprint + assert provider.verify(payload, signature, fingerprint) + rotated_signature = provider.sign(payload) + assert provider.verify(payload, rotated_signature, new_fp) + assert provider.last_rotated_at is not None + assert provider.attestation()["provider"] == "aws_kms" + + +def test_azure_key_vault_provider_signs_and_rotates() -> None: + key_client = StubAzureKeyClient(initial_age_days=3) + crypto_client = StubAzureCryptoClient(key_client) + provider = AzureKeyVaultProvider( + key_id="decision", + vault_url=key_client.vault_url, + rotation_sla_days=45, + key_client=key_client, + crypto_client=crypto_client, + ) + + payload = b"azure-payload" + signature = provider.sign(payload) + fingerprint = provider.fingerprint() + + assert provider.verify(payload, signature, fingerprint) + + provider.rotate() + new_fp = provider.fingerprint() + assert new_fp != fingerprint + assert provider.verify(payload, signature, fingerprint) + new_signature = provider.sign(payload) + assert provider.verify(payload, new_signature, new_fp) + attestation = provider.attestation() + assert attestation["provider"] == "azure_key_vault" + assert attestation["key_version"] is not None + + +def test_rotation_health_flags_breach() -> None: + kms = StubKMSClient(initial_age_days=50) + provider = AWSKMSProvider( + key_id="alias/decision", + region="us-east-1", + rotation_sla_days=30, + kms_client=kms, + ) + + FixOpsMetrics.reset_runtime_stats() + status = evaluate_rotation_health(provider=provider, max_age_days=30) + + assert status["healthy"] is False + assert FixOpsMetrics.get_key_rotation_health(status["provider"]) is False + assert FixOpsMetrics.get_key_rotation_age(status["provider"]) >= 30 + + # Freshly rotated key should report healthy status. + kms_fresh = StubKMSClient(initial_age_days=1) + provider_fresh = AWSKMSProvider( + key_id="alias/decision", + region="us-east-1", + rotation_sla_days=30, + kms_client=kms_fresh, + ) + FixOpsMetrics.reset_runtime_stats() + healthy = evaluate_rotation_health(provider=provider_fresh, max_age_days=30) + assert healthy["healthy"] is True diff --git a/tests/test_overlay_configuration.py b/tests/test_overlay_configuration.py index f46b7c861..36846241f 100644 --- a/tests/test_overlay_configuration.py +++ b/tests/test_overlay_configuration.py @@ -37,6 +37,7 @@ def test_load_overlay_merges_profile_and_defaults(overlay_file: Path) -> None: assert exported["auth"] == {} assert exported["guardrails"]["maturity"] == "advanced" assert exported["guardrails"]["fail_on"] in {"medium", "high", "critical"} + assert exported["signing"]["provider"] == "env" assert "ssdlc" in exported assert "modules" in exported assert "analytics" in exported @@ -172,3 +173,83 @@ def test_policy_action_triggers_normalised(tmp_path: Path) -> None: actions = config.policy_settings["actions"] assert actions and actions[0]["trigger"] == "guardrail:fail" assert actions[0]["type"] == "jira_issue" + + +def test_policy_engine_overlay_round_trip(tmp_path: Path) -> None: + path = tmp_path / "fixops.overlay.yml" + overlay_content = { + "policy_engine": { + "opa": { + "enabled": False, + "url": "https://opa.example.com", + "policy_package": "fixops.security", + "health_path": "/healthz", + "bundle_status_path": "/bundles/status", + "auth_token_env": "OPA_TOKEN", + "request_timeout_seconds": 12, + } + } + } + path.write_text(json.dumps(overlay_content), encoding="utf-8") + config = load_overlay(path) + opa_config = config.policy_engine.get("opa") + assert opa_config is not None + assert opa_config["enabled"] is False + assert opa_config["url"] == "https://opa.example.com" + exported = config.to_sanitised_dict()["policy_engine"]["opa"] + assert exported["auth_token_env"] == "OPA_TOKEN" + assert exported["policy_package"] == "fixops.security" + assert exported["request_timeout_seconds"] == 12 + + +def test_policy_engine_rejects_invalid_timeout(tmp_path: Path) -> None: + path = tmp_path / "fixops.overlay.yml" + overlay_content = { + "policy_engine": { + "opa": { + "url": "https://opa.example.com", + "request_timeout_seconds": 0, + } + } + } + path.write_text(json.dumps(overlay_content), encoding="utf-8") + with pytest.raises(ValueError): + load_overlay(path) + + +def test_policy_engine_rejects_unknown_fields(tmp_path: Path) -> None: + path = tmp_path / "fixops.overlay.yml" + overlay_content = { + "policy_engine": { + "opa": { + "url": "https://opa.example.com", + "unexpected": True, + }, + "other": {}, + } + } + path.write_text(json.dumps(overlay_content), encoding="utf-8") + with pytest.raises(ValueError): + load_overlay(path) + + +def test_signing_configuration_round_trip(tmp_path: Path) -> None: + path = tmp_path / "fixops.overlay.yml" + overlay_content = { + "signing": { + "provider": "aws_kms", + "key_id": "alias/decision", + "aws_region": "us-west-2", + "rotation_sla_days": 14, + } + } + path.write_text(json.dumps(overlay_content), encoding="utf-8") + config = load_overlay(path) + assert config.signing["provider"] == "aws_kms" + assert config.signing["key_id"] == "alias/decision" + assert config.signing["aws_region"] == "us-west-2" + assert config.signing["rotation_sla_days"] == 14 + + exported = config.to_sanitised_dict()["signing"] + assert exported["provider"] == "aws_kms" + assert exported["rotation_sla_days"] == 14 diff --git a/tests/test_policy_kevs.py b/tests/test_policy_kevs.py new file mode 100644 index 000000000..bf9a6e808 --- /dev/null +++ b/tests/test_policy_kevs.py @@ -0,0 +1,110 @@ +"""Tests for KEV waiver enforcement in the policy API.""" + +from __future__ import annotations + +import asyncio +from datetime import datetime, timedelta, timezone +from typing import Any, Awaitable, Callable, Dict +import sys +import types + +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine +from sqlalchemy.orm import sessionmaker + +from pydantic import FieldInfo + +pydantic_settings = types.ModuleType("pydantic_settings") + + +class _BaseSettings: + def __init__(self, **overrides: Any) -> None: + for name, value in self.__class__.__dict__.items(): + if name.startswith("_") or callable(value) or isinstance(value, property): + continue + default = value.default if isinstance(value, FieldInfo) else value + setattr(self, name, overrides.get(name, default)) + + def model_dump(self) -> Dict[str, Any]: + return { + name: getattr(self, name) + for name in dir(self) + if name.isupper() + } + + +pydantic_settings.BaseSettings = _BaseSettings +pydantic_settings.SettingsConfigDict = dict +sys.modules["pydantic_settings"] = pydantic_settings + +from src.api.v1.policy import GateRequest, WaiverCreate, create_waiver, evaluate_gate +from src.models import security_sqlite # noqa: F401 # Ensure metadata is populated +from src.models.base_sqlite import Base + + +async def _execute_with_session(test_fn: Callable[[AsyncSession], Awaitable[None]]) -> None: + engine = create_async_engine("sqlite+aiosqlite:///:memory:", future=True) + try: + async with engine.begin() as connection: + await connection.run_sync(Base.metadata.create_all) + + SessionLocal = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession) + async with SessionLocal() as session: + await test_fn(session) + finally: + await engine.dispose() + + +def run_with_session(test_fn: Callable[[AsyncSession], Awaitable[None]]) -> None: + asyncio.run(_execute_with_session(test_fn)) + + +def test_kevs_block_without_waiver() -> None: + """KEV findings without waivers must trigger a hard block.""" + + async def scenario(session: AsyncSession) -> None: + request = GateRequest( + decision="ALLOW", + confidence=0.82, + signals={"kev_count": 1, "service_name": "payments"}, + findings=[{"cve_id": "CVE-2024-1111", "kev": True, "severity": "medium"}], + ) + + response = await evaluate_gate(request, db=session) + + assert response.allow is False + assert "CVE-2024-1111" in response.reason + assert any("waiver" in action.lower() for action in response.required_actions) + + run_with_session(scenario) + + +def test_kevs_allow_with_active_waiver() -> None: + """An approved, active waiver should allow the deployment to proceed.""" + + async def scenario(session: AsyncSession) -> None: + waiver_payload = WaiverCreate( + cve_id="CVE-2024-1111", + service_name="payments", + justification="Compensating controls deployed across edge fleet", + approved_by="security-director", + expires_at=datetime.now(timezone.utc) + timedelta(days=7), + requested_by="security-analyst", + ) + + created = await create_waiver(waiver_payload, db=session) + assert created.cve_id == "CVE-2024-1111" + assert created.status == "active" + + request = GateRequest( + decision="ALLOW", + confidence=0.91, + signals={"kev_count": 1, "service_name": "payments"}, + findings=[{"cve_id": "CVE-2024-1111", "kev": True, "severity": "medium"}], + ) + + response = await evaluate_gate(request, db=session) + + assert response.allow is True + assert response.reason == "Policy checks passed" + + run_with_session(scenario) diff --git a/tests/test_policy_opa.py b/tests/test_policy_opa.py new file mode 100644 index 000000000..eb0ba80f3 --- /dev/null +++ b/tests/test_policy_opa.py @@ -0,0 +1,79 @@ +"""OPA integration tests for the policy evaluation endpoint.""" + +from __future__ import annotations + +from typing import Any, Dict + +import pytest + +from src.api.v1 import policy +from tests.test_policy_kevs import run_with_session + + +class _StubEngine: + def __init__(self, *, decision: Dict[str, Any]) -> None: + self._decision = decision + self.seen_payloads: list[Dict[str, Any]] = [] + + async def health_check(self) -> bool: + return True + + async def evaluate_policy(self, policy_name: str, payload: Dict[str, Any]) -> Dict[str, Any]: + self.seen_payloads.append({"name": policy_name, "payload": payload}) + return dict(self._decision) + + +async def _evaluate_with_stub( + monkeypatch: pytest.MonkeyPatch, + engine: _StubEngine, + db_session: Any, +) -> policy.GateResponse: + monkeypatch.setattr(policy.settings, "DEMO_MODE", False) + monkeypatch.setattr(policy.settings, "OPA_SERVER_URL", "https://opa.example.com") + + async def _get_engine() -> _StubEngine: + return engine + + monkeypatch.setattr(policy, "get_opa_engine", _get_engine) + + request = policy.GateRequest( + decision="ALLOW", + confidence=0.95, + signals={"environment": "prod"}, + findings=[ + { + "id": "finding-1", + "cve_id": "CVE-2024-9999", + "severity": "high", + "fix_available": True, + } + ], + ) + + return await policy.evaluate_gate(request, db=db_session) + + +def test_policy_blocks_when_opa_denies(monkeypatch: pytest.MonkeyPatch) -> None: + """OPA block responses should fail the gate even when local checks pass.""" + + async def scenario(session: Any) -> None: + engine = _StubEngine(decision={"decision": "block", "rationale": "policy violation"}) + response = await _evaluate_with_stub(monkeypatch, engine, session) + assert response.allow is False + assert "OPA policy" in response.reason + assert engine.seen_payloads and engine.seen_payloads[0]["name"] == "vulnerability" + + run_with_session(scenario) + + +def test_policy_allows_when_opa_passes(monkeypatch: pytest.MonkeyPatch) -> None: + """OPA allow responses should permit the deployment when no other guardrail blocks it.""" + + async def scenario(session: Any) -> None: + engine = _StubEngine(decision={"decision": "allow", "rationale": "policy satisfied"}) + response = await _evaluate_with_stub(monkeypatch, engine, session) + assert response.allow is True + assert response.reason == "Policy checks passed" + assert engine.seen_payloads + + run_with_session(scenario)