diff --git a/ncore/impl/data/v4/compat_test.py b/ncore/impl/data/v4/compat_test.py index 967797d..e06a48c 100644 --- a/ncore/impl/data/v4/compat_test.py +++ b/ncore/impl/data/v4/compat_test.py @@ -16,7 +16,7 @@ import tempfile import unittest -from typing import Literal, Tuple +from typing import List, Literal, Tuple, Union import numpy as np @@ -27,9 +27,20 @@ from ncore.impl.common.transformations import HalfClosedInterval from ncore.impl.common.util import unpack_optional -from ncore.impl.data.types import PointCloud, RowOffsetStructuredSpinningLidarModelParameters +from ncore.impl.data.compat import SensorProtocol +from ncore.impl.data.types import ( + CameraLabelDescriptor, + LabelCategory, + LabelEncoding, + LabelSchema, + LabelSource, + LabelType, + PointCloud, + RowOffsetStructuredSpinningLidarModelParameters, +) from ncore.impl.data.v4.compat import SequenceLoaderV4 from ncore.impl.data.v4.components import ( + CameraLabelsComponent, IntrinsicsComponent, LidarSensorComponent, PointCloudsComponent, @@ -40,27 +51,29 @@ _RUNFILES = Runfiles.Create() +assert _RUNFILES is not None + +_TEST_DATA_PATH = UPath( + _RUNFILES.Rlocation("test-data-v4/c9b05cf4-afb9-11ec-b3c2-00044bf65fcb@1648597318700123-1648599151600035.json") + or "" +) class TestCompatV4(unittest.TestCase): """Test to verify SequenceLoaderV4 compatibility layer""" - def setUp(self): + def setUp(self) -> None: # Make printed errors more representable numerically np.set_printoptions(floatmode="unique", linewidth=200, suppress=True) # Load V4 data self.loader = SequenceLoaderV4( SequenceComponentGroupsReader( - [ - _RUNFILES.Rlocation( - "test-data-v4/c9b05cf4-afb9-11ec-b3c2-00044bf65fcb@1648597318700123-1648599151600035.json" - ) - ], + [_TEST_DATA_PATH], ) ) - def test_sequence_properties(self): + def test_sequence_properties(self) -> None: """Test sequence-level properties through V4 compat layer""" # Test sequence_id seq_id = self.loader.sequence_id @@ -76,7 +89,7 @@ def test_sequence_properties(self): self.assertIsNotNone(interval) self.assertGreater(interval.stop, interval.start) - def test_sensor_enumeration(self): + def test_sensor_enumeration(self) -> None: """Test sensor ID enumeration in V4""" # Test camera_ids camera_ids = self.loader.camera_ids @@ -92,12 +105,12 @@ def test_sensor_enumeration(self): radar_ids = self.loader.radar_ids self.assertEqual(len(radar_ids), 18) - def test_pose_graph(self): + def test_pose_graph(self) -> None: """Test pose graph access in V4""" pose_graph = self.loader.pose_graph self.assertIsNotNone(pose_graph) - def test_camera_sensor_basic(self): + def test_camera_sensor_basic(self) -> None: """Test basic camera sensor properties in V4""" camera_id = "camera_front_wide_120fov" camera = self.loader.get_camera_sensor(camera_id) @@ -114,7 +127,7 @@ def test_camera_sensor_basic(self): self.assertEqual(timestamps.shape[0], frames_count) self.assertEqual(timestamps.shape[1], 2) - def test_camera_sensor_frames(self): + def test_camera_sensor_frames(self) -> None: """Test camera frame data access in V4""" camera = self.loader.get_camera_sensor("camera_front_wide_120fov") @@ -133,7 +146,7 @@ def test_camera_sensor_frames(self): image_array = camera.get_frame_image_array(frame_idx) self.assertIsInstance(image_array, np.ndarray) - def test_sequence_paths(self): + def test_sequence_paths(self) -> None: """Test sequence_paths property""" paths = self.loader.sequence_paths self.assertIsInstance(paths, list) @@ -143,7 +156,7 @@ def test_sequence_paths(self): path.name.startswith("c9b05cf4-afb9-11ec-b3c2-00044bf65fcb@1648597318700123-1648599151600035") ) - def test_reload_resources(self): + def test_reload_resources(self) -> None: """Test reload_resources in V4""" # Should not raise an exception self.loader.reload_resources() @@ -152,7 +165,7 @@ def test_reload_resources(self): camera = self.loader.get_camera_sensor("camera_front_wide_120fov") self.assertGreater(camera.frames_count, 0) - def test_get_closest_frame_index_relative_frame_time_v4(self): + def test_get_closest_frame_index_relative_frame_time_v4(self) -> None: """Test get_closest_frame_index with various relative_frame_time values for V4 data""" camera = self.loader.get_camera_sensor("camera_front_wide_120fov") @@ -186,7 +199,7 @@ def test_get_closest_frame_index_relative_frame_time_v4(self): found_idx = camera.get_closest_frame_index(test_start_timestamp, relative_frame_time=0.0) self.assertEqual(found_idx, test_frame_idx) - def test_lidar_sensor_basic(self): + def test_lidar_sensor_basic(self) -> None: """Test basic lidar sensor properties in V4""" lidar_id = "lidar_gt_top_p128_v4p5" lidar = self.loader.get_lidar_sensor(lidar_id) @@ -208,7 +221,7 @@ def test_lidar_sensor_basic(self): self.assertIsNotNone(T_sensor_rig) self.assertEqual(unpack_optional(T_sensor_rig).shape, (4, 4)) - def test_lidar_sensor_point_cloud(self): + def test_lidar_sensor_point_cloud(self) -> None: """Test lidar point cloud access in V4""" lidar = self.loader.get_lidar_sensor("lidar_gt_top_p128_v4p5") @@ -224,7 +237,7 @@ def test_lidar_sensor_point_cloud(self): self.assertGreater(len(point_cloud.xyz_m_end), 0) self.assertEqual(point_cloud.xyz_m_end.shape[1], 3) - def test_lidar_sensor_ray_bundle(self): + def test_lidar_sensor_ray_bundle(self) -> None: """Test lidar ray bundle access in V4""" lidar = self.loader.get_lidar_sensor("lidar_gt_top_p128_v4p5") @@ -256,7 +269,66 @@ def test_lidar_sensor_ray_bundle(self): intensities = lidar.get_frame_ray_bundle_return_intensity(frame_idx) self.assertEqual(intensities.shape[0], count) - def test_lidar_sensor_transforms(self): + def test_get_sequence_meta(self) -> None: + """Test get_sequence_meta returns a non-empty dict without raising""" + meta = self.loader.get_sequence_meta() + self.assertIsInstance(meta, dict) + # Should contain at least sequence_id or equivalent high-level info + self.assertGreater(len(meta), 0) + + def test_radar_sensor_basic(self) -> None: + """Test basic radar sensor properties through the compat layer""" + radar_ids = self.loader.radar_ids + self.assertGreater(len(radar_ids), 0) + + radar_id = radar_ids[0] + radar = self.loader.get_radar_sensor(radar_id) + + # Test sensor_id + self.assertEqual(radar.sensor_id, radar_id) + + # Test frames_count + frames_count = radar.frames_count + self.assertGreater(frames_count, 0) + + # Test frames_timestamps_us + timestamps = radar.frames_timestamps_us + self.assertEqual(timestamps.shape[0], frames_count) + self.assertEqual(timestamps.shape[1], 2) + + def test_radar_sensor_ray_bundle(self) -> None: + """Test radar sensor ray bundle access through the compat layer""" + radar_id = self.loader.radar_ids[0] + radar = self.loader.get_radar_sensor(radar_id) + + frame_idx = 0 + + # Test get_frame_ray_bundle_count + count = radar.get_frame_ray_bundle_count(frame_idx) + self.assertGreater(count, 0) + + # Test get_frame_ray_bundle_timestamp_us + timestamps = radar.get_frame_ray_bundle_timestamp_us(frame_idx) + self.assertEqual(timestamps.shape, (count,)) + + # Test get_frame_ray_bundle_return_count + return_count = radar.get_frame_ray_bundle_return_count(frame_idx) + self.assertGreaterEqual(return_count, 1) + + # Test get_frame_ray_bundle_return_valid_mask + valid_masks = radar.get_frame_ray_bundle_return_valid_mask(frame_idx) + self.assertEqual(valid_masks.shape, (count,)) + self.assertTrue(valid_masks.dtype == np.bool_) + + # Test get_frame_ray_bundle_return_distance_m + distances_m = radar.get_frame_ray_bundle_return_distance_m(frame_idx) + self.assertEqual(distances_m.shape[0], count) + + # Test get_frame_ray_bundle_direction + directions = radar.get_frame_ray_bundle_direction(frame_idx) + self.assertEqual(directions.shape, (count, 3)) + + def test_lidar_sensor_transforms(self) -> None: """Test lidar sensor transformations in V4""" lidar = self.loader.get_lidar_sensor("lidar_gt_top_p128_v4p5") @@ -280,22 +352,18 @@ def test_lidar_sensor_transforms(self): class TestCompatV4ReferenceValues(unittest.TestCase): """Test V4 data against known reference values""" - def setUp(self): + def setUp(self) -> None: # Make printed errors more representable numerically np.set_printoptions(floatmode="unique", linewidth=200, suppress=True) # Load V4 data self.loader = SequenceLoaderV4( SequenceComponentGroupsReader( - [ - _RUNFILES.Rlocation( - "test-data-v4/c9b05cf4-afb9-11ec-b3c2-00044bf65fcb@1648597318700123-1648599151600035.json" - ) - ], + [_TEST_DATA_PATH], ) ) - def test_sensor_extrinsics_reference_values(self): + def test_sensor_extrinsics_reference_values(self) -> None: """Test T_sensor_rig matches known reference values for camera_front_wide_120fov""" sensor = self.loader.get_camera_sensor("camera_front_wide_120fov") @@ -312,12 +380,12 @@ def test_sensor_extrinsics_reference_values(self): np.testing.assert_array_equal(unpack_optional(sensor.T_sensor_rig), reference_T_sensor_rig) - def test_sensor_frame_count_reference_values(self): + def test_sensor_frame_count_reference_values(self) -> None: """Test frames_count matches expected value""" sensor = self.loader.get_camera_sensor("camera_front_wide_120fov") self.assertEqual(sensor.frames_count, 26) - def test_sensor_timestamps_reference_values(self): + def test_sensor_timestamps_reference_values(self) -> None: """Test that frame timestamps match expected values""" sensor = self.loader.get_camera_sensor("camera_front_wide_120fov") @@ -335,7 +403,7 @@ def test_sensor_timestamps_reference_values(self): self.assertEqual(actual_start, expected_start, f"Frame {frame_idx} start timestamp mismatch") self.assertEqual(actual_end, expected_end, f"Frame {frame_idx} end timestamp mismatch") - def test_sensor_poses_by_timestamp_reference_values(self): + def test_sensor_poses_by_timestamp_reference_values(self) -> None: """Test T_rig_world at known timestamps matches reference values.""" # Reference poses keyed by timestamp_us reference_poses = { @@ -434,7 +502,7 @@ def test_sensor_poses_by_timestamp_reference_values(self): err_msg=f"T_rig_world mismatch at timestamp {ts}", ) - def test_closest_frame_index_reference_values(self): + def test_closest_frame_index_reference_values(self) -> None: """Test get_closest_frame_index returns expected frame indices""" sensor = self.loader.get_camera_sensor("camera_front_wide_120fov") @@ -450,10 +518,10 @@ def test_closest_frame_index_reference_values(self): actual_idx = sensor.get_closest_frame_index(query_ts) self.assertEqual(actual_idx, expected_idx, f"get_closest_frame_index({query_ts}) mismatch") - def test_rig_world_poses_for_all_sensors(self): + def test_rig_world_poses_for_all_sensors(self) -> None: """Test that rig-world poses can be evaluated for all frame start/end times for all sensors""" # Collect all sensors: cameras, lidars, radars - sensors = [] + sensors: List[Tuple[str, str, SensorProtocol]] = [] for camera_id in self.loader.camera_ids: sensors.append(("camera", camera_id, self.loader.get_camera_sensor(camera_id))) for lidar_id in self.loader.lidar_ids: @@ -474,7 +542,7 @@ def test_rig_world_poses_for_all_sensors(self): # Verify all poses are valid transformation matrices (last row should be [0, 0, 0, 1]) self.assertTrue(np.allclose(poses[:, :, 3, :], np.array([0.0, 0.0, 0.0, 1.0]))) - def test_cuboid_observations_reference_values(self): + def test_cuboid_observations_reference_values(self) -> None: """Test cuboid track observations match reference values""" observations = list(self.loader.get_cuboid_track_observations()) @@ -518,7 +586,7 @@ def test_cuboid_observations_reference_values(self): self.assertEqual(third_obs.class_id, "automobile") self.assertEqual(third_obs.timestamp_us, 1648597318806465) - def test_cuboid_observations_timestamp_filtering(self): + def test_cuboid_observations_timestamp_filtering(self) -> None: """Test that timestamp_interval_us correctly filters cuboid track observations""" # Get all observations to establish reference data @@ -590,18 +658,18 @@ class TestPointCloudsSourceIntegration(unittest.TestCase): store_type: Literal["itar", "directory"] - # ── helpers ────────────────────────────────────────────────────────────── + # helpers @staticmethod def _normalize_directions(vectors: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: norms = np.linalg.norm(vectors, axis=1) return vectors / norms[:, np.newaxis], norms - # ── setUp ──────────────────────────────────────────────────────────────── + # setUp - def setUp(self): + def setUp(self) -> None: np.set_printoptions(floatmode="unique", linewidth=200, suppress=True) - np.random.seed(42) + self._rng = np.random.default_rng(42) self._tempdir = tempfile.TemporaryDirectory() @@ -617,7 +685,7 @@ def setUp(self): generic_meta_data={}, ) - # ── Poses ──────────────────────────────────────────────────────── + # Poses T_rig_worlds = np.stack( [ np.block( @@ -658,7 +726,7 @@ def setUp(self): pose=T_lidar_rig, ) - # ── Intrinsics ────────────────────────────────────────────────── + # Intrinsics intrinsics_writer = store_writer.register_component_writer( IntrinsicsComponent.Writer, "default", @@ -675,7 +743,7 @@ def setUp(self): ) intrinsics_writer.store_lidar_intrinsics("test_lidar", self.ref_lidar_intrinsics) - # ── Lidar frames ──────────────────────────────────────────────── + # Lidar frames lidar_writer = store_writer.register_component_writer( LidarSensorComponent.Writer, "test_lidar", @@ -683,12 +751,12 @@ def setUp(self): ) # Frame 0: 5 rays, 1 return, with model_element, generic_data={"rgb": (5,3)} - self.ref_lidar_dir0, lidar_dist0 = self._normalize_directions(np.random.rand(5, 3).astype(np.float32) + 0.1) + self.ref_lidar_dir0, lidar_dist0 = self._normalize_directions(self._rng.random((5, 3)).astype(np.float32) + 0.1) self.ref_lidar_ts0 = np.linspace(0, 500_000, num=5, dtype=np.uint64) self.ref_lidar_model_element0 = np.arange(10, dtype=np.uint16).reshape(5, 2) self.ref_lidar_distance_m0 = lidar_dist0[np.newaxis, :] - self.ref_lidar_intensity0 = np.random.rand(1, 5).astype(np.float32) - self.ref_lidar_rgb0 = np.random.randint(0, 256, size=(5, 3), dtype=np.uint8) + self.ref_lidar_intensity0 = self._rng.random((1, 5)).astype(np.float32) + self.ref_lidar_rgb0 = self._rng.integers(0, 256, size=(5, 3), dtype=np.uint8) self.ref_lidar_frame_ts0 = np.array([0, 500_000], dtype=np.uint64) lidar_writer.store_frame( @@ -703,9 +771,9 @@ def setUp(self): ) # Frame 1: 8 rays, 2 returns, no model_element, generic_data={"rgb": (8,3)} - self.ref_lidar_dir1, lidar_dist1 = self._normalize_directions(np.random.rand(8, 3).astype(np.float32) + 0.1) + self.ref_lidar_dir1, lidar_dist1 = self._normalize_directions(self._rng.random((8, 3)).astype(np.float32) + 0.1) self.ref_lidar_ts1 = np.linspace(500_001, 1_000_000, num=8, dtype=np.uint64) - self.ref_lidar_rgb1 = np.random.randint(0, 256, size=(8, 3), dtype=np.uint8) + self.ref_lidar_rgb1 = self._rng.integers(0, 256, size=(8, 3), dtype=np.uint8) self.ref_lidar_frame_ts1 = np.array([500_001, 1_000_000], dtype=np.uint64) # Build 2-return distances and intensities with some NaN for return 1 @@ -715,7 +783,7 @@ def setUp(self): self.ref_lidar_distance_m1 = np.stack((lidar_dist1, lidar_dist1 + 0.1)).astype(np.float32) self.ref_lidar_distance_m1[absent_mask] = np.nan - self.ref_lidar_intensity1 = np.random.rand(2, 8).astype(np.float32) + self.ref_lidar_intensity1 = self._rng.random((2, 8)).astype(np.float32) self.ref_lidar_intensity1[absent_mask] = np.nan self.ref_lidar_valid_mask1 = ~absent_mask @@ -731,10 +799,10 @@ def setUp(self): generic_meta_data={"frame": 1}, ) - # ── PointCloudsComponent ("sfm_points") ──────────────────────── - self.ref_pc_xyz = np.random.rand(10, 3).astype(np.float32) - self.ref_pc_rgb = np.random.randint(0, 256, size=(10, 3), dtype=np.uint8) - self.ref_pc_score = np.random.rand(2).astype(np.float32) + # PointCloudsComponent ("sfm_points") + self.ref_pc_xyz = self._rng.random((10, 3)).astype(np.float32) + self.ref_pc_rgb = self._rng.integers(0, 256, size=(10, 3), dtype=np.uint8) + self.ref_pc_score = self._rng.random(2).astype(np.float32) pc_writer = store_writer.register_component_writer( PointCloudsComponent.Writer, @@ -757,7 +825,7 @@ def setUp(self): generic_meta_data={"source": "sfm"}, ) - # ── Finalize & Load ───────────────────────────────────────────── + # Finalize & Load store_paths = store_writer.finalize() reader = SequenceComponentGroupsReader(store_paths, open_consolidated=False) @@ -767,18 +835,18 @@ def setUp(self): cuboids_component_group_name=None, ) - def tearDown(self): + def tearDown(self) -> None: self._tempdir.cleanup() - # ── Tests ──────────────────────────────────────────────────────────────── + # Tests - def test_point_clouds_ids(self): + def test_point_clouds_ids(self) -> None: """point_clouds_ids returns only native sources, not lidar/radar.""" ids = self.loader.point_clouds_ids self.assertEqual(ids, ["sfm_points"]) self.assertNotIn("test_lidar", ids) - def test_native_point_clouds_source(self): + def test_native_point_clouds_source(self) -> None: """Verify native source: xyz, attributes, reference frame, coordinate_unit, generic_data.""" src = self.loader.get_point_clouds_source("sfm_points") @@ -803,7 +871,7 @@ def test_native_point_clouds_source(self): self.assertEqual(sorted(src.get_pc_generic_data_names(0)), ["score"]) self.assertEqual(src.get_pc_generic_meta_data(0), {"source": "sfm"}) - def test_lidar_adapted_source(self): + def test_lidar_adapted_source(self) -> None: """Verify adapter: pcs_count matches frames_count, PointCloud has intensity/timestamp_us/valid_mask. Sensor generic_data (rgb) NOT in PointCloud attributes but accessible via get_pc_generic_data. """ @@ -851,7 +919,7 @@ def test_lidar_adapted_source(self): np.testing.assert_array_equal(src.get_pc_generic_data(0, "rgb"), self.ref_lidar_rgb0) self.assertEqual(src.get_pc_generic_data_names(0), ["rgb"]) - def test_lidar_adapter_model_element(self): + def test_lidar_adapter_model_element(self) -> None: """model_element present for frame 0, absent for frame 1.""" src = self.loader.get_point_clouds_source("test_lidar") @@ -865,7 +933,7 @@ def test_lidar_adapter_model_element(self): pc1 = src.get_pc(1) self.assertNotIn("model_element", pc1.attribute_names) - def test_lidar_adapter_multi_return(self): + def test_lidar_adapter_multi_return(self) -> None: """Use return_index=0 vs return_index=1 for frame 1: - Different xyz values (different distances) - Different intensity values @@ -902,23 +970,23 @@ def test_lidar_adapter_multi_return(self): np.testing.assert_array_equal(mask1, self.ref_lidar_valid_mask1[1]) self.assertFalse(np.all(mask1)) - def test_lidar_adapter_generic_meta_data(self): + def test_lidar_adapter_generic_meta_data(self) -> None: """Adapter forwards per-pc generic metadata from the sensor.""" src = self.loader.get_point_clouds_source("test_lidar") self.assertEqual(src.get_pc_generic_meta_data(0), {"frame": 0}) self.assertEqual(src.get_pc_generic_meta_data(1), {"frame": 1}) - def test_lidar_adapter_missing_generic_data(self): + def test_lidar_adapter_missing_generic_data(self) -> None: """has_pc_generic_data returns False for non-existent names.""" src = self.loader.get_point_clouds_source("test_lidar") self.assertFalse(src.has_pc_generic_data(0, "nonexistent")) - def test_unknown_id_raises_key_error(self): + def test_unknown_id_raises_key_error(self) -> None: """Requesting a non-existent source raises KeyError.""" with self.assertRaises(KeyError): self.loader.get_point_clouds_source("nonexistent_source") - def test_pc_index_range(self): + def test_pc_index_range(self) -> None: """Strided access works for both native and adapted sources.""" # Native source: 1 pc native_src = self.loader.get_point_clouds_source("sfm_points") @@ -932,3 +1000,286 @@ def test_pc_index_range(self): self.assertEqual(list(lidar_src.get_pc_index_range(0, 2, 2)), [0]) self.assertEqual(list(lidar_src.get_pc_index_range(1, 2)), [1]) self.assertEqual(list(lidar_src.get_pc_index_range(step=1)), [0, 1]) + + +@parameterized_class( + ("store_type"), + [ + ("itar",), + ("directory",), + ], +) +class TestCameraLabelsCompatIntegration(unittest.TestCase): + """Integration tests for camera labels compat API via SequenceLoaderV4. + + Writes V4 data (poses + camera labels of different types/cameras), finalizes, + then loads through SequenceLoaderV4 and verifies the compat protocol surface: + camera_labels_ids, get_camera_labels, query_camera_labels, and the CameraLabels wrapper. + """ + + store_type: Literal["itar", "directory"] + + def setUp(self) -> None: + np.set_printoptions(floatmode="unique", linewidth=200, suppress=True) + + self._tempdir = tempfile.TemporaryDirectory() + + ref_sequence_id = "camera-labels-compat-test" + ref_ts_interval = HalfClosedInterval(0, 10_000_001) + + store_writer = SequenceComponentGroupsWriter( + output_dir_path=UPath(self._tempdir.name), + store_base_name=ref_sequence_id, + sequence_id=ref_sequence_id, + sequence_timestamp_interval_us=ref_ts_interval, + store_type=self.store_type, + generic_meta_data={}, + ) + + # Poses (minimal, required by SequenceLoaderV4) + T_rig_worlds = np.stack( + [ + np.block( + [ + [R.from_euler("xyz", [0, a, 0], degrees=True).as_matrix(), np.zeros((3, 1))], + [np.array([0, 0, 0, 1])], + ] + ) + for a in [0.0, 1.0] + ] + ) + T_rig_world_timestamps_us = np.array([0, 10_000_000], dtype=np.uint64) + + poses_writer = store_writer.register_component_writer(PosesComponent.Writer, "default", group_name=None) + poses_writer.store_dynamic_pose( + source_frame_id="rig", + target_frame_id="world", + poses=T_rig_worlds, + timestamps_us=T_rig_world_timestamps_us, + ) + + # Intrinsics (minimal, required by SequenceLoaderV4) + _ = store_writer.register_component_writer(IntrinsicsComponent.Writer, "default", "intrinsics") + + # Camera Labels: depth for "front" camera + self.ref_depth_descriptor = CameraLabelDescriptor( + camera_id="front", + label_type=LabelType.DEPTH_Z_M, + label_schema=LabelSchema( + dtype=np.dtype("float32"), + shape_suffix=(), + encoding=LabelEncoding.RAW, + ), + label_source=LabelSource.AUTOLABEL, + ) + depth_writer = store_writer.register_component_writer( + CameraLabelsComponent.Writer, + self.ref_depth_descriptor.default_instance_name, + generic_meta_data={"pipeline": "autolabel-v2"}, + descriptor=self.ref_depth_descriptor, + ) + self.ref_depth1 = np.random.default_rng(1).random((32, 40), dtype=np.float32) * 100.0 + self.ref_depth2 = np.random.default_rng(2).random((32, 40), dtype=np.float32) * 50.0 + depth_writer.store_label(data=self.ref_depth1, timestamp_us=1_000_000) + depth_writer.store_label(data=self.ref_depth2, timestamp_us=2_000_000) + + # Camera Labels: segmentation for "front" camera + self.ref_seg_descriptor = CameraLabelDescriptor( + camera_id="front", + label_type=LabelType.SEGMENTATION_SEMANTIC, + label_schema=LabelSchema( + dtype=np.dtype("uint8"), + shape_suffix=(), + encoding=LabelEncoding.RAW, + ), + label_source=LabelSource.GT_ANNOTATION, + ) + seg_writer = store_writer.register_component_writer( + CameraLabelsComponent.Writer, + self.ref_seg_descriptor.default_instance_name, + generic_meta_data={}, + descriptor=self.ref_seg_descriptor, + ) + self.ref_seg1 = np.random.default_rng(3).integers(0, 20, size=(32, 40), dtype=np.uint8) + seg_writer.store_label(data=self.ref_seg1, timestamp_us=1_000_000) + + # Camera Labels: depth for "rear" camera + self.ref_rear_depth_descriptor = CameraLabelDescriptor( + camera_id="rear", + label_type=LabelType.DEPTH_Z_M, + label_schema=LabelSchema( + dtype=np.dtype("float32"), + shape_suffix=(), + encoding=LabelEncoding.RAW, + ), + label_source=LabelSource.AUTOLABEL, + ) + rear_depth_writer = store_writer.register_component_writer( + CameraLabelsComponent.Writer, + self.ref_rear_depth_descriptor.default_instance_name, + generic_meta_data={}, + descriptor=self.ref_rear_depth_descriptor, + ) + self.ref_rear_depth1 = np.random.default_rng(4).random((32, 40), dtype=np.float32) * 80.0 + rear_depth_writer.store_label(data=self.ref_rear_depth1, timestamp_us=3_000_000) + + # Finalize & Load + store_paths = store_writer.finalize() + + reader = SequenceComponentGroupsReader(store_paths, open_consolidated=False) + self.loader = SequenceLoaderV4( + reader, + masks_component_group_name=None, + cuboids_component_group_name=None, + ) + + def tearDown(self) -> None: + self._tempdir.cleanup() + + # Tests: camera_labels_ids + + def test_camera_labels_ids(self) -> None: + """camera_labels_ids returns all registered label instance names.""" + ids = self.loader.camera_labels_ids + self.assertEqual(len(ids), 3) + self.assertIn("depth.z@front", ids) + self.assertIn("segmentation.semantic@front", ids) + self.assertIn("depth.z@rear", ids) + + # Tests: get_camera_labels + + def test_get_camera_labels_depth_front(self) -> None: + """get_camera_labels returns a CameraLabelsProtocol with correct properties.""" + labels = self.loader.get_camera_labels("depth.z@front") + + # Verify label_descriptor + descriptor = labels.label_descriptor + self.assertEqual(descriptor.camera_id, "front") + self.assertEqual(descriptor.label_type, LabelType.DEPTH_Z_M) + self.assertEqual(descriptor.label_type.category, LabelCategory.DEPTH) + self.assertEqual(descriptor.label_source, LabelSource.AUTOLABEL) + self.assertEqual(descriptor.label_schema.encoding, LabelEncoding.RAW) + self.assertEqual(descriptor.label_schema.dtype, np.dtype("float32")) + + # Verify labels_count + self.assertEqual(labels.labels_count, 2) + + # Verify label_timestamps_us + np.testing.assert_array_equal( + labels.label_timestamps_us, + np.array([1_000_000, 2_000_000], dtype=np.uint64), + ) + + # Verify labels_generic_meta_data + self.assertEqual(labels.labels_generic_meta_data, {"pipeline": "autolabel-v2"}) + + def test_get_camera_labels_data_access(self) -> None: + """get_label() returns correct data for each timestamp.""" + labels = self.loader.get_camera_labels("depth.z@front") + + # First label + handle1 = labels.get_label(1_000_000) + np.testing.assert_array_almost_equal(handle1.get_data(), self.ref_depth1) + self.assertEqual(handle1.timestamp_us, 1_000_000) + # RAW encoding -> get_encoded_data returns None + self.assertIsNone(handle1.get_encoded_data()) + + # Second label + handle2 = labels.get_label(2_000_000) + np.testing.assert_array_almost_equal(handle2.get_data(), self.ref_depth2) + self.assertEqual(handle2.timestamp_us, 2_000_000) + + def test_get_camera_labels_segmentation(self) -> None: + """Segmentation labels are accessible through the compat layer.""" + labels = self.loader.get_camera_labels("segmentation.semantic@front") + + self.assertEqual(labels.labels_count, 1) + self.assertEqual(labels.label_descriptor.camera_id, "front") + self.assertEqual(labels.label_descriptor.label_type, LabelType.SEGMENTATION_SEMANTIC) + self.assertEqual(labels.label_descriptor.label_type.category, LabelCategory.SEGMENTATION) + + handle = labels.get_label(1_000_000) + np.testing.assert_array_equal(handle.get_data(), self.ref_seg1) + + def test_get_camera_labels_rear(self) -> None: + """Labels from a different camera are accessible.""" + labels = self.loader.get_camera_labels("depth.z@rear") + + self.assertEqual(labels.labels_count, 1) + self.assertEqual(labels.label_descriptor.camera_id, "rear") + + handle = labels.get_label(3_000_000) + np.testing.assert_array_almost_equal(handle.get_data(), self.ref_rear_depth1) + + def test_get_camera_labels_unknown_raises(self) -> None: + """Requesting a non-existent label ID raises KeyError.""" + with self.assertRaises(KeyError): + self.loader.get_camera_labels("nonexistent@camera") + + # Tests: query_camera_labels + def test_query_camera_labels_by_camera_id(self) -> None: + """query_camera_labels filters by camera_id correctly.""" + # "front" has 2 label instances (depth + segmentation) + front_labels = self.loader.query_camera_labels("front") + self.assertEqual(len(front_labels), 2) + camera_ids = {lbl.label_descriptor.camera_id for lbl in front_labels} + self.assertEqual(camera_ids, {"front"}) + + # "rear" has 1 label instance (depth only) + rear_labels = self.loader.query_camera_labels("rear") + self.assertEqual(len(rear_labels), 1) + self.assertEqual(rear_labels[0].label_descriptor.camera_id, "rear") + + # Non-existent camera returns empty list + empty = self.loader.query_camera_labels("side_left") + self.assertEqual(len(empty), 0) + + def test_query_camera_labels_by_label_type(self) -> None: + """query_camera_labels filters by label_type correctly.""" + # Query "front" for DEPTH_Z_M -> should return 1 result + depth_labels = self.loader.query_camera_labels("front", label_type=LabelType.DEPTH_Z_M) + self.assertEqual(len(depth_labels), 1) + self.assertEqual(depth_labels[0].label_descriptor.label_type, LabelType.DEPTH_Z_M) + + # Query "front" for SEGMENTATION_SEMANTIC -> should return 1 result + seg_labels = self.loader.query_camera_labels("front", label_type=LabelType.SEGMENTATION_SEMANTIC) + self.assertEqual(len(seg_labels), 1) + self.assertEqual(seg_labels[0].label_descriptor.label_type, LabelType.SEGMENTATION_SEMANTIC) + + # Query "front" for non-matching type -> empty + empty = self.loader.query_camera_labels("front", label_type=LabelType.FLOW_OPTICAL_FORWARD_PX) + self.assertEqual(len(empty), 0) + + def test_query_camera_labels_by_label_category(self) -> None: + """query_camera_labels filters by label_category correctly.""" + # Query "front" for DEPTH category -> depth.z@front only + depth_cat = self.loader.query_camera_labels("front", label_category=LabelCategory.DEPTH) + self.assertEqual(len(depth_cat), 1) + self.assertEqual(depth_cat[0].label_descriptor.label_type.category, LabelCategory.DEPTH) + + # Query "front" for SEGMENTATION category + seg_cat = self.loader.query_camera_labels("front", label_category=LabelCategory.SEGMENTATION) + self.assertEqual(len(seg_cat), 1) + self.assertEqual(seg_cat[0].label_descriptor.label_type.category, LabelCategory.SEGMENTATION) + + # Query "front" for FLOW category -> empty + flow_cat = self.loader.query_camera_labels("front", label_category=LabelCategory.FLOW) + self.assertEqual(len(flow_cat), 0) + + def test_query_camera_labels_combined_filters(self) -> None: + """query_camera_labels with both label_type and label_category filters.""" + # label_type takes precedence; category is redundant here but shouldn't break + results = self.loader.query_camera_labels( + "front", label_type=LabelType.DEPTH_Z_M, label_category=LabelCategory.DEPTH + ) + self.assertEqual(len(results), 1) + + # Mismatched type vs category: label_type filter is checked first, so if type matches + # but category doesn't match the type's category, result depends on implementation. + # The impl checks type first, then category against type.category — so a matching + # type with matching category should return the result. + results_mismatch = self.loader.query_camera_labels( + "front", label_type=LabelType.DEPTH_Z_M, label_category=LabelCategory.FLOW + ) + # DEPTH_Z_M.category is DEPTH, not FLOW, so this should be filtered out + self.assertEqual(len(results_mismatch), 0)