Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion biofuse/formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,13 @@ def _plink_encoder_factory(reader, opts):


def _bgen_encoder_factory(reader, opts):
embed_header_samples = not opts.no_header_samples
return vcztools.BgenEncoder(
reader,
embed_header_samples=not opts.no_header_samples,
embed_header_samples=embed_header_samples,
unphased=opts.unphased,
total_string_length=opts.total_string_length,
pad_byte=opts.pad_byte,
)


Expand Down
56 changes: 51 additions & 5 deletions tests/test_bgen_apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,18 @@ async def fx_mounted_bgen(tmp_path, fx_medium_vcz):
yield mnt, "medium", expected, log


def _encoder_bytes(vcz_path: pathlib.Path) -> bytes:
def _encoder_bytes(
vcz_path: pathlib.Path,
*,
total_string_length: int | None = None,
pad_byte: bytes | None = None,
) -> bytes:
reader = _open_reader(vcz_path)
with vcztools.BgenEncoder(reader) as enc:
with vcztools.BgenEncoder(
reader,
total_string_length=total_string_length,
pad_byte=pad_byte,
) as enc:
return enc.read(0, enc.total_size)


Expand All @@ -89,14 +98,16 @@ async def _arun(cmd) -> None:


@contextlib.asynccontextmanager
async def _mount_bgen(tmp_path, vcz, opts=None):
async def _mount_bgen(tmp_path, vcz, opts=None, *, mnt_name="mnt"):
"""Mount ``vcz`` as a BGEN fileset; yield ``(mnt, basename)``.

``opts`` is the ``vcztools.ViewBgenOptions`` dataclass the host
runs under; defaults to a fresh ``ViewBgenOptions()`` (every field
at its dataclass default).
at its dataclass default). ``mnt_name`` lets a single test mount
twice under the same ``tmp_path`` without colliding on the
mountpoint directory.
"""
mnt = tmp_path / "mnt"
mnt = tmp_path / mnt_name
mnt.mkdir()
basename = vcz.path.stem
if opts is None:
Expand Down Expand Up @@ -199,6 +210,41 @@ async def test_unphased_stable_across_opens(self, tmp_path, fx_small_vcz, unphas
assert data == expected, f"cycle {cycle} differed from reference"


class TestBgenCustomStringPadding:
"""End-to-end coverage that ``--total-string-length`` /
``--pad-byte`` reach the FUSE-served bytes.
"""

async def test_full_bgen_with_custom_budget(self, tmp_path, fx_small_vcz):
opts = vcztools.ViewBgenOptions(total_string_length=128, pad_byte=b"X")
expected = _encoder_bytes(
fx_small_vcz.path, total_string_length=128, pad_byte=b"X"
)
async with _mount_bgen(tmp_path, fx_small_vcz, opts=opts) as (mnt, basename):
bgen_path = mnt / f"{basename}.bgen"
data = await trio.to_thread.run_sync(bgen_path.read_bytes)
assert data == expected

async def test_bytes_differ_from_default(self, tmp_path, fx_small_vcz):
opts_x = vcztools.ViewBgenOptions(pad_byte=b"X")
async with _mount_bgen(
tmp_path, fx_small_vcz, opts=opts_x, mnt_name="mnt_x"
) as (mnt, basename):
data_x = await trio.to_thread.run_sync(
(mnt / f"{basename}.bgen").read_bytes
)

opts_default = vcztools.ViewBgenOptions()
async with _mount_bgen(
tmp_path, fx_small_vcz, opts=opts_default, mnt_name="mnt_default"
) as (mnt, basename):
data_default = await trio.to_thread.run_sync(
(mnt / f"{basename}.bgen").read_bytes
)

assert data_default != data_x


def _pread_sync(path: pathlib.Path, off: int, size: int) -> bytes:
with path.open("rb") as f:
f.seek(off)
Expand Down
43 changes: 43 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ def test_mount_bgen_help(self):
"--no-sample-file",
"--no-bgi",
"--no-header-samples",
"--total-string-length",
"--pad-byte",
"--log-level",
]:
assert flag in result.output, f"missing {flag} in mount-bgen help"
Expand Down Expand Up @@ -96,6 +98,47 @@ def test_bgen_nonexistent_mount_dir_fails(self, tmp_path):
assert "mount directory does not exist" in result.output


class TestMountBgenStringOptions:
"""Parse-time validation of ``--total-string-length`` / ``--pad-byte``.

The CLI rejects malformed values before any FUSE mount is
attempted, so these tests stay in-process.
"""

def test_pad_byte_rejects_multichar(self, tmp_path):
result = CliRunner().invoke(
cli.biofuse_main,
["mount-bgen", "--pad-byte", "XX", "x.vcz", str(tmp_path)],
)
assert result.exit_code != 0
assert "single ASCII character" in result.output

