diff --git a/python/tests/test_tsfile_dataset.py b/python/tests/test_tsfile_dataset.py index f79a6d466..2f7623ae9 100644 --- a/python/tests/test_tsfile_dataset.py +++ b/python/tests/test_tsfile_dataset.py @@ -32,10 +32,11 @@ from tsfile.dataset.formatting import format_timestamp from tsfile.dataset.metadata import ( MetadataCatalog, + SeriesStats, build_series_path, resolve_series_path, ) -from tsfile.dataset.reader import TsFileSeriesReader, _build_exact_tag_filter +from tsfile.dataset.reader import MODEL_TABLE, TsFileSeriesReader, _build_exact_tag_filter def _write_weather_file(path, start): @@ -247,7 +248,7 @@ def test_dataset_basic_access_patterns(tmp_path, capsys): assert list(tsdf["field"]) == ["temperature", "humidity"] - assert "TsFileDataFrame(2 time series, 2 files)" in repr(tsdf) + assert "TsFileDataFrame(table model, 2 time series, 2 files)" in repr(tsdf) aligned.show(2) assert "AlignedTimeseries(6 rows, 2 series)" in capsys.readouterr().out @@ -306,6 +307,54 @@ def test_dataset_loc_supports_single_timestamp_and_mixed_series_specifiers(tmp_p np.testing.assert_array_equal(aligned.values, np.array([[21.5, 52.0]])) +def test_dataset_loc_dedups_repeated_series_specifiers(tmp_path): + path = tmp_path / "weather.tsfile" + _write_weather_file(path, 0) + + with TsFileDataFrame(str(path), show_progress=False) as tsdf: + humidity = "weather.device_a.humidity" + humidity_idx = tsdf.list_timeseries().index(humidity) + + # 1) name + matching idx pointing at the same series. + aligned_two_dup = tsdf.loc[0:2, [humidity, humidity_idx]] + assert aligned_two_dup.shape == (3, 2) + np.testing.assert_array_equal( + aligned_two_dup.timestamps, np.array([0, 1, 2], dtype=np.int64) + ) + np.testing.assert_array_equal( + aligned_two_dup.values, + np.array([[50.0, 50.0], [52.0, 52.0], [55.0, 55.0]]), + ) + + # 2) same name twice -- single-group, single-key dedup path. + aligned_name_twice = tsdf.loc[0:2, [humidity, humidity]] + assert aligned_name_twice.shape == (3, 2) + np.testing.assert_array_equal( + aligned_name_twice.values, + np.array([[50.0, 50.0], [52.0, 52.0], [55.0, 55.0]]), + ) + + # 3) duplicate among other distinct series must not regress the + # historically-passing case either. + aligned_mixed = tsdf.loc[ + 0:2, [humidity, "weather.device_a.temperature", humidity_idx] + ] + assert aligned_mixed.shape == (3, 3) + np.testing.assert_array_equal( + aligned_mixed.timestamps, np.array([0, 1, 2], dtype=np.int64) + ) + np.testing.assert_array_equal( + aligned_mixed.values, + np.array( + [ + [50.0, 20.0, 50.0], + [52.0, 21.5, 52.0], + [55.0, 23.0, 55.0], + ] + ), + ) + + def test_dataset_loc_supports_open_ended_ranges_and_negative_series_index(tmp_path): path = tmp_path / "weather.tsfile" _write_weather_file(path, 100) @@ -386,7 +435,7 @@ def test_dataset_repr_only_builds_preview_rows(tmp_path, monkeypatch): _write_weather_file(path, 0) with TsFileDataFrame(str(path), show_progress=False) as tsdf: - tsdf._index.series_refs_ordered = [(0, 0)] * 1000 + tsdf._index.series = [(0, 0)] * 1000 built_rows = [] @@ -411,7 +460,7 @@ def fail_build_series_name(_series_ref): monkeypatch.setattr(tsdf, "_build_series_name", fail_build_series_name) rendered = repr(tsdf) - assert "TsFileDataFrame(1000 time series, 1 files)" in rendered + assert "TsFileDataFrame(table model, 1000 time series, 1 files)" in rendered assert "..." in rendered assert len(built_rows) == 20 @@ -438,6 +487,68 @@ def test_dataset_exposes_only_numeric_fields_and_keeps_nan(tmp_path): assert series[1:1].shape == (0,) +def test_dataset_omits_table_model_phantom_series_for_skipped_cells(tmp_path): + """Schema-declared fields that a device never wrote must NOT appear. + + The dataset surface treats a series as "data physically written for one + (device, field) pair". A Tablet that skips ``add_value_by_name`` for a + column produces a chunk with ``length=0``; that cell is not a real series + and must not be exposed via ``list_timeseries`` / ``len(tsdf)`` / + ``series_shards`` -- table-model and tree-model behave identically here. + """ + from tsfile import Tablet + + path = tmp_path / "sparse_table.tsfile" + schema = TableSchema( + "bench", + [ + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("v1", TSDataType.DOUBLE, ColumnCategory.FIELD), + ColumnSchema("v2", TSDataType.DOUBLE, ColumnCategory.FIELD), + ColumnSchema("v3", TSDataType.DOUBLE, ColumnCategory.FIELD), + ], + ) + with TsFileTableWriter(str(path), schema) as writer: + # d1: writes only v1 and v2 (skip v3) + t1 = Tablet( + ["device", "v1", "v2", "v3"], + [TSDataType.STRING, TSDataType.DOUBLE, TSDataType.DOUBLE, TSDataType.DOUBLE], + 1, + ) + t1.add_timestamp(0, 1) + t1.add_value_by_name("device", 0, "d1") + t1.add_value_by_name("v1", 0, 100.0) + t1.add_value_by_name("v2", 0, 200.0) + writer.write_table(t1) + + # d2: writes only v1 and v3 (skip v2) + t2 = Tablet( + ["device", "v1", "v2", "v3"], + [TSDataType.STRING, TSDataType.DOUBLE, TSDataType.DOUBLE, TSDataType.DOUBLE], + 1, + ) + t2.add_timestamp(0, 2) + t2.add_value_by_name("device", 0, "d2") + t2.add_value_by_name("v1", 0, 110.0) + t2.add_value_by_name("v3", 0, 330.0) + writer.write_table(t2) + + with TsFileDataFrame(str(path), show_progress=False) as tsdf: + # 4 real cells: (d1,v1), (d1,v2), (d2,v1), (d2,v3); NO phantoms. + assert len(tsdf) == 4 + assert sorted(tsdf.list_timeseries()) == [ + "bench.d1.v1", + "bench.d1.v2", + "bench.d2.v1", + "bench.d2.v3", + ] + + with pytest.raises(KeyError): + tsdf["bench.d1.v3"] + with pytest.raises(KeyError): + tsdf["bench.d2.v2"] + + def test_dataset_timeseries_supports_negative_step_slices(tmp_path): path = tmp_path / "weather.tsfile" _write_weather_file(path, 0) @@ -737,6 +848,7 @@ def query_table_by_row( return _FakeResultSet(rows) reader = object.__new__(TsFileSeriesReader) + reader._model_kind = MODEL_TABLE reader._reader = _FakeNativeReader( np.arange(30, dtype=np.int64), np.arange(30, dtype=np.float64), boundary=10 ) @@ -758,14 +870,14 @@ def test_series_path_resolution_allows_prefix_tag_values(): ("temperature",), ) device_id = catalog.add_device(table_id, ("site_a", "device_a"), 0, 1) - catalog.series_stats_by_ref[(device_id, 0)] = { - "length": 1, - "min_time": 0, - "max_time": 0, - "timeline_length": 1, - "timeline_min_time": 0, - "timeline_max_time": 0, - } + catalog.series_stats_by_ref[(device_id, 0)] = SeriesStats( + length=1, + min_time=0, + max_time=0, + timeline_length=1, + timeline_min_time=0, + timeline_max_time=0, + ) series_path = build_series_path(catalog, device_id, 0) assert series_path == "weather.site_a.device_a.temperature" @@ -781,14 +893,14 @@ def test_series_path_resolution_allows_missing_trailing_tag_value(): ("temperature",), ) device_id = catalog.add_device(table_id, (), 0, 1) - catalog.series_stats_by_ref[(device_id, 0)] = { - "length": 1, - "min_time": 0, - "max_time": 0, - "timeline_length": 1, - "timeline_min_time": 0, - "timeline_max_time": 0, - } + catalog.series_stats_by_ref[(device_id, 0)] = SeriesStats( + length=1, + min_time=0, + max_time=0, + timeline_length=1, + timeline_min_time=0, + timeline_max_time=0, + ) series_path = build_series_path(catalog, device_id, 0) assert series_path == "weather.temperature" @@ -804,14 +916,14 @@ def test_series_path_resolution_uses_named_tags_for_sparse_non_prefix_values(): ("temperature",), ) device_id = catalog.add_device(table_id, (None, "device_a", None), 0, 1) - catalog.series_stats_by_ref[(device_id, 0)] = { - "length": 1, - "min_time": 0, - "max_time": 0, - "timeline_length": 1, - "timeline_min_time": 0, - "timeline_max_time": 0, - } + catalog.series_stats_by_ref[(device_id, 0)] = SeriesStats( + length=1, + min_time=0, + max_time=0, + timeline_length=1, + timeline_min_time=0, + timeline_max_time=0, + ) series_path = build_series_path(catalog, device_id, 0) assert series_path == "weather.device_a.temperature" @@ -846,6 +958,7 @@ def query_table_by_row(self, *args, **kwargs): ) reader = object.__new__(TsFileSeriesReader) + reader._model_kind = MODEL_TABLE reader._reader = _FakeNativeReader() reader._catalog = MetadataCatalog() table_id = reader._catalog.add_table( @@ -855,14 +968,14 @@ def query_table_by_row(self, *args, **kwargs): ("temperature",), ) device_id = reader._catalog.add_device(table_id, (None, "device_a", "north"), 0, 1) - reader._catalog.series_stats_by_ref[(device_id, 0)] = { - "length": 2, - "min_time": 0, - "max_time": 1, - "timeline_length": 2, - "timeline_min_time": 0, - "timeline_max_time": 1, - } + reader._catalog.series_stats_by_ref[(device_id, 0)] = SeriesStats( + length=2, + min_time=0, + max_time=1, + timeline_length=2, + timeline_min_time=0, + timeline_max_time=1, + ) with pytest.raises(NotImplementedError, match="IS NULL / IS NOT NULL"): reader.read_series_by_ref(device_id, 0, 0, 1) @@ -872,7 +985,7 @@ def query_table_by_row(self, *args, **kwargs): def test_dataframe_resolves_named_sparse_tag_series_path(): tsdf = object.__new__(TsFileDataFrame) - tsdf._index = dataframe_module._LogicalIndex() + tsdf._index = dataframe_module._DataFrameCatalog() tsdf._index.table_entries["weather"] = dataframe_module.TableEntry( table_name="weather", tag_columns=("city", "device", "region"), @@ -880,16 +993,15 @@ def test_dataframe_resolves_named_sparse_tag_series_path(): field_columns=("temperature",), ) device_key = ("weather", (None, "device_a")) - tsdf._index.device_order = [device_key] - tsdf._index.device_index_by_key = {device_key: 0} + tsdf._index.devices = [device_key] + tsdf._index.device_index = {device_key: 0} tsdf._index.tables_with_sparse_tag_values = {"weather"} tsdf._index.sparse_device_indices_by_compressed_path = { ("weather", ("device_a",)): [0] } - tsdf._index.device_refs = [[]] - tsdf._index.series_refs_ordered = [(0, 0)] - tsdf._index.series_ref_set = {(0, 0)} - tsdf._index.series_ref_map = {(0, 0): []} + tsdf._index.device_time_bounds = [(0, 1)] + tsdf._index.series = [(0, 0)] + tsdf._index.series_shards = {(0, 0): []} assert tsdf.list_timeseries() == ["weather.device_a.temperature"] assert tsdf._resolve_series_name("weather.device_a.temperature") == (0, 0) @@ -897,18 +1009,18 @@ def test_dataframe_resolves_named_sparse_tag_series_path(): def test_dataframe_list_timeseries_filters_named_sparse_tag_prefix(): tsdf = object.__new__(TsFileDataFrame) - tsdf._index = dataframe_module._LogicalIndex() + tsdf._index = dataframe_module._DataFrameCatalog() tsdf._index.table_entries["weather"] = dataframe_module.TableEntry( table_name="weather", tag_columns=("city", "device", "region"), tag_types=(TSDataType.STRING, TSDataType.STRING, TSDataType.STRING), field_columns=("temperature",), ) - tsdf._index.device_order = [ + tsdf._index.devices = [ ("weather", (None, "device_a")), ("weather", ("beijing", "device_b")), ] - tsdf._index.device_index_by_key = { + tsdf._index.device_index = { ("weather", (None, "device_a")): 0, ("weather", ("beijing", "device_b")): 1, } @@ -917,10 +1029,9 @@ def test_dataframe_list_timeseries_filters_named_sparse_tag_prefix(): ("weather", ("device_a",)): [0], ("weather", ("beijing", "device_b")): [1], } - tsdf._index.device_refs = [[], []] - tsdf._index.series_refs_ordered = [(0, 0), (1, 0)] - tsdf._index.series_ref_set = {(0, 0), (1, 0)} - tsdf._index.series_ref_map = {(0, 0): [], (1, 0): []} + tsdf._index.device_time_bounds = [(0, 1), (0, 1)] + tsdf._index.series = [(0, 0), (1, 0)] + tsdf._index.series_shards = {(0, 0): [], (1, 0): []} assert tsdf.list_timeseries("weather.device_a") == ["weather.device_a.temperature"] @@ -932,7 +1043,7 @@ def test_dataframe_list_timeseries_prefix_can_skip_full_name_build( _write_weather_file(path, 0) with TsFileDataFrame(str(path), show_progress=False) as tsdf: - tsdf._index.series_refs_ordered = [(0, 0)] * 1000 + tsdf._index.series = [(0, 0)] * 1000 def fail_build_series_name(_series_ref): raise AssertionError( @@ -954,14 +1065,14 @@ def test_series_path_resolution_reports_ambiguous_sparse_path(): first_id = catalog.add_device(table_id, ("beijing", None), 0, 1) second_id = catalog.add_device(table_id, (None, "beijing"), 0, 1) for device_id in (first_id, second_id): - catalog.series_stats_by_ref[(device_id, 0)] = { - "length": 1, - "min_time": 0, - "max_time": 0, - "timeline_length": 1, - "timeline_min_time": 0, - "timeline_max_time": 0, - } + catalog.series_stats_by_ref[(device_id, 0)] = SeriesStats( + length=1, + min_time=0, + max_time=0, + timeline_length=1, + timeline_min_time=0, + timeline_max_time=0, + ) assert build_series_path(catalog, first_id, 0) == "weather.beijing.temperature" assert build_series_path(catalog, second_id, 0) == "weather.beijing.temperature" @@ -994,3 +1105,136 @@ def test_dataframe_parallel_show_progress_reports_start_immediately(tmp_path, ca stderr = capsys.readouterr().err assert "Loading TsFile shards: 0/2" in stderr assert "Loading TsFile shards: 2/2 (4 series) ... done" in stderr + + +# --- Tree-model tests ------------------------------------------------------- + + +def _write_tree_file(path): + """Tree-model TsFile with two devices; the second device is shorter and + only has one of the two declared measurements, exercising None-pad + + union-field paths in the synthetic table layer. + """ + from tsfile import ( + Field, + RowRecord, + TimeseriesSchema, + TsFileWriter, + ) + + writer = TsFileWriter(str(path)) + writer.register_timeseries( + "root.ln.wf01.wt01", TimeseriesSchema("status", TSDataType.INT32) + ) + writer.register_timeseries( + "root.ln.wf01.wt01", TimeseriesSchema("temperature", TSDataType.DOUBLE) + ) + writer.register_timeseries( + "root.ln.wf02.wt02", TimeseriesSchema("status", TSDataType.INT32) + ) + for t in range(5): + writer.write_row_record( + RowRecord( + "root.ln.wf01.wt01", + t, + [ + Field("status", t, TSDataType.INT32), + Field("temperature", float(t) + 0.5, TSDataType.DOUBLE), + ], + ) + ) + writer.write_row_record( + RowRecord( + "root.ln.wf02.wt02", + t, + [Field("status", t * 2, TSDataType.INT32)], + ) + ) + writer.close() + + +def test_dataset_tree_model_metadata_and_repr(tmp_path): + path = tmp_path / "tree.tsfile" + _write_tree_file(path) + + with TsFileDataFrame(str(path), show_progress=False) as tsdf: + assert tsdf.model == "tree" + assert len(tsdf) == 3 + assert sorted(tsdf.list_timeseries()) == [ + "root.ln.wf01.wt01.status", + "root.ln.wf01.wt01.temperature", + "root.ln.wf02.wt02.status", + ] + + rendered = repr(tsdf) + # Header carries the model marker; tag headers use _col_i (1-based). + assert "TsFileDataFrame(tree model, 3 time series, 1 files)" in rendered + assert "_col_1" in rendered and "_col_2" in rendered and "_col_3" in rendered + assert "table" not in rendered.splitlines()[1] # no 'table' header + + # Metadata column projection: _col_i and field; 'table' is rejected. + assert list(tsdf["_col_1"]) == ["ln", "ln", "ln"] + assert list(tsdf["_col_3"]) == ["wt01", "wt01", "wt02"] + assert list(tsdf["field"]) == ["status", "temperature", "status"] + with pytest.raises(KeyError): + tsdf["table"] + + +def test_dataset_tree_model_series_access(tmp_path): + path = tmp_path / "tree.tsfile" + _write_tree_file(path) + + with TsFileDataFrame(str(path), show_progress=False) as tsdf: + ts = tsdf["root.ln.wf01.wt01.temperature"] + assert isinstance(ts, Timeseries) + assert ts.name == "root.ln.wf01.wt01.temperature" + assert len(ts) == 5 + np.testing.assert_array_equal(ts.timestamps, np.arange(5, dtype=np.int64)) + # __getitem__ slice routes through _read_series_by_row_tree. + first_three = ts[0:3] + np.testing.assert_array_equal(first_three, np.array([0.5, 1.5, 2.5])) + + # Aligned read across two co-located series. + aligned = tsdf.loc[0:5, [ + "root.ln.wf01.wt01.temperature", + "root.ln.wf01.wt01.status", + ]] + assert isinstance(aligned, AlignedTimeseries) + assert aligned.shape == (5, 2) + np.testing.assert_array_equal(aligned.timestamps, np.arange(5, dtype=np.int64)) + + +def test_dataset_tree_model_list_timeseries_metadata(tmp_path): + path = tmp_path / "tree.tsfile" + _write_tree_file(path) + + with TsFileDataFrame(str(path), show_progress=False) as tsdf: + meta = tsdf.list_timeseries_metadata() + assert isinstance(meta, pd.DataFrame) + assert list(meta.columns) == [ + "field", + "start_time", + "end_time", + "count", + "_col_1", + "_col_2", + "_col_3", + ] + assert sorted(meta.index.tolist()) == sorted(tsdf.list_timeseries()) + # Time bounds surface as pandas.Timestamp for ergonomic comparison. + assert pd.api.types.is_datetime64_any_dtype(meta["start_time"]) + assert pd.api.types.is_datetime64_any_dtype(meta["end_time"]) + # Per-series count comes from the catalog, not the synthetic union. + assert meta.loc["root.ln.wf01.wt01.temperature", "count"] == 5 + assert meta.loc["root.ln.wf02.wt02.status", "count"] == 5 + + +def test_dataset_rejects_mixed_model_load(tmp_path): + table_path = tmp_path / "weather.tsfile" + tree_path = tmp_path / "tree.tsfile" + _write_weather_file(table_path, 0) + _write_tree_file(tree_path) + + with pytest.raises(ValueError, match="Mixed table-model and tree-model"): + TsFileDataFrame([str(table_path), str(tree_path)], show_progress=False) + diff --git a/python/tsfile/dataset/dataframe.py b/python/tsfile/dataset/dataframe.py index 40149102a..81b377ebd 100644 --- a/python/tsfile/dataset/dataframe.py +++ b/python/tsfile/dataset/dataframe.py @@ -23,7 +23,7 @@ import heapq import os import sys -from typing import Dict, List, Set, Tuple, Union +from typing import Dict, List, Optional, Set, Tuple, Union import warnings import numpy as np @@ -41,7 +41,6 @@ DeviceKey = Tuple[str, tuple] SeriesRefKey = Tuple[int, int] SeriesRef = Tuple[object, int, int] -DeviceRef = Tuple[object, int] _QUERY_START = np.iinfo(np.int64).min _QUERY_END = np.iinfo(np.int64).max @@ -52,18 +51,27 @@ # multiple shards. _OVERLAP_ROW_CHUNK_SIZE = 256 +MODEL_TABLE = "table" +MODEL_TREE = "tree" + @dataclass(**_DATACLASS_SLOTS) -class _LogicalIndex: - """Cross-reader logical mapping for devices and series.""" +class _DataFrameCatalog: + """TsFileDataFrame's cross-file unified catalog: merges each tsfile's + ``MetadataCatalog`` into one user-facing global view. + """ + + # Model kind for the entire load set: "table" or "tree". A single + # TsFileDataFrame is not allowed to mix table-model and tree-model files. + model: Optional[str] = None # Shared table schema references keyed by table name. table_entries: Dict[str, TableEntry] = field(default_factory=dict) # Stable logical device order, each item is (table_name, tag_values). - device_order: List[DeviceKey] = field(default_factory=list) + devices: List[DeviceKey] = field(default_factory=list) # Map one logical device key to its dataframe-local device index. - device_index_by_key: Dict[DeviceKey, int] = field(default_factory=dict) + device_index: Dict[DeviceKey, int] = field(default_factory=dict) # Tables that need sparse compressed-path lookup because some devices # contain non-trailing missing tag values. tables_with_sparse_tag_values: Set[str] = field(default_factory=set) @@ -71,23 +79,17 @@ class _LogicalIndex: sparse_device_indices_by_compressed_path: Dict[ Tuple[str, Tuple[str, ...]], List[int] ] = field(default_factory=dict) - # For each logical device, keep the contributing reader-local device refs. - device_refs: List[List[DeviceRef]] = field(default_factory=list) + # Aggregated (min_time, max_time) per logical device, computed once at + # registration so query-time lookups are O(1). + device_time_bounds: List[Tuple[Optional[int], Optional[int]]] = field( + default_factory=list + ) # Stable logical series order, each item is (device_idx, field_idx). - series_refs_ordered: List[SeriesRefKey] = field(default_factory=list) - # Map one logical series ref to the contributing reader-local series refs. - series_ref_map: Dict[SeriesRefKey, List[SeriesRef]] = field(default_factory=dict) - # Fast membership check for resolved series refs. - series_ref_set: Set[SeriesRefKey] = field(default_factory=set) - - -@dataclass(**_DATACLASS_SLOTS) -class _DerivedCache: - """Merged metadata derived from the logical index.""" - - devices: List[dict] = field(default_factory=list) - field_stats: Dict[SeriesRefKey, dict] = field(default_factory=dict) + series: List[SeriesRefKey] = field(default_factory=list) + # For each logical series: which shards hold it, plus its (device_id, + # field_idx) coordinates inside each shard. + series_shards: Dict[SeriesRefKey, List[SeriesRef]] = field(default_factory=dict) def _expand_paths(paths: Union[str, List[str]]) -> List[str]: @@ -144,11 +146,22 @@ def _validate_table_schema( def _register_reader( readers: Dict[str, object], - index: _LogicalIndex, + index: _DataFrameCatalog, file_path: str, reader, ) -> None: """Merge one reader's catalog into the dataframe-wide logical index.""" + cur_tsfile_model = reader.model_kind + if index.model is None: + index.model = cur_tsfile_model + elif index.model != cur_tsfile_model: + raise ValueError( + f"Mixed table-model and tree-model TsFiles detected. The first " + f"loaded file is {index.model!r} but '{file_path}' is " + f"{cur_tsfile_model!r}. A single TsFileDataFrame load set must be " + f"entirely table-model or entirely tree-model." + ) + readers[file_path] = reader catalog = reader.catalog @@ -162,12 +175,14 @@ def _register_reader( for device_id, device_entry in enumerate(catalog.device_entries): table_entry = catalog.table_entries[device_entry.table_id] device_key = (table_entry.table_name, tuple(device_entry.tag_values)) - device_idx = index.device_index_by_key.get(device_key) + device_idx = index.device_index.get(device_key) if device_idx is None: - device_idx = len(index.device_order) - index.device_index_by_key[device_key] = device_idx - index.device_order.append(device_key) - index.device_refs.append([]) + device_idx = len(index.devices) + index.device_index[device_key] = device_idx + index.devices.append(device_key) + index.device_time_bounds.append( + (device_entry.min_time, device_entry.max_time) + ) if any(value is None for value in device_entry.tag_values): index.tables_with_sparse_tag_values.add(table_entry.table_name) compressed_components = tuple( @@ -182,32 +197,31 @@ def _register_reader( index.sparse_device_indices_by_compressed_path.setdefault( compressed_key, [] ).append(device_idx) - index.device_refs[device_idx].append((reader, device_id)) - - for field_idx in range(len(table_entry.field_columns)): - series_ref = (device_idx, field_idx) - if series_ref not in index.series_ref_map: - index.series_refs_ordered.append(series_ref) - index.series_ref_map[series_ref] = [] - index.series_ref_map[series_ref].append((reader, device_id, field_idx)) - - -def _build_device_entry(refs: List[DeviceRef]) -> dict: - """Compute per-device time bounds from cheap metadata only. - - We intentionally do not validate duplicates at the device level because - table-model fields do not necessarily share one complete timestamp axis. - Duplicate detection stays on the logical-series paths that materialize or - merge one field's timestamps. - """ - infos = [reader.get_device_info(device_id) for reader, device_id in refs] - min_time = min(info["min_time"] for info in infos) - max_time = max(info["max_time"] for info in infos) + else: + cur_min, cur_max = index.device_time_bounds[device_idx] + new_min = ( + device_entry.min_time + if cur_min is None + else min(cur_min, device_entry.min_time) + ) + new_max = ( + device_entry.max_time + if cur_max is None + else max(cur_max, device_entry.max_time) + ) + index.device_time_bounds[device_idx] = (new_min, new_max) - return { - "min_time": min_time, - "max_time": max_time, - } + # Register every (device, field) pair the reader physically holds. + for device_id, field_idx in catalog.series_stats_by_ref: + device_entry = catalog.device_entries[device_id] + table_entry = catalog.table_entries[device_entry.table_id] + device_key = (table_entry.table_name, tuple(device_entry.tag_values)) + device_idx = index.device_index[device_key] + series_ref = (device_idx, field_idx) + if series_ref not in index.series_shards: + index.series.append(series_ref) + index.series_shards[series_ref] = [] + index.series_shards[series_ref].append((reader, device_id, field_idx)) def _build_runtime_series_stats(refs: List[SeriesRef]) -> dict: @@ -487,10 +501,10 @@ def _parse_key(self, key): if isinstance(item, (int, np.integer)): idx = int(item) if idx < 0: - idx += len(self._df._index.series_refs_ordered) - if idx < 0 or idx >= len(self._df._index.series_refs_ordered): + idx += len(self._df._index.series) + if idx < 0 or idx >= len(self._df._index.series): raise IndexError(f"Series index {item} out of range") - series_ref = self._df._index.series_refs_ordered[idx] + series_ref = self._df._index.series[idx] elif isinstance(item, str): series_ref = self._df._resolve_series_name(item) else: @@ -514,17 +528,19 @@ def _query_aligned( groups = defaultdict(list) for col_idx, series_ref in enumerate(series_refs): device_idx, field_idx = series_ref - device_info = self._df._cache.devices[device_idx] + min_time_dev, max_time_dev = self._df._index.device_time_bounds[ + device_idx + ] if ( - device_info["max_time"] is None - or device_info["max_time"] < start_time - or device_info["min_time"] > end_time + max_time_dev is None + or max_time_dev < start_time + or (min_time_dev is not None and min_time_dev > end_time) ): continue _, table_entry, _ = self._df._get_series_components(series_ref) field_name = table_entry.field_columns[field_idx] - for reader, device_id, reader_field_idx in self._df._index.series_ref_map[ + for reader, device_id, reader_field_idx in self._df._index.series_shards[ series_ref ]: groups[(id(reader), device_id)].append( @@ -547,10 +563,15 @@ def _query_aligned( ts_arr, field_vals = reader.read_device_fields_by_time_range( device_id, field_indices, start_time, end_time ) + if len(ts_arr) == 0: + continue + appended_series = set() for _, _, field_name, series_name, _, _ in entries: - if len(ts_arr) > 0: - series_time_parts[series_name].append(ts_arr) - series_value_parts[series_name].append(field_vals[field_name]) + if series_name in appended_series: + continue + appended_series.add(series_name) + series_time_parts[series_name].append(ts_arr) + series_value_parts[series_name].append(field_vals[field_name]) series_data = {} for name in series_names: @@ -575,8 +596,7 @@ def __init__(self, paths: Union[str, List[str]], show_progress: bool = True): self._paths = _expand_paths(paths) self._show_progress = show_progress self._readers: Dict[str, object] = {} - self._index = _LogicalIndex() - self._cache = _DerivedCache() + self._index = _DataFrameCatalog() self._is_view = False self._root = None self._closed = False @@ -593,17 +613,19 @@ def _from_subset( obj._paths = parent._paths obj._show_progress = parent._show_progress obj._readers = parent._readers - obj._index = _LogicalIndex( + # Reuse the parent's full mapping but restrict the membership scope to + # the requested subset. + subset_refs = list(series_refs) + parent_shards = parent._index.series_shards + subset_shards = {ref: parent_shards[ref] for ref in subset_refs} + obj._index = _DataFrameCatalog( + model=parent._index.model, table_entries=parent._index.table_entries, - device_order=parent._index.device_order, - device_index_by_key=parent._index.device_index_by_key, - device_refs=parent._index.device_refs, - series_refs_ordered=list(series_refs), - series_ref_map=parent._index.series_ref_map, - series_ref_set=set(series_refs), - ) - obj._cache = _DerivedCache( - devices=parent._cache.devices, field_stats=parent._cache.field_stats + devices=parent._index.devices, + device_index=parent._index.device_index, + device_time_bounds=parent._index.device_time_bounds, + series=subset_refs, + series_shards=subset_shards, ) obj._closed = False return obj @@ -624,16 +646,7 @@ def _load_metadata(self): else: self._load_metadata_serial(TsFileSeriesReader) - self._cache.devices = [ - _build_device_entry(refs) for refs in self._index.device_refs - ] - for series_ref in self._index.series_refs_ordered: - self._cache.field_stats[series_ref] = _build_field_stats( - self._index.series_ref_map[series_ref] - ) - - self._index.series_ref_set = set(self._index.series_refs_ordered) - if not self._index.series_refs_ordered: + if not self._index.series: raise ValueError("No valid time series found in the provided TsFile files") def _show_loading_progress(self, done: int, total: int, total_series: int = None): @@ -704,7 +717,7 @@ def _get_series_components( self, series_ref: SeriesRefKey ) -> Tuple[DeviceKey, TableEntry, int]: device_idx, field_idx = series_ref - device_key = self._index.device_order[device_idx] + device_key = self._index.devices[device_idx] return device_key, self._index.table_entries[device_key[0]], field_idx def _build_series_name(self, series_ref: SeriesRefKey) -> str: @@ -735,7 +748,7 @@ def _resolve_series_name(self, series_name: str) -> SeriesRefKey: raise KeyError(_series_lookup_hint(series_name)) from exc tag_parts = parts[1:-1] - direct_device_idx = self._index.device_index_by_key.get( + direct_device_idx = self._index.device_index.get( (table_name, tuple(tag_parts)) ) @@ -763,32 +776,46 @@ def _resolve_series_name(self, series_name: str) -> SeriesRefKey: device_idx = candidate_indices[0] series_ref = (device_idx, field_idx) - if series_ref not in self._index.series_ref_set: + if series_ref not in self._index.series_shards: raise KeyError(_series_lookup_hint(series_name)) return series_ref def _build_series_info(self, series_ref: SeriesRefKey) -> dict: device_idx, field_idx = series_ref device_key, table_entry, _ = self._get_series_components(series_ref) - field_stats = self._cache.field_stats[series_ref] + # Aggregate per-shard timeline stats lazily on demand for this series. + field_stats = _build_field_stats(self._index.series_shards[series_ref]) + # Pad short tag tuples (tree-model devices whose path is shorter than + # the synthetic table's max depth) with None so positional access by + # `_col_i` index always lands on a defined cell. + tag_values_ordered = list(device_key[1]) + if len(tag_values_ordered) < len(table_entry.tag_columns): + tag_values_ordered.extend( + [None] * (len(table_entry.tag_columns) - len(tag_values_ordered)) + ) return { "table_name": table_entry.table_name, "field": table_entry.field_columns[field_idx], "tag_columns": table_entry.tag_columns, - "tag_values": dict(zip(table_entry.tag_columns, device_key[1])), + "tag_values": dict(zip(table_entry.tag_columns, tag_values_ordered)), + "tag_values_ordered": tag_values_ordered, "min_time": field_stats["min_time"], "max_time": field_stats["max_time"], "count": field_stats["count"], } def __len__(self) -> int: - return len(self._index.series_refs_ordered) + return len(self._index.series) + + @property + def model(self) -> str: + return self._index.model def list_timeseries(self, path_prefix: str = "") -> List[str]: if not path_prefix: return [ self._build_series_name(series_ref) - for series_ref in self._index.series_refs_ordered + for series_ref in self._index.series ] try: @@ -797,7 +824,7 @@ def list_timeseries(self, path_prefix: str = "") -> List[str]: return [] matched = [] - for series_ref in self._index.series_refs_ordered: + for series_ref in self._index.series: device_key, table_entry, field_idx = self._get_series_components(series_ref) components = build_logical_series_components( table_entry.table_name, @@ -809,19 +836,79 @@ def list_timeseries(self, path_prefix: str = "") -> List[str]: matched.append(self._build_series_name(series_ref)) return matched + def list_timeseries_metadata(self, path_prefix: str = ""): + """Return a pandas DataFrame of per-series metadata. + + The returned frame is indexed by the logical series name and includes + per-series ``field``, time-bound (start/end) statistics, observation + ``count``, and the per-device tag values (named ``_col_1``, ``_col_2``, + ... in tree mode, or by their declared tag-column names in table + mode). Time bounds are exposed as ``pandas.Timestamp`` for ergonomic + comparison; ``count`` is an integer. + + ``path_prefix`` filters by the same logical-path prefix semantics as + ``list_timeseries`` (no prefix returns the full catalog). + """ + import pandas as pd + + # Reuse list_timeseries to apply prefix filtering, then map names back + # to the underlying series_ref (this respects view subsetting too). + names = self.list_timeseries(path_prefix) + + rows = [] + for series_name in names: + series_ref = self._resolve_series_name(series_name) + info = self._build_series_info(series_ref) + row = { + "field": info["field"], + "start_time": pd.to_datetime(info["min_time"], unit="ms"), + "end_time": pd.to_datetime(info["max_time"], unit="ms"), + "count": int(info["count"]), + } + if self._index.model != MODEL_TREE: + row["table"] = info["table_name"] + tag_columns = info["tag_columns"] + tag_values_ordered = info["tag_values_ordered"] + for column, value in zip(tag_columns, tag_values_ordered): + row[column] = value + rows.append((series_name, row)) + + if not rows: + columns = ["field", "start_time", "end_time", "count"] + if self._index.model != MODEL_TREE: + columns.insert(0, "table") + columns.extend(self._collect_tag_columns()) + return pd.DataFrame(columns=columns) + + index = [name for name, _ in rows] + data = [row for _, row in rows] + df = pd.DataFrame(data, index=index) + + # Stable, predictable column order: leading bookkeeping, then tags. + leading = ["field", "start_time", "end_time", "count"] + if self._index.model != MODEL_TREE: + leading.insert(0, "table") + tag_order = list(self._collect_tag_columns()) + ordered_columns = leading + [c for c in tag_order if c in df.columns] + # Preserve any extra columns at the end (defensive against schema drift). + for extra in df.columns: + if extra not in ordered_columns: + ordered_columns.append(extra) + return df.reindex(columns=ordered_columns) + def _get_timeseries(self, series_ref: SeriesRefKey) -> Timeseries: self._assert_open() series_name = self._build_series_name(series_ref) return Timeseries( series_name, - self._index.series_ref_map[series_ref], - _build_runtime_series_stats(self._index.series_ref_map[series_ref]), + self._index.series_shards[series_ref], + _build_runtime_series_stats(self._index.series_shards[series_ref]), self._assert_open, lambda: _merge_field_timestamps( - series_name, self._index.series_ref_map[series_ref] + series_name, self._index.series_shards[series_ref] ), lambda offset, limit: _read_field_by_position( - series_name, self._index.series_ref_map[series_ref], offset, limit + series_name, self._index.series_shards[series_ref], offset, limit ), ) @@ -831,7 +918,7 @@ def __getitem__(self, key): if isinstance(key, pd.Series) and key.dtype == bool: selected = [ - self._index.series_refs_ordered[idx] for idx in key.index[key] + self._index.series[idx] for idx in key.index[key] ] return TsFileDataFrame._from_subset(self, selected) except ImportError: @@ -840,12 +927,12 @@ def __getitem__(self, key): if isinstance(key, (int, np.integer)): idx = int(key) if idx < 0: - idx += len(self._index.series_refs_ordered) - if idx < 0 or idx >= len(self._index.series_refs_ordered): + idx += len(self._index.series) + if idx < 0 or idx >= len(self._index.series): raise IndexError( - f"Index {idx} out of range [0, {len(self._index.series_refs_ordered)})" + f"Index {idx} out of range [0, {len(self._index.series)})" ) - return self._get_timeseries(self._index.series_refs_ordered[idx]) + return self._get_timeseries(self._index.series[idx]) if isinstance(key, str): try: @@ -853,7 +940,9 @@ def __getitem__(self, key): except KeyError: pass - valid_columns = {"table", "field", "start_time", "end_time", "count"} + valid_columns = {"field", "start_time", "end_time", "count"} + if self._index.model != MODEL_TREE: + valid_columns.add("table") valid_columns.update(self._collect_tag_columns()) if key not in valid_columns: raise KeyError(_series_lookup_hint(key)) @@ -861,7 +950,7 @@ def __getitem__(self, key): import pandas as pd values = [] - for series_ref in self._index.series_refs_ordered: + for series_ref in self._index.series: info = self._build_series_info(series_ref) if key == "table": values.append(info["table_name"]) @@ -874,15 +963,15 @@ def __getitem__(self, key): elif key == "count": values.append(info["count"]) else: - values.append(info["tag_values"].get(key, "")) + values.append(info["tag_values"].get(key)) return pd.Series(values, name=key) if isinstance(key, slice): return TsFileDataFrame._from_subset( self, [ - self._index.series_refs_ordered[idx] - for idx in range(*key.indices(len(self._index.series_refs_ordered))) + self._index.series[idx] + for idx in range(*key.indices(len(self._index.series))) ], ) @@ -895,12 +984,12 @@ def __getitem__(self, key): ) idx = int(item) if idx < 0: - idx += len(self._index.series_refs_ordered) - if idx < 0 or idx >= len(self._index.series_refs_ordered): + idx += len(self._index.series) + if idx < 0 or idx >= len(self._index.series): raise IndexError( - f"Index {item} out of range [0, {len(self._index.series_refs_ordered)})" + f"Index {item} out of range [0, {len(self._index.series)})" ) - selected.append(self._index.series_refs_ordered[idx]) + selected.append(self._index.series[idx]) return TsFileDataFrame._from_subset(self, selected) raise TypeError(f"Unsupported key type: {type(key)}") @@ -911,7 +1000,7 @@ def loc(self): def _collect_tag_columns(self) -> List[str]: seen = {} - for table_name, _ in self._index.device_order: + for table_name, _ in self._index.devices: for column in self._index.table_entries[table_name].tag_columns: seen.setdefault(column, True) return list(seen.keys()) @@ -930,25 +1019,27 @@ def _preview_indices( def _format_table(self, indices=None, max_rows: int = 20) -> str: if indices is None: - indices = list(range(len(self._index.series_refs_ordered))) + indices = list(range(len(self._index.series))) else: indices = list(indices) preview_indices, truncated, split_index = self._preview_indices( indices, max_rows ) + is_tree = self._index.model == MODEL_TREE rows = [] for idx in preview_indices: - series_ref = self._index.series_refs_ordered[idx] + series_ref = self._index.series[idx] info = self._build_series_info(series_ref) row = { "index": idx, - "table": info["table_name"], "field": info["field"], "start_time": info["min_time"], "end_time": info["max_time"], "count": info["count"], } + if not is_tree: + row["table"] = info["table_name"] row.update(info["tag_values"]) rows.append(row) @@ -958,14 +1049,21 @@ def _format_table(self, indices=None, max_rows: int = 20) -> str: total_count=len(indices), truncated=truncated, split_index=split_index, + is_table_model=not is_tree, ) def _repr_header(self) -> str: - total = len(self._index.series_refs_ordered) + total = len(self._index.series) + model_marker = self._index.model if self._is_view: - return f"TsFileDataFrame({total} time series, subset of {len(self._root._index.series_refs_ordered)})\n" - return f"TsFileDataFrame({total} time series, {len(self._paths)} files)\n" - + return ( + f"TsFileDataFrame({model_marker} model, {total} time series, " + f"subset of {len(self._root._index.series)})\n" + ) + return ( + f"TsFileDataFrame({model_marker} model, {total} time series, " + f"{len(self._paths)} files)\n" + ) def __repr__(self): return self._repr_header() + self._format_table() diff --git a/python/tsfile/dataset/formatting.py b/python/tsfile/dataset/formatting.py index 8c387c102..b813b6d8a 100644 --- a/python/tsfile/dataset/formatting.py +++ b/python/tsfile/dataset/formatting.py @@ -106,8 +106,14 @@ def format_dataframe_table( total_count: int, truncated: bool = False, split_index: Optional[int] = None, + is_table_model: bool = True, ) -> str: - """Render the metadata table used by TsFileDataFrame.__repr__.""" + """Render the metadata table used by TsFileDataFrame.__repr__. + + When ``is_table_model`` is False (tree-model layout) the leading + ``table`` column is omitted; ``tag_columns`` is then expected to carry the + tree-model headers (e.g. ``_col_1``, ``_col_2``, ...). + """ if not rows: return "Empty TsFileDataFrame" @@ -115,23 +121,31 @@ def format_dataframe_table( for row in rows: rendered = { "index": row["index"], - "table": row["table"], "field": row["field"], "start_time": format_timestamp(row["start_time"]), "end_time": format_timestamp(row["end_time"]), "count": row["count"], } + if is_table_model: + rendered["table"] = row["table"] for tag_col in tag_columns: - rendered[tag_col] = row.get(tag_col, "") + value = row.get(tag_col) + rendered[tag_col] = "None" if value is None else value rendered_rows.append(rendered) - headers = ["", "table"] + tag_columns + ["field", "start_time", "end_time", "count"] + headers = [""] + if is_table_model: + headers.append("table") + headers.extend(tag_columns) + headers.extend(["field", "start_time", "end_time", "count"]) + widths = {header: len(header) for header in headers} widths[""] = max(len(str(row["index"])) for row in rendered_rows) for row in rendered_rows: widths[""] = max(widths[""], len(str(row["index"]))) - widths["table"] = max(widths["table"], len(row["table"])) + if is_table_model: + widths["table"] = max(widths["table"], len(row["table"])) widths["field"] = max(widths["field"], len(row["field"])) widths["start_time"] = max(widths["start_time"], len(row["start_time"])) widths["end_time"] = max(widths["end_time"], len(row["end_time"])) @@ -144,10 +158,9 @@ def format_dataframe_table( for row_idx, row in enumerate(rendered_rows): if truncated and row_idx == split: lines.append("...") - parts = [ - str(row["index"]).rjust(widths[""]), - row["table"].rjust(widths["table"]), - ] + parts = [str(row["index"]).rjust(widths[""])] + if is_table_model: + parts.append(row["table"].rjust(widths["table"])) for tag_col in tag_columns: parts.append(str(row[tag_col]).rjust(widths[tag_col])) parts.extend( diff --git a/python/tsfile/dataset/metadata.py b/python/tsfile/dataset/metadata.py index 125bb00c2..95c6cace6 100644 --- a/python/tsfile/dataset/metadata.py +++ b/python/tsfile/dataset/metadata.py @@ -20,7 +20,7 @@ from dataclasses import dataclass, field import sys -from typing import Any, Dict, Iterable, Iterator, List, Tuple +from typing import Any, Dict, Iterable, Iterator, List, NamedTuple, Tuple from ..constants import TSDataType @@ -29,6 +29,17 @@ _DATACLASS_SLOTS = {"slots": True} if sys.version_info >= (3, 10) else {} +class SeriesStats(NamedTuple): + """Statistics for a single time series.""" + + length: int + min_time: int + max_time: int + timeline_length: int + timeline_min_time: int + timeline_max_time: int + + @dataclass(**_DATACLASS_SLOTS) class TableEntry: """Schema-level metadata shared by every device in one table.""" @@ -77,7 +88,7 @@ class MetadataCatalog: sparse_device_ids_by_compressed_path: Dict[ Tuple[int, Tuple[str, ...]], List[int] ] = field(default_factory=dict) - series_stats_by_ref: Dict[Tuple[int, int], Dict[str, int]] = field( + series_stats_by_ref: Dict[Tuple[int, int], SeriesStats] = field( default_factory=dict ) diff --git a/python/tsfile/dataset/reader.py b/python/tsfile/dataset/reader.py index 4899b2bf9..2fcd18579 100644 --- a/python/tsfile/dataset/reader.py +++ b/python/tsfile/dataset/reader.py @@ -29,11 +29,14 @@ from ..tsfile_reader import TsFileReaderPy from .metadata import ( MetadataCatalog, + SeriesStats, build_series_path, - iter_series_refs, resolve_series_path, ) +MODEL_TABLE = "table" +MODEL_TREE = "tree" + _NUMERIC_FIELD_TYPES = { TSDataType.BOOLEAN, TSDataType.INT32, @@ -80,6 +83,12 @@ def __init__(self, file_path: str, show_progress: bool = True): except Exception as e: raise ValueError(f"Failed to open TsFile: {e}") from e + # Probe the file model: an empty table-schema map signals tree model + self._table_schemas = self._reader.get_all_table_schemas() + self._model_kind: str = ( + MODEL_TREE if not self._table_schemas else MODEL_TABLE + ) + self._catalog = MetadataCatalog() self._cache_metadata() @@ -90,20 +99,24 @@ def __del__(self): def catalog(self) -> MetadataCatalog: return self._catalog + @property + def model_kind(self) -> str: + return self._model_kind + @property def series_paths(self) -> List[str]: return list(self.iter_series_paths()) @property def series_count(self) -> int: - return self._catalog.series_count + return len(self._catalog.series_stats_by_ref) def iter_series_paths(self) -> Iterator[str]: - for device_id, field_idx in iter_series_refs(self._catalog): + for device_id, field_idx in self._catalog.series_stats_by_ref: yield build_series_path(self._catalog, device_id, field_idx) def iter_series_refs(self) -> Iterator[Tuple[str, int, int]]: - for device_id, field_idx in iter_series_refs(self._catalog): + for device_id, field_idx in self._catalog.series_stats_by_ref: yield build_series_path( self._catalog, device_id, field_idx ), device_id, field_idx @@ -118,8 +131,10 @@ def close(self): def _cache_metadata(self): """Wrap metadata discovery so reader construction surfaces one stable error shape.""" try: - self._cache_metadata_table_model() - # todo: we should support tree model + if self._model_kind == MODEL_TABLE: + self._cache_metadata_table_model() + else: + self._cache_metadata_tree_model() except Exception as e: raise ValueError( f"Failed to read TsFile metadata. Please ensure the TsFile is valid and readable. Error: {e}" @@ -127,7 +142,7 @@ def _cache_metadata(self): def _cache_metadata_table_model(self): """Build the in-memory catalog from table schemas and native metadata.""" - table_schemas = self._reader.get_all_table_schemas() + table_schemas = self._table_schemas if not table_schemas: raise ValueError("No tables found in TsFile") @@ -191,18 +206,13 @@ def _cache_metadata_table_model(self): for field_idx, field_name in enumerate(table_entry.field_columns): field_stats = stats_by_field.get(field_name) if field_stats is None: - self._catalog.series_stats_by_ref[(device_id, field_idx)] = { - "length": 0, - "min_time": None, - "max_time": None, - "timeline_length": 0, - "timeline_min_time": None, - "timeline_max_time": None, - } - else: - self._catalog.series_stats_by_ref[(device_id, field_idx)] = ( - field_stats - ) + # Schema declares this field but the device never + # wrote it (or wrote it entirely as NaN). Skip -- + # the dataset surface only carries real series. + continue + self._catalog.series_stats_by_ref[(device_id, field_idx)] = ( + field_stats + ) if self.show_progress: sys.stderr.write( @@ -217,6 +227,111 @@ def _cache_metadata_table_model(self): ) sys.stderr.flush() + def _cache_metadata_tree_model(self): + """Build the in-memory catalog by synthesizing one virtual table. + + Tree TsFiles have no schema, so we materialize a single + ``TableEntry``: table name = the shared root segment, tag columns + = ``_col_1..._col_{N_max}`` (one per remaining path segment), + fields = union of measurements across devices. Per-device + ownership is preserved by registering only the + ``(device_id, field_idx)`` pairs actually written on disk in + ``series_stats_by_ref``. + """ + metadata_groups = self._reader.get_timeseries_metadata(None) + if not metadata_groups: + raise ValueError("No devices found in tree-model TsFile") + + # 1) Walk every device once to collect: root-segment, max depth, and + # the union of measurements that pass the numeric filter. + root_name = None + max_depth = 0 # segments after the root (i.e. virtual tag depth) + device_specs = [] # list of (tail_segments, group, stats_by_field) + union_fields = [] # ordered union of measurement names + seen_field_names = set() + + for device_path, group in metadata_groups.items(): + if not device_path: + continue + full_segments = tuple(device_path.split(".")) + if not full_segments: + continue + current_root = full_segments[0] + if root_name is None: + root_name = current_root + elif current_root != root_name: + raise ValueError( + f"Tree-model TsFile contains multiple root segments: " + f"{root_name!r} vs {current_root!r}. A single load set " + f"must share one tree root." + ) + + tail = full_segments[1:] + depth = len(tail) + if depth > max_depth: + max_depth = depth + + stats_by_field = self._metadata_field_stats(group) + if not stats_by_field: + continue + for measurement in stats_by_field.keys(): + if measurement not in seen_field_names: + seen_field_names.add(measurement) + union_fields.append(measurement) + device_specs.append((tail, group, stats_by_field)) + + if root_name is None: + raise ValueError("No devices found in tree-model TsFile") + if not union_fields: + raise ValueError("No numeric measurements found in tree-model TsFile") + + # 2) Materialize the synthetic table entry. Tag columns are 1-based + # so the rendered headers match the requirement ("_col_1"). + tag_columns = tuple(f"_col_{i + 1}" for i in range(max_depth)) + tag_types = (TSDataType.STRING,) * max_depth + + self._catalog = MetadataCatalog() + table_id = self._catalog.add_table( + root_name, tag_columns, tag_types, union_fields + ) + table_entry = self._catalog.table_entries[table_id] + + # 3) Stable device order: keep input iteration order so the dataset + # layer's row index stays deterministic across reloads. + total = len(device_specs) + if self.show_progress: + sys.stderr.write(f"\rReading TsFile metadata: 0/{total} devices") + sys.stderr.flush() + + for idx, (tail, group, stats_by_field) in enumerate(device_specs, start=1): + device_stats = self._metadata_device_stats(group) + if device_stats is None: + continue + # Pad shorter devices with None to length max_depth; the catalog + # normalizer strips trailing Nones so this never enters the + # sparse-tag bookkeeping. + padded = tuple(list(tail) + [None] * (max_depth - len(tail))) + device_id = self._add_device( + table_id, padded, device_stats["min_time"], device_stats["max_time"] + ) + for measurement, field_stats in stats_by_field.items(): + field_idx = table_entry.get_field_index(measurement) + self._catalog.series_stats_by_ref[(device_id, field_idx)] = ( + field_stats + ) + if self.show_progress and (idx % 64 == 0 or idx == total): + sys.stderr.write( + f"\rReading TsFile metadata: {idx}/{total} devices" + ) + sys.stderr.flush() + + if self.show_progress: + sys.stderr.write( + f"\rReading TsFile metadata (tree): {total} device(s), " + f"{self.series_count} series ... done\n" + ) + sys.stderr.flush() + @staticmethod def _metadata_device_stats(group) -> dict: """Derive cheap device-level metadata hints from native field statistics. @@ -256,28 +371,29 @@ def _metadata_tag_values(group, tag_count: int) -> tuple: return tuple(values) @staticmethod - def _metadata_field_stats(group) -> Dict[str, dict]: - stats = {} + def _metadata_field_stats(group) -> Dict[str, SeriesStats]: + """Collect per-measurement stats for cells that have real values. + + A measurement appears in the result iff its native ``statistic`` block + is populated and reports a positive ``row_count``. Columns that the + device never wrote (Tablet skip / all-NaN pandas column) carry no + real values and are intentionally absent -- the dataset layer + surfaces only series that physically exist. + """ + stats: Dict[str, SeriesStats] = {} for timeseries in group.timeseries: statistic = timeseries.statistic - timeline_statistic = timeseries.timeline_statistic - if ( - not timeline_statistic.has_statistic - or timeline_statistic.row_count <= 0 - ): + if not statistic.has_statistic or statistic.row_count <= 0: continue - stats[timeseries.measurement_name] = { - "length": int(statistic.row_count) if statistic.has_statistic else 0, - "min_time": ( - int(statistic.start_time) if statistic.has_statistic else None - ), - "max_time": ( - int(statistic.end_time) if statistic.has_statistic else None - ), - "timeline_length": int(timeline_statistic.row_count), - "timeline_min_time": int(timeline_statistic.start_time), - "timeline_max_time": int(timeline_statistic.end_time), - } + timeline_statistic = timeseries.timeline_statistic + stats[timeseries.measurement_name] = SeriesStats( + length=int(statistic.row_count), + min_time=int(statistic.start_time), + max_time=int(statistic.end_time), + timeline_length=int(timeline_statistic.row_count), + timeline_min_time=int(timeline_statistic.start_time), + timeline_max_time=int(timeline_statistic.end_time), + ) return stats def _add_device( @@ -317,12 +433,12 @@ def get_series_info_by_ref(self, device_id: int, field_idx: int) -> dict: ) field_stats = self._catalog.series_stats_by_ref[(device_id, field_idx)] return { - "length": field_stats["length"], - "min_time": field_stats["min_time"], - "max_time": field_stats["max_time"], - "timeline_length": field_stats["timeline_length"], - "timeline_min_time": field_stats["timeline_min_time"], - "timeline_max_time": field_stats["timeline_max_time"], + "length": field_stats.length, + "min_time": field_stats.min_time, + "max_time": field_stats.max_time, + "timeline_length": field_stats.timeline_length, + "timeline_min_time": field_stats.timeline_min_time, + "timeline_max_time": field_stats.timeline_max_time, "table_name": table_entry.table_name, "column_name": field_name, "device_id": device_id, @@ -362,6 +478,13 @@ def read_series_by_row( table_entry, device_entry, field_name = self._resolve_series_ref( device_id, field_idx ) + + if self._model_kind == MODEL_TREE: + device_path = self._build_tree_device_path(table_entry, device_entry) + return self._read_series_by_row_tree( + device_path, field_name, offset, limit + ) + tag_values = dict(zip(table_entry.tag_columns, device_entry.tag_values)) tag_filter = _build_exact_tag_filter(tag_values) if tag_values else None @@ -403,6 +526,60 @@ def read_series_by_row( return timestamp_parts[0], value_parts[0] return np.concatenate(timestamp_parts), np.concatenate(value_parts) + def _read_series_by_row_tree( + self, device_path: str, field_name: str, offset: int, limit: int + ) -> Tuple[np.ndarray, np.ndarray]: + """Tree-model row read: scan on-tree result, filter device, apply offset/limit.""" + target_path_segments = device_path.split(".") + # +1 because cwrapper prepends the root as an extra col_i cell. + expected_path_len = max( + len(t.tag_columns) for t in self._catalog.table_entries + ) + 1 + timestamps = [] + values = [] + skipped = 0 + with self._reader.query_table_on_tree([field_name]) as result_set: + md = result_set.get_metadata() + num_cols = md.get_column_num() + col_names = [md.get_column_name(i + 1) for i in range(num_cols)] + try: + field_idx = col_names.index(field_name) + 1 + except ValueError: + return np.array([], dtype=np.int64), np.array([], dtype=np.float64) + all_col_indices = [ + idx + 1 + for idx, name in enumerate(col_names) + if name.startswith("col_") + ] + # Only the trailing expected_path_len col_i cells are genuine; the + # leading duplicates are stale from prior queries on this reader. + col_indices = all_col_indices[-expected_path_len:] + while result_set.next(): + row_path_segments = [ + result_set.get_value_by_index(ci) for ci in col_indices + ] + # Trim trailing Nones for the (possibly-shorter) device path. + while row_path_segments and row_path_segments[-1] is None: + row_path_segments.pop() + if row_path_segments != target_path_segments: + continue + if skipped < offset: + skipped += 1 + continue + if len(timestamps) >= limit: + break + ts = result_set.get_value_by_index(1) + raw = result_set.get_value_by_index(field_idx) + timestamps.append(int(ts)) + values.append(np.nan if raw is None else float(raw)) + + if not timestamps: + return np.array([], dtype=np.int64), np.array([], dtype=np.float64) + return ( + np.asarray(timestamps, dtype=np.int64), + np.asarray(values, dtype=np.float64), + ) + def read_device_fields_by_time_range( self, device_id: int, field_indices: List[int], start_time: int, end_time: int ) -> Tuple[np.ndarray, Dict[str, np.ndarray]]: @@ -412,7 +589,12 @@ def read_device_fields_by_time_range( requested_field_columns = [ table_entry.field_columns[field_idx] for field_idx in field_indices ] - timestamps, field_values = self._read_arrow( + if self._model_kind == MODEL_TREE: + device_path = self._build_tree_device_path(table_entry, device_entry) + return self._read_arrow_tree( + device_path, requested_field_columns, start_time, end_time + ) + return self._read_arrow_table( table_entry.table_name, requested_field_columns, table_entry.tag_columns, @@ -420,9 +602,36 @@ def read_device_fields_by_time_range( start_time, end_time, ) - return timestamps, field_values - def _read_arrow( + @staticmethod + def _build_tree_device_path(table_entry, device_entry) -> str: + """Reassemble the cwrapper-facing tree device path from catalog state. + + The native ``query_timeseries`` / ``query_tree_by_row`` APIs split the + device path on ``.`` internally, so segments themselves must not + contain ``.``. Tree-model writers enforce this convention; we surface + an explicit error if a future writer ever violates it. + """ + components = [str(table_entry.table_name)] + for value in device_entry.tag_values: + if value is None: + # Should not happen: trailing-None devices are normalized to a + # shorter tag tuple. An interior None signals a sparse-tag + # device, which is not part of the tree-model contract. + raise ValueError( + f"Tree device path cannot include a null segment: " + f"{device_entry.tag_values!r}" + ) + text = str(value) + if "." in text: + raise NotImplementedError( + f"Tree device segment with '.' is not supported by the " + f"underlying cwrapper path API: {text!r}" + ) + components.append(text) + return ".".join(components) + + def _read_arrow_table( self, table_name: str, field_columns: List[str], @@ -491,3 +700,78 @@ def _read_arrow( } return timestamps, field_values + + def _read_arrow_tree( + self, + device_path: str, + field_columns: List[str], + start_time: int, + end_time: int, + ) -> Tuple[np.ndarray, Dict[str, np.ndarray]]: + """Tree-model time-range read for one device (multi-field).""" + field_columns = list(field_columns) + if not field_columns: + return ( + np.array([], dtype=np.int64), + {}, + ) + + target_path_segments = device_path.split(".") + expected_path_len = max( + len(t.tag_columns) for t in self._catalog.table_entries + ) + 1 + timestamps = [] + value_buckets = {col: [] for col in field_columns} + + with self._reader.query_table_on_tree( + field_columns, start_time, end_time + ) as result_set: + md = result_set.get_metadata() + num_cols = md.get_column_num() + col_names = [md.get_column_name(i + 1) for i in range(num_cols)] + value_indices = {} + for col in field_columns: + try: + value_indices[col] = col_names.index(col) + 1 + except ValueError: + # Column missing (no device in file owns it). Yield empty. + return ( + np.array([], dtype=np.int64), + {col2: np.array([], dtype=np.float64) for col2 in field_columns}, + ) + all_col_indices = [ + idx + 1 + for idx, name in enumerate(col_names) + if name.startswith("col_") + ] + col_indices = all_col_indices[-expected_path_len:] + while result_set.next(): + row_path_segments = [ + result_set.get_value_by_index(ci) for ci in col_indices + ] + while row_path_segments and row_path_segments[-1] is None: + row_path_segments.pop() + if row_path_segments != target_path_segments: + continue + ts = int(result_set.get_value_by_index(1)) + # The on-tree scan already honors start/end_time at the + # cwrapper level, but defensively re-clip on the boundary. + if ts < start_time or ts > end_time: + continue + timestamps.append(ts) + for col, vidx in value_indices.items(): + raw = result_set.get_value_by_index(vidx) + value_buckets[col].append(np.nan if raw is None else float(raw)) + + if not timestamps: + return ( + np.array([], dtype=np.int64), + {col: np.array([], dtype=np.float64) for col in field_columns}, + ) + + timestamps_arr = np.asarray(timestamps, dtype=np.int64) + field_values = { + col: np.asarray(value_buckets[col], dtype=np.float64) + for col in field_columns + } + return timestamps_arr, field_values