diff --git a/biofuse/formats.py b/biofuse/formats.py index 00ebbd8..0b71952 100644 --- a/biofuse/formats.py +++ b/biofuse/formats.py @@ -129,10 +129,13 @@ def _plink_encoder_factory(reader, opts): def _bgen_encoder_factory(reader, opts): + embed_header_samples = not opts.no_header_samples return vcztools.BgenEncoder( reader, - embed_header_samples=not opts.no_header_samples, + embed_header_samples=embed_header_samples, unphased=opts.unphased, + total_string_length=opts.total_string_length, + pad_byte=opts.pad_byte, ) diff --git a/tests/test_bgen_apps.py b/tests/test_bgen_apps.py index 7990880..b166456 100644 --- a/tests/test_bgen_apps.py +++ b/tests/test_bgen_apps.py @@ -78,9 +78,18 @@ async def fx_mounted_bgen(tmp_path, fx_medium_vcz): yield mnt, "medium", expected, log -def _encoder_bytes(vcz_path: pathlib.Path) -> bytes: +def _encoder_bytes( + vcz_path: pathlib.Path, + *, + total_string_length: int | None = None, + pad_byte: bytes | None = None, +) -> bytes: reader = _open_reader(vcz_path) - with vcztools.BgenEncoder(reader) as enc: + with vcztools.BgenEncoder( + reader, + total_string_length=total_string_length, + pad_byte=pad_byte, + ) as enc: return enc.read(0, enc.total_size) @@ -89,14 +98,16 @@ async def _arun(cmd) -> None: @contextlib.asynccontextmanager -async def _mount_bgen(tmp_path, vcz, opts=None): +async def _mount_bgen(tmp_path, vcz, opts=None, *, mnt_name="mnt"): """Mount ``vcz`` as a BGEN fileset; yield ``(mnt, basename)``. ``opts`` is the ``vcztools.ViewBgenOptions`` dataclass the host runs under; defaults to a fresh ``ViewBgenOptions()`` (every field - at its dataclass default). + at its dataclass default). ``mnt_name`` lets a single test mount + twice under the same ``tmp_path`` without colliding on the + mountpoint directory. """ - mnt = tmp_path / "mnt" + mnt = tmp_path / mnt_name mnt.mkdir() basename = vcz.path.stem if opts is None: @@ -199,6 +210,41 @@ async def test_unphased_stable_across_opens(self, tmp_path, fx_small_vcz, unphas assert data == expected, f"cycle {cycle} differed from reference" +class TestBgenCustomStringPadding: + """End-to-end coverage that ``--total-string-length`` / + ``--pad-byte`` reach the FUSE-served bytes. + """ + + async def test_full_bgen_with_custom_budget(self, tmp_path, fx_small_vcz): + opts = vcztools.ViewBgenOptions(total_string_length=128, pad_byte=b"X") + expected = _encoder_bytes( + fx_small_vcz.path, total_string_length=128, pad_byte=b"X" + ) + async with _mount_bgen(tmp_path, fx_small_vcz, opts=opts) as (mnt, basename): + bgen_path = mnt / f"{basename}.bgen" + data = await trio.to_thread.run_sync(bgen_path.read_bytes) + assert data == expected + + async def test_bytes_differ_from_default(self, tmp_path, fx_small_vcz): + opts_x = vcztools.ViewBgenOptions(pad_byte=b"X") + async with _mount_bgen( + tmp_path, fx_small_vcz, opts=opts_x, mnt_name="mnt_x" + ) as (mnt, basename): + data_x = await trio.to_thread.run_sync( + (mnt / f"{basename}.bgen").read_bytes + ) + + opts_default = vcztools.ViewBgenOptions() + async with _mount_bgen( + tmp_path, fx_small_vcz, opts=opts_default, mnt_name="mnt_default" + ) as (mnt, basename): + data_default = await trio.to_thread.run_sync( + (mnt / f"{basename}.bgen").read_bytes + ) + + assert data_default != data_x + + def _pread_sync(path: pathlib.Path, off: int, size: int) -> bytes: with path.open("rb") as f: f.seek(off) diff --git a/tests/test_cli.py b/tests/test_cli.py index 666593c..f79ba3c 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -54,6 +54,8 @@ def test_mount_bgen_help(self): "--no-sample-file", "--no-bgi", "--no-header-samples", + "--total-string-length", + "--pad-byte", "--log-level", ]: assert flag in result.output, f"missing {flag} in mount-bgen help" @@ -96,6 +98,47 @@ def test_bgen_nonexistent_mount_dir_fails(self, tmp_path): assert "mount directory does not exist" in result.output +class TestMountBgenStringOptions: + """Parse-time validation of ``--total-string-length`` / ``--pad-byte``. + + The CLI rejects malformed values before any FUSE mount is + attempted, so these tests stay in-process. + """ + + def test_pad_byte_rejects_multichar(self, tmp_path): + result = CliRunner().invoke( + cli.biofuse_main, + ["mount-bgen", "--pad-byte", "XX", "x.vcz", str(tmp_path)], + ) + assert result.exit_code != 0 + assert "single ASCII character" in result.output + + def test_pad_byte_rejects_non_ascii(self, tmp_path): + # Upstream's --pad-byte callback uses errors="strict", so a + # non-ASCII character surfaces as a UnicodeEncodeError rather + # than a click.BadParameter — only the non-zero exit is stable. + result = CliRunner().invoke( + cli.biofuse_main, + ["mount-bgen", "--pad-byte", "é", "x.vcz", str(tmp_path)], + ) + assert result.exit_code != 0 + + def test_pad_byte_rejects_empty(self, tmp_path): + result = CliRunner().invoke( + cli.biofuse_main, + ["mount-bgen", "--pad-byte", "", "x.vcz", str(tmp_path)], + ) + assert result.exit_code != 0 + assert "single ASCII character" in result.output + + def test_total_string_length_rejects_zero(self, tmp_path): + result = CliRunner().invoke( + cli.biofuse_main, + ["mount-bgen", "--total-string-length", "0", "x.vcz", str(tmp_path)], + ) + assert result.exit_code != 0 + + class TestEndToEndMount: """Spawn the CLI as a subprocess, wait for mount, read files, terminate.""" diff --git a/tests/test_formats.py b/tests/test_formats.py index 6beb876..1e762d7 100644 --- a/tests/test_formats.py +++ b/tests/test_formats.py @@ -341,6 +341,71 @@ def test_unphased_default_matches_in_process(self, fx_reader, fx_small_vcz): assert data == ref_data +class TestBgenTotalStringLengthPlumbing: + """``--total-string-length`` flows through to ``BgenEncoder``. + + Compared against an in-process ``BgenEncoder`` built with the same + kwarg, the streamed bytes must be identical. + """ + + def test_default_matches_in_process(self, fx_reader, fx_small_vcz): + opts = vcztools.ViewBgenOptions() + assert opts.total_string_length is None + with formats.BGEN_SPEC.encoder_factory(fx_reader, opts) as encoder: + data = encoder.read(0, encoder.total_size) + ref_reader = _open_reader(fx_small_vcz.path) + with vcztools.BgenEncoder(ref_reader) as ref: + ref_data = ref.read(0, ref.total_size) + assert data == ref_data + + def test_explicit_value_matches_in_process(self, fx_reader, fx_small_vcz): + opts = vcztools.ViewBgenOptions(total_string_length=128) + with formats.BGEN_SPEC.encoder_factory(fx_reader, opts) as encoder: + data = encoder.read(0, encoder.total_size) + ref_reader = _open_reader(fx_small_vcz.path) + with vcztools.BgenEncoder(ref_reader, total_string_length=128) as ref: + ref_data = ref.read(0, ref.total_size) + assert data == ref_data + + def test_budget_too_small_raises(self, fx_reader): + opts = vcztools.ViewBgenOptions(total_string_length=1) + with pytest.raises(ValueError, match="total_string_length"): + with formats.BGEN_SPEC.encoder_factory(fx_reader, opts) as encoder: + encoder.read(0, encoder.total_size) + + +class TestBgenPadBytePlumbing: + """``--pad-byte`` flows through to ``BgenEncoder(pad_byte=...)``.""" + + def test_default_matches_in_process(self, fx_reader, fx_small_vcz): + opts = vcztools.ViewBgenOptions() + assert opts.pad_byte is None + with formats.BGEN_SPEC.encoder_factory(fx_reader, opts) as encoder: + data = encoder.read(0, encoder.total_size) + ref_reader = _open_reader(fx_small_vcz.path) + with vcztools.BgenEncoder(ref_reader) as ref: + ref_data = ref.read(0, ref.total_size) + assert data == ref_data + + def test_explicit_pad_byte_matches_in_process(self, fx_reader, fx_small_vcz): + opts = vcztools.ViewBgenOptions(pad_byte=b"X") + with formats.BGEN_SPEC.encoder_factory(fx_reader, opts) as encoder: + data = encoder.read(0, encoder.total_size) + ref_reader = _open_reader(fx_small_vcz.path) + with vcztools.BgenEncoder(ref_reader, pad_byte=b"X") as ref: + ref_data = ref.read(0, ref.total_size) + assert data == ref_data + + def test_pad_byte_changes_bytes(self, fx_reader): + opts_default = vcztools.ViewBgenOptions() + opts_x = vcztools.ViewBgenOptions(pad_byte=b"X") + with formats.BGEN_SPEC.encoder_factory(fx_reader, opts_default) as encoder: + data_default = encoder.read(0, encoder.total_size) + with formats.BGEN_SPEC.encoder_factory(fx_reader, opts_x) as encoder: + data_x = encoder.read(0, encoder.total_size) + assert data_default != data_x + + class TestSpecsRegistry: def test_specs_dict_has_both_entries(self): assert formats.SPECS == {"plink": formats.PLINK_SPEC, "bgen": formats.BGEN_SPEC} diff --git a/uv.lock b/uv.lock index 3b2d3c5..c82a2db 100644 --- a/uv.lock +++ b/uv.lock @@ -1737,8 +1737,8 @@ all = [ [[package]] name = "vcztools" -version = "0.1.3.dev356" -source = { git = "https://github.com/sgkit-dev/vcztools.git?rev=main#11ead2fe565c81dc7b8d024dae9ce373a45be4ba" } +version = "0.1.3.dev378" +source = { git = "https://github.com/sgkit-dev/vcztools.git?rev=main#a8ce91d1e4fedf4e0cdb5b5e0ca7ebda4fb65b46" } dependencies = [ { name = "click" }, { name = "humanfriendly" },