Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
python scripts/run_demo_steps.py --app "life-claims-portal"
- name: Upload decision artefact
if: env.RUN_FIXOPS_INTEGRATION_TESTS == '1'
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: demo-decision
path: artefacts/**/outputs/decision.json
Expand Down
8 changes: 6 additions & 2 deletions apps/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,13 @@ def create_app() -> FastAPI:

async def _verify_api_key(api_key: Optional[str] = Depends(api_key_header)) -> None:
if auth_strategy == "token":
if not api_key or api_key not in expected_tokens:
if not api_key:
raise HTTPException(
status_code=401, detail="Missing API token"
)
if api_key not in expected_tokens:
raise HTTPException(
status_code=401, detail="Invalid or missing API token"
status_code=401, detail="Invalid API token"
)
return
if auth_strategy == "jwt":
Expand Down
186 changes: 155 additions & 31 deletions apps/api/normalizers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,17 @@
import zipfile
from contextlib import suppress
from dataclasses import dataclass, field, asdict
from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple
from typing import Any, Dict, Iterable, List, Literal, Mapping, Optional, Tuple

from pydantic import (
BaseModel,
ConfigDict,
Field,
StrictInt,
StrictStr,
ValidationError,
field_validator,
)

try: # Optional dependency for YAML parsing
import yaml
Expand Down Expand Up @@ -64,6 +74,9 @@ def _resolve_sbom_parser_state() -> tuple[Any | None, Exception | None]:

logger = logging.getLogger(__name__)

MAX_DOCUMENT_BYTES = 32 * 1024 * 1024
_SARIF_LEVELS: tuple[str, ...] = ("none", "note", "warning", "error", "info")

_SNYK_SEVERITY_TO_LEVEL = {
"critical": "error",
"high": "error",
Expand Down Expand Up @@ -439,6 +452,40 @@ def to_dict(self) -> Dict[str, Any]:
}


class SarifFindingModel(BaseModel):
"""Schema for validated SARIF findings with strict coercion rules."""

model_config = ConfigDict(extra="forbid", str_strip_whitespace=True)

rule_id: StrictStr | None = Field(default=None)
message: StrictStr | None = Field(default=None)
level: Literal[_SARIF_LEVELS] | None = Field(default=None)
file: StrictStr | None = Field(default=None)
line: StrictInt | None = Field(default=None, ge=0)
raw: Mapping[str, Any]

@field_validator("rule_id")
@classmethod
def _validate_rule_id(cls, value: StrictStr | None) -> StrictStr | None:
if value is None:
return None
if not value:
raise ValueError("rule_id cannot be empty")
return value


class NormalizedSARIFModel(BaseModel):
"""Validated structure for normalised SARIF payloads."""

model_config = ConfigDict(extra="forbid", str_strip_whitespace=True)

version: StrictStr
schema_uri: StrictStr | None = Field(default=None)
tool_names: List[StrictStr]
findings: List[SarifFindingModel]
metadata: Dict[str, Any]


@dataclass
class SarifFinding:
"""Summarised SARIF result."""
Expand Down Expand Up @@ -561,6 +608,9 @@ def _prepare_text(self, raw: Any) -> str:
data = self._ensure_bytes(raw)
data = self._maybe_decode_base64(data)
data = self._maybe_decompress(data)
if len(data) > MAX_DOCUMENT_BYTES:
# TODO: consider streaming parsers to avoid loading oversized artefacts entirely in memory.
raise ValueError("Document exceeds maximum allowed size")
return data.decode("utf-8", errors="ignore")

def load_sbom(self, raw: Any) -> NormalizedSBOM:
Expand Down Expand Up @@ -998,8 +1048,8 @@ def load_sarif(self, raw: Any) -> NormalizedSARIF:
properties=data.get("properties"),
)

findings: List[SarifFinding] = []
tool_names: List[str] = []
finding_models: List[SarifFindingModel] = []

for run in runs:
tool = (
Expand All @@ -1008,50 +1058,124 @@ def load_sarif(self, raw: Any) -> NormalizedSARIF:
else {}
)
tool_name = tool.get("name")
if tool_name:
tool_names.append(tool_name)
if isinstance(tool_name, str) and tool_name.strip():
tool_names.append(tool_name.strip())