def test_pad_byte_rejects_non_ascii(self, tmp_path):
# Upstream's --pad-byte callback uses errors="strict", so a
# non-ASCII character surfaces as a UnicodeEncodeError rather
# than a click.BadParameter — only the non-zero exit is stable.
result = CliRunner().invoke(
cli.biofuse_main,
["mount-bgen", "--pad-byte", "é", "x.vcz", str(tmp_path)],
)
assert result.exit_code != 0

def test_pad_byte_rejects_empty(self, tmp_path):
result = CliRunner().invoke(
cli.biofuse_main,
["mount-bgen", "--pad-byte", "", "x.vcz", str(tmp_path)],
)
assert result.exit_code != 0
assert "single ASCII character" in result.output

def test_total_string_length_rejects_zero(self, tmp_path):
result = CliRunner().invoke(
cli.biofuse_main,
["mount-bgen", "--total-string-length", "0", "x.vcz", str(tmp_path)],
)
assert result.exit_code != 0


class TestEndToEndMount:
"""Spawn the CLI as a subprocess, wait for mount, read files, terminate."""

Expand Down
65 changes: 65 additions & 0 deletions tests/test_formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,71 @@ def test_unphased_default_matches_in_process(self, fx_reader, fx_small_vcz):
assert data == ref_data


class TestBgenTotalStringLengthPlumbing:
"""``--total-string-length`` flows through to ``BgenEncoder``.

Compared against an in-process ``BgenEncoder`` built with the same
kwarg, the streamed bytes must be identical.
"""

def test_default_matches_in_process(self, fx_reader, fx_small_vcz):
opts = vcztools.ViewBgenOptions()
assert opts.total_string_length is None
with formats.BGEN_SPEC.encoder_factory(fx_reader, opts) as encoder:
data = encoder.read(0, encoder.total_size)
ref_reader = _open_reader(fx_small_vcz.path)
with vcztools.BgenEncoder(ref_reader) as ref:
ref_data = ref.read(0, ref.total_size)
assert data == ref_data

def test_explicit_value_matches_in_process(self, fx_reader, fx_small_vcz):
opts = vcztools.ViewBgenOptions(total_string_length=128)
with formats.BGEN_SPEC.encoder_factory(fx_reader, opts) as encoder:
data = encoder.read(0, encoder.total_size)
ref_reader = _open_reader(fx_small_vcz.path)
with vcztools.BgenEncoder(ref_reader, total_string_length=128) as ref:
ref_data = ref.read(0, ref.total_size)
assert data == ref_data

def test_budget_too_small_raises(self, fx_reader):
opts = vcztools.ViewBgenOptions(total_string_length=1)
with pytest.raises(ValueError, match="total_string_length"):
with formats.BGEN_SPEC.encoder_factory(fx_reader, opts) as encoder:
encoder.read(0, encoder.total_size)


class TestBgenPadBytePlumbing:
"""``--pad-byte`` flows through to ``BgenEncoder(pad_byte=...)``."""

def test_default_matches_in_process(self, fx_reader, fx_small_vcz):
opts = vcztools.ViewBgenOptions()
assert opts.pad_byte is None
with formats.BGEN_SPEC.encoder_factory(fx_reader, opts) as encoder:
data = encoder.read(0, encoder.total_size)
ref_reader = _open_reader(fx_small_vcz.path)
with vcztools.BgenEncoder(ref_reader) as ref:
ref_data = ref.read(0, ref.total_size)
assert data == ref_data

def test_explicit_pad_byte_matches_in_process(self, fx_reader, fx_small_vcz):
opts = vcztools.ViewBgenOptions(pad_byte=b"X")
with formats.BGEN_SPEC.encoder_factory(fx_reader, opts) as encoder:
data = encoder.read(0, encoder.total_size)
ref_reader = _open_reader(fx_small_vcz.path)
with vcztools.BgenEncoder(ref_reader, pad_byte=b"X") as ref:
ref_data = ref.read(0, ref.total_size)
assert data == ref_data

def test_pad_byte_changes_bytes(self, fx_reader):
opts_default = vcztools.ViewBgenOptions()
opts_x = vcztools.ViewBgenOptions(pad_byte=b"X")
with formats.BGEN_SPEC.encoder_factory(fx_reader, opts_default) as encoder:
data_default = encoder.read(0, encoder.total_size)
with formats.BGEN_SPEC.encoder_factory(fx_reader, opts_x) as encoder:
data_x = encoder.read(0, encoder.total_size)
assert data_default != data_x


class TestSpecsRegistry:
def test_specs_dict_has_both_entries(self):
assert formats.SPECS == {"plink": formats.PLINK_SPEC, "bgen": formats.BGEN_SPEC}
4 changes: 2 additions & 2 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading