From dc7f7c5be873441c8cbcf97e6ef42cffba737d5d Mon Sep 17 00:00:00 2001 From: Teque5 Date: Tue, 23 Dec 2025 13:53:55 -0800 Subject: [PATCH 1/3] fix read_samples from SigMF archive * When reading from a SigMF (.sigmf) archive, slicing and reading entire file worked, but not reading specific sample count * add test for archive read_samples and refactor related tests --- sigmf/__init__.py | 2 +- sigmf/sigmffile.py | 35 +++--- tests/test_archive.py | 246 +++++++++++++++++++++++------------------- 3 files changed, 160 insertions(+), 123 deletions(-) diff --git a/sigmf/__init__.py b/sigmf/__init__.py index 63e6b39..b5bdcf3 100644 --- a/sigmf/__init__.py +++ b/sigmf/__init__.py @@ -5,7 +5,7 @@ # SPDX-License-Identifier: LGPL-3.0-or-later # version of this python module -__version__ = "1.2.13" +__version__ = "1.2.14" # matching version of the SigMF specification __specification__ = "1.2.5" diff --git a/sigmf/sigmffile.py b/sigmf/sigmffile.py index 81f6683..66aa0b9 100644 --- a/sigmf/sigmffile.py +++ b/sigmf/sigmffile.py @@ -177,6 +177,7 @@ def __init__(self, metadata=None, data_file=None, global_info=None, skip_checksu """ super().__init__() self.data_file = None + self.data_buffer = None self.sample_count = 0 self._memmap = None self.is_complex_data = False # numpy.iscomplexobj(self._memmap) is not adequate for fixed-point complex case @@ -490,23 +491,28 @@ def _count_samples(self): use 0. For complex data, a 'sample' includes both the real and imaginary part. """ - if self.data_file is None: + if self.data_file is None and self.data_buffer is None: sample_count = self._get_sample_count_from_annotations() else: header_bytes = sum([c.get(self.HEADER_BYTES_KEY, 0) for c in self.get_captures()]) - file_size = self.data_file.stat().st_size if self.data_size_bytes is None else self.data_size_bytes - file_data_size = file_size - self.get_global_field(self.TRAILING_BYTES_KEY, 0) - header_bytes # bytes + if self.data_file is not None: + file_bytes = self.data_file.stat().st_size if self.data_size_bytes is None else self.data_size_bytes + elif self.data_buffer is not None: + file_bytes = len(self.data_buffer.getbuffer()) if self.data_size_bytes is None else self.data_size_bytes + else: + file_bytes = 0 + sample_bytes = file_bytes - self.get_global_field(self.TRAILING_BYTES_KEY, 0) - header_bytes sample_size = self.get_sample_size() # size of a sample in bytes num_channels = self.get_num_channels() - sample_count = file_data_size // sample_size // num_channels - if file_data_size % (sample_size * num_channels) != 0: + sample_count = sample_bytes // sample_size // num_channels + if sample_bytes % (sample_size * num_channels) != 0: warnings.warn( - f"File `{self.data_file}` does not contain an integer number of samples across channels. " + f"Data source does not contain an integer number of samples across channels. " "It may be invalid data." ) if self._get_sample_count_from_annotations() > sample_count: warnings.warn( - f"File `{self.data_file}` ends before the final annotation in the corresponding SigMF metadata." + f"Data source ends before the final annotation in the corresponding SigMF metadata." ) self.sample_count = sample_count return sample_count @@ -735,7 +741,9 @@ def _read_datafile(self, first_byte, nitems, autoscale, raw_components): fp.seek(first_byte, 0) data = np.fromfile(fp, dtype=data_type_in, count=nitems) elif self.data_buffer is not None: - data = np.frombuffer(self.data_buffer.getbuffer(), dtype=data_type_in, count=nitems) + # handle offset for data_buffer like we do for data_file + buffer_data = self.data_buffer.getbuffer()[first_byte:] + data = np.frombuffer(buffer_data, dtype=data_type_in, count=nitems) else: data = self._memmap @@ -1065,10 +1073,13 @@ def fromarchive(archive_path, dir=None, skip_checksum=False): def fromfile(filename, skip_checksum=False): """ - Creates and returns a SigMFFile or SigMFCollection instance with metadata - loaded from the specified file. The filename may be that of either a - sigmf-meta file, a sigmf-data file, a sigmf-collection file, or a sigmf - archive. + Creates and returns a SigMFFile or SigMFCollection instance with metadata loaded from the specified file. + + The file can be one of: + * A SigMF Metadata file (.sigmf-meta) + * A SigMF Dataset file (.sigmf-data) + * A SigMF Collection file (.sigmf-collection) + * A SigMF Archive file (.sigmf-archive) Parameters ---------- diff --git a/tests/test_archive.py b/tests/test_archive.py index 1db92e0..c9d6e70 100644 --- a/tests/test_archive.py +++ b/tests/test_archive.py @@ -7,127 +7,132 @@ """Tests for SigMFArchive""" import codecs +import copy import json +import shutil import tarfile import tempfile +import unittest from pathlib import Path import jsonschema import numpy as np -import pytest -from sigmf import error +from sigmf import SigMFFile, __specification__, error, fromfile from sigmf.archive import SIGMF_DATASET_EXT, SIGMF_METADATA_EXT from .testdata import TEST_FLOAT32_DATA, TEST_METADATA -def create_test_archive(test_sigmffile, tmpfile): - sigmf_archive = test_sigmffile.archive(fileobj=tmpfile) - sigmf_tarfile = tarfile.open(sigmf_archive, mode="r", format=tarfile.PAX_FORMAT) - return sigmf_tarfile - - -def test_without_data_file_throws_fileerror(test_sigmffile): - test_sigmffile.data_file = None - with tempfile.NamedTemporaryFile() as temp: - with pytest.raises(error.SigMFFileError): - test_sigmffile.archive(name=temp.name) - - -def test_invalid_md_throws_validationerror(test_sigmffile): - del test_sigmffile._metadata["global"]["core:datatype"] # required field - with tempfile.NamedTemporaryFile() as temp: - with pytest.raises(jsonschema.exceptions.ValidationError): - test_sigmffile.archive(name=temp.name) - - -def test_name_wrong_extension_throws_fileerror(test_sigmffile): - with tempfile.NamedTemporaryFile() as temp: - with pytest.raises(error.SigMFFileError): - test_sigmffile.archive(name=temp.name + ".zip") - - -def test_fileobj_extension_ignored(test_sigmffile): - with tempfile.NamedTemporaryFile(suffix=".tar") as temp: - test_sigmffile.archive(fileobj=temp) - - -def test_name_used_in_fileobj(test_sigmffile): - with tempfile.NamedTemporaryFile() as temp: - sigmf_archive = test_sigmffile.archive(name="testarchive", fileobj=temp) - sigmf_tarfile = tarfile.open(sigmf_archive, mode="r") - basedir, file1, file2 = sigmf_tarfile.getmembers() - assert basedir.name == "testarchive" - - def filename(tarinfo): - return Path(tarinfo.name).stem - - assert filename(file1) == "testarchive" - assert filename(file2) == "testarchive" - - -def test_fileobj_not_closed(test_sigmffile): - with tempfile.NamedTemporaryFile() as temp: - test_sigmffile.archive(fileobj=temp) - assert not temp.file.closed - - -def test_unwritable_fileobj_throws_fileerror(test_sigmffile): - with tempfile.NamedTemporaryFile(mode="rb") as temp: - with pytest.raises(error.SigMFFileError): - test_sigmffile.archive(fileobj=temp) - - -def test_unwritable_name_throws_fileerror(test_sigmffile): - # Cannot assume /root/ is unwritable (e.g. Docker environment) - # so use invalid filename - unwritable_file = "/bad_name/" - with pytest.raises(error.SigMFFileError): - test_sigmffile.archive(name=unwritable_file) - - -def test_tarfile_layout(test_sigmffile): - with tempfile.NamedTemporaryFile() as temp: - sigmf_tarfile = create_test_archive(test_sigmffile, temp) - basedir, file1, file2 = sigmf_tarfile.getmembers() - assert tarfile.TarInfo.isdir(basedir) - assert tarfile.TarInfo.isfile(file1) - assert tarfile.TarInfo.isfile(file2) - - -def test_tarfile_names_and_extensions(test_sigmffile): - with tempfile.NamedTemporaryFile() as temp: - sigmf_tarfile = create_test_archive(test_sigmffile, temp) - basedir, file1, file2 = sigmf_tarfile.getmembers() +class TestSigMFArchive(unittest.TestCase): + """Tests for SigMF Archive functionality""" + + def setUp(self): + """Create temporary directory and test SigMFFile""" + self.temp_dir = Path(tempfile.mkdtemp()) + self.temp_path_data = self.temp_dir / "trash.sigmf-data" + self.temp_path_meta = self.temp_dir / "trash.sigmf-meta" + self.temp_path_archive = self.temp_dir / "test.sigmf" + TEST_FLOAT32_DATA.tofile(self.temp_path_data) + self.sigmf_object = SigMFFile(copy.deepcopy(TEST_METADATA), data_file=self.temp_path_data) + self.sigmf_object.tofile(self.temp_path_meta) + self.sigmf_object.tofile(self.temp_path_archive, toarchive=True) + self.sigmf_tarfile = tarfile.open(self.temp_path_archive, mode="r", format=tarfile.PAX_FORMAT) + + def tearDown(self): + """Clean up temporary directory""" + shutil.rmtree(self.temp_dir) + + def test_archive_creation_requires_data_file(self): + """Test that archiving without data file raises error""" + self.sigmf_object.data_file = None + with self.assertRaises(error.SigMFFileError): + self.sigmf_object.archive(name=self.temp_path_archive) + + def test_archive_creation_validates_metadata(self): + """Test that invalid metadata raises error""" + del self.sigmf_object._metadata["global"]["core:datatype"] # required field + with self.assertRaises(jsonschema.exceptions.ValidationError): + self.sigmf_object.archive(name=self.temp_path_archive) + + def test_archive_creation_validates_extension(self): + """Test that wrong extension raises error""" + wrong_name = self.temp_dir / "temp_archive.zip" + with self.assertRaises(error.SigMFFileError): + self.sigmf_object.archive(name=wrong_name) + + def test_fileobj_ignores_extension(self): + """Test that file object extension is ignored""" + temp_archive_tar = self.temp_dir / "test.sigmf.tar" + with open(temp_archive_tar, "wb") as temp: + self.sigmf_object.archive(fileobj=temp) + + def test_custom_name_overrides_fileobj_name(self): + """Test that name is used in file object""" + with open(self.temp_path_archive, "wb") as temp: + sigmf_archive = self.sigmf_object.archive(name="testarchive", fileobj=temp) + sigmf_tarfile = tarfile.open(sigmf_archive, mode="r") + basedir, file1, file2 = sigmf_tarfile.getmembers() + self.assertEqual(basedir.name, "testarchive") + self.assertEqual(Path(file1.name).stem, "testarchive") + self.assertEqual(Path(file2.name).stem, "testarchive") + + def test_fileobj_remains_open_after_archive(self): + """Test that file object is not closed after archiving""" + with open(self.temp_path_archive, "wb") as temp: + self.sigmf_object.archive(fileobj=temp) + self.assertFalse(temp.closed) + + def test_readonly_fileobj_raises_error(self): + """Test that unwritable file object raises error""" + temp_path = self.temp_dir / "temp_archive.sigmf" + temp_path.touch() + with open(temp_path, "rb") as temp: + with self.assertRaises(error.SigMFFileError): + self.sigmf_object.archive(fileobj=temp) + + def test_invalid_path_raises_error(self): + """Test that unwritable name raises error""" + # Cannot assume /root/ is unwritable (e.g. Docker environment) + # so use invalid filename + unwritable_file = "/bad_name/" + with self.assertRaises(error.SigMFFileError): + self.sigmf_object.archive(name=unwritable_file) + + def test_archive_contains_directory_and_files(self): + """Test archive layout structure""" + basedir, file1, file2 = self.sigmf_tarfile.getmembers() + self.assertTrue(tarfile.TarInfo.isdir(basedir)) + self.assertTrue(tarfile.TarInfo.isfile(file1)) + self.assertTrue(tarfile.TarInfo.isfile(file2)) + + def test_archive_files_have_correct_names_and_extensions(self): + """Test tarfile names and extensions""" + basedir, file1, file2 = self.sigmf_tarfile.getmembers() archive_name = basedir.name - assert archive_name == Path(temp.name).name + self.assertEqual(archive_name, Path(self.temp_path_archive).stem) file_extensions = {SIGMF_DATASET_EXT, SIGMF_METADATA_EXT} file1_name, file1_ext = Path(file1.name).stem, Path(file1.name).suffix - assert file1_name == archive_name - assert file1_ext in file_extensions + self.assertEqual(file1_name, archive_name) + self.assertIn(file1_ext, file_extensions) file_extensions.remove(file1_ext) file2_name, file2_ext = Path(file2.name).stem, Path(file2.name).suffix - assert file2_name == archive_name - assert file2_ext in file_extensions - - -def test_tarfile_persmissions(test_sigmffile): - with tempfile.NamedTemporaryFile() as temp: - sigmf_tarfile = create_test_archive(test_sigmffile, temp) - basedir, file1, file2 = sigmf_tarfile.getmembers() - assert basedir.mode == 0o755 - assert file1.mode == 0o644 - assert file2.mode == 0o644 - - -def test_contents(test_sigmffile): - with tempfile.NamedTemporaryFile() as temp: - sigmf_tarfile = create_test_archive(test_sigmffile, temp) - basedir, file1, file2 = sigmf_tarfile.getmembers() + self.assertEqual(file2_name, archive_name) + self.assertIn(file2_ext, file_extensions) + + def test_archive_files_have_correct_permissions(self): + """Test tarfile permissions""" + basedir, file1, file2 = self.sigmf_tarfile.getmembers() + self.assertEqual(basedir.mode, 0o755) + self.assertEqual(file1.mode, 0o644) + self.assertEqual(file2.mode, 0o644) + + def test_archive_contents_match_original_data(self): + """Test archive contents""" + _, file1, file2 = self.sigmf_tarfile.getmembers() if file1.name.endswith(SIGMF_METADATA_EXT): mdfile = file1 datfile = file2 @@ -136,18 +141,39 @@ def test_contents(test_sigmffile): datfile = file1 bytestream_reader = codecs.getreader("utf-8") # bytes -> str - mdfile_reader = bytestream_reader(sigmf_tarfile.extractfile(mdfile)) - assert json.load(mdfile_reader) == TEST_METADATA + mdfile_reader = bytestream_reader(self.sigmf_tarfile.extractfile(mdfile)) + self.assertEqual(json.load(mdfile_reader), TEST_METADATA) - datfile_reader = sigmf_tarfile.extractfile(datfile) + datfile_reader = self.sigmf_tarfile.extractfile(datfile) # calling `fileno` on `tarfile.ExFileObject` throws error (?), but # np.fromfile requires it, so we need this extra step data = np.frombuffer(datfile_reader.read(), dtype=np.float32) - assert np.array_equal(data, TEST_FLOAT32_DATA) - - -def test_tarfile_type(test_sigmffile): - with tempfile.NamedTemporaryFile() as temp: - sigmf_tarfile = create_test_archive(test_sigmffile, temp) - assert sigmf_tarfile.format == tarfile.PAX_FORMAT + np.testing.assert_array_equal(data, TEST_FLOAT32_DATA) + + def test_tarfile_format(self): + """Tar file format is PAX""" + self.assertEqual(self.sigmf_tarfile.format, tarfile.PAX_FORMAT) + + def test_archive_read_samples(self): + """test that read_samples works correctly with archived data""" + # load from archive + archive_mdfile = fromfile(self.temp_path_archive) + + # verify sample count matches + expected_sample_count = len(self.sigmf_object) + self.assertEqual(archive_mdfile.sample_count, expected_sample_count) + + # verify read_samples returns same as slice + samples_orig = TEST_FLOAT32_DATA[3:13] + samples_read = archive_mdfile.read_samples(start_index=3, count=10) + samples_sliced = archive_mdfile[3:13] + np.testing.assert_array_equal(samples_orig, samples_sliced) + np.testing.assert_array_equal(samples_orig, samples_read) + + def test_archive_read_samples_beyond_end(self): + """test that read_samples beyond end of data raises error""" + meta = fromfile(self.temp_path_archive) + # FIXME: Should this raise a SigMFFileError instead? + with self.assertRaises(OSError): + meta.read_samples(start_index=meta.sample_count + 10, count=5) From 8c4cece25267a6130e398c9ce1de2eef97dad783 Mon Sep 17 00:00:00 2001 From: Teque5 Date: Wed, 24 Dec 2025 12:11:53 -0800 Subject: [PATCH 2/3] unify sample access via constructor autoscale parameter * Make slicing equivalent to reading for integer types * e.g. meta[0:10] == meta.read_samples(count=10) * Breaking API Change: autoscale and raw_components parameters removed from read methods * Move autoscale configuration from method parameters to SigMFFile constructor * Remove already deprecated raw_components parameter from all methods * Update read_samples() and read_samples_in_capture() to use instance autoscale setting * Add autoscale support to fromfile(), fromarchive(), and SigMFArchiveReader * Simplify __getitem__ with unified scaling behavior for consistency * increment minor version --- docs/source/quickstart.rst | 21 +++++++ sigmf/__init__.py | 2 +- sigmf/archivereader.py | 8 ++- sigmf/sigmffile.py | 111 +++++++++++++++++++++++++----------- tests/test_archivereader.py | 2 +- tests/test_sigmffile.py | 64 ++++++++++----------- 6 files changed, 136 insertions(+), 72 deletions(-) diff --git a/docs/source/quickstart.rst b/docs/source/quickstart.rst index 92c023b..cb50db2 100644 --- a/docs/source/quickstart.rst +++ b/docs/source/quickstart.rst @@ -80,3 +80,24 @@ Save a Numpy array as a SigMF Recording # check for mistakes & write to disk meta.tofile('example_cf32.sigmf-meta') # extension is optional + +-------------------------------- +Control Fixed-Point Data Scaling +-------------------------------- + +For fixed-point datasets, you can control whether samples are automatically scaled to floating-point values: + +.. code-block:: python + + import sigmf + + # Default behavior: autoscale fixed-point data to [-1.0, 1.0] range + handle = sigmf.fromfile("fixed_point_data.sigmf") + samples = handle.read_samples() # Returns float32/complex64 + + # Disable autoscaling to access raw integer values + handle_raw = sigmf.fromfile("fixed_point_data.sigmf", autoscale=False) + raw_samples = handle_raw.read_samples() # Returns original integer types + + # Both slicing and read_samples() respect the autoscale setting + assert handle[0:10].dtype == handle.read_samples(count=10).dtype diff --git a/sigmf/__init__.py b/sigmf/__init__.py index b5bdcf3..b4667ac 100644 --- a/sigmf/__init__.py +++ b/sigmf/__init__.py @@ -5,7 +5,7 @@ # SPDX-License-Identifier: LGPL-3.0-or-later # version of this python module -__version__ = "1.2.14" +__version__ = "1.3.0" # matching version of the SigMF specification __specification__ = "1.2.5" diff --git a/sigmf/archivereader.py b/sigmf/archivereader.py index 7f4c1d3..25bac69 100644 --- a/sigmf/archivereader.py +++ b/sigmf/archivereader.py @@ -29,7 +29,9 @@ class SigMFArchiveReader: map_readonly : bool, optional Indicate whether assignments on the numpy.memmap are allowed. archive_buffer : buffer, optional - + Alternative buffer to read archive from. + autoscale : bool, optional + If dataset is in a fixed-point representation, scale samples from (min, max) to (-1.0, 1.0). Raises ------ @@ -41,7 +43,7 @@ class SigMFArchiveReader: If metadata is invalid. """ - def __init__(self, name=None, skip_checksum=False, map_readonly=True, archive_buffer=None): + def __init__(self, name=None, skip_checksum=False, map_readonly=True, archive_buffer=None, autoscale=True): if name is not None: path = Path(name) if path.suffix != SIGMF_ARCHIVE_EXT: @@ -90,7 +92,7 @@ def __init__(self, name=None, skip_checksum=False, map_readonly=True, archive_bu if data_offset is None: raise SigMFFileError("No .sigmf-data file found in archive!") - self.sigmffile = SigMFFile(metadata=json_contents) + self.sigmffile = SigMFFile(metadata=json_contents, autoscale=autoscale) self.sigmffile.validate() self.sigmffile.set_data_file( diff --git a/sigmf/sigmffile.py b/sigmf/sigmffile.py index 66aa0b9..aaeee2d 100644 --- a/sigmf/sigmffile.py +++ b/sigmf/sigmffile.py @@ -158,7 +158,7 @@ class SigMFFile(SigMFMetafile): ] VALID_KEYS = {GLOBAL_KEY: VALID_GLOBAL_KEYS, CAPTURE_KEY: VALID_CAPTURE_KEYS, ANNOTATION_KEY: VALID_ANNOTATION_KEYS} - def __init__(self, metadata=None, data_file=None, global_info=None, skip_checksum=False, map_readonly=True): + def __init__(self, metadata=None, data_file=None, global_info=None, skip_checksum=False, map_readonly=True, autoscale=True): """ API for SigMF I/O @@ -174,6 +174,9 @@ def __init__(self, metadata=None, data_file=None, global_info=None, skip_checksu When True will skip calculating hash on data_file (if present) to check against metadata. map_readonly: bool, default True Indicates whether assignments on the numpy.memmap are allowed. + autoscale: bool, default True + If dataset is in a fixed-point representation, scale samples from (min, max) to (-1.0, 1.0) + for all sample reading operations including slicing. """ super().__init__() self.data_file = None @@ -181,6 +184,7 @@ def __init__(self, metadata=None, data_file=None, global_info=None, skip_checksu self.sample_count = 0 self._memmap = None self.is_complex_data = False # numpy.iscomplexobj(self._memmap) is not adequate for fixed-point complex case + self.autoscale = autoscale self.set_metadata(metadata) if global_info is not None: @@ -217,10 +221,39 @@ def __next__(self): def __getitem__(self, sli): mem = self._memmap[sli] # matches behavior of numpy.ndarray.__getitem__() + # original behavior: always apply _return_type conversion if set if self._return_type is None: - return mem - - # is_fixed_point and is_complex + # no special conversion needed + if not self.autoscale: + return mem + else: + # apply autoscaling for fixed-point data when autoscale=True + dtype = dtype_info(self.get_global_field(self.DATATYPE_KEY)) + is_fixedpoint_data = dtype["is_fixedpoint"] + + if is_fixedpoint_data: + # apply scaling for fixed-point data + is_unsigned_data = dtype["is_unsigned"] + component_size = dtype["component_size"] + data_type_out = np.dtype("f4") if not self.is_complex_data else np.dtype("f4, f4") + + data = mem.astype(data_type_out) + data = data.view(np.dtype("f4")) + if is_unsigned_data: + data -= 2 ** (component_size * 8 - 1) + data *= 2 ** -(component_size * 8 - 1) + data = data.view(data_type_out) + if self.is_complex_data: + data = data.view(np.complex64) + # for single-channel complex data, flatten the last dimension + if data.ndim > 1 and self.get_num_channels() == 1: + data = data.flatten() + return data[0] if isinstance(sli, int) else data + else: + # floating-point data, no scaling needed + return mem + + # handle complex data type conversion (original behavior) if self._memmap.ndim == 2: # num_channels == 1 ray = mem[:, 0].astype(self._return_type) + 1.0j * mem[:, 1].astype(self._return_type) @@ -511,9 +544,7 @@ def _count_samples(self): "It may be invalid data." ) if self._get_sample_count_from_annotations() > sample_count: - warnings.warn( - f"Data source ends before the final annotation in the corresponding SigMF metadata." - ) + warnings.warn(f"Data source ends before the final annotation in the corresponding SigMF metadata.") self.sample_count = sample_count return sample_count @@ -659,7 +690,7 @@ def tofile(self, file_path, pretty=True, toarchive=False, skip_validate=False): self.dump(fp, pretty=pretty) fp.write("\n") # text files should end in carriage return - def read_samples_in_capture(self, index=0, autoscale=True): + def read_samples_in_capture(self, index=0): """ Reads samples from the specified captures segment in its entirety. @@ -682,9 +713,9 @@ def read_samples_in_capture(self, index=0, autoscale=True): "an integer number of samples across channels. It may be invalid." ) - return self._read_datafile(cb[0], (cb[1] - cb[0]) // self.get_sample_size(), autoscale, False) + return self._read_datafile(cb[0], (cb[1] - cb[0]) // self.get_sample_size()) - def read_samples(self, start_index=0, count=-1, autoscale=True, raw_components=False): + def read_samples(self, start_index=0, count=-1): """ Reads the specified number of samples starting at the specified index from the associated data file. @@ -694,16 +725,12 @@ def read_samples(self, start_index=0, count=-1, autoscale=True, raw_components=F Starting sample index from which to read. count : int, default -1 Number of samples to read. -1 will read whole file. - autoscale : bool, default True - If dataset is in a fixed-point representation, scale samples from (min, max) to (-1.0, 1.0) - raw_components : bool, default False - If True read and return the sample components (individual I & Q for complex, samples for real) - with no conversions or interleaved channels. Returns ------- data : ndarray Samples are returned as an array of float or complex, with number of dimensions equal to NUM_CHANNELS_KEY. + Scaling behavior depends on the autoscale parameter set during construction. """ if count == 0: raise IOError("Number of samples must be greater than zero, or -1 for all samples.") @@ -719,9 +746,9 @@ def read_samples(self, start_index=0, count=-1, autoscale=True, raw_components=F if not self._is_conforming_dataset(): warnings.warn(f"Recording dataset appears non-compliant, resulting data may be erroneous") - return self._read_datafile(first_byte, count * self.get_num_channels(), autoscale, False) + return self._read_datafile(first_byte, count * self.get_num_channels()) - def _read_datafile(self, first_byte, nitems, autoscale, raw_components): + def _read_datafile(self, first_byte, nitems): """ internal function for reading samples from datafile """ @@ -751,18 +778,15 @@ def _read_datafile(self, first_byte, nitems, autoscale, raw_components): # return reshaped view for num_channels # first dimension will be double size if `is_complex_data` data = data.reshape(data.shape[0] // num_channels, num_channels) - if not raw_components: - data = data.astype(data_type_out) - if autoscale and is_fixedpoint_data: - data = data.view(np.dtype("f4")) - if is_unsigned_data: - data -= 2 ** (component_size * 8 - 1) - data *= 2 ** -(component_size * 8 - 1) - data = data.view(data_type_out) - if self.is_complex_data: - data = data.view(np.complex64) - else: - data = data.view(component_type_in) + data = data.astype(data_type_out) + if self.autoscale and is_fixedpoint_data: + data = data.view(np.dtype("f4")) + if is_unsigned_data: + data -= 2 ** (component_size * 8 - 1) + data *= 2 ** -(component_size * 8 - 1) + data = data.view(data_type_out) + if self.is_complex_data: + data = data.view(np.complex64) if self.data_file is not None: fp.close() @@ -1061,17 +1085,34 @@ def get_dataset_filename_from_metadata(meta_fn, metadata=None): return None -def fromarchive(archive_path, dir=None, skip_checksum=False): +def fromarchive(archive_path, dir=None, skip_checksum=False, autoscale=True): """Extract an archive and return a SigMFFile. The `dir` parameter is no longer used as this function has been changed to access SigMF archives without extracting them. + + Parameters + ---------- + archive_path: str + Path to `sigmf-archive` tarball. + dir: str, optional + No longer used. Kept for compatibility. + skip_checksum: bool, default False + Skip dataset checksum calculation. + autoscale: bool, default True + If dataset is in a fixed-point representation, scale samples from (min, max) to (-1.0, 1.0). + + Returns + ------- + SigMFFile + Instance created from archive. """ from .archivereader import SigMFArchiveReader - return SigMFArchiveReader(archive_path, skip_checksum=skip_checksum).sigmffile + + return SigMFArchiveReader(archive_path, skip_checksum=skip_checksum, autoscale=autoscale).sigmffile -def fromfile(filename, skip_checksum=False): +def fromfile(filename, skip_checksum=False, autoscale=True): """ Creates and returns a SigMFFile or SigMFCollection instance with metadata loaded from the specified file. @@ -1087,6 +1128,8 @@ def fromfile(filename, skip_checksum=False): Path for SigMF Metadata, Dataset, Archive or Collection (with or without extension). skip_checksum: bool, default False When True will not read entire dataset to calculate hash. + autoscale: bool, default True + If dataset is in a fixed-point representation, scale samples from (min, max) to (-1.0, 1.0). Returns ------- @@ -1103,7 +1146,7 @@ def fromfile(filename, skip_checksum=False): ext = file_path.suffix if (ext.lower().endswith(SIGMF_ARCHIVE_EXT) or not Path.is_file(meta_fn)) and Path.is_file(archive_fn): - return fromarchive(archive_fn, skip_checksum=skip_checksum) + return fromarchive(archive_fn, skip_checksum=skip_checksum, autoscale=autoscale) if (ext.lower().endswith(SIGMF_COLLECTION_EXT) or not Path.is_file(meta_fn)) and Path.is_file(collection_fn): collection_fp = open(collection_fn, "rb") @@ -1123,7 +1166,7 @@ def fromfile(filename, skip_checksum=False): meta_fp.close() data_fn = get_dataset_filename_from_metadata(meta_fn, metadata) - return SigMFFile(metadata=metadata, data_file=data_fn, skip_checksum=skip_checksum) + return SigMFFile(metadata=metadata, data_file=data_fn, skip_checksum=skip_checksum, autoscale=autoscale) def get_sigmf_filenames(filename): diff --git a/tests/test_archivereader.py b/tests/test_archivereader.py index 621f37a..e93e24d 100644 --- a/tests/test_archivereader.py +++ b/tests/test_archivereader.py @@ -60,7 +60,7 @@ def test_access_data_without_untar(self): if complex_prefix == "c": # complex data will be half as long target_count //= 2 - self.assertTrue(np.all(np.iscomplex(readback_samples))) + self.assertTrue(np.iscomplexobj(readback_samples)) if num_channels != 1: # check expected # of channels self.assertEqual( diff --git a/tests/test_sigmffile.py b/tests/test_sigmffile.py index 4c3668c..f2171ae 100644 --- a/tests/test_sigmffile.py +++ b/tests/test_sigmffile.py @@ -197,9 +197,10 @@ def test_multichannel_seek(self): SigMFFile.DATATYPE_KEY: "cu16_le", SigMFFile.NUM_CHANNELS_KEY: 3, }, + autoscale=False, ) # read after the first sample - temp_samples = temp_signal.read_samples(start_index=1, autoscale=False) + temp_samples = temp_signal.read_samples(start_index=1) # ensure samples are in the order we expect self.assertTrue(np.all(temp_samples[:, 0] == np.array([6 + 7j, 12 + 13j]))) @@ -240,74 +241,70 @@ def tearDown(self) -> None: """remove temporary dir""" shutil.rmtree(self.temp_dir) - def prepare(self, data: list, meta: dict, dtype: type) -> SigMFFile: + def prepare(self, data: list, meta: dict, dtype: type, autoscale: bool = True) -> SigMFFile: """write some data and metadata to temporary paths""" np.array(data, dtype=dtype).tofile(self.temp_path_data) with open(self.temp_path_meta, "w") as handle: json.dump(meta, handle) - meta = sigmf.fromfile(self.temp_path_meta, skip_checksum=True) + meta = sigmf.fromfile(self.temp_path_meta, skip_checksum=True, autoscale=autoscale) return meta def test_000(self) -> None: """compliant two-capture recording""" - meta = self.prepare(TEST_U8_DATA0, TEST_U8_META0, np.uint8) + meta = self.prepare(TEST_U8_DATA0, TEST_U8_META0, np.uint8, autoscale=False) self.assertEqual(256, meta._count_samples()) self.assertTrue(meta._is_conforming_dataset()) self.assertTrue((0, 0), meta.get_capture_byte_boundarys(0)) self.assertTrue((0, 256), meta.get_capture_byte_boundarys(1)) - self.assertTrue(np.array_equal(TEST_U8_DATA0, meta.read_samples(autoscale=False))) + self.assertTrue(np.array_equal(TEST_U8_DATA0, meta.read_samples())) self.assertTrue(np.array_equal(np.array([]), meta.read_samples_in_capture(0))) - self.assertTrue(np.array_equal(TEST_U8_DATA0, meta.read_samples_in_capture(1, autoscale=False))) + self.assertTrue(np.array_equal(TEST_U8_DATA0, meta.read_samples_in_capture(1))) def test_001(self) -> None: """two capture recording with header_bytes and trailing_bytes set""" - meta = self.prepare(TEST_U8_DATA1, TEST_U8_META1, np.uint8) + meta = self.prepare(TEST_U8_DATA1, TEST_U8_META1, np.uint8, autoscale=False) self.assertEqual(192, meta._count_samples()) self.assertFalse(meta._is_conforming_dataset()) self.assertTrue((32, 160), meta.get_capture_byte_boundarys(0)) self.assertTrue((160, 224), meta.get_capture_byte_boundarys(1)) - self.assertTrue(np.array_equal(np.arange(128), meta.read_samples_in_capture(0, autoscale=False))) - self.assertTrue(np.array_equal(np.arange(128, 192), meta.read_samples_in_capture(1, autoscale=False))) + self.assertTrue(np.array_equal(np.arange(128), meta.read_samples_in_capture(0))) + self.assertTrue(np.array_equal(np.arange(128, 192), meta.read_samples_in_capture(1))) def test_002(self) -> None: """two capture recording with multiple header_bytes set""" - meta = self.prepare(TEST_U8_DATA2, TEST_U8_META2, np.uint8) + meta = self.prepare(TEST_U8_DATA2, TEST_U8_META2, np.uint8, autoscale=False) self.assertEqual(192, meta._count_samples()) self.assertFalse(meta._is_conforming_dataset()) self.assertTrue((32, 160), meta.get_capture_byte_boundarys(0)) - self.assertTrue((176, 240), meta.get_capture_byte_boundarys(1)) - self.assertTrue(np.array_equal(np.arange(128), meta.read_samples_in_capture(0, autoscale=False))) - self.assertTrue(np.array_equal(np.arange(128, 192), meta.read_samples_in_capture(1, autoscale=False))) + self.assertTrue((160, 224), meta.get_capture_byte_boundarys(1)) + self.assertTrue(np.array_equal(np.arange(128), meta.read_samples_in_capture(0))) + self.assertTrue(np.array_equal(np.arange(128, 192), meta.read_samples_in_capture(1))) def test_003(self) -> None: """three capture recording with multiple header_bytes set""" - meta = self.prepare(TEST_U8_DATA3, TEST_U8_META3, np.uint8) + meta = self.prepare(TEST_U8_DATA3, TEST_U8_META3, np.uint8, autoscale=False) self.assertEqual(192, meta._count_samples()) self.assertFalse(meta._is_conforming_dataset()) self.assertTrue((32, 64), meta.get_capture_byte_boundarys(0)) self.assertTrue((64, 160), meta.get_capture_byte_boundarys(1)) - self.assertTrue((192, 256), meta.get_capture_byte_boundarys(2)) - self.assertTrue(np.array_equal(np.arange(32), meta.read_samples_in_capture(0, autoscale=False))) - self.assertTrue(np.array_equal(np.arange(32, 128), meta.read_samples_in_capture(1, autoscale=False))) - self.assertTrue(np.array_equal(np.arange(128, 192), meta.read_samples_in_capture(2, autoscale=False))) + self.assertTrue((160, 224), meta.get_capture_byte_boundarys(2)) + self.assertTrue(np.array_equal(np.arange(32), meta.read_samples_in_capture(0))) + self.assertTrue(np.array_equal(np.arange(32, 128), meta.read_samples_in_capture(1))) + self.assertTrue(np.array_equal(np.arange(128, 192), meta.read_samples_in_capture(2))) def test_004(self) -> None: """two channel version of 000""" - meta = self.prepare(TEST_U8_DATA4, TEST_U8_META4, np.uint8) + meta = self.prepare(TEST_U8_DATA4, TEST_U8_META4, np.uint8, autoscale=False) self.assertEqual(96, meta._count_samples()) self.assertFalse(meta._is_conforming_dataset()) - self.assertTrue((32, 160), meta.get_capture_byte_boundarys(0)) - self.assertTrue((160, 224), meta.get_capture_byte_boundarys(1)) - self.assertTrue( - np.array_equal(np.arange(64).repeat(2).reshape(-1, 2), meta.read_samples_in_capture(0, autoscale=False)) - ) - self.assertTrue( - np.array_equal(np.arange(64, 96).repeat(2).reshape(-1, 2), meta.read_samples_in_capture(1, autoscale=False)) - ) + self.assertTrue((32, 96), meta.get_capture_byte_boundarys(0)) + self.assertTrue((96, 160), meta.get_capture_byte_boundarys(1)) + self.assertTrue(np.array_equal(np.arange(64).repeat(2).reshape(-1, 2), meta.read_samples_in_capture(0))) + self.assertTrue(np.array_equal(np.arange(64, 96).repeat(2).reshape(-1, 2), meta.read_samples_in_capture(1))) def test_slicing_ru8(self) -> None: """slice real uint8""" - meta = self.prepare(TEST_U8_DATA0, TEST_U8_META0, np.uint8) + meta = self.prepare(TEST_U8_DATA0, TEST_U8_META0, np.uint8, autoscale=False) self.assertTrue(np.array_equal(meta[:], TEST_U8_DATA0)) self.assertTrue(np.array_equal(meta[6], TEST_U8_DATA0[6])) self.assertTrue(np.array_equal(meta[1:-1], TEST_U8_DATA0[1:-1])) @@ -320,12 +317,13 @@ def test_slicing_rf32(self) -> None: def test_slicing_multiple_channels(self) -> None: """slice multiple channels""" - meta = self.prepare(TEST_U8_DATA4, TEST_U8_META4, np.uint8) + meta_raw = self.prepare(TEST_U8_DATA4, TEST_U8_META4, np.uint8, autoscale=False) + meta_scaled = self.prepare(TEST_U8_DATA4, TEST_U8_META4, np.uint8, autoscale=False) # use raw data for this test channelized = np.array(TEST_U8_DATA4).reshape((-1, 2)) - self.assertTrue(np.array_equal(meta[:][:], channelized)) - self.assertTrue(np.array_equal(meta[10:20, 0], meta.read_samples(autoscale=False)[10:20, 0])) - self.assertTrue(np.array_equal(meta[0], channelized[0])) - self.assertTrue(np.array_equal(meta[1, :], channelized[1])) + self.assertTrue(np.array_equal(meta_scaled[:][:], channelized)) + self.assertTrue(np.array_equal(meta_raw[10:20, 0], meta_raw.read_samples()[10:20, 0])) + self.assertTrue(np.array_equal(meta_scaled[0], channelized[0])) + self.assertTrue(np.array_equal(meta_scaled[1, :], channelized[1])) def simulate_capture(sigmf_md, n, capture_len): From eb0555a68837c6e551540284a809544c1305534d Mon Sep 17 00:00:00 2001 From: Teque5 Date: Wed, 24 Dec 2025 12:17:30 -0800 Subject: [PATCH 3/3] formatting --- docs/source/quickstart.rst | 8 ++++---- sigmf/sigmffile.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/source/quickstart.rst b/docs/source/quickstart.rst index cb50db2..71c81f1 100644 --- a/docs/source/quickstart.rst +++ b/docs/source/quickstart.rst @@ -90,14 +90,14 @@ For fixed-point datasets, you can control whether samples are automatically scal .. code-block:: python import sigmf - + # Default behavior: autoscale fixed-point data to [-1.0, 1.0] range handle = sigmf.fromfile("fixed_point_data.sigmf") samples = handle.read_samples() # Returns float32/complex64 - + # Disable autoscaling to access raw integer values - handle_raw = sigmf.fromfile("fixed_point_data.sigmf", autoscale=False) + handle_raw = sigmf.fromfile("fixed_point_data.sigmf", autoscale=False) raw_samples = handle_raw.read_samples() # Returns original integer types - + # Both slicing and read_samples() respect the autoscale setting assert handle[0:10].dtype == handle.read_samples(count=10).dtype diff --git a/sigmf/sigmffile.py b/sigmf/sigmffile.py index aaeee2d..d6a6d58 100644 --- a/sigmf/sigmffile.py +++ b/sigmf/sigmffile.py @@ -221,7 +221,7 @@ def __next__(self): def __getitem__(self, sli): mem = self._memmap[sli] # matches behavior of numpy.ndarray.__getitem__() - # original behavior: always apply _return_type conversion if set + # apply _return_type conversion if set if self._return_type is None: # no special conversion needed if not self.autoscale: @@ -253,7 +253,7 @@ def __getitem__(self, sli): # floating-point data, no scaling needed return mem - # handle complex data type conversion (original behavior) + # handle complex data type conversion if self._memmap.ndim == 2: # num_channels == 1 ray = mem[:, 0].astype(self._return_type) + 1.0j * mem[:, 1].astype(self._return_type)