results = run.get("results") if isinstance(run, dict) else None
for result in results or []:
message = None
if "message" in result:
if isinstance(result["message"], dict):
message = result["message"].get("text")
else:
message = str(result["message"])

location = (result.get("locations") or [{}])[0]
physical = location.get("physicalLocation", {})
artifact = physical.get("artifactLocation", {})
region = physical.get("region", {})
if isinstance(results, Iterable) and not isinstance(
results, (str, bytes, bytearray)
):
for result in results:
if not isinstance(result, Mapping):
continue
message_value = result.get("message")
message: Optional[str] = None
if isinstance(message_value, Mapping):
text_value = message_value.get("text")
if isinstance(text_value, str):
message = text_value
elif isinstance(message_value, str):
message = message_value

locations = result.get("locations")
location: Mapping[str, Any] | None = None
if isinstance(locations, Iterable) and not isinstance(
locations, (str, bytes, bytearray)
):
for entry in locations:
if isinstance(entry, Mapping):
location = entry
break
physical = (
location.get("physicalLocation")
if isinstance(location, Mapping)
else None
)
artifact = (
physical.get("artifactLocation")
if isinstance(physical, Mapping)
else None
)
region = (
physical.get("region")
if isinstance(physical, Mapping)
else None
)

findings.append(
SarifFinding(
rule_id=result.get("ruleId"),
message=message,
level=result.get("level"),
file=artifact.get("uri"),
line=region.get("startLine"),
raw=result,
level_value = result.get("level")
normalized_level = None
if isinstance(level_value, str) and level_value.strip():
normalized_level = level_value.strip().lower()

rule_id_value = result.get("ruleId")
rule_id = rule_id_value if isinstance(rule_id_value, str) else None

file_uri = artifact.get("uri") if isinstance(artifact, Mapping) else None
if file_uri is not None and not isinstance(file_uri, str):
file_uri = None

start_line = (
region.get("startLine") if isinstance(region, Mapping) else None
)
)
if not isinstance(start_line, int):
start_line = None

candidate = {
"rule_id": rule_id,
"message": message,
"level": normalized_level,
"file": file_uri,
"line": start_line,
"raw": result,
}
try:
finding_models.append(SarifFindingModel.model_validate(candidate))
except ValidationError as exc:
raise ValueError("Invalid SARIF result") from exc

metadata = {
"run_count": len(runs),
"finding_count": len(findings),
"finding_count": len(finding_models),
}
schema_key = sarif_log.schema_uri
if isinstance(schema_key, str):
metadata["supported_schema"] = schema_key.lower() in SUPPORTED_SARIF_SCHEMAS
if tool_names:
metadata["tool_count"] = len(tool_names)

try:
normalized_model = NormalizedSARIFModel.model_validate(
{
"version": str(sarif_log.version or "2.1.0"),
"schema_uri": (
str(sarif_log.schema_uri)
if isinstance(sarif_log.schema_uri, str)
else None
),
"tool_names": tool_names,
"findings": finding_models,
"metadata": metadata,
}
)
except ValidationError as exc:
raise ValueError("Normalised SARIF payload failed validation") from exc

normalized = NormalizedSARIF(
version=sarif_log.version,
schema_uri=sarif_log.schema_uri,
tool_names=tool_names,
findings=findings,
metadata=metadata,
version=normalized_model.version,
schema_uri=normalized_model.schema_uri,
tool_names=normalized_model.tool_names,
findings=[
SarifFinding(
rule_id=model.rule_id,
message=model.message,
level=model.level,
file=model.file,
line=model.line,
raw=dict(model.raw),
)
for model in normalized_model.findings
],
metadata=normalized_model.metadata,
)
logger.debug("Normalised SARIF", extra={"metadata": metadata})
return normalized
Expand Down
49 changes: 39 additions & 10 deletions core/stage_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from pathlib import Path
from typing import Any, Dict, Iterable, Mapping, Optional

from fixops.utils.paths import resolve_within_root

from apps.api.normalizers import InputNormalizer, NormalizedSARIF, NormalizedSBOM

def _current_utc_timestamp() -> str:
Expand Down Expand Up @@ -666,6 +668,32 @@ def _analyse_posture(self, payload: Mapping[str, Any]) -> dict[str, Any]:
else:
resources = [payload]

def _extract_cidrs(source: Mapping[str, Any] | None, *, include_ipv6: bool = True) -> list[str]:
if not isinstance(source, Mapping):
return []
values: list[str] = []
for key in ("cidr_blocks", "cidrs", "cidr"):
entries = source.get(key)
if isinstance(entries, (str, bytes)):
values.append(str(entries))
elif isinstance(entries, Iterable) and not isinstance(entries, (str, bytes, bytearray)):
values.extend(str(item) for item in entries)
if include_ipv6:
for key in ("ipv6_cidr_blocks", "ipv6_cidrs"):
entries = source.get(key)
if isinstance(entries, (str, bytes)):
values.append(str(entries))
elif isinstance(entries, Iterable) and not isinstance(entries, (str, bytes, bytearray)):
values.extend(str(item) for item in entries)
return values

def _contains_open_rule(cidrs: Iterable[str]) -> bool:
for value in cidrs:
candidate = str(value).strip()
if candidate in {"0.0.0.0/0", "::/0"}:
return True
return False

for resource in resources:
if not isinstance(resource, Mapping):
continue
Expand Down Expand Up @@ -695,23 +723,24 @@ def _analyse_posture(self, payload: Mapping[str, Any]) -> dict[str, Any]:
if candidate_tls:
tls_policy = candidate_tls

if rtype in {"aws_security_group", "aws_security_group_rule"}:
if rtype == "aws_security_group":
ingress_rules = after.get("ingress") or resource.get("ingress") or []
if isinstance(ingress_rules, Mapping):
ingress_rules = [ingress_rules]
for rule in ingress_rules:
if not isinstance(rule, Mapping):
continue
cidrs = rule.get("cidr_blocks") or rule.get("cidrs") or rule.get("cidr")
if isinstance(cidrs, (str, bytes)):
cidr_values = [cidrs]
elif isinstance(cidrs, Iterable):
cidr_values = [str(item) for item in cidrs]
else:
cidr_values = []
if any(value == "0.0.0.0/0" for value in cidr_values):
cidr_values = _extract_cidrs(rule)
if _contains_open_rule(cidr_values):
Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai Bot Oct 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Limit aws_security_group_rule detection to ingress rules; otherwise egress-only rules (the AWS default) are now flagged as open security groups and inflate the deploy risk score.

Prompt for AI agents
Address the following comment on core/stage_runner.py at line 732:

<comment>Limit aws_security_group_rule detection to ingress rules; otherwise egress-only rules (the AWS default) are now flagged as open security groups and inflate the deploy risk score.</comment>

<file context>
@@ -695,23 +721,24 @@ def _analyse_posture(self, payload: Mapping[str, Any]) -&gt; dict[str, Any]:
-                        cidr_values = []
-                    if any(value == &quot;0.0.0.0/0&quot; for value in cidr_values):
+                    cidr_values = _extract_cidrs(rule)
+                    if _contains_open_rule(cidr_values):
                         open_security_groups.add(name)
 
</file context>
Fix with Cubic

open_security_groups.add(name)

if rtype == "aws_security_group_rule":
cidr_values = _extract_cidrs(after)
if not cidr_values:
cidr_values = _extract_cidrs(resource)
if _contains_open_rule(cidr_values):
open_security_groups.add(name)

if rtype in {"aws_db_instance", "aws_rds_cluster"}:
encrypted = after.get("storage_encrypted")
if encrypted is False or encrypted is None:
Expand Down Expand Up @@ -935,7 +964,7 @@ def _marketplace_recommendations(self, failing_controls: list[Any]) -> list[dict
]

def _write_evidence_bundle(self, context, documents: Mapping[str, Mapping[str, Any]]) -> Path:
bundle_path = context.outputs_dir / "evidence_bundle.zip"
bundle_path = resolve_within_root(context.outputs_dir, "evidence_bundle.zip")
with zipfile.ZipFile(bundle_path, "w") as archive:
for key, filename in self._OUTPUT_FILENAMES.items():
document = documents.get(key)
Expand Down
Loading
Loading