Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Bug Fixes

- traverse NeMo YAML list configs when checking suspicious targets
- mark malformed Keras ZIP configs inconclusive instead of clean
- mark malformed Keras H5 scans inconclusive instead of clean
- mark malformed TFLite scans inconclusive instead of clean
Expand Down
106 changes: 92 additions & 14 deletions modelaudit/scanners/nemo_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import tarfile
from typing import Any, ClassVar

from .base import BaseScanner, IssueSeverity, ScanResult
from .base import INCONCLUSIVE_SCAN_OUTCOME, BaseScanner, IssueSeverity, ScanResult

try:
import yaml
Expand Down Expand Up @@ -105,6 +105,9 @@
"Update to NeMo >= 2.3.2 which validates _target_ values. Do not load untrusted .nemo files."
)

_INCONCLUSIVE_METADATA_KEY = "scan_outcome"
_INCONCLUSIVE_REASONS_METADATA_KEY = "scan_outcome_reasons"


def _find_suspicious_target_pattern(target: str) -> str | None:
"""Return a suspicious identifier token if a target contains one."""
Expand All @@ -119,6 +122,10 @@ def _find_suspicious_target_pattern(target: str) -> str | None:
return None


def _scan_result_has_security_findings(result: ScanResult) -> bool:
return any(issue.severity in (IssueSeverity.WARNING, IssueSeverity.CRITICAL) for issue in result.issues)


