Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Bug Fixes

- mark trailing bytes after NumPy object-array pickle payloads inconclusive without escalating to security findings
- avoid CoreML nested parse failures on bounded-read truncation
- flag TensorFlow `LoadLibrary` and `LoadLibraryV2` graph ops as dangerous native-library loading
- detect split CNTK native-user-function and native-library references
Expand Down
36 changes: 32 additions & 4 deletions modelaudit/scanners/numpy_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
import warnings
from typing import TYPE_CHECKING, Any, BinaryIO, ClassVar

from .base import BaseScanner, IssueSeverity, ScanResult
from .base import BaseScanner, Check, Issue, IssueSeverity, ScanResult
from .pickle_scanner import PickleScanner
from .pickle_support import _finish_with_inconclusive_contract, _mark_inconclusive_scan_result

# Import NumPy with compatibility handling
try:
Expand Down Expand Up @@ -103,6 +104,21 @@ def _scan_embedded_pickle_payload(
source=context_path,
)

@staticmethod
def _is_trailing_pickle_parse_noise(item: Check | Issue) -> bool:
"""Return True for parse diagnostics superseded by NumPy trailing-byte validation."""
details = item.details or {}
if details.get("analysis_incomplete") is not True:
return False

message = item.message.lower()
return (
"pickle parsing failed before full scan completion" in message
or "pickle parsing stopped before the stream was fully consumed" in message
or details.get("pickle_notice_code") == "parse_incomplete"
or details.get("failure_reason") == "unknown_opcode_or_format_error"
)

def _validate_dtype(self, dtype: Any) -> None:
"""Validate numpy dtype for security"""
# Check for problematic data types
Expand Down Expand Up @@ -321,12 +337,20 @@ def scan(self, path: str) -> ScanResult:
file_size - data_offset,
path,
)
result.issues.extend(embedded_result.issues)
result.checks.extend(embedded_result.checks)

pickle_end_offset = embedded_result.metadata.get("first_pickle_end_pos")
if isinstance(pickle_end_offset, int) and pickle_end_offset < file_size:
trailing_bytes = file_size - pickle_end_offset
result.issues.extend(
issue
for issue in embedded_result.issues
if not self._is_trailing_pickle_parse_noise(issue)
)
result.checks.extend(
check
for check in embedded_result.checks
if not self._is_trailing_pickle_parse_noise(check)
)
result.add_check(
name="File Integrity Check",
passed=False,
Expand All @@ -343,9 +367,13 @@ def scan(self, path: str) -> ScanResult:
"dtype": str(dtype),
},
)
result.finish(success=False)
_mark_inconclusive_scan_result(result, "numpy_object_pickle_trailing_bytes")
_finish_with_inconclusive_contract(result, default_success=True)
return result

result.issues.extend(embedded_result.issues)
result.checks.extend(embedded_result.checks)

# Object-dtype .npy payloads are stored as a pickle stream rather than
# fixed-width element data, so the numeric dtype/size validation path
# is not applicable after we recurse into the embedded pickle payload.
Expand Down
55 changes: 54 additions & 1 deletion tests/scanners/test_numpy_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import numpy as np

from modelaudit.core import determine_exit_code, scan_model_directory_or_file
from modelaudit.scanners.base import Check, IssueSeverity, ScanResult
from modelaudit.scanners.base import INCONCLUSIVE_SCAN_OUTCOME, Check, IssueSeverity, ScanResult
from modelaudit.scanners.numpy_scanner import NumPyScanner


Expand Down Expand Up @@ -132,6 +132,23 @@ def _failed_checks(result: ScanResult) -> list[Check]:
return [c for c in result.checks if c.status.value == "failed"]


def _assert_no_trailing_pickle_parse_noise(result: ScanResult) -> None:
parse_noise_terms = (
"parse_incomplete",
"pickle parsing failed before full scan completion",
"pickle parsing stopped before the stream was fully consumed",
"stream was fully consumed",
)
check_text = " ".join(f"{check.name} {check.message}" for check in result.checks).lower()
issue_text = " ".join(issue.message for issue in result.issues).lower()
reasons = result.metadata.get("scan_outcome_reasons", [])

assert not any(term in check_text for term in parse_noise_terms)
assert not any(term in issue_text for term in parse_noise_terms)
assert isinstance(reasons, list)
assert "parse_incomplete" not in reasons


def _inject_comment_token_into_npy_payload(path: Path) -> None:
with path.open("rb") as handle:
major, minor = np.lib.format.read_magic(handle)
Expand Down Expand Up @@ -346,6 +363,12 @@ def test_object_dtype_numpy_trailing_bytes_fail_integrity(tmp_path: Path) -> Non
result = scanner.scan(str(path))

assert result.success is False
assert result.has_errors is False
assert result.metadata["scan_outcome"] == INCONCLUSIVE_SCAN_OUTCOME
assert result.metadata["analysis_incomplete"] is True
assert "numpy_object_pickle_trailing_bytes" in result.metadata["scan_outcome_reasons"]
assert not any(issue.severity == IssueSeverity.CRITICAL for issue in result.issues)
_assert_no_trailing_pickle_parse_noise(result)
assert any(
check.name == "File Integrity Check"
and check.status.value == "failed"
Expand All @@ -354,6 +377,36 @@ def test_object_dtype_numpy_trailing_bytes_fail_integrity(tmp_path: Path) -> Non
), f"Expected trailing-byte integrity failure, got: {[c.message for c in result.checks]}"


def test_object_dtype_numpy_trailing_bytes_exit2_not_security_finding(tmp_path: Path) -> None:
arr = np.array([{"k": "v"}], dtype=object)
path = tmp_path / "trailing.npy"
np.save(path, arr, allow_pickle=True)
path.write_bytes(path.read_bytes() + b"TRAILINGJUNK")

result = scan_model_directory_or_file(str(path))
metadata = next(iter(result.file_metadata.values()))

assert result.success is False
assert metadata.get("scan_outcome") == INCONCLUSIVE_SCAN_OUTCOME
assert "numpy_object_pickle_trailing_bytes" in metadata.get("scan_outcome_reasons", [])
assert determine_exit_code(result) == 2
assert not any(issue.severity == IssueSeverity.CRITICAL for issue in result.issues)
Comment thread
coderabbitai[bot] marked this conversation as resolved.


def test_object_dtype_numpy_trailing_bytes_malicious_exit1(tmp_path: Path) -> None:
arr = np.array([_ExecPayload()], dtype=object)
path = tmp_path / "malicious_trailing.npy"
np.save(path, arr, allow_pickle=True)
path.write_bytes(path.read_bytes() + b"TRAILINGJUNK")

result = scan_model_directory_or_file(str(path))
metadata = next(iter(result.file_metadata.values()))

assert metadata.get("scan_outcome") == INCONCLUSIVE_SCAN_OUTCOME
assert determine_exit_code(result) == 1
assert any(issue.severity == IssueSeverity.CRITICAL for issue in result.issues)


def test_corrupted_npz_fails_safely(tmp_path: Path) -> None:
npz_path = tmp_path / "corrupt.npz"
npz_path.write_bytes(b"not-a-zip")
Expand Down
Loading