diff --git a/docs/configuration.md b/docs/configuration.md index 78401e39..82ae97fa 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -18,7 +18,7 @@ You can find a summary of the available variables and their defaults below. | `MDIO__EXPORT__CPU_COUNT` | `int` | Number of logical CPUs available | | `MDIO__GRID__SPARSITY_RATIO_WARN` | `float` | `2.0` | | `MDIO__GRID__SPARSITY_RATIO_LIMIT` | `float` | `10.0` | -| `MDIO__IMPORT__SAVE_SEGY_FILE_HEADER` | `bool` | `False` | +| `MDIO__IMPORT__SAVE_SEGY_FILE_HEADER` | `int` | `0` | | `MDIO__IMPORT__CLOUD_NATIVE` | `bool` | `False` | | `MDIO__IMPORT__RAW_HEADERS` | `bool` | `False` | | `MDIO_IGNORE_CHECKS` | `bool` | `False` | @@ -71,13 +71,29 @@ $ export MDIO__GRID__SPARSITY_RATIO_LIMIT=15.0 ### `MDIO__IMPORT__SAVE_SEGY_FILE_HEADER` -**Accepted values:** `true`, `false`, `1`, `0`, `yes`, `no`, `on`, `off` +**Accepted values:** `0`, `1`, `2`, `true`, `false`, `yes`, `no`, `on`, `off` + +Controls preservation of the original SEG-Y textual file header during import. +The textual file header must be 40 lines of 80 printable characters per the +SEG-Y standard; lossy EBCDIC decoding can produce headers that violate this +layout. The variable selects how MDIO reacts: + +| Value | Behavior | +| ------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `0` / `false` | Do not save SEG-Y file headers (default). | +| `1` / `true` | Save SEG-Y file headers and raise `ValueError` if the text header is not exactly 40x80 ASCII-printable characters (rejects e.g. `U+FFFD` from a lossy EBCDIC decode). | +| `2` | Save SEG-Y file headers; if the text header is malformed, log a warning and correct it (non-ASCII or non-printable characters become spaces and rows pad to 80x40). | -When enabled, preserves the original SEG-Y textual file header during import. -This is useful for maintaining full SEG-Y standard compliance and preserving survey metadata. +```{note} +On export, `mdio_to_segy` always defensively validates the stored text header +and, if it cannot be re-encoded as ASCII (for example because the store was +written by an older version of MDIO that accepted lossy EBCDIC decodes), +repairs it on the fly and emits a warning. Re-ingest the source SEG-Y with +mode `1` or `2` to silence the warning permanently. +``` ```shell -$ export MDIO__IMPORT__SAVE_SEGY_FILE_HEADER=true +$ export MDIO__IMPORT__SAVE_SEGY_FILE_HEADER=1 $ mdio segy import input.segy output.mdio --header-locations 189,193 ``` diff --git a/src/mdio/converters/segy.py b/src/mdio/converters/segy.py index f0d34549..a2d15836 100644 --- a/src/mdio/converters/segy.py +++ b/src/mdio/converters/segy.py @@ -30,6 +30,9 @@ from mdio.converters.exceptions import GridTraceCountError from mdio.converters.exceptions import GridTraceSparsityError from mdio.converters.type_converter import to_structured_type +from mdio.core.config import SAVE_SEGY_FILE_HEADER_LENIENT +from mdio.core.config import SAVE_SEGY_FILE_HEADER_OFF +from mdio.core.config import SAVE_SEGY_FILE_HEADER_STRICT from mdio.core.config import MDIOSettings from mdio.core.grid import Grid from mdio.core.utils_write import MAX_COORDINATES_BYTES @@ -39,6 +42,8 @@ from mdio.segy.file import get_segy_file_info from mdio.segy.scalar import SCALE_COORDINATE_KEYS from mdio.segy.scalar import _apply_coordinate_scalar +from mdio.segy.text_header import sanitize_text_header +from mdio.segy.text_header import validate_text_header from mdio.segy.utilities import get_grid_plan if TYPE_CHECKING: @@ -537,28 +542,26 @@ def _populate_coordinates( def _add_segy_file_headers(xr_dataset: xr_Dataset, segy_file_info: SegyFileInfo) -> xr_Dataset: settings = MDIOSettings() + mode = settings.save_segy_file_header - if not settings.save_segy_file_header: + if mode == SAVE_SEGY_FILE_HEADER_OFF: return xr_dataset - expected_rows = 40 - expected_cols = 80 + text_header = segy_file_info.text_header - text_header_rows = segy_file_info.text_header.splitlines() - text_header_cols_bad = [len(row) != expected_cols for row in text_header_rows] - - if len(text_header_rows) != expected_rows: - err = f"Invalid text header count: expected {expected_rows}, got {len(segy_file_info.text_header)}" - raise ValueError(err) - - if any(text_header_cols_bad): - err = f"Invalid text header columns: expected {expected_cols} per line." - raise ValueError(err) + if mode == SAVE_SEGY_FILE_HEADER_LENIENT: + try: + validate_text_header(text_header) + except ValueError as exc: + logger.warning("Correcting malformed SEG-Y text header on import: %s", exc) + text_header = sanitize_text_header(text_header) + elif mode == SAVE_SEGY_FILE_HEADER_STRICT: + validate_text_header(text_header) xr_dataset["segy_file_header"] = ((), "") xr_dataset["segy_file_header"].attrs.update( { - "textHeader": segy_file_info.text_header, + "textHeader": text_header, "binaryHeader": segy_file_info.binary_header_dict, } ) diff --git a/src/mdio/core/config.py b/src/mdio/core/config.py index dce608f9..53edfb85 100644 --- a/src/mdio/core/config.py +++ b/src/mdio/core/config.py @@ -1,10 +1,26 @@ """Environment variable management for MDIO operations.""" +from typing import Literal + from psutil import cpu_count from pydantic import Field +from pydantic import field_validator from pydantic_settings import BaseSettings from pydantic_settings import SettingsConfigDict +SAVE_SEGY_FILE_HEADER_OFF = 0 +SAVE_SEGY_FILE_HEADER_STRICT = 1 +SAVE_SEGY_FILE_HEADER_LENIENT = 2 + +SaveSegyFileHeaderMode = Literal[ + SAVE_SEGY_FILE_HEADER_OFF, + SAVE_SEGY_FILE_HEADER_STRICT, + SAVE_SEGY_FILE_HEADER_LENIENT, +] + +_SAVE_HEADER_TRUE_STRINGS = frozenset({"true", "yes", "on"}) +_SAVE_HEADER_FALSE_STRINGS = frozenset({"false", "no", "off"}) + class MDIOSettings(BaseSettings): """MDIO environment configuration settings.""" @@ -34,9 +50,12 @@ class MDIOSettings(BaseSettings): ) # Import configuration - save_segy_file_header: bool = Field( - default=False, - description="Whether to save SEG-Y file headers", + save_segy_file_header: SaveSegyFileHeaderMode = Field( + default=0, + description=( + "How to save SEG-Y file headers: 0 (or False) skips, 1 (or True) saves " + "and raises on malformed text header, 2 saves and corrects malformed text header." + ), alias="MDIO__IMPORT__SAVE_SEGY_FILE_HEADER", ) raw_headers: bool = Field( @@ -58,3 +77,21 @@ class MDIOSettings(BaseSettings): ) model_config = SettingsConfigDict(case_sensitive=True) + + @field_validator("save_segy_file_header", mode="before") + @classmethod + def _coerce_save_segy_file_header(cls, value: object) -> object: + """Accept legacy bool values and case-insensitive string aliases.""" + if isinstance(value, str): + normalized = value.strip().lower() + if normalized in _SAVE_HEADER_FALSE_STRINGS: + return SAVE_SEGY_FILE_HEADER_OFF + if normalized in _SAVE_HEADER_TRUE_STRINGS: + return SAVE_SEGY_FILE_HEADER_STRICT + try: + return int(value) + except ValueError: + pass + if isinstance(value, bool): + return int(value) + return value diff --git a/src/mdio/segy/creation.py b/src/mdio/segy/creation.py index 8b10ad48..2a588087 100644 --- a/src/mdio/segy/creation.py +++ b/src/mdio/segy/creation.py @@ -17,6 +17,8 @@ from mdio.api.io import open_mdio from mdio.exceptions import MDIOMissingVariableError from mdio.segy.compat import encode_segy_revision +from mdio.segy.text_header import sanitize_text_header +from mdio.segy.text_header import validate_text_header if TYPE_CHECKING: import xarray as xr @@ -28,6 +30,23 @@ logger = logging.getLogger(__name__) +def _ensure_exportable_text_header(text_header: str) -> str: + """Validate the stored text header; repair and warn if it cannot be ASCII-encoded. + + Args: + text_header: The ``textHeader`` attribute as stored on the MDIO dataset. + + Returns: + A text header string that satisfies :func:`validate_text_header`. + """ + try: + validate_text_header(text_header) + except ValueError as exc: + logger.warning("Stored MDIO text header is not exportable as-is and will be repaired: %s", exc) + return sanitize_text_header(text_header) + return text_header + + def make_segy_factory(spec: SegySpec, binary_header: dict[str, int]) -> SegyFactory: """Generate SEG-Y factory from MDIO metadata.""" sample_interval = binary_header["sample_interval"] @@ -88,6 +107,7 @@ def mdio_spec_to_segy( factory = make_segy_factory(spec=segy_spec, binary_header=binary_header) + text_header = _ensure_exportable_text_header(text_header) text_header_bytes = factory.create_textual_header(text_header) # During MDIO SEGY import, TGSAI/segy always creates revision major/minor fields diff --git a/src/mdio/segy/text_header.py b/src/mdio/segy/text_header.py new file mode 100644 index 00000000..e3e5c889 --- /dev/null +++ b/src/mdio/segy/text_header.py @@ -0,0 +1,92 @@ +"""SEG-Y textual file header validation and sanitization helpers.""" + +from __future__ import annotations + +import re + +EXPECTED_ROWS = 40 +EXPECTED_COLS = 80 +ASCII_MAX_ORD = 127 + +_REPORT_LIMIT = 5 +_NEWLINE_RUN = re.compile(r"\n{2,}") + + +def _is_safe_char(char: str) -> bool: + """Return True if char is 7-bit ASCII and printable.""" + return ord(char) <= ASCII_MAX_ORD and char.isprintable() + + +def _summarize(mapping: dict[int, list[int]], limit: int = _REPORT_LIMIT) -> str: + """Format ``{row: [positions]}`` for an error message, capped for readability.""" + if not mapping: + return "{}" + + items = list(mapping.items()) + head = items[:limit] + body = ", ".join(f"row {row}: positions {positions[:limit]}" for row, positions in head) + + extra_rows = len(items) - len(head) + if extra_rows > 0: + body += f" (+{extra_rows} more rows)" + return body + + +def validate_text_header(text_header: str) -> None: + r"""Validate a SEG-Y textual file header is 40 rows of 80 ASCII-printable characters. + + Args: + text_header: Decoded text header in wrapped form (40 rows of 80 chars joined by ``\n``). + + Raises: + ValueError: If row count, row width, or any character fails the SEG-Y ASCII contract. + """ + rows = text_header.split("\n") + + if len(rows) != EXPECTED_ROWS: + err = f"Invalid text header line count: expected {EXPECTED_ROWS}, got {len(rows)}" + raise ValueError(err) + + bad_widths = [(i, len(row)) for i, row in enumerate(rows) if len(row) != EXPECTED_COLS] + if bad_widths: + capped = bad_widths[:_REPORT_LIMIT] + suffix = f" (+{len(bad_widths) - len(capped)} more)" if len(bad_widths) > len(capped) else "" + err = f"Invalid text header line widths: expected {EXPECTED_COLS} columns; offending rows: {capped}{suffix}" + raise ValueError(err) + + bad_chars: dict[int, list[int]] = {} + for i, row in enumerate(rows): + positions = [j for j, c in enumerate(row) if not _is_safe_char(c)] + if positions: + bad_chars[i] = positions + + if bad_chars: + err = f"Invalid text header characters: non-ASCII or non-printable at {_summarize(bad_chars)}" + raise ValueError(err) + + +def sanitize_text_header(text_header: str) -> str: + r"""Coerce a SEG-Y textual file header into the 40x80 ASCII-printable card layout. + + Runs of two or more ``\n`` collapse to one (some writers terminate cards with ``\n\n``). + Each row gets unsafe characters replaced with spaces and is padded/truncated to 80 chars. + The result always has exactly 40 rows. + + Args: + text_header: Decoded textual file header string. + + Returns: + Sanitized header that satisfies :func:`validate_text_header`. + """ + normalized = _NEWLINE_RUN.sub("\n", text_header) + rows = normalized.split("\n") + + sanitized: list[str] = [] + for row in rows[:EXPECTED_ROWS]: + cleaned = "".join(c if _is_safe_char(c) else " " for c in row) + sanitized.append(cleaned[:EXPECTED_COLS].ljust(EXPECTED_COLS)) + + while len(sanitized) < EXPECTED_ROWS: + sanitized.append(" " * EXPECTED_COLS) + + return "\n".join(sanitized) diff --git a/tests/unit/test_environment.py b/tests/unit/test_environment.py index 430d31dd..32e72b7d 100644 --- a/tests/unit/test_environment.py +++ b/tests/unit/test_environment.py @@ -4,6 +4,7 @@ from unittest.mock import patch import pytest +from pydantic import ValidationError from mdio.core.config import MDIOSettings @@ -32,7 +33,7 @@ def test_environment_isolation(self) -> None: original_values = { "cpus": MDIOSettings().export_cpus, "ratio": MDIOSettings().grid_sparsity_ratio_warn, - "bool": MDIOSettings().save_segy_file_header, + "save_header": MDIOSettings().save_segy_file_header, } with patch.dict( @@ -45,9 +46,51 @@ def test_environment_isolation(self) -> None: ): assert MDIOSettings().export_cpus == 99 assert MDIOSettings().grid_sparsity_ratio_warn == 99.9 - assert MDIOSettings().save_segy_file_header is True + assert MDIOSettings().save_segy_file_header == 1 # Values should be restored after context assert MDIOSettings().export_cpus == original_values["cpus"] assert MDIOSettings().grid_sparsity_ratio_warn == original_values["ratio"] - assert MDIOSettings().save_segy_file_header == original_values["bool"] + assert MDIOSettings().save_segy_file_header == original_values["save_header"] + + +class TestSaveSegyFileHeaderMode: + """Test coercion for ``MDIO__IMPORT__SAVE_SEGY_FILE_HEADER``.""" + + @pytest.mark.parametrize( + ("env_value", "expected"), + [ + ("0", 0), + ("1", 1), + ("2", 2), + ("false", 0), + ("False", 0), + ("FALSE", 0), + ("no", 0), + ("off", 0), + ("true", 1), + ("True", 1), + ("TRUE", 1), + ("yes", 1), + ("on", 1), + ], + ) + def test_string_coercion(self, env_value: str, expected: int) -> None: + """Strings (including legacy bool aliases) coerce to 0, 1, or 2.""" + with patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": env_value}): + assert MDIOSettings().save_segy_file_header == expected + + @pytest.mark.parametrize("python_value", [False, True, 0, 1, 2]) + def test_native_python_values(self, python_value: bool | int) -> None: + """Bool/int passed directly are accepted for backwards compatibility.""" + settings = MDIOSettings(MDIO__IMPORT__SAVE_SEGY_FILE_HEADER=python_value) + assert settings.save_segy_file_header == int(python_value) + + @pytest.mark.parametrize("bad_value", ["3", "-1", "maybe", "tru"]) + def test_rejects_invalid_strings(self, bad_value: str) -> None: + """Anything other than 0/1/2 or bool aliases is rejected.""" + with ( + patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": bad_value}), + pytest.raises(ValidationError), + ): + MDIOSettings() diff --git a/tests/unit/test_segy_export_text_header.py b/tests/unit/test_segy_export_text_header.py new file mode 100644 index 00000000..b06ed0ed --- /dev/null +++ b/tests/unit/test_segy_export_text_header.py @@ -0,0 +1,86 @@ +"""Tests for export-side text header guarding in ``mdio.segy.creation``.""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +from segy.factory import SegyFactory +from segy.standards import get_segy_standard + +from mdio.segy.creation import _ensure_exportable_text_header + +if TYPE_CHECKING: + import pytest + + +def _well_formed_header() -> str: + """Build a 40x80 header where each row reads ``Cnn ...spaces``.""" + return "\n".join([f"C{i:02d}".ljust(80) for i in range(1, 41)]) + + +def _replacement_char_header() -> str: + """Build a 40x80 header with U+FFFD scattered through the last three cards.""" + rows = [f"C{i:02d}".ljust(80) for i in range(1, 41)] + rows[37] = "\ufffdC38" + " " * 76 + rows[38] = "\ufffdC39" + " " * 76 + rows[39] = "\ufffdC40 END EBCDIC" + " " * 65 + return "\n".join(rows) + + +class TestEnsureExportableTextHeader: + """The export guard repairs malformed headers and warns; otherwise no-op.""" + + def test_passthrough_when_well_formed(self, caplog: pytest.LogCaptureFixture) -> None: + """Well-formed input is returned unchanged with no warning.""" + header = _well_formed_header() + with caplog.at_level(logging.WARNING, logger="mdio.segy.creation"): + result = _ensure_exportable_text_header(header) + assert result == header + assert not any("repaired" in record.message for record in caplog.records) + + def test_repairs_replacement_char_and_warns(self, caplog: pytest.LogCaptureFixture) -> None: + """U+FFFD is repaired and a warning is logged.""" + with caplog.at_level(logging.WARNING, logger="mdio.segy.creation"): + result = _ensure_exportable_text_header(_replacement_char_header()) + assert "\ufffd" not in result + result.replace("\n", "").encode("ascii") + assert any("repaired" in record.message for record in caplog.records) + + def test_repairs_short_layout(self, caplog: pytest.LogCaptureFixture) -> None: + """Header with fewer than 40 cards is padded out to 40 rows of 80 chars.""" + short = "\n".join(["C01".ljust(80)] * 5) + with caplog.at_level(logging.WARNING, logger="mdio.segy.creation"): + result = _ensure_exportable_text_header(short) + rows = result.split("\n") + assert len(rows) == 40 + assert all(len(row) == 80 for row in rows) + assert any("repaired" in record.message for record in caplog.records) + + def test_repaired_header_is_accepted_by_segy_factory(self) -> None: + """Repair output round-trips through ``factory.create_textual_header`` to 3200 bytes.""" + spec = get_segy_standard(1.0) + factory = SegyFactory(spec=spec, sample_interval=2000, samples_per_trace=1) + + repaired = _ensure_exportable_text_header(_replacement_char_header()) + encoded = factory.create_textual_header(repaired) + + assert len(encoded) == 3200 + + def test_repairs_double_newline_wrapped(self, caplog: pytest.LogCaptureFixture) -> None: + r"""Cards terminated with ``\n\n`` keep all 40 ``Cnn`` prefixes after repair.""" + cards = [f"C{i:02d}".ljust(80) for i in range(1, 41)] + wrapped = "\n\n".join(cards) + "\n" + + with caplog.at_level(logging.WARNING, logger="mdio.segy.creation"): + repaired = _ensure_exportable_text_header(wrapped) + + repaired_rows = repaired.split("\n") + assert len(repaired_rows) == 40 + for i, row in enumerate(repaired_rows, start=1): + assert row.startswith(f"C{i:02d}"), f"card {i} lost; got {row!r}" + assert any("repaired" in record.message for record in caplog.records) + + spec = get_segy_standard(1.0) + factory = SegyFactory(spec=spec, sample_interval=2000, samples_per_trace=1) + assert len(factory.create_textual_header(repaired)) == 3200 diff --git a/tests/unit/test_segy_file_header_modes.py b/tests/unit/test_segy_file_header_modes.py new file mode 100644 index 00000000..22925efa --- /dev/null +++ b/tests/unit/test_segy_file_header_modes.py @@ -0,0 +1,150 @@ +"""Tests for ``_add_segy_file_headers`` mode handling. + +Covers the three values of ``MDIO__IMPORT__SAVE_SEGY_FILE_HEADER``: +0 skips, 1 raises on a malformed text header, 2 corrects a malformed text header. +""" + +from __future__ import annotations + +import logging +import os +from unittest.mock import patch + +import pytest +import xarray as xr + +from mdio.converters.segy import _add_segy_file_headers +from mdio.segy.file import SegyFileInfo + + +def _well_formed_header() -> str: + """Build a 40x80 header where each row reads ``Cnn ...spaces``.""" + return "\n".join([f"C{i:02d}".ljust(80) for i in range(1, 41)]) + + +def _malformed_header() -> str: + """Header with a NUL byte injected into row 0; valid 80x40 layout otherwise.""" + rows = [f"C{i:02d}".ljust(80) for i in range(1, 41)] + rows[0] = "C01\x00" + " " * 76 + return "\n".join(rows) + + +def _replacement_char_header() -> str: + """Build a 40x80 header with U+FFFD scattered through the last three cards.""" + rows = [f"C{i:02d}".ljust(80) for i in range(1, 41)] + rows[37] = "\ufffdC38" + " " * 76 + rows[38] = "\ufffdC39" + " " * 76 + rows[39] = "\ufffdC40 END EBCDIC" + " " * 65 + return "\n".join(rows) + + +def _segy_info(text_header: str) -> SegyFileInfo: + """Minimal SegyFileInfo fixture with the given text header.""" + return SegyFileInfo( + num_traces=1, + sample_labels=None, + text_header=text_header, + binary_header_dict={"job_id": 1}, + raw_binary_headers=b"", + coordinate_scalar=1, + ) + + +class TestSaveSegyFileHeaderModes: + """Mode 0 skips, mode 1 strict, mode 2 lenient.""" + + def test_mode_zero_skips_header_save(self) -> None: + """Mode 0 leaves the dataset without a ``segy_file_header`` variable.""" + ds = xr.Dataset() + with patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "0"}): + result = _add_segy_file_headers(ds, _segy_info(_malformed_header())) + + assert "segy_file_header" not in result + + def test_mode_one_accepts_well_formed(self) -> None: + """Mode 1 stores a well-formed header verbatim.""" + ds = xr.Dataset() + header = _well_formed_header() + with patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "1"}): + result = _add_segy_file_headers(ds, _segy_info(header)) + + assert result["segy_file_header"].attrs["textHeader"] == header + + def test_mode_one_raises_on_malformed(self) -> None: + """Mode 1 raises on a NUL byte in the header.""" + ds = xr.Dataset() + with ( + patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "1"}), + pytest.raises(ValueError, match="non-ASCII or non-printable"), + ): + _add_segy_file_headers(ds, _segy_info(_malformed_header())) + + def test_mode_one_raises_on_replacement_char(self) -> None: + """Mode 1 raises on U+FFFD.""" + ds = xr.Dataset() + with ( + patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "1"}), + pytest.raises(ValueError, match="non-ASCII or non-printable"), + ): + _add_segy_file_headers(ds, _segy_info(_replacement_char_header())) + + def test_mode_two_corrects_malformed(self, caplog: pytest.LogCaptureFixture) -> None: + """Mode 2 repairs a NUL byte and stores a 40x80 header.""" + ds = xr.Dataset() + with ( + patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "2"}), + caplog.at_level(logging.WARNING, logger="mdio.converters.segy"), + ): + result = _add_segy_file_headers(ds, _segy_info(_malformed_header())) + + stored = result["segy_file_header"].attrs["textHeader"] + assert "\x00" not in stored + assert all(len(row) == 80 for row in stored.split("\n")) + assert len(stored.split("\n")) == 40 + assert any("Correcting malformed" in record.message for record in caplog.records) + + def test_mode_two_corrects_replacement_char(self, caplog: pytest.LogCaptureFixture) -> None: + """Mode 2 repairs U+FFFD and stores ASCII-encodable bytes.""" + ds = xr.Dataset() + with ( + patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "2"}), + caplog.at_level(logging.WARNING, logger="mdio.converters.segy"), + ): + result = _add_segy_file_headers(ds, _segy_info(_replacement_char_header())) + + stored = result["segy_file_header"].attrs["textHeader"] + assert "\ufffd" not in stored + stored.replace("\n", "").encode("ascii") + assert any("Correcting malformed" in record.message for record in caplog.records) + + def test_mode_two_passes_through_well_formed(self, caplog: pytest.LogCaptureFixture) -> None: + """Mode 2 stays silent and bit-identical on well-formed input.""" + ds = xr.Dataset() + header = _well_formed_header() + with ( + patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "2"}), + caplog.at_level(logging.WARNING, logger="mdio.converters.segy"), + ): + result = _add_segy_file_headers(ds, _segy_info(header)) + + assert result["segy_file_header"].attrs["textHeader"] == header + assert not any("Correcting" in record.message for record in caplog.records) + + def test_mode_two_repairs_double_newline_wrapped(self, caplog: pytest.LogCaptureFixture) -> None: + r"""Mode 2 keeps all 40 ``Cnn`` cards when the source uses ``\n\n`` between cards.""" + cards = [f"C{i:02d}".ljust(80) for i in range(1, 41)] + wrapped = "\n\n".join(cards) + "\n" + + ds = xr.Dataset() + with ( + patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "2"}), + caplog.at_level(logging.WARNING, logger="mdio.converters.segy"), + ): + result = _add_segy_file_headers(ds, _segy_info(wrapped)) + + stored = result["segy_file_header"].attrs["textHeader"] + stored_rows = stored.split("\n") + assert len(stored_rows) == 40 + for i, row in enumerate(stored_rows, start=1): + assert row.startswith(f"C{i:02d}"), f"card {i} lost; got {row!r}" + assert any("Correcting malformed" in record.message for record in caplog.records) diff --git a/tests/unit/test_text_header.py b/tests/unit/test_text_header.py new file mode 100644 index 00000000..d839c98a --- /dev/null +++ b/tests/unit/test_text_header.py @@ -0,0 +1,197 @@ +"""Tests for SEG-Y textual file header validation and sanitization.""" + +from __future__ import annotations + +import pytest + +from mdio.segy.text_header import EXPECTED_COLS +from mdio.segy.text_header import EXPECTED_ROWS +from mdio.segy.text_header import sanitize_text_header +from mdio.segy.text_header import validate_text_header + + +def _well_formed_header() -> str: + """Build a 40x80 header where each row reads ``Cnn ...spaces``.""" + rows = [f"C{i:02d}".ljust(EXPECTED_COLS) for i in range(1, EXPECTED_ROWS + 1)] + return "\n".join(rows) + + +def _replacement_char_header() -> str: + """Build a 40x80 header with U+FFFD scattered through the last three cards.""" + rows = [f"C{i:02d}".ljust(EXPECTED_COLS) for i in range(1, EXPECTED_ROWS + 1)] + rows[37] = "\ufffdC38" + " " * (EXPECTED_COLS - 4) + rows[38] = "\ufffdC39" + " " * (EXPECTED_COLS - 4) + rows[39] = "\ufffdC40 END EBCDIC" + " " * (EXPECTED_COLS - 15) + return "\n".join(rows) + + +class TestValidateTextHeader: + """Validation accepts well-formed headers and rejects anything else.""" + + def test_accepts_well_formed(self) -> None: + """Well-formed 40x80 ASCII header passes.""" + validate_text_header(_well_formed_header()) + + def test_rejects_wrong_row_count(self) -> None: + """Wrong row count raises.""" + rows = [" " * EXPECTED_COLS] * (EXPECTED_ROWS - 1) + with pytest.raises(ValueError, match="line count"): + validate_text_header("\n".join(rows)) + + def test_rejects_wrong_column_width(self) -> None: + """Row that is not 80 chars wide raises.""" + rows = [" " * EXPECTED_COLS] * EXPECTED_ROWS + rows[5] = "short" + with pytest.raises(ValueError, match="line widths"): + validate_text_header("\n".join(rows)) + + def test_rejects_non_printable_characters(self) -> None: + """Non-printable ASCII (NUL) is rejected.""" + rows = [" " * EXPECTED_COLS] * EXPECTED_ROWS + rows[10] = "\x00" + " " * (EXPECTED_COLS - 1) + with pytest.raises(ValueError, match="non-ASCII or non-printable"): + validate_text_header("\n".join(rows)) + + @pytest.mark.parametrize( + "bad_char", + ["\ufffd", "\xa0", "\u00e9", "\u00c1"], + ids=["U+FFFD", "U+00A0", "U+00E9", "U+00C1"], + ) + def test_rejects_non_ascii_printable_characters(self, bad_char: str) -> None: + """Non-ASCII codepoints are rejected even when isprintable() is True.""" + rows = [" " * EXPECTED_COLS] * EXPECTED_ROWS + rows[0] = bad_char + " " * (EXPECTED_COLS - 1) + with pytest.raises(ValueError, match="non-ASCII or non-printable"): + validate_text_header("\n".join(rows)) + + def test_rejects_replacement_char_header(self) -> None: + """Header laced with U+FFFD is rejected.""" + with pytest.raises(ValueError, match="non-ASCII or non-printable"): + validate_text_header(_replacement_char_header()) + + def test_does_not_split_on_unicode_line_separators(self) -> None: + r"""``\v``, ``\f``, ``\x85`` are content, not row breaks.""" + rows = [" " * EXPECTED_COLS] * EXPECTED_ROWS + rows[0] = "\x0b" + " " * (EXPECTED_COLS - 1) + with pytest.raises(ValueError, match="non-ASCII or non-printable"): + validate_text_header("\n".join(rows)) + + def test_rejects_double_newline_wrapped(self) -> None: + r"""Strict validation does not collapse ``\n\n``; only sanitize does.""" + rows = [f"C{i:02d}".ljust(EXPECTED_COLS) for i in range(1, EXPECTED_ROWS + 1)] + with pytest.raises(ValueError, match="line count"): + validate_text_header("\n\n".join(rows)) + + def test_error_message_is_capped(self) -> None: + """Pathologically broken header produces a bounded error message.""" + rows = ["\ufffd" * EXPECTED_COLS for _ in range(EXPECTED_ROWS)] + with pytest.raises(ValueError, match="Invalid text header characters") as exc_info: + validate_text_header("\n".join(rows)) + message = str(exc_info.value) + assert "more rows" in message + assert len(message) < 1000 + + +class TestSanitizeTextHeader: + """Sanitization replaces non-printable chars and forces 40x80 layout.""" + + def test_passthrough_well_formed(self) -> None: + """Well-formed input round-trips unchanged.""" + header = _well_formed_header() + assert sanitize_text_header(header) == header + + def test_replaces_non_printable_with_space(self) -> None: + """NUL/BEL bytes are replaced with spaces.""" + rows = [" " * EXPECTED_COLS] * EXPECTED_ROWS + rows[0] = "C01\x00\x07" + " " * (EXPECTED_COLS - 5) + cleaned = sanitize_text_header("\n".join(rows)) + + cleaned_rows = cleaned.split("\n") + assert len(cleaned_rows) == EXPECTED_ROWS + assert cleaned_rows[0].startswith("C01 ") + assert all(c.isprintable() for row in cleaned_rows for c in row) + + def test_replaces_replacement_char_with_space(self) -> None: + """U+FFFD is repaired to spaces; surrounding text survives.""" + cleaned = sanitize_text_header(_replacement_char_header()) + assert "\ufffd" not in cleaned + cleaned_rows = cleaned.split("\n") + assert cleaned_rows[37].startswith(" C38") + assert cleaned_rows[38].startswith(" C39") + assert cleaned_rows[39].startswith(" C40 END EBCDIC") + + def test_replaces_unicode_line_separator_with_space(self) -> None: + r"""``\v`` is replaced with a space, not split as a row break.""" + rows = [" " * EXPECTED_COLS] * EXPECTED_ROWS + rows[0] = "\x0b" + " " * (EXPECTED_COLS - 1) + cleaned = sanitize_text_header("\n".join(rows)) + cleaned_rows = cleaned.split("\n") + assert len(cleaned_rows) == EXPECTED_ROWS + assert cleaned_rows[0] == " " * EXPECTED_COLS + + def test_pads_short_rows_to_eighty_columns(self) -> None: + """Short rows are right-padded with spaces.""" + rows = ["short"] * EXPECTED_ROWS + cleaned = sanitize_text_header("\n".join(rows)) + + for row in cleaned.split("\n"): + assert len(row) == EXPECTED_COLS + + def test_truncates_long_rows_to_eighty_columns(self) -> None: + """Rows longer than 80 chars are truncated.""" + long_row = "X" * (EXPECTED_COLS + 20) + cleaned = sanitize_text_header("\n".join([long_row] * EXPECTED_ROWS)) + + for row in cleaned.split("\n"): + assert len(row) == EXPECTED_COLS + assert row == "X" * EXPECTED_COLS + + def test_pads_missing_rows_with_blank_lines(self) -> None: + """Headers with fewer than 40 rows are padded with blank lines.""" + rows = [f"C{i:02d}".ljust(EXPECTED_COLS) for i in range(1, 5)] + cleaned = sanitize_text_header("\n".join(rows)) + + cleaned_rows = cleaned.split("\n") + assert len(cleaned_rows) == EXPECTED_ROWS + assert cleaned_rows[-1] == " " * EXPECTED_COLS + + def test_truncates_excess_rows(self) -> None: + """Headers with more than 40 rows are truncated to 40.""" + rows = [f"C{i:02d}".ljust(EXPECTED_COLS) for i in range(1, EXPECTED_ROWS + 5)] + cleaned = sanitize_text_header("\n".join(rows)) + + cleaned_rows = cleaned.split("\n") + assert len(cleaned_rows) == EXPECTED_ROWS + assert cleaned_rows[-1].startswith("C40") + + def test_output_passes_validation(self) -> None: + """Sanitize output is always accepted by validate.""" + rows = [f"C{i:02d}\x00\x01\ufffd garbage" for i in range(1, EXPECTED_ROWS + 10)] + cleaned = sanitize_text_header("\n".join(rows)) + validate_text_header(cleaned) + + def test_sanitized_header_is_ascii_encodable(self) -> None: + """Sanitized output encodes as ASCII (the SEG-Y export requirement).""" + cleaned = sanitize_text_header(_replacement_char_header()) + cleaned.replace("\n", "").encode("ascii") + + def test_collapses_double_newline_separator(self) -> None: + r"""Headers with ``\n\n`` between cards keep all 40 cards.""" + cards = [f"C{i:02d}".ljust(EXPECTED_COLS) for i in range(1, EXPECTED_ROWS + 1)] + wrapped = "\n\n".join(cards) + "\n" + cleaned = sanitize_text_header(wrapped) + + cleaned_rows = cleaned.split("\n") + assert len(cleaned_rows) == EXPECTED_ROWS + for i, row in enumerate(cleaned_rows, start=1): + assert row.startswith(f"C{i:02d}"), f"card {i} lost; got {row!r}" + validate_text_header(cleaned) + + def test_collapses_runs_longer_than_two(self) -> None: + r"""Triple (or longer) newline runs collapse to a single ``\n``.""" + cards = [f"C{i:02d}".ljust(EXPECTED_COLS) for i in range(1, EXPECTED_ROWS + 1)] + cleaned = sanitize_text_header("\n\n\n".join(cards)) + + cleaned_rows = cleaned.split("\n") + assert len(cleaned_rows) == EXPECTED_ROWS + assert cleaned_rows[-1].startswith("C40")