class NemoScanner(BaseScanner):
"""Scanner for NVIDIA NeMo model files.

Expand Down Expand Up @@ -176,11 +183,50 @@ def scan(self, path: str) -> ScanResult:
result.success = False

result.bytes_scanned = file_size
self._finish_scan_result(result)
return result

def _mark_inconclusive_scan_result(
self,
result: ScanResult,
*,
reason: str,
check_name: str,
message: str,
location: str,
details: dict[str, Any] | None = None,
) -> None:
reasons = result.metadata.get(_INCONCLUSIVE_REASONS_METADATA_KEY)
if not isinstance(reasons, list):
reasons = []
if reason not in reasons:
reasons.append(reason)

result.metadata[_INCONCLUSIVE_METADATA_KEY] = INCONCLUSIVE_SCAN_OUTCOME
result.metadata[_INCONCLUSIVE_REASONS_METADATA_KEY] = reasons
result.add_check(
name=check_name,
passed=False,
message=message,
severity=IssueSeverity.INFO,
location=location,
details={"scan_outcome_reason": reason, **(details or {})},
)

@staticmethod
def _finish_scan_result(result: ScanResult) -> None:
if result.metadata.get(
_INCONCLUSIVE_METADATA_KEY
) == INCONCLUSIVE_SCAN_OUTCOME and not _scan_result_has_security_findings(result):
result.finish(success=False)
return

result.finish(success=result.success and not result.has_errors)

def _scan_nemo_archive(self, path: str, result: ScanResult) -> None:
"""Extract and scan YAML configs from a NeMo tar archive."""
yaml_configs_found = 0
yaml_config_files_found = 0

with tarfile.open(path, "r:*") as tar:
for member in tar:
Expand All @@ -204,6 +250,7 @@ def _scan_nemo_archive(self, path: str, result: ScanResult) -> None:

# Parse YAML config files
if name_lower.endswith((".yaml", ".yml")):
yaml_config_files_found += 1
if member.size > self.MAX_CONFIG_SIZE:
result.add_check(
name="NeMo Config Size Check",
Expand All @@ -230,17 +277,46 @@ def _scan_nemo_archive(self, path: str, result: ScanResult) -> None:
)
continue
config = yaml.safe_load(raw)
if isinstance(config, dict):
if isinstance(config, dict | list):
yaml_configs_found += 1
self._check_hydra_targets(config, member.name, path, result)
else:
self._mark_inconclusive_scan_result(
result,
reason="nemo_config_invalid_structure",
check_name="NeMo Config Structure",
message=(
f"YAML config {member.name} has unsupported "
f"top-level type: {type(config).__name__}"
),
location=f"{path}:{member.name}",
details={
"config_file": member.name,
"expected_type": "dict_or_list",
"actual_type": type(config).__name__,
},
)
except yaml.YAMLError:
logger.debug("Failed to parse YAML config %s in %s", member.name, path)
self._mark_inconclusive_scan_result(
result,
reason="nemo_config_yaml_parse_failed",
check_name="NeMo Config YAML Parsing",
message=f"Failed to parse YAML config {member.name}",
location=f"{path}:{member.name}",
details={"config_file": member.name},
)

if yaml_configs_found == 0:
message = (
"No YAML configuration found in NeMo archive"
if yaml_config_files_found == 0
else "No analyzable YAML configuration found in NeMo archive"
)
result.add_check(
name="NeMo Config Presence",
passed=False,
message="No YAML configuration found in NeMo archive",
message=message,
severity=IssueSeverity.INFO,
location=path,
)
Expand All @@ -261,6 +337,18 @@ def _check_hydra_targets(
path_prefix: str = "",
) -> None:
"""Recursively check _target_ values in Hydra config."""
if isinstance(config, list):
for index, item in enumerate(config):
if isinstance(item, dict | list):
self._check_hydra_targets(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Guard recursive YAML aliases during list traversal

Prevent unbounded recursion when descending YAML lists. PyYAML can materialize self-referential lists via anchors (e.g., &a [*a]), and this recursive call will repeatedly revisit the same object until RecursionError. Because only yaml.YAMLError is handled upstream, a crafted .nemo can abort scanning instead of producing a normal result.

Useful? React with 👍 / 👎.

item,
config_name,
archive_path,
result,
f"{path_prefix}[{index}]" if path_prefix else f"[{index}]",
)
return

if not isinstance(config, dict):
return

Expand All @@ -269,18 +357,8 @@ def _check_hydra_targets(

if key == "_target_" and isinstance(value, str):
self._evaluate_target(value, current_path, config_name, archive_path, result)
elif isinstance(value, dict):
elif isinstance(value, dict | list):
self._check_hydra_targets(value, config_name, archive_path, result, current_path)
elif isinstance(value, list):
for i, item in enumerate(value):
if isinstance(item, dict):
self._check_hydra_targets(
item,
config_name,
archive_path,
result,
f"{current_path}[{i}]",
)

def _evaluate_target(
self,
Expand Down
99 changes: 97 additions & 2 deletions tests/scanners/test_nemo_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
except ImportError:
HAS_YAML = False

from modelaudit.core import scan_file
from modelaudit.scanners.base import CheckStatus, IssueSeverity
from modelaudit.core import determine_exit_code, scan_file, scan_model_directory_or_file
from modelaudit.scanners.base import INCONCLUSIVE_SCAN_OUTCOME, CheckStatus, IssueSeverity
from modelaudit.scanners.nemo_scanner import NemoScanner


Expand Down Expand Up @@ -159,6 +159,101 @@ def test_dangerous_subprocess_detected(self, tmp_path):
assert len(cve_checks) > 0, "Should detect subprocess.Popen"
assert cve_checks[0].details.get("target") == "subprocess.Popen"

def test_top_level_list_target_detected(self, tmp_path: Path) -> None:
"""Top-level YAML lists must be traversed, not mistaken for absent config."""
path = _create_nemo_file_from_bytes(
tmp_path,
b"- _target_: os.system\n command: id\n",
)

result = scan_file(str(path), config={"cache_scan_results": False})

assert result.scanner_name == "nemo"
cve_checks = [c for c in result.checks if c.name == "CVE-2025-23304: Dangerous Hydra _target_"]
assert len(cve_checks) == 1
assert cve_checks[0].status == CheckStatus.FAILED
assert cve_checks[0].severity == IssueSeverity.CRITICAL
assert cve_checks[0].details["target"] == "os.system"
assert cve_checks[0].details["config_path"] == "[0]._target_"

def test_nested_list_target_detected(self, tmp_path: Path) -> None:
"""Nested list-of-list structures should remain part of Hydra target traversal."""
path = _create_nemo_file_from_bytes(
tmp_path,
b"trainer:\n callbacks:\n - - _target_: subprocess.Popen\n args:\n - whoami\n",
)

result = NemoScanner().scan(str(path))

cve_checks = [c for c in result.checks if c.name == "CVE-2025-23304: Dangerous Hydra _target_"]
assert len(cve_checks) == 1
assert cve_checks[0].details["target"] == "subprocess.Popen"
assert cve_checks[0].details["config_path"] == "trainer.callbacks[0][0]._target_"

@pytest.mark.parametrize(
("payload", "expected_reason", "expected_check"),
[
(b"model: [unterminated\n", "nemo_config_yaml_parse_failed", "NeMo Config YAML Parsing"),
(b"null\n", "nemo_config_invalid_structure", "NeMo Config Structure"),
(b"just text\n", "nemo_config_invalid_structure", "NeMo Config Structure"),
],
)
def test_unanalyzable_yaml_config_returns_inconclusive_exit2(
self,
tmp_path: Path,
payload: bytes,
expected_reason: str,
expected_check: str,
) -> None:
"""Present but unanalyzable configs should fail closed as inconclusive coverage."""
path = _create_nemo_file_from_bytes(tmp_path, payload)

direct_result = NemoScanner().scan(str(path))
aggregate_result = scan_model_directory_or_file(
str(path),
config={"cache_scan_results": False},
)

assert direct_result.success is False
assert direct_result.has_errors is False
assert direct_result.metadata["scan_outcome"] == INCONCLUSIVE_SCAN_OUTCOME
assert expected_reason in direct_result.metadata["scan_outcome_reasons"]
checks = [c for c in direct_result.checks if c.name == expected_check]
assert len(checks) == 1
assert checks[0].status == CheckStatus.FAILED
assert checks[0].severity == IssueSeverity.INFO

metadata = aggregate_result.file_metadata[str(path)]
assert metadata.get("scan_outcome") == INCONCLUSIVE_SCAN_OUTCOME
assert expected_reason in metadata.get("scan_outcome_reasons")
assert aggregate_result.success is False
assert aggregate_result.has_errors is False
assert determine_exit_code(aggregate_result) == 2

def test_inconclusive_config_preserves_security_exit1(self, tmp_path: Path) -> None:
"""Security findings should keep priority over inconclusive config coverage."""
nemo_path = tmp_path / "model.nemo"
malformed_config = b"model: [unterminated\n"
script = b"#!/bin/sh\nid\n"
with tarfile.open(nemo_path, "w") as tar:
info = tarfile.TarInfo(name="model_config.yaml")
info.size = len(malformed_config)
tar.addfile(info, io.BytesIO(malformed_config))
info = tarfile.TarInfo(name="payload.sh")
info.size = len(script)
tar.addfile(info, io.BytesIO(script))

result = scan_model_directory_or_file(
str(nemo_path),
config={"cache_scan_results": False},
)

metadata = result.file_metadata[str(nemo_path)]
assert metadata.get("scan_outcome") == INCONCLUSIVE_SCAN_OUTCOME
assert "nemo_config_yaml_parse_failed" in metadata.get("scan_outcome_reasons")
assert any(issue.severity == IssueSeverity.WARNING for issue in result.issues)
assert determine_exit_code(result) == 1

def test_dangerous_eval_detected(self, tmp_path):
"""builtins.eval _target_ should trigger CVE-2025-23304."""
config = {"_target_": "builtins.eval", "expression": "__import__('os').system('id')"}
Expand Down
Loading