From b2f6c1b02c1a15779467cfb97b62337b63e5b8ee Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Thu, 23 Apr 2026 14:43:35 +0000 Subject: [PATCH 1/3] Bump version for release --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 70d8792c..547d7ec7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "multidimio" -version = "1.1.2" +version = "1.1.3" description = "Cloud-native, scalable, and user-friendly multi dimensional energy data!" authors = [{ name = "Altay Sansal", email = "altay.sansal@tgs.com" }] requires-python = ">=3.11,<3.14" @@ -183,7 +183,7 @@ init_typed = true warn_required_dynamic_aliases = true [tool.bumpversion] -current_version = "1.1.2" +current_version = "1.1.3" allow_dirty = true commit = false tag = false From 5bac3c2020aa6a806e1796d6dd1c835104155e4f Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Thu, 7 May 2026 16:34:29 +0000 Subject: [PATCH 2/3] Implement better text header prasing and in-flow correction logic --- docs/configuration.md | 26 ++- src/mdio/converters/segy.py | 31 ++-- src/mdio/core/config.py | 52 +++++- src/mdio/segy/creation.py | 34 ++++ src/mdio/segy/text_header.py | 158 ++++++++++++++++ tests/unit/test_environment.py | 49 ++++- tests/unit/test_segy_export_text_header.py | 97 ++++++++++ tests/unit/test_segy_file_header_modes.py | 137 ++++++++++++++ tests/unit/test_text_header.py | 204 +++++++++++++++++++++ 9 files changed, 763 insertions(+), 25 deletions(-) create mode 100644 src/mdio/segy/text_header.py create mode 100644 tests/unit/test_segy_export_text_header.py create mode 100644 tests/unit/test_segy_file_header_modes.py create mode 100644 tests/unit/test_text_header.py diff --git a/docs/configuration.md b/docs/configuration.md index 78401e39..82ae97fa 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -18,7 +18,7 @@ You can find a summary of the available variables and their defaults below. | `MDIO__EXPORT__CPU_COUNT` | `int` | Number of logical CPUs available | | `MDIO__GRID__SPARSITY_RATIO_WARN` | `float` | `2.0` | | `MDIO__GRID__SPARSITY_RATIO_LIMIT` | `float` | `10.0` | -| `MDIO__IMPORT__SAVE_SEGY_FILE_HEADER` | `bool` | `False` | +| `MDIO__IMPORT__SAVE_SEGY_FILE_HEADER` | `int` | `0` | | `MDIO__IMPORT__CLOUD_NATIVE` | `bool` | `False` | | `MDIO__IMPORT__RAW_HEADERS` | `bool` | `False` | | `MDIO_IGNORE_CHECKS` | `bool` | `False` | @@ -71,13 +71,29 @@ $ export MDIO__GRID__SPARSITY_RATIO_LIMIT=15.0 ### `MDIO__IMPORT__SAVE_SEGY_FILE_HEADER` -**Accepted values:** `true`, `false`, `1`, `0`, `yes`, `no`, `on`, `off` +**Accepted values:** `0`, `1`, `2`, `true`, `false`, `yes`, `no`, `on`, `off` + +Controls preservation of the original SEG-Y textual file header during import. +The textual file header must be 40 lines of 80 printable characters per the +SEG-Y standard; lossy EBCDIC decoding can produce headers that violate this +layout. The variable selects how MDIO reacts: + +| Value | Behavior | +| ------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `0` / `false` | Do not save SEG-Y file headers (default). | +| `1` / `true` | Save SEG-Y file headers and raise `ValueError` if the text header is not exactly 40x80 ASCII-printable characters (rejects e.g. `U+FFFD` from a lossy EBCDIC decode). | +| `2` | Save SEG-Y file headers; if the text header is malformed, log a warning and correct it (non-ASCII or non-printable characters become spaces and rows pad to 80x40). | -When enabled, preserves the original SEG-Y textual file header during import. -This is useful for maintaining full SEG-Y standard compliance and preserving survey metadata. +```{note} +On export, `mdio_to_segy` always defensively validates the stored text header +and, if it cannot be re-encoded as ASCII (for example because the store was +written by an older version of MDIO that accepted lossy EBCDIC decodes), +repairs it on the fly and emits a warning. Re-ingest the source SEG-Y with +mode `1` or `2` to silence the warning permanently. +``` ```shell -$ export MDIO__IMPORT__SAVE_SEGY_FILE_HEADER=true +$ export MDIO__IMPORT__SAVE_SEGY_FILE_HEADER=1 $ mdio segy import input.segy output.mdio --header-locations 189,193 ``` diff --git a/src/mdio/converters/segy.py b/src/mdio/converters/segy.py index f0d34549..a2d15836 100644 --- a/src/mdio/converters/segy.py +++ b/src/mdio/converters/segy.py @@ -30,6 +30,9 @@ from mdio.converters.exceptions import GridTraceCountError from mdio.converters.exceptions import GridTraceSparsityError from mdio.converters.type_converter import to_structured_type +from mdio.core.config import SAVE_SEGY_FILE_HEADER_LENIENT +from mdio.core.config import SAVE_SEGY_FILE_HEADER_OFF +from mdio.core.config import SAVE_SEGY_FILE_HEADER_STRICT from mdio.core.config import MDIOSettings from mdio.core.grid import Grid from mdio.core.utils_write import MAX_COORDINATES_BYTES @@ -39,6 +42,8 @@ from mdio.segy.file import get_segy_file_info from mdio.segy.scalar import SCALE_COORDINATE_KEYS from mdio.segy.scalar import _apply_coordinate_scalar +from mdio.segy.text_header import sanitize_text_header +from mdio.segy.text_header import validate_text_header from mdio.segy.utilities import get_grid_plan if TYPE_CHECKING: @@ -537,28 +542,26 @@ def _populate_coordinates( def _add_segy_file_headers(xr_dataset: xr_Dataset, segy_file_info: SegyFileInfo) -> xr_Dataset: settings = MDIOSettings() + mode = settings.save_segy_file_header - if not settings.save_segy_file_header: + if mode == SAVE_SEGY_FILE_HEADER_OFF: return xr_dataset - expected_rows = 40 - expected_cols = 80 + text_header = segy_file_info.text_header - text_header_rows = segy_file_info.text_header.splitlines() - text_header_cols_bad = [len(row) != expected_cols for row in text_header_rows] - - if len(text_header_rows) != expected_rows: - err = f"Invalid text header count: expected {expected_rows}, got {len(segy_file_info.text_header)}" - raise ValueError(err) - - if any(text_header_cols_bad): - err = f"Invalid text header columns: expected {expected_cols} per line." - raise ValueError(err) + if mode == SAVE_SEGY_FILE_HEADER_LENIENT: + try: + validate_text_header(text_header) + except ValueError as exc: + logger.warning("Correcting malformed SEG-Y text header on import: %s", exc) + text_header = sanitize_text_header(text_header) + elif mode == SAVE_SEGY_FILE_HEADER_STRICT: + validate_text_header(text_header) xr_dataset["segy_file_header"] = ((), "") xr_dataset["segy_file_header"].attrs.update( { - "textHeader": segy_file_info.text_header, + "textHeader": text_header, "binaryHeader": segy_file_info.binary_header_dict, } ) diff --git a/src/mdio/core/config.py b/src/mdio/core/config.py index dce608f9..e82e6f2b 100644 --- a/src/mdio/core/config.py +++ b/src/mdio/core/config.py @@ -1,10 +1,35 @@ """Environment variable management for MDIO operations.""" +from typing import Literal + from psutil import cpu_count from pydantic import Field +from pydantic import field_validator from pydantic_settings import BaseSettings from pydantic_settings import SettingsConfigDict +SAVE_SEGY_FILE_HEADER_OFF = 0 +SAVE_SEGY_FILE_HEADER_STRICT = 1 +SAVE_SEGY_FILE_HEADER_LENIENT = 2 + +SaveSegyFileHeaderMode = Literal[ + SAVE_SEGY_FILE_HEADER_OFF, + SAVE_SEGY_FILE_HEADER_STRICT, + SAVE_SEGY_FILE_HEADER_LENIENT, +] +"""Mode for ``MDIO__IMPORT__SAVE_SEGY_FILE_HEADER``. + +* ``0`` (also accepts ``False`` / ``"false"``): do not save SEG-Y file headers. +* ``1`` (also accepts ``True`` / ``"true"``): save SEG-Y file headers and raise + on a malformed text header. +* ``2``: save SEG-Y file headers and, on a malformed text header, log a + warning and correct it (non-ASCII or non-printable characters become spaces + and the header is padded to 80x40). +""" + +_SAVE_HEADER_TRUE_STRINGS = frozenset({"true", "yes", "on"}) +_SAVE_HEADER_FALSE_STRINGS = frozenset({"false", "no", "off"}) + class MDIOSettings(BaseSettings): """MDIO environment configuration settings.""" @@ -34,9 +59,12 @@ class MDIOSettings(BaseSettings): ) # Import configuration - save_segy_file_header: bool = Field( - default=False, - description="Whether to save SEG-Y file headers", + save_segy_file_header: SaveSegyFileHeaderMode = Field( + default=0, + description=( + "How to save SEG-Y file headers: 0 (or False) skips, 1 (or True) saves " + "and raises on malformed text header, 2 saves and corrects malformed text header." + ), alias="MDIO__IMPORT__SAVE_SEGY_FILE_HEADER", ) raw_headers: bool = Field( @@ -58,3 +86,21 @@ class MDIOSettings(BaseSettings): ) model_config = SettingsConfigDict(case_sensitive=True) + + @field_validator("save_segy_file_header", mode="before") + @classmethod + def _coerce_save_segy_file_header(cls, value: object) -> object: + """Accept legacy bool values and case-insensitive string aliases.""" + if isinstance(value, str): + normalized = value.strip().lower() + if normalized in _SAVE_HEADER_FALSE_STRINGS: + return SAVE_SEGY_FILE_HEADER_OFF + if normalized in _SAVE_HEADER_TRUE_STRINGS: + return SAVE_SEGY_FILE_HEADER_STRICT + try: + return int(value) + except ValueError: + pass + if isinstance(value, bool): + return int(value) + return value diff --git a/src/mdio/segy/creation.py b/src/mdio/segy/creation.py index 8b10ad48..c250f8bb 100644 --- a/src/mdio/segy/creation.py +++ b/src/mdio/segy/creation.py @@ -17,6 +17,8 @@ from mdio.api.io import open_mdio from mdio.exceptions import MDIOMissingVariableError from mdio.segy.compat import encode_segy_revision +from mdio.segy.text_header import sanitize_text_header +from mdio.segy.text_header import validate_text_header if TYPE_CHECKING: import xarray as xr @@ -28,6 +30,37 @@ logger = logging.getLogger(__name__) +def _ensure_exportable_text_header(text_header: str) -> str: + """Validate the stored text header and repair it if it cannot be encoded. + + MDIO stores the text header as a wrapped 40x80 string. Stores written by + older versions of MDIO may contain non-ASCII characters (typically + ``U+FFFD`` from a lossy EBCDIC import) that cannot be re-encoded to ASCII + by the SEG-Y factory. To keep export usable for those stores this helper + runs the validator and, on failure, sanitizes the header in place and logs + a warning rather than aborting the export. + + Args: + text_header: The ``textHeader`` attribute as stored on the MDIO dataset. + + Returns: + A text header string that satisfies + :func:`mdio.segy.text_header.validate_text_header` and is therefore + guaranteed to round-trip through ``factory.create_textual_header``. + """ + try: + validate_text_header(text_header) + except ValueError as exc: + logger.warning( + "Stored MDIO text header is not exportable as-is and will be repaired: %s. " + "The repair replaces non-ASCII or non-printable characters with spaces and " + "forces the 80x40 card layout. Re-ingest the source SEG-Y to remove this warning.", + exc, + ) + return sanitize_text_header(text_header) + return text_header + + def make_segy_factory(spec: SegySpec, binary_header: dict[str, int]) -> SegyFactory: """Generate SEG-Y factory from MDIO metadata.""" sample_interval = binary_header["sample_interval"] @@ -88,6 +121,7 @@ def mdio_spec_to_segy( factory = make_segy_factory(spec=segy_spec, binary_header=binary_header) + text_header = _ensure_exportable_text_header(text_header) text_header_bytes = factory.create_textual_header(text_header) # During MDIO SEGY import, TGSAI/segy always creates revision major/minor fields diff --git a/src/mdio/segy/text_header.py b/src/mdio/segy/text_header.py new file mode 100644 index 00000000..d3c1acd3 --- /dev/null +++ b/src/mdio/segy/text_header.py @@ -0,0 +1,158 @@ +"""SEG-Y textual file header validation and sanitization helpers. + +The SEG-Y standard defines the textual file header as a 3200-byte block +organized as 40 cards of 80 characters each, encoded as either ASCII or +EBCDIC. Both encodings used by ``TGSAI/segy`` ultimately require the +in-memory string to be 7-bit ASCII (``ord(c) <= 127``) before bytes can be +written. The MDIO on-disk representation is the wrapped form: 40 lines of +exactly 80 characters joined by ``"\\n"``. + +When the source bytes were ingested through a lossy EBCDIC decode, MDIO +typically receives ``U+FFFD`` (``"\uFFFD"``) replacement characters and other +non-ASCII codepoints. Those characters round-trip through MDIO storage but +fail when ``segy.factory.create_textual_header`` tries to re-encode the +header to ASCII for SEG-Y export. The helpers in this module exist to detect +that situation up-front and, when requested, repair it deterministically. + +Repairs are conservative: any character that is either non-ASCII +(``ord(c) > 127``) or non-printable per :py:meth:`str.isprintable` is replaced +with an ASCII space, and the card grid is forced to exactly 40 rows of 80 +columns. Newlines (``"\\n"``) are treated only as row separators; other +Unicode line-break characters (``"\\v"``, ``"\\f"``, ``"\\x85"``, ``"\u2028"``, +``"\u2029"``) are treated as content and replaced rather than re-splitting +the layout. Sanitization additionally collapses runs of two or more +``"\\n"`` to one so headers that were written with ``"\\n\\n"`` between +cards are not silently truncated to half their length. +""" + +from __future__ import annotations + +import re + +EXPECTED_ROWS = 40 +EXPECTED_COLS = 80 +EXPECTED_LENGTH = EXPECTED_ROWS * EXPECTED_COLS +ASCII_MAX_ORD = 127 + +_REPORT_LIMIT = 5 +_NEWLINE_RUN = re.compile(r"\n{2,}") + + +def _is_safe_char(char: str) -> bool: + """Return True if a char is safe to round-trip through SEG-Y ASCII/EBCDIC. + + A char is "safe" when it is both 7-bit ASCII (``ord <= 127``) and printable + per :py:meth:`str.isprintable`. ASCII space passes; ``U+FFFD``, accented + Latin characters, control characters and tabs do not. + """ + return ord(char) <= ASCII_MAX_ORD and char.isprintable() + + +def _split_rows(text_header: str) -> list[str]: + """Split a wrapped text header into rows on ``"\\n"`` only. + + Other Unicode line-break characters (``"\\v"``, ``"\\f"``, ``"\u0085"``, etc.) + are intentionally left in place so that lossy decodes do not silently + re-shape the card grid. They will surface as unsafe characters during + validation and be replaced during sanitization. + """ + return text_header.split("\n") + + +def _find_unsafe(row: str) -> list[int]: + """Return positions of characters that are not :func:`_is_safe_char`.""" + return [i for i, c in enumerate(row) if not _is_safe_char(c)] + + +def _summarize(mapping: dict[int, list[int]], limit: int = _REPORT_LIMIT) -> str: + """Format ``{row: [positions]}`` for an error message, capped for readability.""" + if not mapping: + return "{}" + + items = list(mapping.items()) + head = items[:limit] + body = ", ".join(f"row {row}: positions {positions[:limit]}" for row, positions in head) + + extra_rows = len(items) - len(head) + if extra_rows > 0: + body += f" (+{extra_rows} more rows)" + return body + + +def validate_text_header(text_header: str) -> None: + """Validate a SEG-Y textual file header is 40 rows of 80 ASCII-printable characters. + + Args: + text_header: Decoded textual file header string in the wrapped form + (40 rows of 80 characters joined by ``"\\n"``). + + Raises: + ValueError: If the header does not split into exactly 40 rows on + ``"\\n"``, any row is not 80 characters wide, or any character is + not safe to encode as 7-bit ASCII (see :func:`_is_safe_char`). + """ + rows = _split_rows(text_header) + + if len(rows) != EXPECTED_ROWS: + err = f"Invalid text header line count: expected {EXPECTED_ROWS}, got {len(rows)}" + raise ValueError(err) + + bad_widths = [(i, len(row)) for i, row in enumerate(rows) if len(row) != EXPECTED_COLS] + if bad_widths: + capped = bad_widths[:_REPORT_LIMIT] + suffix = f" (+{len(bad_widths) - len(capped)} more)" if len(bad_widths) > len(capped) else "" + err = f"Invalid text header line widths: expected {EXPECTED_COLS} columns; offending rows: {capped}{suffix}" + raise ValueError(err) + + bad_chars: dict[int, list[int]] = {} + for i, row in enumerate(rows): + positions = _find_unsafe(row) + if positions: + bad_chars[i] = positions + + if bad_chars: + err = ( + "Invalid text header characters: non-ASCII or non-printable at " + f"{_summarize(bad_chars)}" + ) + raise ValueError(err) + + +def sanitize_text_header(text_header: str) -> str: + """Coerce a SEG-Y textual file header into the 40x80 ASCII-printable card layout. + + Pre-processing collapses runs of two or more ``"\\n"`` into one. Some SEG-Y + writers terminate each card with ``"\\n\\n"``, which yields 80 rows on a + naive ``split("\\n")`` and would silently drop cards 21-40 when the row + list is sliced to 40. Collapsing runs of newlines recovers the intended + card layout for that common case while leaving properly-wrapped headers + untouched. + + The normalized input is then split on ``"\\n"`` and each row is independently: + + 1. Stripped of unsafe characters (any non-ASCII or non-printable codepoint + is replaced with a single ASCII space). + 2. Right-padded with spaces or truncated to exactly 80 characters. + + Rows beyond 40 are dropped. Missing rows are appended as 80-space blanks + so the result always contains exactly 40 lines. + + Args: + text_header: Decoded textual file header string. + + Returns: + Sanitized header string with rows joined by ``"\\n"``. The output is + guaranteed to satisfy :func:`validate_text_header`. + """ + normalized = _NEWLINE_RUN.sub("\n", text_header) + rows = _split_rows(normalized) + + sanitized: list[str] = [] + for row in rows[:EXPECTED_ROWS]: + cleaned = "".join(c if _is_safe_char(c) else " " for c in row) + sanitized.append(cleaned[:EXPECTED_COLS].ljust(EXPECTED_COLS)) + + while len(sanitized) < EXPECTED_ROWS: + sanitized.append(" " * EXPECTED_COLS) + + return "\n".join(sanitized) diff --git a/tests/unit/test_environment.py b/tests/unit/test_environment.py index 430d31dd..32e72b7d 100644 --- a/tests/unit/test_environment.py +++ b/tests/unit/test_environment.py @@ -4,6 +4,7 @@ from unittest.mock import patch import pytest +from pydantic import ValidationError from mdio.core.config import MDIOSettings @@ -32,7 +33,7 @@ def test_environment_isolation(self) -> None: original_values = { "cpus": MDIOSettings().export_cpus, "ratio": MDIOSettings().grid_sparsity_ratio_warn, - "bool": MDIOSettings().save_segy_file_header, + "save_header": MDIOSettings().save_segy_file_header, } with patch.dict( @@ -45,9 +46,51 @@ def test_environment_isolation(self) -> None: ): assert MDIOSettings().export_cpus == 99 assert MDIOSettings().grid_sparsity_ratio_warn == 99.9 - assert MDIOSettings().save_segy_file_header is True + assert MDIOSettings().save_segy_file_header == 1 # Values should be restored after context assert MDIOSettings().export_cpus == original_values["cpus"] assert MDIOSettings().grid_sparsity_ratio_warn == original_values["ratio"] - assert MDIOSettings().save_segy_file_header == original_values["bool"] + assert MDIOSettings().save_segy_file_header == original_values["save_header"] + + +class TestSaveSegyFileHeaderMode: + """Test coercion for ``MDIO__IMPORT__SAVE_SEGY_FILE_HEADER``.""" + + @pytest.mark.parametrize( + ("env_value", "expected"), + [ + ("0", 0), + ("1", 1), + ("2", 2), + ("false", 0), + ("False", 0), + ("FALSE", 0), + ("no", 0), + ("off", 0), + ("true", 1), + ("True", 1), + ("TRUE", 1), + ("yes", 1), + ("on", 1), + ], + ) + def test_string_coercion(self, env_value: str, expected: int) -> None: + """Strings (including legacy bool aliases) coerce to 0, 1, or 2.""" + with patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": env_value}): + assert MDIOSettings().save_segy_file_header == expected + + @pytest.mark.parametrize("python_value", [False, True, 0, 1, 2]) + def test_native_python_values(self, python_value: bool | int) -> None: + """Bool/int passed directly are accepted for backwards compatibility.""" + settings = MDIOSettings(MDIO__IMPORT__SAVE_SEGY_FILE_HEADER=python_value) + assert settings.save_segy_file_header == int(python_value) + + @pytest.mark.parametrize("bad_value", ["3", "-1", "maybe", "tru"]) + def test_rejects_invalid_strings(self, bad_value: str) -> None: + """Anything other than 0/1/2 or bool aliases is rejected.""" + with ( + patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": bad_value}), + pytest.raises(ValidationError), + ): + MDIOSettings() diff --git a/tests/unit/test_segy_export_text_header.py b/tests/unit/test_segy_export_text_header.py new file mode 100644 index 00000000..b45fb46f --- /dev/null +++ b/tests/unit/test_segy_export_text_header.py @@ -0,0 +1,97 @@ +"""Tests for export-side text-header guarding in ``mdio.segy.creation``. + +These cover the second half of issue #814: existing MDIO stores written by an +older version of MDIO may carry a malformed text header (typically scattered +``U+FFFD`` characters from a lossy EBCDIC import). The export path must not +crash on those stores; it should repair the header and warn instead. +""" + +from __future__ import annotations + +import logging + +import pytest +from segy.factory import SegyFactory +from segy.standards import get_segy_standard + +from mdio.segy.creation import _ensure_exportable_text_header + + +def _well_formed_header() -> str: + return "\n".join([f"C{i:02d}".ljust(80) for i in range(1, 41)]) + + +def _replacement_char_header() -> str: + rows = [f"C{i:02d}".ljust(80) for i in range(1, 41)] + rows[37] = "\ufffdC38" + " " * 76 + rows[38] = "\ufffdC39" + " " * 76 + rows[39] = "\ufffdC40 END EBCDIC" + " " * 65 + return "\n".join(rows) + + +class TestEnsureExportableTextHeader: + """The export guard repairs malformed headers and warns; otherwise no-op.""" + + def test_passthrough_when_well_formed(self, caplog: pytest.LogCaptureFixture) -> None: + header = _well_formed_header() + with caplog.at_level(logging.WARNING, logger="mdio.segy.creation"): + result = _ensure_exportable_text_header(header) + assert result == header + assert not any("repaired" in record.message for record in caplog.records) + + def test_repairs_replacement_char_and_warns(self, caplog: pytest.LogCaptureFixture) -> None: + with caplog.at_level(logging.WARNING, logger="mdio.segy.creation"): + result = _ensure_exportable_text_header(_replacement_char_header()) + assert "\ufffd" not in result + result.replace("\n", "").encode("ascii") # raises if any non-ASCII char survived + assert any("repaired" in record.message for record in caplog.records) + + def test_repairs_short_layout(self, caplog: pytest.LogCaptureFixture) -> None: + """A header with fewer than 40 cards is padded out so export can proceed.""" + short = "\n".join(["C01".ljust(80)] * 5) + with caplog.at_level(logging.WARNING, logger="mdio.segy.creation"): + result = _ensure_exportable_text_header(short) + rows = result.split("\n") + assert len(rows) == 40 + assert all(len(row) == 80 for row in rows) + assert any("repaired" in record.message for record in caplog.records) + + def test_repaired_header_is_accepted_by_segy_factory(self) -> None: + """End-to-end proof that repair output is round-trippable via the SEG-Y factory. + + Regression guard for issue #814: a malformed header that previously + crashed ``factory.create_textual_header`` must produce a 3200-byte + textual block after going through the export guard. + """ + spec = get_segy_standard(1.0) + factory = SegyFactory(spec=spec, sample_interval=2000, samples_per_trace=1) + + repaired = _ensure_exportable_text_header(_replacement_char_header()) + encoded = factory.create_textual_header(repaired) + + assert len(encoded) == 3200 + + def test_repairs_double_newline_wrapped(self, caplog: pytest.LogCaptureFixture) -> None: + """Legacy stores wrapped with ``\\n\\n`` per card must export with all 40 cards intact. + + This is the second real-world malformed sample seen in the wild + (file ``260418_A4_…``): each card is terminated with ``\\n\\n``, which + previously caused naive splitting to lose cards 21-40 silently. The + export guard must collapse the double newlines and emit a 3200-byte + textual block whose 40 cards all carry their original ``Cnn`` prefix. + """ + cards = [f"C{i:02d}".ljust(80) for i in range(1, 41)] + wrapped = "\n\n".join(cards) + "\n" + + with caplog.at_level(logging.WARNING, logger="mdio.segy.creation"): + repaired = _ensure_exportable_text_header(wrapped) + + repaired_rows = repaired.split("\n") + assert len(repaired_rows) == 40 + for i, row in enumerate(repaired_rows, start=1): + assert row.startswith(f"C{i:02d}"), f"card {i} lost; got {row!r}" + assert any("repaired" in record.message for record in caplog.records) + + spec = get_segy_standard(1.0) + factory = SegyFactory(spec=spec, sample_interval=2000, samples_per_trace=1) + assert len(factory.create_textual_header(repaired)) == 3200 diff --git a/tests/unit/test_segy_file_header_modes.py b/tests/unit/test_segy_file_header_modes.py new file mode 100644 index 00000000..93d46f65 --- /dev/null +++ b/tests/unit/test_segy_file_header_modes.py @@ -0,0 +1,137 @@ +"""Tests for ``_add_segy_file_headers`` mode handling. + +Covers the three values of ``MDIO__IMPORT__SAVE_SEGY_FILE_HEADER``: 0 skips, +1 raises on a malformed text header, 2 corrects a malformed text header. +""" + +from __future__ import annotations + +import logging +import os +from unittest.mock import patch + +import pytest +import xarray as xr + +from mdio.converters.segy import _add_segy_file_headers +from mdio.segy.file import SegyFileInfo + + +def _well_formed_header() -> str: + return "\n".join([f"C{i:02d}".ljust(80) for i in range(1, 41)]) + + +def _malformed_header() -> str: + """Header with a NUL byte injected into row 0; valid 80x40 layout otherwise.""" + rows = [f"C{i:02d}".ljust(80) for i in range(1, 41)] + rows[0] = "C01\x00" + " " * 76 + return "\n".join(rows) + + +def _replacement_char_header() -> str: + """Header that mirrors the example from issue #814. + + ``U+FFFD`` is reported as printable by Python so naive ``str.isprintable`` + checks would let it through and break SEG-Y export, which requires + 7-bit ASCII bytes. Mode 1 must reject it; mode 2 must repair it. + """ + rows = [f"C{i:02d}".ljust(80) for i in range(1, 41)] + rows[37] = "\ufffdC38" + " " * 76 + rows[38] = "\ufffdC39" + " " * 76 + rows[39] = "\ufffdC40 END EBCDIC" + " " * 65 + return "\n".join(rows) + + +def _segy_info(text_header: str) -> SegyFileInfo: + return SegyFileInfo( + num_traces=1, + sample_labels=None, + text_header=text_header, + binary_header_dict={"job_id": 1}, + raw_binary_headers=b"", + coordinate_scalar=1, + ) + + +class TestSaveSegyFileHeaderModes: + """Mode 0 skips, mode 1 strict, mode 2 lenient.""" + + def test_mode_zero_skips_header_save(self) -> None: + ds = xr.Dataset() + with patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "0"}): + result = _add_segy_file_headers(ds, _segy_info(_malformed_header())) + + assert "segy_file_header" not in result + + def test_mode_one_accepts_well_formed(self) -> None: + ds = xr.Dataset() + header = _well_formed_header() + with patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "1"}): + result = _add_segy_file_headers(ds, _segy_info(header)) + + assert result["segy_file_header"].attrs["textHeader"] == header + + def test_mode_one_raises_on_malformed(self) -> None: + ds = xr.Dataset() + with patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "1"}): + with pytest.raises(ValueError, match="non-ASCII or non-printable"): + _add_segy_file_headers(ds, _segy_info(_malformed_header())) + + def test_mode_one_raises_on_replacement_char(self) -> None: + """The bug from issue #814: U+FFFD must be rejected in strict mode.""" + ds = xr.Dataset() + with patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "1"}): + with pytest.raises(ValueError, match="non-ASCII or non-printable"): + _add_segy_file_headers(ds, _segy_info(_replacement_char_header())) + + def test_mode_two_corrects_malformed(self, caplog: pytest.LogCaptureFixture) -> None: + ds = xr.Dataset() + with patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "2"}): + with caplog.at_level(logging.WARNING, logger="mdio.converters.segy"): + result = _add_segy_file_headers(ds, _segy_info(_malformed_header())) + + stored = result["segy_file_header"].attrs["textHeader"] + assert "\x00" not in stored + assert all(len(row) == 80 for row in stored.split("\n")) + assert len(stored.split("\n")) == 40 + assert any("Correcting malformed" in record.message for record in caplog.records) + + def test_mode_two_corrects_replacement_char(self, caplog: pytest.LogCaptureFixture) -> None: + """The bug from issue #814: U+FFFD must be repaired in lenient mode and stored ASCII-clean.""" + ds = xr.Dataset() + with patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "2"}): + with caplog.at_level(logging.WARNING, logger="mdio.converters.segy"): + result = _add_segy_file_headers(ds, _segy_info(_replacement_char_header())) + + stored = result["segy_file_header"].attrs["textHeader"] + assert "\ufffd" not in stored + stored.replace("\n", "").encode("ascii") # would raise if any non-ASCII char survived + assert any("Correcting malformed" in record.message for record in caplog.records) + + def test_mode_two_passes_through_well_formed(self, caplog: pytest.LogCaptureFixture) -> None: + """Mode 2 always sanitizes, but stays silent and bit-identical on well-formed input.""" + ds = xr.Dataset() + header = _well_formed_header() + with patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "2"}): + with caplog.at_level(logging.WARNING, logger="mdio.converters.segy"): + result = _add_segy_file_headers(ds, _segy_info(header)) + + assert result["segy_file_header"].attrs["textHeader"] == header + assert not any("Correcting" in record.message for record in caplog.records) + + def test_mode_two_repairs_double_newline_wrapped(self, caplog: pytest.LogCaptureFixture) -> None: + """Source SEG-Y wrapped with ``\\n\\n`` between cards keeps all 40 Cnn cards in mode 2.""" + cards = [f"C{i:02d}".ljust(80) for i in range(1, 41)] + wrapped = "\n\n".join(cards) + "\n" + + ds = xr.Dataset() + with patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "2"}): + with caplog.at_level(logging.WARNING, logger="mdio.converters.segy"): + result = _add_segy_file_headers(ds, _segy_info(wrapped)) + + stored = result["segy_file_header"].attrs["textHeader"] + stored_rows = stored.split("\n") + assert len(stored_rows) == 40 + for i, row in enumerate(stored_rows, start=1): + assert row.startswith(f"C{i:02d}"), f"card {i} lost; got {row!r}" + assert any("Correcting malformed" in record.message for record in caplog.records) diff --git a/tests/unit/test_text_header.py b/tests/unit/test_text_header.py new file mode 100644 index 00000000..dc2305f1 --- /dev/null +++ b/tests/unit/test_text_header.py @@ -0,0 +1,204 @@ +"""Tests for SEG-Y textual file header validation and sanitization.""" + +from __future__ import annotations + +import pytest + +from mdio.segy.text_header import EXPECTED_COLS +from mdio.segy.text_header import EXPECTED_ROWS +from mdio.segy.text_header import sanitize_text_header +from mdio.segy.text_header import validate_text_header + + +def _well_formed_header() -> str: + """Build a 40x80 header where each row reads ``Cnn ...spaces``.""" + rows = [f"C{i:02d}".ljust(EXPECTED_COLS) for i in range(1, EXPECTED_ROWS + 1)] + return "\n".join(rows) + + +def _replacement_char_header() -> str: + """Build a 40x80 header that mirrors the example in issue #814. + + Three replacement characters (``U+FFFD``) are scattered through the last + three cards. ``U+FFFD`` is reported as printable by Python but cannot be + encoded as ASCII, which is exactly the failure mode the issue describes. + """ + rows = [f"C{i:02d}".ljust(EXPECTED_COLS) for i in range(1, EXPECTED_ROWS + 1)] + rows[37] = ("\ufffdC38" + " " * (EXPECTED_COLS - 4)) + rows[38] = ("\ufffdC39" + " " * (EXPECTED_COLS - 4)) + rows[39] = ("\ufffdC40 END EBCDIC" + " " * (EXPECTED_COLS - 15)) + return "\n".join(rows) + + +class TestValidateTextHeader: + """Validation should accept well-formed and reject anything else.""" + + def test_accepts_well_formed(self) -> None: + validate_text_header(_well_formed_header()) + + def test_rejects_wrong_row_count(self) -> None: + rows = [" " * EXPECTED_COLS] * (EXPECTED_ROWS - 1) + with pytest.raises(ValueError, match="line count"): + validate_text_header("\n".join(rows)) + + def test_rejects_wrong_column_width(self) -> None: + rows = [" " * EXPECTED_COLS] * EXPECTED_ROWS + rows[5] = "short" + with pytest.raises(ValueError, match="line widths"): + validate_text_header("\n".join(rows)) + + def test_rejects_non_printable_characters(self) -> None: + rows = [" " * EXPECTED_COLS] * EXPECTED_ROWS + rows[10] = "\x00" + " " * (EXPECTED_COLS - 1) + with pytest.raises(ValueError, match="non-ASCII or non-printable"): + validate_text_header("\n".join(rows)) + + @pytest.mark.parametrize( + "bad_char", + [ + "\ufffd", # replacement char from a lossy EBCDIC decode (issue #814) + "\xa0", # non-breaking space + "\u00e9", # 'é' - encodable as latin-1 but not ascii + "\u00c1", # 'Á' + ], + ids=["U+FFFD", "U+00A0", "U+00E9", "U+00C1"], + ) + def test_rejects_non_ascii_printable_characters(self, bad_char: str) -> None: + """Non-ASCII codepoints must be rejected even when isprintable() is True.""" + rows = [" " * EXPECTED_COLS] * EXPECTED_ROWS + rows[0] = bad_char + " " * (EXPECTED_COLS - 1) + with pytest.raises(ValueError, match="non-ASCII or non-printable"): + validate_text_header("\n".join(rows)) + + def test_rejects_issue_814_example(self) -> None: + """The header from the issue body must be flagged as malformed.""" + with pytest.raises(ValueError, match="non-ASCII or non-printable"): + validate_text_header(_replacement_char_header()) + + def test_does_not_split_on_unicode_line_separators(self) -> None: + """``\\v`` / ``\\f`` / ``\\x85`` must be treated as content, not row breaks.""" + rows = [" " * EXPECTED_COLS] * EXPECTED_ROWS + rows[0] = "\x0b" + " " * (EXPECTED_COLS - 1) + with pytest.raises(ValueError, match="non-ASCII or non-printable"): + validate_text_header("\n".join(rows)) + + def test_rejects_double_newline_wrapped(self) -> None: + """Strict validation must not collapse ``\\n\\n``; only sanitize does that.""" + rows = [f"C{i:02d}".ljust(EXPECTED_COLS) for i in range(1, EXPECTED_ROWS + 1)] + with pytest.raises(ValueError, match="line count"): + validate_text_header("\n\n".join(rows)) + + def test_error_message_is_capped(self) -> None: + """A pathologically broken header must not produce an unbounded error message.""" + rows = ["\ufffd" * EXPECTED_COLS for _ in range(EXPECTED_ROWS)] + with pytest.raises(ValueError) as exc_info: + validate_text_header("\n".join(rows)) + message = str(exc_info.value) + assert "more rows" in message + # 40 rows × 80 positions × ~4 chars per position would be ~12k chars; cap keeps it tiny. + assert len(message) < 1000 + + +class TestSanitizeTextHeader: + """Sanitization replaces non-printable chars and forces 40x80 layout.""" + + def test_passthrough_well_formed(self) -> None: + header = _well_formed_header() + assert sanitize_text_header(header) == header + + def test_replaces_non_printable_with_space(self) -> None: + rows = [" " * EXPECTED_COLS] * EXPECTED_ROWS + rows[0] = ("C01\x00\x07" + " " * (EXPECTED_COLS - 5)) + cleaned = sanitize_text_header("\n".join(rows)) + + cleaned_rows = cleaned.split("\n") + assert len(cleaned_rows) == EXPECTED_ROWS + assert cleaned_rows[0].startswith("C01 ") + assert all(c.isprintable() for row in cleaned_rows for c in row) + + def test_replaces_replacement_char_with_space(self) -> None: + """``U+FFFD`` (the issue #814 case) must be repaired to spaces.""" + cleaned = sanitize_text_header(_replacement_char_header()) + assert "\ufffd" not in cleaned + # The leading replacement char of card 38 becomes a space; the literal text survives. + cleaned_rows = cleaned.split("\n") + assert cleaned_rows[37].startswith(" C38") + assert cleaned_rows[38].startswith(" C39") + assert cleaned_rows[39].startswith(" C40 END EBCDIC") + + def test_replaces_unicode_line_separator_with_space(self) -> None: + rows = [" " * EXPECTED_COLS] * EXPECTED_ROWS + rows[0] = "\x0b" + " " * (EXPECTED_COLS - 1) + cleaned = sanitize_text_header("\n".join(rows)) + cleaned_rows = cleaned.split("\n") + assert len(cleaned_rows) == EXPECTED_ROWS + assert cleaned_rows[0] == " " * EXPECTED_COLS + + def test_pads_short_rows_to_eighty_columns(self) -> None: + rows = ["short"] * EXPECTED_ROWS + cleaned = sanitize_text_header("\n".join(rows)) + + for row in cleaned.split("\n"): + assert len(row) == EXPECTED_COLS + + def test_truncates_long_rows_to_eighty_columns(self) -> None: + long_row = "X" * (EXPECTED_COLS + 20) + cleaned = sanitize_text_header("\n".join([long_row] * EXPECTED_ROWS)) + + for row in cleaned.split("\n"): + assert len(row) == EXPECTED_COLS + assert row == "X" * EXPECTED_COLS + + def test_pads_missing_rows_with_blank_lines(self) -> None: + rows = [f"C{i:02d}".ljust(EXPECTED_COLS) for i in range(1, 5)] + cleaned = sanitize_text_header("\n".join(rows)) + + cleaned_rows = cleaned.split("\n") + assert len(cleaned_rows) == EXPECTED_ROWS + assert cleaned_rows[-1] == " " * EXPECTED_COLS + + def test_truncates_excess_rows(self) -> None: + rows = [f"C{i:02d}".ljust(EXPECTED_COLS) for i in range(1, EXPECTED_ROWS + 5)] + cleaned = sanitize_text_header("\n".join(rows)) + + cleaned_rows = cleaned.split("\n") + assert len(cleaned_rows) == EXPECTED_ROWS + assert cleaned_rows[-1].startswith("C40") + + def test_output_passes_validation(self) -> None: + """The output of sanitize is always accepted by validate.""" + rows = [f"C{i:02d}\x00\x01\ufffd garbage" for i in range(1, EXPECTED_ROWS + 10)] + cleaned = sanitize_text_header("\n".join(rows)) + validate_text_header(cleaned) + + def test_sanitized_header_is_ascii_encodable(self) -> None: + """Sanitized output must be encodable as ASCII (the SEG-Y export requirement).""" + cleaned = sanitize_text_header(_replacement_char_header()) + cleaned.replace("\n", "").encode("ascii") + + def test_collapses_double_newline_separator(self) -> None: + """Headers terminated with ``\\n\\n`` between cards must keep all 40 cards. + + Some SEG-Y writers double the newline after each card. A naive + ``split("\\n")`` followed by ``rows[:40]`` would silently drop cards + 21-40. ``sanitize_text_header`` collapses runs of ``\\n`` so the card + layout survives. + """ + cards = [f"C{i:02d}".ljust(EXPECTED_COLS) for i in range(1, EXPECTED_ROWS + 1)] + wrapped = "\n\n".join(cards) + "\n" + cleaned = sanitize_text_header(wrapped) + + cleaned_rows = cleaned.split("\n") + assert len(cleaned_rows) == EXPECTED_ROWS + for i, row in enumerate(cleaned_rows, start=1): + assert row.startswith(f"C{i:02d}"), f"card {i} lost; got {row!r}" + validate_text_header(cleaned) + + def test_collapses_runs_longer_than_two(self) -> None: + """Triple (or longer) newline runs collapse to a single ``\\n``.""" + cards = [f"C{i:02d}".ljust(EXPECTED_COLS) for i in range(1, EXPECTED_ROWS + 1)] + cleaned = sanitize_text_header("\n\n\n".join(cards)) + + cleaned_rows = cleaned.split("\n") + assert len(cleaned_rows) == EXPECTED_ROWS + assert cleaned_rows[-1].startswith("C40") From 7c1b89e1e2f438effba892db1635069823a3a426 Mon Sep 17 00:00:00 2001 From: BrianMichell Date: Fri, 8 May 2026 13:34:32 +0000 Subject: [PATCH 3/3] Lint and tighten up code --- src/mdio/core/config.py | 9 --- src/mdio/segy/creation.py | 20 +---- src/mdio/segy/text_header.py | 94 ++++------------------ tests/unit/test_segy_export_text_header.py | 37 +++------ tests/unit/test_segy_file_header_modes.py | 75 ++++++++++------- tests/unit/test_text_header.py | 67 +++++++-------- 6 files changed, 104 insertions(+), 198 deletions(-) diff --git a/src/mdio/core/config.py b/src/mdio/core/config.py index e82e6f2b..53edfb85 100644 --- a/src/mdio/core/config.py +++ b/src/mdio/core/config.py @@ -17,15 +17,6 @@ SAVE_SEGY_FILE_HEADER_STRICT, SAVE_SEGY_FILE_HEADER_LENIENT, ] -"""Mode for ``MDIO__IMPORT__SAVE_SEGY_FILE_HEADER``. - -* ``0`` (also accepts ``False`` / ``"false"``): do not save SEG-Y file headers. -* ``1`` (also accepts ``True`` / ``"true"``): save SEG-Y file headers and raise - on a malformed text header. -* ``2``: save SEG-Y file headers and, on a malformed text header, log a - warning and correct it (non-ASCII or non-printable characters become spaces - and the header is padded to 80x40). -""" _SAVE_HEADER_TRUE_STRINGS = frozenset({"true", "yes", "on"}) _SAVE_HEADER_FALSE_STRINGS = frozenset({"false", "no", "off"}) diff --git a/src/mdio/segy/creation.py b/src/mdio/segy/creation.py index c250f8bb..2a588087 100644 --- a/src/mdio/segy/creation.py +++ b/src/mdio/segy/creation.py @@ -31,32 +31,18 @@ def _ensure_exportable_text_header(text_header: str) -> str: - """Validate the stored text header and repair it if it cannot be encoded. - - MDIO stores the text header as a wrapped 40x80 string. Stores written by - older versions of MDIO may contain non-ASCII characters (typically - ``U+FFFD`` from a lossy EBCDIC import) that cannot be re-encoded to ASCII - by the SEG-Y factory. To keep export usable for those stores this helper - runs the validator and, on failure, sanitizes the header in place and logs - a warning rather than aborting the export. + """Validate the stored text header; repair and warn if it cannot be ASCII-encoded. Args: text_header: The ``textHeader`` attribute as stored on the MDIO dataset. Returns: - A text header string that satisfies - :func:`mdio.segy.text_header.validate_text_header` and is therefore - guaranteed to round-trip through ``factory.create_textual_header``. + A text header string that satisfies :func:`validate_text_header`. """ try: validate_text_header(text_header) except ValueError as exc: - logger.warning( - "Stored MDIO text header is not exportable as-is and will be repaired: %s. " - "The repair replaces non-ASCII or non-printable characters with spaces and " - "forces the 80x40 card layout. Re-ingest the source SEG-Y to remove this warning.", - exc, - ) + logger.warning("Stored MDIO text header is not exportable as-is and will be repaired: %s", exc) return sanitize_text_header(text_header) return text_header diff --git a/src/mdio/segy/text_header.py b/src/mdio/segy/text_header.py index d3c1acd3..e3e5c889 100644 --- a/src/mdio/segy/text_header.py +++ b/src/mdio/segy/text_header.py @@ -1,29 +1,4 @@ -"""SEG-Y textual file header validation and sanitization helpers. - -The SEG-Y standard defines the textual file header as a 3200-byte block -organized as 40 cards of 80 characters each, encoded as either ASCII or -EBCDIC. Both encodings used by ``TGSAI/segy`` ultimately require the -in-memory string to be 7-bit ASCII (``ord(c) <= 127``) before bytes can be -written. The MDIO on-disk representation is the wrapped form: 40 lines of -exactly 80 characters joined by ``"\\n"``. - -When the source bytes were ingested through a lossy EBCDIC decode, MDIO -typically receives ``U+FFFD`` (``"\uFFFD"``) replacement characters and other -non-ASCII codepoints. Those characters round-trip through MDIO storage but -fail when ``segy.factory.create_textual_header`` tries to re-encode the -header to ASCII for SEG-Y export. The helpers in this module exist to detect -that situation up-front and, when requested, repair it deterministically. - -Repairs are conservative: any character that is either non-ASCII -(``ord(c) > 127``) or non-printable per :py:meth:`str.isprintable` is replaced -with an ASCII space, and the card grid is forced to exactly 40 rows of 80 -columns. Newlines (``"\\n"``) are treated only as row separators; other -Unicode line-break characters (``"\\v"``, ``"\\f"``, ``"\\x85"``, ``"\u2028"``, -``"\u2029"``) are treated as content and replaced rather than re-splitting -the layout. Sanitization additionally collapses runs of two or more -``"\\n"`` to one so headers that were written with ``"\\n\\n"`` between -cards are not silently truncated to half their length. -""" +"""SEG-Y textual file header validation and sanitization helpers.""" from __future__ import annotations @@ -31,7 +6,6 @@ EXPECTED_ROWS = 40 EXPECTED_COLS = 80 -EXPECTED_LENGTH = EXPECTED_ROWS * EXPECTED_COLS ASCII_MAX_ORD = 127 _REPORT_LIMIT = 5 @@ -39,31 +13,10 @@ def _is_safe_char(char: str) -> bool: - """Return True if a char is safe to round-trip through SEG-Y ASCII/EBCDIC. - - A char is "safe" when it is both 7-bit ASCII (``ord <= 127``) and printable - per :py:meth:`str.isprintable`. ASCII space passes; ``U+FFFD``, accented - Latin characters, control characters and tabs do not. - """ + """Return True if char is 7-bit ASCII and printable.""" return ord(char) <= ASCII_MAX_ORD and char.isprintable() -def _split_rows(text_header: str) -> list[str]: - """Split a wrapped text header into rows on ``"\\n"`` only. - - Other Unicode line-break characters (``"\\v"``, ``"\\f"``, ``"\u0085"``, etc.) - are intentionally left in place so that lossy decodes do not silently - re-shape the card grid. They will surface as unsafe characters during - validation and be replaced during sanitization. - """ - return text_header.split("\n") - - -def _find_unsafe(row: str) -> list[int]: - """Return positions of characters that are not :func:`_is_safe_char`.""" - return [i for i, c in enumerate(row) if not _is_safe_char(c)] - - def _summarize(mapping: dict[int, list[int]], limit: int = _REPORT_LIMIT) -> str: """Format ``{row: [positions]}`` for an error message, capped for readability.""" if not mapping: @@ -80,18 +33,15 @@ def _summarize(mapping: dict[int, list[int]], limit: int = _REPORT_LIMIT) -> str def validate_text_header(text_header: str) -> None: - """Validate a SEG-Y textual file header is 40 rows of 80 ASCII-printable characters. + r"""Validate a SEG-Y textual file header is 40 rows of 80 ASCII-printable characters. Args: - text_header: Decoded textual file header string in the wrapped form - (40 rows of 80 characters joined by ``"\\n"``). + text_header: Decoded text header in wrapped form (40 rows of 80 chars joined by ``\n``). Raises: - ValueError: If the header does not split into exactly 40 rows on - ``"\\n"``, any row is not 80 characters wide, or any character is - not safe to encode as 7-bit ASCII (see :func:`_is_safe_char`). + ValueError: If row count, row width, or any character fails the SEG-Y ASCII contract. """ - rows = _split_rows(text_header) + rows = text_header.split("\n") if len(rows) != EXPECTED_ROWS: err = f"Invalid text header line count: expected {EXPECTED_ROWS}, got {len(rows)}" @@ -106,46 +56,30 @@ def validate_text_header(text_header: str) -> None: bad_chars: dict[int, list[int]] = {} for i, row in enumerate(rows): - positions = _find_unsafe(row) + positions = [j for j, c in enumerate(row) if not _is_safe_char(c)] if positions: bad_chars[i] = positions if bad_chars: - err = ( - "Invalid text header characters: non-ASCII or non-printable at " - f"{_summarize(bad_chars)}" - ) + err = f"Invalid text header characters: non-ASCII or non-printable at {_summarize(bad_chars)}" raise ValueError(err) def sanitize_text_header(text_header: str) -> str: - """Coerce a SEG-Y textual file header into the 40x80 ASCII-printable card layout. - - Pre-processing collapses runs of two or more ``"\\n"`` into one. Some SEG-Y - writers terminate each card with ``"\\n\\n"``, which yields 80 rows on a - naive ``split("\\n")`` and would silently drop cards 21-40 when the row - list is sliced to 40. Collapsing runs of newlines recovers the intended - card layout for that common case while leaving properly-wrapped headers - untouched. - - The normalized input is then split on ``"\\n"`` and each row is independently: - - 1. Stripped of unsafe characters (any non-ASCII or non-printable codepoint - is replaced with a single ASCII space). - 2. Right-padded with spaces or truncated to exactly 80 characters. + r"""Coerce a SEG-Y textual file header into the 40x80 ASCII-printable card layout. - Rows beyond 40 are dropped. Missing rows are appended as 80-space blanks - so the result always contains exactly 40 lines. + Runs of two or more ``\n`` collapse to one (some writers terminate cards with ``\n\n``). + Each row gets unsafe characters replaced with spaces and is padded/truncated to 80 chars. + The result always has exactly 40 rows. Args: text_header: Decoded textual file header string. Returns: - Sanitized header string with rows joined by ``"\\n"``. The output is - guaranteed to satisfy :func:`validate_text_header`. + Sanitized header that satisfies :func:`validate_text_header`. """ normalized = _NEWLINE_RUN.sub("\n", text_header) - rows = _split_rows(normalized) + rows = normalized.split("\n") sanitized: list[str] = [] for row in rows[:EXPECTED_ROWS]: diff --git a/tests/unit/test_segy_export_text_header.py b/tests/unit/test_segy_export_text_header.py index b45fb46f..b06ed0ed 100644 --- a/tests/unit/test_segy_export_text_header.py +++ b/tests/unit/test_segy_export_text_header.py @@ -1,27 +1,26 @@ -"""Tests for export-side text-header guarding in ``mdio.segy.creation``. - -These cover the second half of issue #814: existing MDIO stores written by an -older version of MDIO may carry a malformed text header (typically scattered -``U+FFFD`` characters from a lossy EBCDIC import). The export path must not -crash on those stores; it should repair the header and warn instead. -""" +"""Tests for export-side text header guarding in ``mdio.segy.creation``.""" from __future__ import annotations import logging +from typing import TYPE_CHECKING -import pytest from segy.factory import SegyFactory from segy.standards import get_segy_standard from mdio.segy.creation import _ensure_exportable_text_header +if TYPE_CHECKING: + import pytest + def _well_formed_header() -> str: + """Build a 40x80 header where each row reads ``Cnn ...spaces``.""" return "\n".join([f"C{i:02d}".ljust(80) for i in range(1, 41)]) def _replacement_char_header() -> str: + """Build a 40x80 header with U+FFFD scattered through the last three cards.""" rows = [f"C{i:02d}".ljust(80) for i in range(1, 41)] rows[37] = "\ufffdC38" + " " * 76 rows[38] = "\ufffdC39" + " " * 76 @@ -33,6 +32,7 @@ class TestEnsureExportableTextHeader: """The export guard repairs malformed headers and warns; otherwise no-op.""" def test_passthrough_when_well_formed(self, caplog: pytest.LogCaptureFixture) -> None: + """Well-formed input is returned unchanged with no warning.""" header = _well_formed_header() with caplog.at_level(logging.WARNING, logger="mdio.segy.creation"): result = _ensure_exportable_text_header(header) @@ -40,14 +40,15 @@ def test_passthrough_when_well_formed(self, caplog: pytest.LogCaptureFixture) -> assert not any("repaired" in record.message for record in caplog.records) def test_repairs_replacement_char_and_warns(self, caplog: pytest.LogCaptureFixture) -> None: + """U+FFFD is repaired and a warning is logged.""" with caplog.at_level(logging.WARNING, logger="mdio.segy.creation"): result = _ensure_exportable_text_header(_replacement_char_header()) assert "\ufffd" not in result - result.replace("\n", "").encode("ascii") # raises if any non-ASCII char survived + result.replace("\n", "").encode("ascii") assert any("repaired" in record.message for record in caplog.records) def test_repairs_short_layout(self, caplog: pytest.LogCaptureFixture) -> None: - """A header with fewer than 40 cards is padded out so export can proceed.""" + """Header with fewer than 40 cards is padded out to 40 rows of 80 chars.""" short = "\n".join(["C01".ljust(80)] * 5) with caplog.at_level(logging.WARNING, logger="mdio.segy.creation"): result = _ensure_exportable_text_header(short) @@ -57,12 +58,7 @@ def test_repairs_short_layout(self, caplog: pytest.LogCaptureFixture) -> None: assert any("repaired" in record.message for record in caplog.records) def test_repaired_header_is_accepted_by_segy_factory(self) -> None: - """End-to-end proof that repair output is round-trippable via the SEG-Y factory. - - Regression guard for issue #814: a malformed header that previously - crashed ``factory.create_textual_header`` must produce a 3200-byte - textual block after going through the export guard. - """ + """Repair output round-trips through ``factory.create_textual_header`` to 3200 bytes.""" spec = get_segy_standard(1.0) factory = SegyFactory(spec=spec, sample_interval=2000, samples_per_trace=1) @@ -72,14 +68,7 @@ def test_repaired_header_is_accepted_by_segy_factory(self) -> None: assert len(encoded) == 3200 def test_repairs_double_newline_wrapped(self, caplog: pytest.LogCaptureFixture) -> None: - """Legacy stores wrapped with ``\\n\\n`` per card must export with all 40 cards intact. - - This is the second real-world malformed sample seen in the wild - (file ``260418_A4_…``): each card is terminated with ``\\n\\n``, which - previously caused naive splitting to lose cards 21-40 silently. The - export guard must collapse the double newlines and emit a 3200-byte - textual block whose 40 cards all carry their original ``Cnn`` prefix. - """ + r"""Cards terminated with ``\n\n`` keep all 40 ``Cnn`` prefixes after repair.""" cards = [f"C{i:02d}".ljust(80) for i in range(1, 41)] wrapped = "\n\n".join(cards) + "\n" diff --git a/tests/unit/test_segy_file_header_modes.py b/tests/unit/test_segy_file_header_modes.py index 93d46f65..22925efa 100644 --- a/tests/unit/test_segy_file_header_modes.py +++ b/tests/unit/test_segy_file_header_modes.py @@ -1,7 +1,7 @@ """Tests for ``_add_segy_file_headers`` mode handling. -Covers the three values of ``MDIO__IMPORT__SAVE_SEGY_FILE_HEADER``: 0 skips, -1 raises on a malformed text header, 2 corrects a malformed text header. +Covers the three values of ``MDIO__IMPORT__SAVE_SEGY_FILE_HEADER``: +0 skips, 1 raises on a malformed text header, 2 corrects a malformed text header. """ from __future__ import annotations @@ -18,6 +18,7 @@ def _well_formed_header() -> str: + """Build a 40x80 header where each row reads ``Cnn ...spaces``.""" return "\n".join([f"C{i:02d}".ljust(80) for i in range(1, 41)]) @@ -29,12 +30,7 @@ def _malformed_header() -> str: def _replacement_char_header() -> str: - """Header that mirrors the example from issue #814. - - ``U+FFFD`` is reported as printable by Python so naive ``str.isprintable`` - checks would let it through and break SEG-Y export, which requires - 7-bit ASCII bytes. Mode 1 must reject it; mode 2 must repair it. - """ + """Build a 40x80 header with U+FFFD scattered through the last three cards.""" rows = [f"C{i:02d}".ljust(80) for i in range(1, 41)] rows[37] = "\ufffdC38" + " " * 76 rows[38] = "\ufffdC39" + " " * 76 @@ -43,6 +39,7 @@ def _replacement_char_header() -> str: def _segy_info(text_header: str) -> SegyFileInfo: + """Minimal SegyFileInfo fixture with the given text header.""" return SegyFileInfo( num_traces=1, sample_labels=None, @@ -57,6 +54,7 @@ class TestSaveSegyFileHeaderModes: """Mode 0 skips, mode 1 strict, mode 2 lenient.""" def test_mode_zero_skips_header_save(self) -> None: + """Mode 0 leaves the dataset without a ``segy_file_header`` variable.""" ds = xr.Dataset() with patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "0"}): result = _add_segy_file_headers(ds, _segy_info(_malformed_header())) @@ -64,6 +62,7 @@ def test_mode_zero_skips_header_save(self) -> None: assert "segy_file_header" not in result def test_mode_one_accepts_well_formed(self) -> None: + """Mode 1 stores a well-formed header verbatim.""" ds = xr.Dataset() header = _well_formed_header() with patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "1"}): @@ -72,23 +71,31 @@ def test_mode_one_accepts_well_formed(self) -> None: assert result["segy_file_header"].attrs["textHeader"] == header def test_mode_one_raises_on_malformed(self) -> None: + """Mode 1 raises on a NUL byte in the header.""" ds = xr.Dataset() - with patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "1"}): - with pytest.raises(ValueError, match="non-ASCII or non-printable"): - _add_segy_file_headers(ds, _segy_info(_malformed_header())) + with ( + patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "1"}), + pytest.raises(ValueError, match="non-ASCII or non-printable"), + ): + _add_segy_file_headers(ds, _segy_info(_malformed_header())) def test_mode_one_raises_on_replacement_char(self) -> None: - """The bug from issue #814: U+FFFD must be rejected in strict mode.""" + """Mode 1 raises on U+FFFD.""" ds = xr.Dataset() - with patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "1"}): - with pytest.raises(ValueError, match="non-ASCII or non-printable"): - _add_segy_file_headers(ds, _segy_info(_replacement_char_header())) + with ( + patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "1"}), + pytest.raises(ValueError, match="non-ASCII or non-printable"), + ): + _add_segy_file_headers(ds, _segy_info(_replacement_char_header())) def test_mode_two_corrects_malformed(self, caplog: pytest.LogCaptureFixture) -> None: + """Mode 2 repairs a NUL byte and stores a 40x80 header.""" ds = xr.Dataset() - with patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "2"}): - with caplog.at_level(logging.WARNING, logger="mdio.converters.segy"): - result = _add_segy_file_headers(ds, _segy_info(_malformed_header())) + with ( + patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "2"}), + caplog.at_level(logging.WARNING, logger="mdio.converters.segy"), + ): + result = _add_segy_file_headers(ds, _segy_info(_malformed_header())) stored = result["segy_file_header"].attrs["textHeader"] assert "\x00" not in stored @@ -97,37 +104,43 @@ def test_mode_two_corrects_malformed(self, caplog: pytest.LogCaptureFixture) -> assert any("Correcting malformed" in record.message for record in caplog.records) def test_mode_two_corrects_replacement_char(self, caplog: pytest.LogCaptureFixture) -> None: - """The bug from issue #814: U+FFFD must be repaired in lenient mode and stored ASCII-clean.""" + """Mode 2 repairs U+FFFD and stores ASCII-encodable bytes.""" ds = xr.Dataset() - with patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "2"}): - with caplog.at_level(logging.WARNING, logger="mdio.converters.segy"): - result = _add_segy_file_headers(ds, _segy_info(_replacement_char_header())) + with ( + patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "2"}), + caplog.at_level(logging.WARNING, logger="mdio.converters.segy"), + ): + result = _add_segy_file_headers(ds, _segy_info(_replacement_char_header())) stored = result["segy_file_header"].attrs["textHeader"] assert "\ufffd" not in stored - stored.replace("\n", "").encode("ascii") # would raise if any non-ASCII char survived + stored.replace("\n", "").encode("ascii") assert any("Correcting malformed" in record.message for record in caplog.records) def test_mode_two_passes_through_well_formed(self, caplog: pytest.LogCaptureFixture) -> None: - """Mode 2 always sanitizes, but stays silent and bit-identical on well-formed input.""" + """Mode 2 stays silent and bit-identical on well-formed input.""" ds = xr.Dataset() header = _well_formed_header() - with patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "2"}): - with caplog.at_level(logging.WARNING, logger="mdio.converters.segy"): - result = _add_segy_file_headers(ds, _segy_info(header)) + with ( + patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "2"}), + caplog.at_level(logging.WARNING, logger="mdio.converters.segy"), + ): + result = _add_segy_file_headers(ds, _segy_info(header)) assert result["segy_file_header"].attrs["textHeader"] == header assert not any("Correcting" in record.message for record in caplog.records) def test_mode_two_repairs_double_newline_wrapped(self, caplog: pytest.LogCaptureFixture) -> None: - """Source SEG-Y wrapped with ``\\n\\n`` between cards keeps all 40 Cnn cards in mode 2.""" + r"""Mode 2 keeps all 40 ``Cnn`` cards when the source uses ``\n\n`` between cards.""" cards = [f"C{i:02d}".ljust(80) for i in range(1, 41)] wrapped = "\n\n".join(cards) + "\n" ds = xr.Dataset() - with patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "2"}): - with caplog.at_level(logging.WARNING, logger="mdio.converters.segy"): - result = _add_segy_file_headers(ds, _segy_info(wrapped)) + with ( + patch.dict(os.environ, {"MDIO__IMPORT__SAVE_SEGY_FILE_HEADER": "2"}), + caplog.at_level(logging.WARNING, logger="mdio.converters.segy"), + ): + result = _add_segy_file_headers(ds, _segy_info(wrapped)) stored = result["segy_file_header"].attrs["textHeader"] stored_rows = stored.split("\n") diff --git a/tests/unit/test_text_header.py b/tests/unit/test_text_header.py index dc2305f1..d839c98a 100644 --- a/tests/unit/test_text_header.py +++ b/tests/unit/test_text_header.py @@ -17,37 +17,36 @@ def _well_formed_header() -> str: def _replacement_char_header() -> str: - """Build a 40x80 header that mirrors the example in issue #814. - - Three replacement characters (``U+FFFD``) are scattered through the last - three cards. ``U+FFFD`` is reported as printable by Python but cannot be - encoded as ASCII, which is exactly the failure mode the issue describes. - """ + """Build a 40x80 header with U+FFFD scattered through the last three cards.""" rows = [f"C{i:02d}".ljust(EXPECTED_COLS) for i in range(1, EXPECTED_ROWS + 1)] - rows[37] = ("\ufffdC38" + " " * (EXPECTED_COLS - 4)) - rows[38] = ("\ufffdC39" + " " * (EXPECTED_COLS - 4)) - rows[39] = ("\ufffdC40 END EBCDIC" + " " * (EXPECTED_COLS - 15)) + rows[37] = "\ufffdC38" + " " * (EXPECTED_COLS - 4) + rows[38] = "\ufffdC39" + " " * (EXPECTED_COLS - 4) + rows[39] = "\ufffdC40 END EBCDIC" + " " * (EXPECTED_COLS - 15) return "\n".join(rows) class TestValidateTextHeader: - """Validation should accept well-formed and reject anything else.""" + """Validation accepts well-formed headers and rejects anything else.""" def test_accepts_well_formed(self) -> None: + """Well-formed 40x80 ASCII header passes.""" validate_text_header(_well_formed_header()) def test_rejects_wrong_row_count(self) -> None: + """Wrong row count raises.""" rows = [" " * EXPECTED_COLS] * (EXPECTED_ROWS - 1) with pytest.raises(ValueError, match="line count"): validate_text_header("\n".join(rows)) def test_rejects_wrong_column_width(self) -> None: + """Row that is not 80 chars wide raises.""" rows = [" " * EXPECTED_COLS] * EXPECTED_ROWS rows[5] = "short" with pytest.raises(ValueError, match="line widths"): validate_text_header("\n".join(rows)) def test_rejects_non_printable_characters(self) -> None: + """Non-printable ASCII (NUL) is rejected.""" rows = [" " * EXPECTED_COLS] * EXPECTED_ROWS rows[10] = "\x00" + " " * (EXPECTED_COLS - 1) with pytest.raises(ValueError, match="non-ASCII or non-printable"): @@ -55,47 +54,41 @@ def test_rejects_non_printable_characters(self) -> None: @pytest.mark.parametrize( "bad_char", - [ - "\ufffd", # replacement char from a lossy EBCDIC decode (issue #814) - "\xa0", # non-breaking space - "\u00e9", # 'é' - encodable as latin-1 but not ascii - "\u00c1", # 'Á' - ], + ["\ufffd", "\xa0", "\u00e9", "\u00c1"], ids=["U+FFFD", "U+00A0", "U+00E9", "U+00C1"], ) def test_rejects_non_ascii_printable_characters(self, bad_char: str) -> None: - """Non-ASCII codepoints must be rejected even when isprintable() is True.""" + """Non-ASCII codepoints are rejected even when isprintable() is True.""" rows = [" " * EXPECTED_COLS] * EXPECTED_ROWS rows[0] = bad_char + " " * (EXPECTED_COLS - 1) with pytest.raises(ValueError, match="non-ASCII or non-printable"): validate_text_header("\n".join(rows)) - def test_rejects_issue_814_example(self) -> None: - """The header from the issue body must be flagged as malformed.""" + def test_rejects_replacement_char_header(self) -> None: + """Header laced with U+FFFD is rejected.""" with pytest.raises(ValueError, match="non-ASCII or non-printable"): validate_text_header(_replacement_char_header()) def test_does_not_split_on_unicode_line_separators(self) -> None: - """``\\v`` / ``\\f`` / ``\\x85`` must be treated as content, not row breaks.""" + r"""``\v``, ``\f``, ``\x85`` are content, not row breaks.""" rows = [" " * EXPECTED_COLS] * EXPECTED_ROWS rows[0] = "\x0b" + " " * (EXPECTED_COLS - 1) with pytest.raises(ValueError, match="non-ASCII or non-printable"): validate_text_header("\n".join(rows)) def test_rejects_double_newline_wrapped(self) -> None: - """Strict validation must not collapse ``\\n\\n``; only sanitize does that.""" + r"""Strict validation does not collapse ``\n\n``; only sanitize does.""" rows = [f"C{i:02d}".ljust(EXPECTED_COLS) for i in range(1, EXPECTED_ROWS + 1)] with pytest.raises(ValueError, match="line count"): validate_text_header("\n\n".join(rows)) def test_error_message_is_capped(self) -> None: - """A pathologically broken header must not produce an unbounded error message.""" + """Pathologically broken header produces a bounded error message.""" rows = ["\ufffd" * EXPECTED_COLS for _ in range(EXPECTED_ROWS)] - with pytest.raises(ValueError) as exc_info: + with pytest.raises(ValueError, match="Invalid text header characters") as exc_info: validate_text_header("\n".join(rows)) message = str(exc_info.value) assert "more rows" in message - # 40 rows × 80 positions × ~4 chars per position would be ~12k chars; cap keeps it tiny. assert len(message) < 1000 @@ -103,12 +96,14 @@ class TestSanitizeTextHeader: """Sanitization replaces non-printable chars and forces 40x80 layout.""" def test_passthrough_well_formed(self) -> None: + """Well-formed input round-trips unchanged.""" header = _well_formed_header() assert sanitize_text_header(header) == header def test_replaces_non_printable_with_space(self) -> None: + """NUL/BEL bytes are replaced with spaces.""" rows = [" " * EXPECTED_COLS] * EXPECTED_ROWS - rows[0] = ("C01\x00\x07" + " " * (EXPECTED_COLS - 5)) + rows[0] = "C01\x00\x07" + " " * (EXPECTED_COLS - 5) cleaned = sanitize_text_header("\n".join(rows)) cleaned_rows = cleaned.split("\n") @@ -117,16 +112,16 @@ def test_replaces_non_printable_with_space(self) -> None: assert all(c.isprintable() for row in cleaned_rows for c in row) def test_replaces_replacement_char_with_space(self) -> None: - """``U+FFFD`` (the issue #814 case) must be repaired to spaces.""" + """U+FFFD is repaired to spaces; surrounding text survives.""" cleaned = sanitize_text_header(_replacement_char_header()) assert "\ufffd" not in cleaned - # The leading replacement char of card 38 becomes a space; the literal text survives. cleaned_rows = cleaned.split("\n") assert cleaned_rows[37].startswith(" C38") assert cleaned_rows[38].startswith(" C39") assert cleaned_rows[39].startswith(" C40 END EBCDIC") def test_replaces_unicode_line_separator_with_space(self) -> None: + r"""``\v`` is replaced with a space, not split as a row break.""" rows = [" " * EXPECTED_COLS] * EXPECTED_ROWS rows[0] = "\x0b" + " " * (EXPECTED_COLS - 1) cleaned = sanitize_text_header("\n".join(rows)) @@ -135,6 +130,7 @@ def test_replaces_unicode_line_separator_with_space(self) -> None: assert cleaned_rows[0] == " " * EXPECTED_COLS def test_pads_short_rows_to_eighty_columns(self) -> None: + """Short rows are right-padded with spaces.""" rows = ["short"] * EXPECTED_ROWS cleaned = sanitize_text_header("\n".join(rows)) @@ -142,6 +138,7 @@ def test_pads_short_rows_to_eighty_columns(self) -> None: assert len(row) == EXPECTED_COLS def test_truncates_long_rows_to_eighty_columns(self) -> None: + """Rows longer than 80 chars are truncated.""" long_row = "X" * (EXPECTED_COLS + 20) cleaned = sanitize_text_header("\n".join([long_row] * EXPECTED_ROWS)) @@ -150,6 +147,7 @@ def test_truncates_long_rows_to_eighty_columns(self) -> None: assert row == "X" * EXPECTED_COLS def test_pads_missing_rows_with_blank_lines(self) -> None: + """Headers with fewer than 40 rows are padded with blank lines.""" rows = [f"C{i:02d}".ljust(EXPECTED_COLS) for i in range(1, 5)] cleaned = sanitize_text_header("\n".join(rows)) @@ -158,6 +156,7 @@ def test_pads_missing_rows_with_blank_lines(self) -> None: assert cleaned_rows[-1] == " " * EXPECTED_COLS def test_truncates_excess_rows(self) -> None: + """Headers with more than 40 rows are truncated to 40.""" rows = [f"C{i:02d}".ljust(EXPECTED_COLS) for i in range(1, EXPECTED_ROWS + 5)] cleaned = sanitize_text_header("\n".join(rows)) @@ -166,24 +165,18 @@ def test_truncates_excess_rows(self) -> None: assert cleaned_rows[-1].startswith("C40") def test_output_passes_validation(self) -> None: - """The output of sanitize is always accepted by validate.""" + """Sanitize output is always accepted by validate.""" rows = [f"C{i:02d}\x00\x01\ufffd garbage" for i in range(1, EXPECTED_ROWS + 10)] cleaned = sanitize_text_header("\n".join(rows)) validate_text_header(cleaned) def test_sanitized_header_is_ascii_encodable(self) -> None: - """Sanitized output must be encodable as ASCII (the SEG-Y export requirement).""" + """Sanitized output encodes as ASCII (the SEG-Y export requirement).""" cleaned = sanitize_text_header(_replacement_char_header()) cleaned.replace("\n", "").encode("ascii") def test_collapses_double_newline_separator(self) -> None: - """Headers terminated with ``\\n\\n`` between cards must keep all 40 cards. - - Some SEG-Y writers double the newline after each card. A naive - ``split("\\n")`` followed by ``rows[:40]`` would silently drop cards - 21-40. ``sanitize_text_header`` collapses runs of ``\\n`` so the card - layout survives. - """ + r"""Headers with ``\n\n`` between cards keep all 40 cards.""" cards = [f"C{i:02d}".ljust(EXPECTED_COLS) for i in range(1, EXPECTED_ROWS + 1)] wrapped = "\n\n".join(cards) + "\n" cleaned = sanitize_text_header(wrapped) @@ -195,7 +188,7 @@ def test_collapses_double_newline_separator(self) -> None: validate_text_header(cleaned) def test_collapses_runs_longer_than_two(self) -> None: - """Triple (or longer) newline runs collapse to a single ``\\n``.""" + r"""Triple (or longer) newline runs collapse to a single ``\n``.""" cards = [f"C{i:02d}".ljust(EXPECTED_COLS) for i in range(1, EXPECTED_ROWS + 1)] cleaned = sanitize_text_header("\n\n\n".join(cards))