Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- ignore remote OCI `layers[].urls` entries during local layer discovery
- fail closed on unterminated OpenVINO DOCTYPE declarations
- avoid PMML `<Extension>` false positives for benign `subprocess` prose while preserving `subprocess.getoutput()`, `subprocess.getstatusoutput()`, and `importlib.import_module("subprocess")` detections
- mark incomplete ZIP, TAR, and 7z archive traversals as inconclusive in scan metadata
- route helper-level ZIP-backed `.ckpt`/`.pkl` checkpoints through archive scanners

## [0.2.31](https://github.com/promptfoo/modelaudit/compare/v0.2.30...v0.2.31) (2026-04-04)
Expand Down
26 changes: 26 additions & 0 deletions modelaudit/scanners/_archive_outcomes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Shared metadata helpers for archive scans that intentionally stop early."""

from __future__ import annotations

from .base import INCONCLUSIVE_SCAN_OUTCOME, ScanResult


def mark_archive_scan_incomplete(result: ScanResult, reason: str) -> None:
"""Mark an archive result as explicitly inconclusive without changing findings."""
result.metadata["analysis_incomplete"] = True
result.metadata["scan_outcome"] = INCONCLUSIVE_SCAN_OUTCOME

existing_reasons = result.metadata.get("scan_outcome_reasons")
reasons = existing_reasons if isinstance(existing_reasons, list) else []
if reason not in reasons:
reasons.append(reason)
result.metadata["scan_outcome_reasons"] = reasons


def member_scan_incomplete(result: ScanResult) -> bool:
"""Return whether a nested archive member scan stopped before complete analysis."""
return (
result.metadata.get("analysis_incomplete") is True
or result.metadata.get("scan_outcome") == INCONCLUSIVE_SCAN_OUTCOME
or (not result.success and not result.has_errors)
)
26 changes: 22 additions & 4 deletions modelaudit/scanners/sevenzip_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from ..utils import sanitize_archive_path
from ._archive_config import get_archive_depth
from ._archive_locations import rewrite_extracted_member_location
from ._archive_outcomes import mark_archive_scan_incomplete, member_scan_incomplete
from .archive_dispatch import NESTED_SCAN_CALLBACK_CONFIG_KEY
from .base import BaseScanner, IssueSeverity, ScanResult

# Try to import py7zr with graceful fallback
Expand Down Expand Up @@ -278,6 +280,7 @@ def scan(self, path: str) -> ScanResult:
"install_command": "pip install py7zr",
},
)
mark_archive_scan_incomplete(result, "sevenzip_analysis_incomplete")
result.finish(success=False)
return result

Expand Down Expand Up @@ -311,6 +314,7 @@ def scan(self, path: str) -> ScanResult:
location=path,
details={"error": str(e), "error_type": "invalid_format"},
)
mark_archive_scan_incomplete(result, "sevenzip_analysis_incomplete")
result.finish(success=False)
return result

Expand All @@ -323,6 +327,7 @@ def scan(self, path: str) -> ScanResult:
location=path,
details={"error": str(e), "error_type": "scan_failure"},
)
mark_archive_scan_incomplete(result, "sevenzip_analysis_incomplete")
result.finish(success=False)
return result

Expand All @@ -345,6 +350,7 @@ def _scan_7z_file(
scan_complete = True

if budget.should_stop():
mark_archive_scan_incomplete(result, "sevenzip_analysis_incomplete")
result.finish(success=False)
return result

Expand All @@ -357,6 +363,7 @@ def _scan_7z_file(
location=path,
details={"depth": depth, "max_depth": self.max_depth},
)
mark_archive_scan_incomplete(result, "sevenzip_analysis_incomplete")
result.finish(success=False)
return result

Expand Down Expand Up @@ -391,6 +398,7 @@ def _scan_7z_file(
result.metadata["scannable_files"] = 0
result.metadata["unsafe_entries"] = 0
result.metadata["file_size"] = os.path.getsize(path)
mark_archive_scan_incomplete(result, "sevenzip_analysis_incomplete")
result.finish(success=False)
return result

Expand All @@ -417,6 +425,7 @@ def _scan_7z_file(
result.metadata["scannable_files"] = 0
result.metadata["unsafe_entries"] = 0
result.metadata["file_size"] = os.path.getsize(path)
mark_archive_scan_incomplete(result, "sevenzip_analysis_incomplete")
result.finish(success=False)
return result

Expand Down Expand Up @@ -463,6 +472,8 @@ def _scan_7z_file(
result.metadata["unsafe_entries"] = len(file_names) - len(safe_file_names)
result.metadata["file_size"] = os.path.getsize(path)

if not scan_complete or budget.should_stop():
mark_archive_scan_incomplete(result, "sevenzip_analysis_incomplete")
result.finish(success=scan_complete and not budget.should_stop() and not result.has_errors)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return result

Expand Down Expand Up @@ -851,6 +862,15 @@ def _rewrite_nested_result_context(
preserve_non_delimited_suffix=True,
)

def _scan_nested_archive_entry(self, path: str, nested_config: dict[str, Any]) -> ScanResult:
"""Dispatch a nested archive member through an injected callback or registry fallback."""
nested_scan_callback = self.config.get(NESTED_SCAN_CALLBACK_CONFIG_KEY)
if callable(nested_scan_callback):
return nested_scan_callback(path, nested_config)
from .. import core

return core.scan_file(path, nested_config)

def _scan_extracted_file(
self,
extracted_path: str,
Expand All @@ -869,15 +889,13 @@ def _scan_extracted_file(
budget=budget,
)
else:
from .. import core

nested_config = dict(self.config)
nested_config["_archive_depth"] = depth + 1
file_result = core.scan_file(extracted_path, nested_config)
file_result = self._scan_nested_archive_entry(extracted_path, nested_config)

self._rewrite_nested_result_context(file_result, extracted_path, archive_path, original_name)
result.merge(file_result)
return file_result.success and not file_result.has_errors
return not member_scan_incomplete(file_result)

except Exception as e:
result.add_check(
Expand Down
22 changes: 18 additions & 4 deletions modelaudit/scanners/tar_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
import tempfile
from typing import Any, ClassVar

from .. import core
from ..utils import is_absolute_archive_path, is_critical_system_path, sanitize_archive_path
from ..utils.helpers.assets import asset_from_scan_result
from ._archive_locations import rewrite_extracted_member_location
from ._archive_outcomes import mark_archive_scan_incomplete, member_scan_incomplete
from .archive_dispatch import NESTED_SCAN_CALLBACK_CONFIG_KEY, scan_nested_file
from .base import BaseScanner, IssueSeverity, ScanResult

CRITICAL_SYSTEM_PATHS = [
Expand Down Expand Up @@ -121,6 +122,7 @@ def scan(self, path: str) -> ScanResult:
details={"path": path},
rule_code="S902",
)
mark_archive_scan_incomplete(result, "tar_analysis_incomplete")
result.finish(success=False)
return result
except Exception as e:
Expand All @@ -132,6 +134,7 @@ def scan(self, path: str) -> ScanResult:
location=path,
details={"exception": str(e), "exception_type": type(e).__name__},
)
mark_archive_scan_incomplete(result, "tar_analysis_incomplete")
result.finish(success=False)
return result

Expand Down Expand Up @@ -195,6 +198,13 @@ def _rewrite_nested_result_context(
else entry_name
)

def _scan_nested_archive_entry(self, path: str, nested_config: dict[str, Any]) -> ScanResult:
"""Dispatch a nested archive member through an injected callback or registry fallback."""
nested_scan_callback = self.config.get(NESTED_SCAN_CALLBACK_CONFIG_KEY)
if callable(nested_scan_callback):
return nested_scan_callback(path, nested_config)
return scan_nested_file(path, nested_config)

@staticmethod
def _rewrite_archive_location(location: str | None, tmp_path: str, archive_location: str) -> str:
return rewrite_extracted_member_location(
Expand Down Expand Up @@ -455,6 +465,7 @@ def _scan_tar_file(self, path: str, depth: int = 0) -> ScanResult:
location=path,
details={"depth": depth, "max_depth": self.max_depth},
)
mark_archive_scan_incomplete(result, "tar_analysis_incomplete")
result.finish(success=False)
return result
else:
Expand All @@ -470,6 +481,7 @@ def _scan_tar_file(self, path: str, depth: int = 0) -> ScanResult:
if not self._preflight_tar_archive(path, result):
result.metadata["contents"] = contents
result.metadata["file_size"] = os.path.getsize(path)
mark_archive_scan_incomplete(result, "tar_analysis_incomplete")
result.finish(success=False)
return result

Expand Down Expand Up @@ -566,7 +578,7 @@ def _scan_tar_file(self, path: str, depth: int = 0) -> ScanResult:
try:
if is_tar_extension and tarfile.is_tarfile(tmp_path):
nested_result = self._scan_tar_file(tmp_path, depth + 1)
if not nested_result.success:
if member_scan_incomplete(nested_result):
scan_complete = False

self._rewrite_nested_result_context(nested_result, tmp_path, path, name)
Expand All @@ -575,8 +587,8 @@ def _scan_tar_file(self, path: str, depth: int = 0) -> ScanResult:
else:
nested_config = dict(self.config)
nested_config["_archive_depth"] = depth + 1
file_result = core.scan_file(tmp_path, nested_config)
if not file_result.success:
file_result = self._scan_nested_archive_entry(tmp_path, nested_config)
if member_scan_incomplete(file_result):
scan_complete = False

self._rewrite_nested_result_context(file_result, tmp_path, path, name)
Expand Down Expand Up @@ -608,5 +620,7 @@ def _scan_tar_file(self, path: str, depth: int = 0) -> ScanResult:

result.metadata["contents"] = contents
result.metadata["file_size"] = os.path.getsize(path)
if not scan_complete:
mark_archive_scan_incomplete(result, "tar_analysis_incomplete")
result.finish(success=scan_complete and not result.has_errors)
return result
9 changes: 8 additions & 1 deletion modelaudit/scanners/zip_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from ..utils.helpers.assets import asset_from_scan_result
from ._archive_config import get_archive_depth
from ._archive_locations import rewrite_extracted_member_location
from ._archive_outcomes import mark_archive_scan_incomplete, member_scan_incomplete
from .archive_dispatch import NESTED_SCAN_CALLBACK_CONFIG_KEY, scan_nested_file
from .base import BaseScanner, IssueSeverity, ScanResult

Expand Down Expand Up @@ -124,6 +125,7 @@ def scan(self, path: str) -> ScanResult:
location=path,
details={"path": path},
)
mark_archive_scan_incomplete(result, "zip_analysis_incomplete")
result.finish(success=False)
return result
except Exception as e:
Expand All @@ -136,6 +138,7 @@ def scan(self, path: str) -> ScanResult:
location=path,
details={"exception": str(e), "exception_type": type(e).__name__},
)
mark_archive_scan_incomplete(result, "zip_analysis_incomplete")
result.finish(success=False)
return result

Expand Down Expand Up @@ -215,6 +218,7 @@ def _scan_zip_file(self, path: str, depth: int = 0) -> ScanResult:
location=path,
details={"depth": depth, "max_depth": self.max_depth},
)
mark_archive_scan_incomplete(result, "zip_analysis_incomplete")
result.finish(success=False)
return result
else:
Expand Down Expand Up @@ -243,6 +247,7 @@ def _scan_zip_file(self, path: str, depth: int = 0) -> ScanResult:
"max_entries": self.max_entries,
},
)
mark_archive_scan_incomplete(result, "zip_analysis_incomplete")
result.finish(success=False)
return result
else:
Expand Down Expand Up @@ -429,7 +434,7 @@ def _scan_zip_file(self, path: str, depth: int = 0) -> ScanResult:
# so production scans preserve core routing while direct
# ZipScanner usage still falls back to registry routing.
file_result = self._scan_nested_archive_entry(tmp_path, nested_config)
if not file_result.success:
if member_scan_incomplete(file_result):
scan_complete = False

self._rewrite_nested_result_context(file_result, tmp_path, path, name)
Expand Down Expand Up @@ -463,6 +468,8 @@ def _scan_zip_file(self, path: str, depth: int = 0) -> ScanResult:

result.metadata["contents"] = contents
result.metadata["file_size"] = os.path.getsize(path)
if not scan_complete:
mark_archive_scan_incomplete(result, "zip_analysis_incomplete")
result.finish(success=scan_complete and not result.has_errors)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return result

Expand Down
50 changes: 48 additions & 2 deletions tests/scanners/test_sevenzip_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@

import pytest

from modelaudit.scanners.base import CheckStatus, IssueSeverity, ScanResult
from modelaudit.scanners.sevenzip_scanner import HAS_PY7ZR, SevenZipScanner
from modelaudit.scanners.archive_dispatch import NESTED_SCAN_CALLBACK_CONFIG_KEY
from modelaudit.scanners.base import INCONCLUSIVE_SCAN_OUTCOME, CheckStatus, IssueSeverity, ScanResult
from modelaudit.scanners.sevenzip_scanner import HAS_PY7ZR, SevenZipScanner, _RecursiveScanBudget

# Skip all tests if py7zr is not available for asset generation
pytest_plugins: list[str] = []
Expand Down Expand Up @@ -112,6 +113,11 @@ def test_scan_without_py7zr(self, scanner, temp_7z_file):
assert issue.severity == IssueSeverity.WARNING
assert "py7zr library not installed" in issue.message
assert "pip install py7zr" in issue.message
assert result.has_warnings is True
assert result.has_errors is False
assert result.metadata["scan_outcome"] == INCONCLUSIVE_SCAN_OUTCOME
assert result.metadata["analysis_incomplete"] is True
assert "sevenzip_analysis_incomplete" in result.metadata["scan_outcome_reasons"]
Comment thread
coderabbitai[bot] marked this conversation as resolved.

@patch("modelaudit.scanners.sevenzip_scanner.HAS_PY7ZR", False)
def test_scan_mocked_unavailable(self, scanner, temp_7z_file):
Expand All @@ -125,6 +131,11 @@ def test_scan_mocked_unavailable(self, scanner, temp_7z_file):
# Missing optional dependency is a WARNING, not CRITICAL
assert issue.severity == IssueSeverity.WARNING
assert "py7zr library not installed" in issue.message
assert result.has_warnings is True
assert result.has_errors is False
assert result.metadata["scan_outcome"] == INCONCLUSIVE_SCAN_OUTCOME
assert result.metadata["analysis_incomplete"] is True
assert "sevenzip_analysis_incomplete" in result.metadata["scan_outcome_reasons"]

@pytest.mark.skipif(not HAS_PY7ZR, reason="py7zr not available")
def test_can_handle_valid_7z_magic_bytes(self, temp_7z_file):
Expand Down Expand Up @@ -756,6 +767,41 @@ def test_unsafe_entries_are_excluded_from_extraction_targets(
assert ["safe.pkl"] in extract_targets
assert all("../../../escape.pkl" not in targets for targets in extract_targets)

def test_nested_critical_scan_does_not_mark_7z_extraction_incomplete(self, tmp_path: Path) -> None:
"""A nested CRITICAL finding is complete analysis, not partial archive traversal."""
extracted_path = tmp_path / "model.pkl"
extracted_path.write_bytes(b"payload")
archive_path = tmp_path / "model.7z"
archive_result = ScanResult(scanner_name="sevenzip")

def nested_scan(path: str, _config: dict[str, Any]) -> ScanResult:
nested_result = ScanResult(scanner_name="test_nested")
nested_result.add_check(
name="Nested Critical Finding",
passed=False,
message="Nested member is malicious",
severity=IssueSeverity.CRITICAL,
location=path,
)
nested_result.finish(success=False)
return nested_result

scanner = SevenZipScanner(config={NESTED_SCAN_CALLBACK_CONFIG_KEY: nested_scan})
scan_complete = scanner._scan_extracted_file(
str(extracted_path),
"model.pkl",
str(archive_path),
archive_result,
depth=0,
budget=_RecursiveScanBudget(),
)

assert scan_complete is True
assert archive_result.has_errors is True
assert "scan_outcome" not in archive_result.metadata
assert archive_result.metadata.get("analysis_incomplete") is not True
assert any(check.name == "Nested Critical Finding" for check in archive_result.checks)

def test_oversized_entries_are_skipped_before_extraction(
self,
scanner: SevenZipScanner,
Expand Down
Loading
Loading