From 1c1ca7044c6fed25b99ad087ffdbb9f5fa121df5 Mon Sep 17 00:00:00 2001 From: Young-Leo <562593859@qq.com> Date: Wed, 13 May 2026 22:08:50 +0800 Subject: [PATCH 1/7] [Python] Support tree-model TsFiles in TsFileDataFrame Detect model kind at reader open, synthesize a virtual TableEntry for tree files (root segment as table name, _col_1.._col_N for positional path-depth headers, union of all device measurements as fields), and preserve per-device measurement ownership via series_stats_by_ref so the dataset layer never registers phantom (device, field) pairs. The dataset surface (__len__, list_timeseries, __getitem__, .loc) is unchanged for both models. New public API: read-only df.model and df.list_timeseries_metadata(). Mixing table and tree files in one load set is rejected with a clear error. Tree-model reads route through query_table_on_tree with client-side device filtering, working around two cwrapper bugs (multi-segment path rejection and duplicate col_* leak across queries). Tree-mode rendering omits the leading "table" column, uses _col_i headers, prints None tag cells as "None"; the repr header now carries the model marker. Tests: 4 new tree-model tests cover metadata + repr layout, single- series and aligned reads, list_timeseries_metadata column shape, and mixed-model load rejection. All 40 tests pass. --- python/tests/test_tsfile_dataset.py | 141 ++++++++++- python/tsfile/dataset/dataframe.py | 147 ++++++++++-- python/tsfile/dataset/formatting.py | 31 ++- python/tsfile/dataset/reader.py | 352 +++++++++++++++++++++++++++- 4 files changed, 635 insertions(+), 36 deletions(-) diff --git a/python/tests/test_tsfile_dataset.py b/python/tests/test_tsfile_dataset.py index f79a6d466..8b20b660f 100644 --- a/python/tests/test_tsfile_dataset.py +++ b/python/tests/test_tsfile_dataset.py @@ -35,7 +35,7 @@ build_series_path, resolve_series_path, ) -from tsfile.dataset.reader import TsFileSeriesReader, _build_exact_tag_filter +from tsfile.dataset.reader import MODEL_TABLE, TsFileSeriesReader, _build_exact_tag_filter def _write_weather_file(path, start): @@ -247,7 +247,7 @@ def test_dataset_basic_access_patterns(tmp_path, capsys): assert list(tsdf["field"]) == ["temperature", "humidity"] - assert "TsFileDataFrame(2 time series, 2 files)" in repr(tsdf) + assert "TsFileDataFrame(table model, 2 time series, 2 files)" in repr(tsdf) aligned.show(2) assert "AlignedTimeseries(6 rows, 2 series)" in capsys.readouterr().out @@ -411,7 +411,7 @@ def fail_build_series_name(_series_ref): monkeypatch.setattr(tsdf, "_build_series_name", fail_build_series_name) rendered = repr(tsdf) - assert "TsFileDataFrame(1000 time series, 1 files)" in rendered + assert "TsFileDataFrame(table model, 1000 time series, 1 files)" in rendered assert "..." in rendered assert len(built_rows) == 20 @@ -737,6 +737,7 @@ def query_table_by_row( return _FakeResultSet(rows) reader = object.__new__(TsFileSeriesReader) + reader._model_kind = MODEL_TABLE reader._reader = _FakeNativeReader( np.arange(30, dtype=np.int64), np.arange(30, dtype=np.float64), boundary=10 ) @@ -846,6 +847,7 @@ def query_table_by_row(self, *args, **kwargs): ) reader = object.__new__(TsFileSeriesReader) + reader._model_kind = MODEL_TABLE reader._reader = _FakeNativeReader() reader._catalog = MetadataCatalog() table_id = reader._catalog.add_table( @@ -994,3 +996,136 @@ def test_dataframe_parallel_show_progress_reports_start_immediately(tmp_path, ca stderr = capsys.readouterr().err assert "Loading TsFile shards: 0/2" in stderr assert "Loading TsFile shards: 2/2 (4 series) ... done" in stderr + + +# --- Tree-model tests ------------------------------------------------------- + + +def _write_tree_file(path): + """Tree-model TsFile with two devices; the second device is shorter and + only has one of the two declared measurements, exercising None-pad + + union-field paths in the synthetic table layer. + """ + from tsfile import ( + Field, + RowRecord, + TimeseriesSchema, + TsFileWriter, + ) + + writer = TsFileWriter(str(path)) + writer.register_timeseries( + "root.ln.wf01.wt01", TimeseriesSchema("status", TSDataType.INT32) + ) + writer.register_timeseries( + "root.ln.wf01.wt01", TimeseriesSchema("temperature", TSDataType.DOUBLE) + ) + writer.register_timeseries( + "root.ln.wf02.wt02", TimeseriesSchema("status", TSDataType.INT32) + ) + for t in range(5): + writer.write_row_record( + RowRecord( + "root.ln.wf01.wt01", + t, + [ + Field("status", t, TSDataType.INT32), + Field("temperature", float(t) + 0.5, TSDataType.DOUBLE), + ], + ) + ) + writer.write_row_record( + RowRecord( + "root.ln.wf02.wt02", + t, + [Field("status", t * 2, TSDataType.INT32)], + ) + ) + writer.close() + + +def test_dataset_tree_model_metadata_and_repr(tmp_path): + path = tmp_path / "tree.tsfile" + _write_tree_file(path) + + with TsFileDataFrame(str(path), show_progress=False) as tsdf: + assert tsdf.model == "tree" + assert len(tsdf) == 3 + assert sorted(tsdf.list_timeseries()) == [ + "root.ln.wf01.wt01.status", + "root.ln.wf01.wt01.temperature", + "root.ln.wf02.wt02.status", + ] + + rendered = repr(tsdf) + # Header carries the model marker; tag headers use _col_i (1-based). + assert "TsFileDataFrame(tree model, 3 time series, 1 files)" in rendered + assert "_col_1" in rendered and "_col_2" in rendered and "_col_3" in rendered + assert "table" not in rendered.splitlines()[1] # no 'table' header + + # Metadata column projection: _col_i and field; 'table' is rejected. + assert list(tsdf["_col_1"]) == ["ln", "ln", "ln"] + assert list(tsdf["_col_3"]) == ["wt01", "wt01", "wt02"] + assert list(tsdf["field"]) == ["status", "temperature", "status"] + with pytest.raises(KeyError): + tsdf["table"] + + +def test_dataset_tree_model_series_access(tmp_path): + path = tmp_path / "tree.tsfile" + _write_tree_file(path) + + with TsFileDataFrame(str(path), show_progress=False) as tsdf: + ts = tsdf["root.ln.wf01.wt01.temperature"] + assert isinstance(ts, Timeseries) + assert ts.name == "root.ln.wf01.wt01.temperature" + assert len(ts) == 5 + np.testing.assert_array_equal(ts.timestamps, np.arange(5, dtype=np.int64)) + # __getitem__ slice routes through _read_series_by_row_tree. + first_three = ts[0:3] + np.testing.assert_array_equal(first_three, np.array([0.5, 1.5, 2.5])) + + # Aligned read across two co-located series. + aligned = tsdf.loc[0:5, [ + "root.ln.wf01.wt01.temperature", + "root.ln.wf01.wt01.status", + ]] + assert isinstance(aligned, AlignedTimeseries) + assert aligned.shape == (5, 2) + np.testing.assert_array_equal(aligned.timestamps, np.arange(5, dtype=np.int64)) + + +def test_dataset_tree_model_list_timeseries_metadata(tmp_path): + path = tmp_path / "tree.tsfile" + _write_tree_file(path) + + with TsFileDataFrame(str(path), show_progress=False) as tsdf: + meta = tsdf.list_timeseries_metadata() + assert isinstance(meta, pd.DataFrame) + assert list(meta.columns) == [ + "field", + "start_time", + "end_time", + "count", + "_col_1", + "_col_2", + "_col_3", + ] + assert sorted(meta.index.tolist()) == sorted(tsdf.list_timeseries()) + # Time bounds surface as pandas.Timestamp for ergonomic comparison. + assert pd.api.types.is_datetime64_any_dtype(meta["start_time"]) + assert pd.api.types.is_datetime64_any_dtype(meta["end_time"]) + # Per-series count comes from the catalog, not the synthetic union. + assert meta.loc["root.ln.wf01.wt01.temperature", "count"] == 5 + assert meta.loc["root.ln.wf02.wt02.status", "count"] == 5 + + +def test_dataset_rejects_mixed_model_load(tmp_path): + table_path = tmp_path / "weather.tsfile" + tree_path = tmp_path / "tree.tsfile" + _write_weather_file(table_path, 0) + _write_tree_file(tree_path) + + with pytest.raises(ValueError, match="Mixed table-model and tree-model"): + TsFileDataFrame([str(table_path), str(tree_path)], show_progress=False) + diff --git a/python/tsfile/dataset/dataframe.py b/python/tsfile/dataset/dataframe.py index 40149102a..f0a8acc22 100644 --- a/python/tsfile/dataset/dataframe.py +++ b/python/tsfile/dataset/dataframe.py @@ -23,7 +23,7 @@ import heapq import os import sys -from typing import Dict, List, Set, Tuple, Union +from typing import Dict, List, Optional, Set, Tuple, Union import warnings import numpy as np @@ -52,11 +52,18 @@ # multiple shards. _OVERLAP_ROW_CHUNK_SIZE = 256 +MODEL_TABLE = "table" +MODEL_TREE = "tree" + @dataclass(**_DATACLASS_SLOTS) class _LogicalIndex: """Cross-reader logical mapping for devices and series.""" + # Model kind for the entire load set: "table" or "tree". A single + # TsFileDataFrame is not allowed to mix table-model and tree-model files. + model: Optional[str] = None + # Shared table schema references keyed by table name. table_entries: Dict[str, TableEntry] = field(default_factory=dict) @@ -149,6 +156,17 @@ def _register_reader( reader, ) -> None: """Merge one reader's catalog into the dataframe-wide logical index.""" + cur_tsfile_model = reader.model_kind + if index.model is None: + index.model = cur_tsfile_model + elif index.model != cur_tsfile_model: + raise ValueError( + f"Mixed table-model and tree-model TsFiles detected. The first " + f"loaded file is {index.model!r} but '{file_path}' is " + f"{cur_tsfile_model!r}. A single TsFileDataFrame load set must be " + f"entirely table-model or entirely tree-model." + ) + readers[file_path] = reader catalog = reader.catalog @@ -184,12 +202,26 @@ def _register_reader( ).append(device_idx) index.device_refs[device_idx].append((reader, device_id)) - for field_idx in range(len(table_entry.field_columns)): - series_ref = (device_idx, field_idx) - if series_ref not in index.series_ref_map: - index.series_refs_ordered.append(series_ref) - index.series_ref_map[series_ref] = [] - index.series_ref_map[series_ref].append((reader, device_id, field_idx)) + # Register only the (device, field) pairs that actually have stats. For + # table-model catalogs every declared (device, field) is populated (with + # empty placeholders when missing), so this preserves the prior behavior. + # For tree-model catalogs each device only owns a subset of the synthetic + # table's union field columns, so this prevents phantom series. + iter_owned = getattr(reader, "iter_owned_series_refs", None) + if iter_owned is None: + owned_pairs = list(catalog.series_stats_by_ref.keys()) + else: + owned_pairs = list(iter_owned()) + for device_id, field_idx in owned_pairs: + device_entry = catalog.device_entries[device_id] + table_entry = catalog.table_entries[device_entry.table_id] + device_key = (table_entry.table_name, tuple(device_entry.tag_values)) + device_idx = index.device_index_by_key[device_key] + series_ref = (device_idx, field_idx) + if series_ref not in index.series_ref_map: + index.series_refs_ordered.append(series_ref) + index.series_ref_map[series_ref] = [] + index.series_ref_map[series_ref].append((reader, device_id, field_idx)) def _build_device_entry(refs: List[DeviceRef]) -> dict: @@ -594,6 +626,7 @@ def _from_subset( obj._show_progress = parent._show_progress obj._readers = parent._readers obj._index = _LogicalIndex( + model=parent._index.model, table_entries=parent._index.table_entries, device_order=parent._index.device_order, device_index_by_key=parent._index.device_index_by_key, @@ -771,11 +804,20 @@ def _build_series_info(self, series_ref: SeriesRefKey) -> dict: device_idx, field_idx = series_ref device_key, table_entry, _ = self._get_series_components(series_ref) field_stats = self._cache.field_stats[series_ref] + # Pad short tag tuples (tree-model devices whose path is shorter than + # the synthetic table's max depth) with None so positional access by + # `_col_i` index always lands on a defined cell. + tag_values_ordered = list(device_key[1]) + if len(tag_values_ordered) < len(table_entry.tag_columns): + tag_values_ordered.extend( + [None] * (len(table_entry.tag_columns) - len(tag_values_ordered)) + ) return { "table_name": table_entry.table_name, "field": table_entry.field_columns[field_idx], "tag_columns": table_entry.tag_columns, - "tag_values": dict(zip(table_entry.tag_columns, device_key[1])), + "tag_values": dict(zip(table_entry.tag_columns, tag_values_ordered)), + "tag_values_ordered": tag_values_ordered, "min_time": field_stats["min_time"], "max_time": field_stats["max_time"], "count": field_stats["count"], @@ -784,6 +826,12 @@ def _build_series_info(self, series_ref: SeriesRefKey) -> dict: def __len__(self) -> int: return len(self._index.series_refs_ordered) + @property + def model(self) -> str: + """Return ``"table"`` or ``"tree"`` for the current load set. + """ + return self._index.model + def list_timeseries(self, path_prefix: str = "") -> List[str]: if not path_prefix: return [ @@ -809,6 +857,66 @@ def list_timeseries(self, path_prefix: str = "") -> List[str]: matched.append(self._build_series_name(series_ref)) return matched + def list_timeseries_metadata(self, path_prefix: str = ""): + """Return a pandas DataFrame of per-series metadata. + + The returned frame is indexed by the logical series name and includes + per-series ``field``, time-bound (start/end) statistics, observation + ``count``, and the per-device tag values (named ``_col_1``, ``_col_2``, + ... in tree mode, or by their declared tag-column names in table + mode). Time bounds are exposed as ``pandas.Timestamp`` for ergonomic + comparison; ``count`` is an integer. + + ``path_prefix`` filters by the same logical-path prefix semantics as + ``list_timeseries`` (no prefix returns the full catalog). + """ + import pandas as pd + + # Reuse list_timeseries to apply prefix filtering, then map names back + # to the underlying series_ref (this respects view subsetting too). + names = self.list_timeseries(path_prefix) + + rows = [] + for series_name in names: + series_ref = self._resolve_series_name(series_name) + info = self._build_series_info(series_ref) + row = { + "field": info["field"], + "start_time": pd.to_datetime(info["min_time"], unit="ms"), + "end_time": pd.to_datetime(info["max_time"], unit="ms"), + "count": int(info["count"]), + } + if self._index.model != MODEL_TREE: + row["table"] = info["table_name"] + tag_columns = info["tag_columns"] + tag_values_ordered = info["tag_values_ordered"] + for column, value in zip(tag_columns, tag_values_ordered): + row[column] = value + rows.append((series_name, row)) + + if not rows: + columns = ["field", "start_time", "end_time", "count"] + if self._index.model != MODEL_TREE: + columns.insert(0, "table") + columns.extend(self._collect_tag_columns()) + return pd.DataFrame(columns=columns) + + index = [name for name, _ in rows] + data = [row for _, row in rows] + df = pd.DataFrame(data, index=pd.Index(index, name="series")) + + # Stable, predictable column order: leading bookkeeping, then tags. + leading = ["field", "start_time", "end_time", "count"] + if self._index.model != MODEL_TREE: + leading.insert(0, "table") + tag_order = list(self._collect_tag_columns()) + ordered_columns = leading + [c for c in tag_order if c in df.columns] + # Preserve any extra columns at the end (defensive against schema drift). + for extra in df.columns: + if extra not in ordered_columns: + ordered_columns.append(extra) + return df.reindex(columns=ordered_columns) + def _get_timeseries(self, series_ref: SeriesRefKey) -> Timeseries: self._assert_open() series_name = self._build_series_name(series_ref) @@ -853,7 +961,9 @@ def __getitem__(self, key): except KeyError: pass - valid_columns = {"table", "field", "start_time", "end_time", "count"} + valid_columns = {"field", "start_time", "end_time", "count"} + if self._index.model != MODEL_TREE: + valid_columns.add("table") valid_columns.update(self._collect_tag_columns()) if key not in valid_columns: raise KeyError(_series_lookup_hint(key)) @@ -874,7 +984,7 @@ def __getitem__(self, key): elif key == "count": values.append(info["count"]) else: - values.append(info["tag_values"].get(key, "")) + values.append(info["tag_values"].get(key)) return pd.Series(values, name=key) if isinstance(key, slice): @@ -937,18 +1047,20 @@ def _format_table(self, indices=None, max_rows: int = 20) -> str: preview_indices, truncated, split_index = self._preview_indices( indices, max_rows ) + is_tree = self._index.model == MODEL_TREE rows = [] for idx in preview_indices: series_ref = self._index.series_refs_ordered[idx] info = self._build_series_info(series_ref) row = { "index": idx, - "table": info["table_name"], "field": info["field"], "start_time": info["min_time"], "end_time": info["max_time"], "count": info["count"], } + if not is_tree: + row["table"] = info["table_name"] row.update(info["tag_values"]) rows.append(row) @@ -958,14 +1070,21 @@ def _format_table(self, indices=None, max_rows: int = 20) -> str: total_count=len(indices), truncated=truncated, split_index=split_index, + is_table_model=not is_tree, ) def _repr_header(self) -> str: total = len(self._index.series_refs_ordered) + model_marker = self._index.model if self._is_view: - return f"TsFileDataFrame({total} time series, subset of {len(self._root._index.series_refs_ordered)})\n" - return f"TsFileDataFrame({total} time series, {len(self._paths)} files)\n" - + return ( + f"TsFileDataFrame({model_marker} model, {total} time series, " + f"subset of {len(self._root._index.series_refs_ordered)})\n" + ) + return ( + f"TsFileDataFrame({model_marker} model, {total} time series, " + f"{len(self._paths)} files)\n" + ) def __repr__(self): return self._repr_header() + self._format_table() diff --git a/python/tsfile/dataset/formatting.py b/python/tsfile/dataset/formatting.py index 8c387c102..b813b6d8a 100644 --- a/python/tsfile/dataset/formatting.py +++ b/python/tsfile/dataset/formatting.py @@ -106,8 +106,14 @@ def format_dataframe_table( total_count: int, truncated: bool = False, split_index: Optional[int] = None, + is_table_model: bool = True, ) -> str: - """Render the metadata table used by TsFileDataFrame.__repr__.""" + """Render the metadata table used by TsFileDataFrame.__repr__. + + When ``is_table_model`` is False (tree-model layout) the leading + ``table`` column is omitted; ``tag_columns`` is then expected to carry the + tree-model headers (e.g. ``_col_1``, ``_col_2``, ...). + """ if not rows: return "Empty TsFileDataFrame" @@ -115,23 +121,31 @@ def format_dataframe_table( for row in rows: rendered = { "index": row["index"], - "table": row["table"], "field": row["field"], "start_time": format_timestamp(row["start_time"]), "end_time": format_timestamp(row["end_time"]), "count": row["count"], } + if is_table_model: + rendered["table"] = row["table"] for tag_col in tag_columns: - rendered[tag_col] = row.get(tag_col, "") + value = row.get(tag_col) + rendered[tag_col] = "None" if value is None else value rendered_rows.append(rendered) - headers = ["", "table"] + tag_columns + ["field", "start_time", "end_time", "count"] + headers = [""] + if is_table_model: + headers.append("table") + headers.extend(tag_columns) + headers.extend(["field", "start_time", "end_time", "count"]) + widths = {header: len(header) for header in headers} widths[""] = max(len(str(row["index"])) for row in rendered_rows) for row in rendered_rows: widths[""] = max(widths[""], len(str(row["index"]))) - widths["table"] = max(widths["table"], len(row["table"])) + if is_table_model: + widths["table"] = max(widths["table"], len(row["table"])) widths["field"] = max(widths["field"], len(row["field"])) widths["start_time"] = max(widths["start_time"], len(row["start_time"])) widths["end_time"] = max(widths["end_time"], len(row["end_time"])) @@ -144,10 +158,9 @@ def format_dataframe_table( for row_idx, row in enumerate(rendered_rows): if truncated and row_idx == split: lines.append("...") - parts = [ - str(row["index"]).rjust(widths[""]), - row["table"].rjust(widths["table"]), - ] + parts = [str(row["index"]).rjust(widths[""])] + if is_table_model: + parts.append(row["table"].rjust(widths["table"])) for tag_col in tag_columns: parts.append(str(row[tag_col]).rjust(widths[tag_col])) parts.extend( diff --git a/python/tsfile/dataset/reader.py b/python/tsfile/dataset/reader.py index 4899b2bf9..ce74bfd7b 100644 --- a/python/tsfile/dataset/reader.py +++ b/python/tsfile/dataset/reader.py @@ -30,10 +30,12 @@ from .metadata import ( MetadataCatalog, build_series_path, - iter_series_refs, resolve_series_path, ) +MODEL_TABLE = "table" +MODEL_TREE = "tree" + _NUMERIC_FIELD_TYPES = { TSDataType.BOOLEAN, TSDataType.INT32, @@ -80,6 +82,12 @@ def __init__(self, file_path: str, show_progress: bool = True): except Exception as e: raise ValueError(f"Failed to open TsFile: {e}") from e + # Probe the file model: an empty table-schema map signals tree model + self._table_schemas = self._reader.get_all_table_schemas() + self._model_kind: str = ( + MODEL_TREE if not self._table_schemas else MODEL_TABLE + ) + self._catalog = MetadataCatalog() self._cache_metadata() @@ -90,24 +98,40 @@ def __del__(self): def catalog(self) -> MetadataCatalog: return self._catalog + @property + def model_kind(self) -> str: + return self._model_kind + @property def series_paths(self) -> List[str]: return list(self.iter_series_paths()) @property def series_count(self) -> int: - return self._catalog.series_count + return len(self._catalog.series_stats_by_ref) def iter_series_paths(self) -> Iterator[str]: - for device_id, field_idx in iter_series_refs(self._catalog): + for device_id, field_idx in self.iter_owned_series_refs(): yield build_series_path(self._catalog, device_id, field_idx) def iter_series_refs(self) -> Iterator[Tuple[str, int, int]]: - for device_id, field_idx in iter_series_refs(self._catalog): + for device_id, field_idx in self.iter_owned_series_refs(): yield build_series_path( self._catalog, device_id, field_idx ), device_id, field_idx + def iter_owned_series_refs(self) -> Iterator[Tuple[int, int]]: + """Yield only ``(device_id, field_idx)`` pairs that have stats. + + Table-model catalogs populate stats for every declared field of each + device (with ``length=0`` placeholders when missing). Tree-model + catalogs only populate stats for measurements actually owned by each + device. The dataset-level cross-file index uses this iterator so it + never registers tree-model phantom (device, field) pairs. + """ + for ref in self._catalog.series_stats_by_ref.keys(): + yield ref + def close(self): if hasattr(self, "_reader"): try: @@ -118,8 +142,10 @@ def close(self): def _cache_metadata(self): """Wrap metadata discovery so reader construction surfaces one stable error shape.""" try: - self._cache_metadata_table_model() - # todo: we should support tree model + if self._model_kind == MODEL_TABLE: + self._cache_metadata_table_model() + else: + self._cache_metadata_tree_model() except Exception as e: raise ValueError( f"Failed to read TsFile metadata. Please ensure the TsFile is valid and readable. Error: {e}" @@ -127,7 +153,7 @@ def _cache_metadata(self): def _cache_metadata_table_model(self): """Build the in-memory catalog from table schemas and native metadata.""" - table_schemas = self._reader.get_all_table_schemas() + table_schemas = self._table_schemas if not table_schemas: raise ValueError("No tables found in TsFile") @@ -217,6 +243,117 @@ def _cache_metadata_table_model(self): ) sys.stderr.flush() + def _cache_metadata_tree_model(self): + """Build the in-memory catalog from native tree-model device metadata. + + Tree TsFiles do not declare table schemas. We synthesize one virtual + table whose name is the common root segment (typically ``"root"``); + each remaining device path segment maps to a synthetic tag column + ``_col_1, _col_2, ..., _col_{N_max}`` (1-based). The set of declared + fields is the union of all measurements across devices, so a single + ``TableEntry`` covers the whole file. Per-device measurement ownership + is preserved by populating ``series_stats_by_ref`` only for the + ``(device_id, field_idx)`` pairs that actually exist on disk. + + Note: the cwrapper-exposed ``segments`` tuple on each device is not the + fully tokenized path — for tree-mode devices it is just + ``(table_name, last_segment)``. The fully tokenized path is the dict + key (``device path``) so we split that ourselves. + """ + metadata_groups = self._reader.get_timeseries_metadata(None) + if not metadata_groups: + raise ValueError("No devices found in tree-model TsFile") + + # 1) Walk every device once to collect: root-segment, max depth, and + # the union of measurements that pass the numeric filter. + root_name = None + max_depth = 0 # segments after the root (i.e. virtual tag depth) + device_specs = [] # list of (tail_segments, group, stats_by_field) + union_fields = [] # ordered union of measurement names + seen_field_names = set() + + for device_path, group in metadata_groups.items(): + if not device_path: + continue + full_segments = tuple(device_path.split(".")) + if not full_segments: + continue + current_root = full_segments[0] + if root_name is None: + root_name = current_root + elif current_root != root_name: + raise ValueError( + f"Tree-model TsFile contains multiple root segments: " + f"{root_name!r} vs {current_root!r}. A single load set " + f"must share one tree root." + ) + + tail = full_segments[1:] + depth = len(tail) + if depth > max_depth: + max_depth = depth + + stats_by_field = self._metadata_field_stats(group) + if not stats_by_field: + continue + for measurement in stats_by_field.keys(): + if measurement not in seen_field_names: + seen_field_names.add(measurement) + union_fields.append(measurement) + device_specs.append((tail, group, stats_by_field)) + + if root_name is None: + raise ValueError("No devices found in tree-model TsFile") + if not union_fields: + raise ValueError("No numeric measurements found in tree-model TsFile") + + # 2) Materialize the synthetic table entry. Tag columns are 1-based + # so the rendered headers match the requirement ("_col_1"). + tag_columns = tuple(f"_col_{i + 1}" for i in range(max_depth)) + tag_types = (TSDataType.STRING,) * max_depth + + self._catalog = MetadataCatalog() + table_id = self._catalog.add_table( + root_name, tag_columns, tag_types, union_fields + ) + table_entry = self._catalog.table_entries[table_id] + + # 3) Stable device order: keep input iteration order so the dataset + # layer's row index stays deterministic across reloads. + total = len(device_specs) + if self.show_progress: + sys.stderr.write(f"\rReading TsFile metadata: 0/{total} devices") + sys.stderr.flush() + + for idx, (tail, group, stats_by_field) in enumerate(device_specs, start=1): + device_stats = self._metadata_device_stats(group) + if device_stats is None: + continue + # Pad shorter devices with None to length max_depth; the catalog + # normalizer strips trailing Nones so this never enters the + # sparse-tag bookkeeping. + padded = tuple(list(tail) + [None] * (max_depth - len(tail))) + device_id = self._add_device( + table_id, padded, device_stats["min_time"], device_stats["max_time"] + ) + for measurement, field_stats in stats_by_field.items(): + field_idx = table_entry.get_field_index(measurement) + self._catalog.series_stats_by_ref[(device_id, field_idx)] = ( + field_stats + ) + if self.show_progress and (idx % 64 == 0 or idx == total): + sys.stderr.write( + f"\rReading TsFile metadata: {idx}/{total} devices" + ) + sys.stderr.flush() + + if self.show_progress: + sys.stderr.write( + f"\rReading TsFile metadata (tree): {total} device(s), " + f"{self.series_count} series ... done\n" + ) + sys.stderr.flush() + @staticmethod def _metadata_device_stats(group) -> dict: """Derive cheap device-level metadata hints from native field statistics. @@ -362,6 +499,13 @@ def read_series_by_row( table_entry, device_entry, field_name = self._resolve_series_ref( device_id, field_idx ) + + if self._model_kind == MODEL_TREE: + device_path = self._build_tree_device_path(table_entry, device_entry) + return self._read_series_by_row_tree( + device_path, field_name, offset, limit + ) + tag_values = dict(zip(table_entry.tag_columns, device_entry.tag_values)) tag_filter = _build_exact_tag_filter(tag_values) if tag_values else None @@ -403,6 +547,74 @@ def read_series_by_row( return timestamp_parts[0], value_parts[0] return np.concatenate(timestamp_parts), np.concatenate(value_parts) + def _read_series_by_row_tree( + self, device_path: str, field_name: str, offset: int, limit: int + ) -> Tuple[np.ndarray, np.ndarray]: + """Tree-model row read: scan the on-tree result and apply offset/limit. + + Routed through ``query_table_on_tree`` because the cwrapper's direct + ``query_tree_by_row`` path currently rejects multi-segment device + paths in the installed binary (E_DEVICE_NOT_EXIST). On-tree scan + + client-side device filter + post-filter offset/limit is the most + reliable available implementation. + + Workaround note: successive ``query_table_on_tree`` invocations on + the same reader instance leak duplicate ``col_*`` columns into the + result schema (a known cwrapper bug). The genuine device-path cells + are always the *trailing* ``expected_path_len`` ``col_*`` cells of + every row, so we slice from the end instead of trusting column + names. + """ + target_path_segments = device_path.split(".") + # +1 because cwrapper prepends the root as an extra col_i cell. + expected_path_len = max( + len(t.tag_columns) for t in self._catalog.table_entries + ) + 1 + timestamps = [] + values = [] + skipped = 0 + with self._reader.query_table_on_tree([field_name]) as result_set: + md = result_set.get_metadata() + num_cols = md.get_column_num() + col_names = [md.get_column_name(i + 1) for i in range(num_cols)] + try: + field_idx = col_names.index(field_name) + 1 + except ValueError: + return np.array([], dtype=np.int64), np.array([], dtype=np.float64) + all_col_indices = [ + idx + 1 + for idx, name in enumerate(col_names) + if name.startswith("col_") + ] + # Only the trailing expected_path_len col_i cells are genuine; the + # leading duplicates are stale from prior queries on this reader. + col_indices = all_col_indices[-expected_path_len:] + while result_set.next(): + row_path_segments = [ + result_set.get_value_by_index(ci) for ci in col_indices + ] + # Trim trailing Nones for the (possibly-shorter) device path. + while row_path_segments and row_path_segments[-1] is None: + row_path_segments.pop() + if row_path_segments != target_path_segments: + continue + if skipped < offset: + skipped += 1 + continue + if len(timestamps) >= limit: + break + ts = result_set.get_value_by_index(1) + raw = result_set.get_value_by_index(field_idx) + timestamps.append(int(ts)) + values.append(np.nan if raw is None else float(raw)) + + if not timestamps: + return np.array([], dtype=np.int64), np.array([], dtype=np.float64) + return ( + np.asarray(timestamps, dtype=np.int64), + np.asarray(values, dtype=np.float64), + ) + def read_device_fields_by_time_range( self, device_id: int, field_indices: List[int], start_time: int, end_time: int ) -> Tuple[np.ndarray, Dict[str, np.ndarray]]: @@ -412,7 +624,12 @@ def read_device_fields_by_time_range( requested_field_columns = [ table_entry.field_columns[field_idx] for field_idx in field_indices ] - timestamps, field_values = self._read_arrow( + if self._model_kind == MODEL_TREE: + device_path = self._build_tree_device_path(table_entry, device_entry) + return self._read_arrow_tree( + device_path, requested_field_columns, start_time, end_time + ) + return self._read_arrow_table( table_entry.table_name, requested_field_columns, table_entry.tag_columns, @@ -420,9 +637,36 @@ def read_device_fields_by_time_range( start_time, end_time, ) - return timestamps, field_values - def _read_arrow( + @staticmethod + def _build_tree_device_path(table_entry, device_entry) -> str: + """Reassemble the cwrapper-facing tree device path from catalog state. + + The native ``query_timeseries`` / ``query_tree_by_row`` APIs split the + device path on ``.`` internally, so segments themselves must not + contain ``.``. Tree-model writers enforce this convention; we surface + an explicit error if a future writer ever violates it. + """ + components = [str(table_entry.table_name)] + for value in device_entry.tag_values: + if value is None: + # Should not happen: trailing-None devices are normalized to a + # shorter tag tuple. An interior None signals a sparse-tag + # device, which is not part of the tree-model contract. + raise ValueError( + f"Tree device path cannot include a null segment: " + f"{device_entry.tag_values!r}" + ) + text = str(value) + if "." in text: + raise NotImplementedError( + f"Tree device segment with '.' is not supported by the " + f"underlying cwrapper path API: {text!r}" + ) + components.append(text) + return ".".join(components) + + def _read_arrow_table( self, table_name: str, field_columns: List[str], @@ -491,3 +735,91 @@ def _read_arrow( } return timestamps, field_values + + def _read_arrow_tree( + self, + device_path: str, + field_columns: List[str], + start_time: int, + end_time: int, + ) -> Tuple[np.ndarray, Dict[str, np.ndarray]]: + """Tree-model time-range read for one device (multi-field). + + Routed through ``query_table_on_tree`` because the cwrapper's direct + ``query_timeseries`` (a.k.a. ``tsfile_reader_query_paths``) path + currently rejects multi-segment device paths in the installed binary. + On-tree scan + client-side device filter is the most reliable + available implementation; performance is proportional to the number + of devices in the file (not just the target device) so callers reading + many disjoint devices on a wide file will see linear cost. + + Workaround note: successive ``query_table_on_tree`` invocations leak + duplicate ``col_*`` columns into the result schema (cwrapper bug), + so we take only the trailing ``expected_path_len`` ``col_*`` cells. + """ + field_columns = list(field_columns) + if not field_columns: + return ( + np.array([], dtype=np.int64), + {}, + ) + + target_path_segments = device_path.split(".") + expected_path_len = max( + len(t.tag_columns) for t in self._catalog.table_entries + ) + 1 + timestamps = [] + value_buckets = {col: [] for col in field_columns} + + with self._reader.query_table_on_tree( + field_columns, start_time, end_time + ) as result_set: + md = result_set.get_metadata() + num_cols = md.get_column_num() + col_names = [md.get_column_name(i + 1) for i in range(num_cols)] + value_indices = {} + for col in field_columns: + try: + value_indices[col] = col_names.index(col) + 1 + except ValueError: + # Column missing (no device in file owns it). Yield empty. + return ( + np.array([], dtype=np.int64), + {col2: np.array([], dtype=np.float64) for col2 in field_columns}, + ) + all_col_indices = [ + idx + 1 + for idx, name in enumerate(col_names) + if name.startswith("col_") + ] + col_indices = all_col_indices[-expected_path_len:] + while result_set.next(): + row_path_segments = [ + result_set.get_value_by_index(ci) for ci in col_indices + ] + while row_path_segments and row_path_segments[-1] is None: + row_path_segments.pop() + if row_path_segments != target_path_segments: + continue + ts = int(result_set.get_value_by_index(1)) + # The on-tree scan already honors start/end_time at the + # cwrapper level, but defensively re-clip on the boundary. + if ts < start_time or ts > end_time: + continue + timestamps.append(ts) + for col, vidx in value_indices.items(): + raw = result_set.get_value_by_index(vidx) + value_buckets[col].append(np.nan if raw is None else float(raw)) + + if not timestamps: + return ( + np.array([], dtype=np.int64), + {col: np.array([], dtype=np.float64) for col in field_columns}, + ) + + timestamps_arr = np.asarray(timestamps, dtype=np.int64) + field_values = { + col: np.asarray(value_buckets[col], dtype=np.float64) + for col in field_columns + } + return timestamps_arr, field_values From d7148e3b7fbda9176d9095945c9d5233989a4e60 Mon Sep 17 00:00:00 2001 From: Young-Leo <562593859@qq.com> Date: Thu, 14 May 2026 12:45:08 +0800 Subject: [PATCH 2/7] perf(python/dataset): shrink _LogicalIndex memory footprint (~40%) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 1a-5 of the Python-side memory optimization for TsFileDataFrame indexes at 30k-device scale: 81.20 MB -> 48.90 MB (-32.30 MB, -39.8%). - metadata: introduce SeriesStats NamedTuple + empty_series_stats() singleton; replace per-series 6-key dict (~360 B) with NamedTuple (~120 B). [Phase 2: -10.53 MB] - dataframe: drop the _DerivedCache class entirely; inline its three members (catalog/order/path lookups) into _LogicalIndex methods computed lazily from existing data. [Phase 4: -10.85 MB] - dataframe: replace _LogicalIndex.device_refs (List[List[DeviceRef]]) with device_bounds (List[Tuple[Optional[int], Optional[int]]]) aggregated at register time so _query_aligned reads bounds in O(1) without holding per-reader DeviceRef tuples. [Phase 5: -3.42 MB] - dataframe: drop redundant series_ref_set; use series_ref_map keys for membership checks. [Phase 1a: -2.00 MB] - reader: get_series_info_by_ref reads SeriesStats attributes and exposes them as the existing dict shape (no API change for callers). 40/40 dataset tests still pass. See notes/TsFileDataFrame实施总结.md section 6 for per-phase measurement breakdown. --- python/tests/test_tsfile_dataset.py | 87 ++++++++++++------------ python/tsfile/dataset/dataframe.py | 101 ++++++++++++---------------- python/tsfile/dataset/metadata.py | 28 +++++++- python/tsfile/dataset/reader.py | 45 ++++++------- 4 files changed, 133 insertions(+), 128 deletions(-) diff --git a/python/tests/test_tsfile_dataset.py b/python/tests/test_tsfile_dataset.py index 8b20b660f..607872d3c 100644 --- a/python/tests/test_tsfile_dataset.py +++ b/python/tests/test_tsfile_dataset.py @@ -32,6 +32,7 @@ from tsfile.dataset.formatting import format_timestamp from tsfile.dataset.metadata import ( MetadataCatalog, + SeriesStats, build_series_path, resolve_series_path, ) @@ -759,14 +760,14 @@ def test_series_path_resolution_allows_prefix_tag_values(): ("temperature",), ) device_id = catalog.add_device(table_id, ("site_a", "device_a"), 0, 1) - catalog.series_stats_by_ref[(device_id, 0)] = { - "length": 1, - "min_time": 0, - "max_time": 0, - "timeline_length": 1, - "timeline_min_time": 0, - "timeline_max_time": 0, - } + catalog.series_stats_by_ref[(device_id, 0)] = SeriesStats( + length=1, + min_time=0, + max_time=0, + timeline_length=1, + timeline_min_time=0, + timeline_max_time=0, + ) series_path = build_series_path(catalog, device_id, 0) assert series_path == "weather.site_a.device_a.temperature" @@ -782,14 +783,14 @@ def test_series_path_resolution_allows_missing_trailing_tag_value(): ("temperature",), ) device_id = catalog.add_device(table_id, (), 0, 1) - catalog.series_stats_by_ref[(device_id, 0)] = { - "length": 1, - "min_time": 0, - "max_time": 0, - "timeline_length": 1, - "timeline_min_time": 0, - "timeline_max_time": 0, - } + catalog.series_stats_by_ref[(device_id, 0)] = SeriesStats( + length=1, + min_time=0, + max_time=0, + timeline_length=1, + timeline_min_time=0, + timeline_max_time=0, + ) series_path = build_series_path(catalog, device_id, 0) assert series_path == "weather.temperature" @@ -805,14 +806,14 @@ def test_series_path_resolution_uses_named_tags_for_sparse_non_prefix_values(): ("temperature",), ) device_id = catalog.add_device(table_id, (None, "device_a", None), 0, 1) - catalog.series_stats_by_ref[(device_id, 0)] = { - "length": 1, - "min_time": 0, - "max_time": 0, - "timeline_length": 1, - "timeline_min_time": 0, - "timeline_max_time": 0, - } + catalog.series_stats_by_ref[(device_id, 0)] = SeriesStats( + length=1, + min_time=0, + max_time=0, + timeline_length=1, + timeline_min_time=0, + timeline_max_time=0, + ) series_path = build_series_path(catalog, device_id, 0) assert series_path == "weather.device_a.temperature" @@ -857,14 +858,14 @@ def query_table_by_row(self, *args, **kwargs): ("temperature",), ) device_id = reader._catalog.add_device(table_id, (None, "device_a", "north"), 0, 1) - reader._catalog.series_stats_by_ref[(device_id, 0)] = { - "length": 2, - "min_time": 0, - "max_time": 1, - "timeline_length": 2, - "timeline_min_time": 0, - "timeline_max_time": 1, - } + reader._catalog.series_stats_by_ref[(device_id, 0)] = SeriesStats( + length=2, + min_time=0, + max_time=1, + timeline_length=2, + timeline_min_time=0, + timeline_max_time=1, + ) with pytest.raises(NotImplementedError, match="IS NULL / IS NOT NULL"): reader.read_series_by_ref(device_id, 0, 0, 1) @@ -888,9 +889,8 @@ def test_dataframe_resolves_named_sparse_tag_series_path(): tsdf._index.sparse_device_indices_by_compressed_path = { ("weather", ("device_a",)): [0] } - tsdf._index.device_refs = [[]] + tsdf._index.device_bounds = [(0, 1)] tsdf._index.series_refs_ordered = [(0, 0)] - tsdf._index.series_ref_set = {(0, 0)} tsdf._index.series_ref_map = {(0, 0): []} assert tsdf.list_timeseries() == ["weather.device_a.temperature"] @@ -919,9 +919,8 @@ def test_dataframe_list_timeseries_filters_named_sparse_tag_prefix(): ("weather", ("device_a",)): [0], ("weather", ("beijing", "device_b")): [1], } - tsdf._index.device_refs = [[], []] + tsdf._index.device_bounds = [(0, 1), (0, 1)] tsdf._index.series_refs_ordered = [(0, 0), (1, 0)] - tsdf._index.series_ref_set = {(0, 0), (1, 0)} tsdf._index.series_ref_map = {(0, 0): [], (1, 0): []} assert tsdf.list_timeseries("weather.device_a") == ["weather.device_a.temperature"] @@ -956,14 +955,14 @@ def test_series_path_resolution_reports_ambiguous_sparse_path(): first_id = catalog.add_device(table_id, ("beijing", None), 0, 1) second_id = catalog.add_device(table_id, (None, "beijing"), 0, 1) for device_id in (first_id, second_id): - catalog.series_stats_by_ref[(device_id, 0)] = { - "length": 1, - "min_time": 0, - "max_time": 0, - "timeline_length": 1, - "timeline_min_time": 0, - "timeline_max_time": 0, - } + catalog.series_stats_by_ref[(device_id, 0)] = SeriesStats( + length=1, + min_time=0, + max_time=0, + timeline_length=1, + timeline_min_time=0, + timeline_max_time=0, + ) assert build_series_path(catalog, first_id, 0) == "weather.beijing.temperature" assert build_series_path(catalog, second_id, 0) == "weather.beijing.temperature" diff --git a/python/tsfile/dataset/dataframe.py b/python/tsfile/dataset/dataframe.py index f0a8acc22..2152e7d12 100644 --- a/python/tsfile/dataset/dataframe.py +++ b/python/tsfile/dataset/dataframe.py @@ -41,7 +41,6 @@ DeviceKey = Tuple[str, tuple] SeriesRefKey = Tuple[int, int] SeriesRef = Tuple[object, int, int] -DeviceRef = Tuple[object, int] _QUERY_START = np.iinfo(np.int64).min _QUERY_END = np.iinfo(np.int64).max @@ -78,23 +77,21 @@ class _LogicalIndex: sparse_device_indices_by_compressed_path: Dict[ Tuple[str, Tuple[str, ...]], List[int] ] = field(default_factory=dict) - # For each logical device, keep the contributing reader-local device refs. - device_refs: List[List[DeviceRef]] = field(default_factory=list) + # Per-logical-device aggregated time bounds (min_time, max_time). Replaces + # the previous list of contributing reader-local device refs which existed + # only to recompute these bounds at query time. Aggregating once at + # register time costs one tuple per device but drops the per-device + # contributing-ref list. + device_bounds: List[Tuple[Optional[int], Optional[int]]] = field( + default_factory=list + ) # Stable logical series order, each item is (device_idx, field_idx). series_refs_ordered: List[SeriesRefKey] = field(default_factory=list) # Map one logical series ref to the contributing reader-local series refs. + # Insertion order matches `series_refs_ordered`; doubles as an O(1) + # membership probe so we no longer need a redundant set. series_ref_map: Dict[SeriesRefKey, List[SeriesRef]] = field(default_factory=dict) - # Fast membership check for resolved series refs. - series_ref_set: Set[SeriesRefKey] = field(default_factory=set) - - -@dataclass(**_DATACLASS_SLOTS) -class _DerivedCache: - """Merged metadata derived from the logical index.""" - - devices: List[dict] = field(default_factory=list) - field_stats: Dict[SeriesRefKey, dict] = field(default_factory=dict) def _expand_paths(paths: Union[str, List[str]]) -> List[str]: @@ -185,7 +182,9 @@ def _register_reader( device_idx = len(index.device_order) index.device_index_by_key[device_key] = device_idx index.device_order.append(device_key) - index.device_refs.append([]) + index.device_bounds.append( + (device_entry.min_time, device_entry.max_time) + ) if any(value is None for value in device_entry.tag_values): index.tables_with_sparse_tag_values.add(table_entry.table_name) compressed_components = tuple( @@ -200,7 +199,19 @@ def _register_reader( index.sparse_device_indices_by_compressed_path.setdefault( compressed_key, [] ).append(device_idx) - index.device_refs[device_idx].append((reader, device_id)) + else: + cur_min, cur_max = index.device_bounds[device_idx] + new_min = ( + device_entry.min_time + if cur_min is None + else min(cur_min, device_entry.min_time) + ) + new_max = ( + device_entry.max_time + if cur_max is None + else max(cur_max, device_entry.max_time) + ) + index.device_bounds[device_idx] = (new_min, new_max) # Register only the (device, field) pairs that actually have stats. For # table-model catalogs every declared (device, field) is populated (with @@ -224,24 +235,6 @@ def _register_reader( index.series_ref_map[series_ref].append((reader, device_id, field_idx)) -def _build_device_entry(refs: List[DeviceRef]) -> dict: - """Compute per-device time bounds from cheap metadata only. - - We intentionally do not validate duplicates at the device level because - table-model fields do not necessarily share one complete timestamp axis. - Duplicate detection stays on the logical-series paths that materialize or - merge one field's timestamps. - """ - infos = [reader.get_device_info(device_id) for reader, device_id in refs] - min_time = min(info["min_time"] for info in infos) - max_time = max(info["max_time"] for info in infos) - - return { - "min_time": min_time, - "max_time": max_time, - } - - def _build_runtime_series_stats(refs: List[SeriesRef]) -> dict: """Build shared-timeline series stats from native timeline metadata.""" min_time = None @@ -546,11 +539,11 @@ def _query_aligned( groups = defaultdict(list) for col_idx, series_ref in enumerate(series_refs): device_idx, field_idx = series_ref - device_info = self._df._cache.devices[device_idx] + min_time_dev, max_time_dev = self._df._index.device_bounds[device_idx] if ( - device_info["max_time"] is None - or device_info["max_time"] < start_time - or device_info["min_time"] > end_time + max_time_dev is None + or max_time_dev < start_time + or (min_time_dev is not None and min_time_dev > end_time) ): continue @@ -608,7 +601,6 @@ def __init__(self, paths: Union[str, List[str]], show_progress: bool = True): self._show_progress = show_progress self._readers: Dict[str, object] = {} self._index = _LogicalIndex() - self._cache = _DerivedCache() self._is_view = False self._root = None self._closed = False @@ -625,18 +617,20 @@ def _from_subset( obj._paths = parent._paths obj._show_progress = parent._show_progress obj._readers = parent._readers + # Reuse the parent's full mapping but restrict the membership scope to + # the requested subset, otherwise resolving a series name outside the + # subset would unexpectedly succeed via the shared dict. + subset_refs = list(series_refs) + parent_map = parent._index.series_ref_map + subset_map = {ref: parent_map[ref] for ref in subset_refs} obj._index = _LogicalIndex( model=parent._index.model, table_entries=parent._index.table_entries, device_order=parent._index.device_order, device_index_by_key=parent._index.device_index_by_key, - device_refs=parent._index.device_refs, - series_refs_ordered=list(series_refs), - series_ref_map=parent._index.series_ref_map, - series_ref_set=set(series_refs), - ) - obj._cache = _DerivedCache( - devices=parent._cache.devices, field_stats=parent._cache.field_stats + device_bounds=parent._index.device_bounds, + series_refs_ordered=subset_refs, + series_ref_map=subset_map, ) obj._closed = False return obj @@ -657,15 +651,6 @@ def _load_metadata(self): else: self._load_metadata_serial(TsFileSeriesReader) - self._cache.devices = [ - _build_device_entry(refs) for refs in self._index.device_refs - ] - for series_ref in self._index.series_refs_ordered: - self._cache.field_stats[series_ref] = _build_field_stats( - self._index.series_ref_map[series_ref] - ) - - self._index.series_ref_set = set(self._index.series_refs_ordered) if not self._index.series_refs_ordered: raise ValueError("No valid time series found in the provided TsFile files") @@ -796,14 +781,16 @@ def _resolve_series_name(self, series_name: str) -> SeriesRefKey: device_idx = candidate_indices[0] series_ref = (device_idx, field_idx) - if series_ref not in self._index.series_ref_set: + if series_ref not in self._index.series_ref_map: raise KeyError(_series_lookup_hint(series_name)) return series_ref def _build_series_info(self, series_ref: SeriesRefKey) -> dict: device_idx, field_idx = series_ref device_key, table_entry, _ = self._get_series_components(series_ref) - field_stats = self._cache.field_stats[series_ref] + # Aggregate per-shard timeline stats lazily; storing this for every + # series at load time burns ~10 MB / 30k devices. + field_stats = _build_field_stats(self._index.series_ref_map[series_ref]) # Pad short tag tuples (tree-model devices whose path is shorter than # the synthetic table's max depth) with None so positional access by # `_col_i` index always lands on a defined cell. @@ -828,8 +815,6 @@ def __len__(self) -> int: @property def model(self) -> str: - """Return ``"table"`` or ``"tree"`` for the current load set. - """ return self._index.model def list_timeseries(self, path_prefix: str = "") -> List[str]: diff --git a/python/tsfile/dataset/metadata.py b/python/tsfile/dataset/metadata.py index 125bb00c2..cc163a694 100644 --- a/python/tsfile/dataset/metadata.py +++ b/python/tsfile/dataset/metadata.py @@ -20,7 +20,7 @@ from dataclasses import dataclass, field import sys -from typing import Any, Dict, Iterable, Iterator, List, Tuple +from typing import Any, Dict, Iterable, Iterator, List, NamedTuple, Optional, Tuple from ..constants import TSDataType @@ -29,6 +29,30 @@ _DATACLASS_SLOTS = {"slots": True} if sys.version_info >= (3, 10) else {} +class SeriesStats(NamedTuple): + """Per-series statistics stored in ``MetadataCatalog.series_stats_by_ref``. + + A 6-field ``NamedTuple`` is ~5x smaller than the equivalent ``dict`` and + drops attribute access cost to a tuple slot read. Either the ``length`` + or ``timeline_*`` group may be ``None`` for placeholder rows. + """ + + length: int + min_time: Optional[int] + max_time: Optional[int] + timeline_length: int + timeline_min_time: Optional[int] + timeline_max_time: Optional[int] + + +_EMPTY_SERIES_STATS = SeriesStats(0, None, None, 0, None, None) + + +def empty_series_stats() -> SeriesStats: + """Return the canonical placeholder for a missing field on a device.""" + return _EMPTY_SERIES_STATS + + @dataclass(**_DATACLASS_SLOTS) class TableEntry: """Schema-level metadata shared by every device in one table.""" @@ -77,7 +101,7 @@ class MetadataCatalog: sparse_device_ids_by_compressed_path: Dict[ Tuple[int, Tuple[str, ...]], List[int] ] = field(default_factory=dict) - series_stats_by_ref: Dict[Tuple[int, int], Dict[str, int]] = field( + series_stats_by_ref: Dict[Tuple[int, int], SeriesStats] = field( default_factory=dict ) diff --git a/python/tsfile/dataset/reader.py b/python/tsfile/dataset/reader.py index ce74bfd7b..348ee5ce6 100644 --- a/python/tsfile/dataset/reader.py +++ b/python/tsfile/dataset/reader.py @@ -29,7 +29,9 @@ from ..tsfile_reader import TsFileReaderPy from .metadata import ( MetadataCatalog, + SeriesStats, build_series_path, + empty_series_stats, resolve_series_path, ) @@ -217,14 +219,9 @@ def _cache_metadata_table_model(self): for field_idx, field_name in enumerate(table_entry.field_columns): field_stats = stats_by_field.get(field_name) if field_stats is None: - self._catalog.series_stats_by_ref[(device_id, field_idx)] = { - "length": 0, - "min_time": None, - "max_time": None, - "timeline_length": 0, - "timeline_min_time": None, - "timeline_max_time": None, - } + self._catalog.series_stats_by_ref[(device_id, field_idx)] = ( + empty_series_stats() + ) else: self._catalog.series_stats_by_ref[(device_id, field_idx)] = ( field_stats @@ -393,8 +390,8 @@ def _metadata_tag_values(group, tag_count: int) -> tuple: return tuple(values) @staticmethod - def _metadata_field_stats(group) -> Dict[str, dict]: - stats = {} + def _metadata_field_stats(group) -> Dict[str, SeriesStats]: + stats: Dict[str, SeriesStats] = {} for timeseries in group.timeseries: statistic = timeseries.statistic timeline_statistic = timeseries.timeline_statistic @@ -403,18 +400,18 @@ def _metadata_field_stats(group) -> Dict[str, dict]: or timeline_statistic.row_count <= 0 ): continue - stats[timeseries.measurement_name] = { - "length": int(statistic.row_count) if statistic.has_statistic else 0, - "min_time": ( + stats[timeseries.measurement_name] = SeriesStats( + length=int(statistic.row_count) if statistic.has_statistic else 0, + min_time=( int(statistic.start_time) if statistic.has_statistic else None ), - "max_time": ( + max_time=( int(statistic.end_time) if statistic.has_statistic else None ), - "timeline_length": int(timeline_statistic.row_count), - "timeline_min_time": int(timeline_statistic.start_time), - "timeline_max_time": int(timeline_statistic.end_time), - } + timeline_length=int(timeline_statistic.row_count), + timeline_min_time=int(timeline_statistic.start_time), + timeline_max_time=int(timeline_statistic.end_time), + ) return stats def _add_device( @@ -454,12 +451,12 @@ def get_series_info_by_ref(self, device_id: int, field_idx: int) -> dict: ) field_stats = self._catalog.series_stats_by_ref[(device_id, field_idx)] return { - "length": field_stats["length"], - "min_time": field_stats["min_time"], - "max_time": field_stats["max_time"], - "timeline_length": field_stats["timeline_length"], - "timeline_min_time": field_stats["timeline_min_time"], - "timeline_max_time": field_stats["timeline_max_time"], + "length": field_stats.length, + "min_time": field_stats.min_time, + "max_time": field_stats.max_time, + "timeline_length": field_stats.timeline_length, + "timeline_min_time": field_stats.timeline_min_time, + "timeline_max_time": field_stats.timeline_max_time, "table_name": table_entry.table_name, "column_name": field_name, "device_id": device_id, From 383abc6fb2caaafac734b84ffe2bd2e787401aae Mon Sep 17 00:00:00 2001 From: Young-Leo <562593859@qq.com> Date: Thu, 14 May 2026 19:01:40 +0800 Subject: [PATCH 3/7] perf(python/dataset): drop table-model phantom cells (Phase 6) Unify table/tree-model semantics so the dataset surface only carries real series. Previously the table-model branch padded series_stats_by_ref with empty_series_stats() placeholders for every (device, field) cell declared in the schema but never written, producing a Cartesian-product index that grew linearly with schema width even when most cells were empty. After this change, both models populate series_stats_by_ref only for cells with statistic.row_count > 0. The dataset view becomes 'real devices x real fields', matching the principle that 'as many devices (and series) exist as were physically written'. Changes: - metadata: SeriesStats fields tighten from Optional[int] to int; delete _EMPTY_SERIES_STATS constant and empty_series_stats() helper; drop unused Optional import. - reader: _metadata_field_stats now filters on statistic.has_statistic + row_count > 0 (was timeline_statistic); placeholder branch in _cache_metadata_table_model removed; iter_owned_series_refs docstring rewritten as a single unified description for both models. - tests: new test_dataset_omits_table_model_phantom_series_for_skipped_cells proves Tablet-skipped cells stay out of list_timeseries / __len__ / series_ref_map and tsdf[skipped_path] raises KeyError. Bench impact: - 30k devices x 1 field full-density (existing bench): 0 change (no phantoms existed in this scenario). - 5k devices x 5 fields x 60% density (new sparse bench): series_ref_map 15.93 -> 9.55 MB, series_stats_by_ref 7.53 -> 4.51 MB, tracked total ~26.5 -> 16.30 MB (~38% reduction). 41/41 dataset tests pass. --- python/tests/test_tsfile_dataset.py | 62 +++++++++++++++++++++++++++++ python/tsfile/dataset/metadata.py | 23 ++++------- python/tsfile/dataset/reader.py | 56 +++++++++++++------------- 3 files changed, 99 insertions(+), 42 deletions(-) diff --git a/python/tests/test_tsfile_dataset.py b/python/tests/test_tsfile_dataset.py index 607872d3c..c7ed73662 100644 --- a/python/tests/test_tsfile_dataset.py +++ b/python/tests/test_tsfile_dataset.py @@ -439,6 +439,68 @@ def test_dataset_exposes_only_numeric_fields_and_keeps_nan(tmp_path): assert series[1:1].shape == (0,) +def test_dataset_omits_table_model_phantom_series_for_skipped_cells(tmp_path): + """Schema-declared fields that a device never wrote must NOT appear. + + The dataset surface treats a series as "data physically written for one + (device, field) pair". A Tablet that skips ``add_value_by_name`` for a + column produces a chunk with ``length=0``; that cell is not a real series + and must not be exposed via ``list_timeseries`` / ``len(tsdf)`` / + ``series_ref_map`` -- table-model and tree-model behave identically here. + """ + from tsfile import Tablet + + path = tmp_path / "sparse_table.tsfile" + schema = TableSchema( + "bench", + [ + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("v1", TSDataType.DOUBLE, ColumnCategory.FIELD), + ColumnSchema("v2", TSDataType.DOUBLE, ColumnCategory.FIELD), + ColumnSchema("v3", TSDataType.DOUBLE, ColumnCategory.FIELD), + ], + ) + with TsFileTableWriter(str(path), schema) as writer: + # d1: writes only v1 and v2 (skip v3) + t1 = Tablet( + ["device", "v1", "v2", "v3"], + [TSDataType.STRING, TSDataType.DOUBLE, TSDataType.DOUBLE, TSDataType.DOUBLE], + 1, + ) + t1.add_timestamp(0, 1) + t1.add_value_by_name("device", 0, "d1") + t1.add_value_by_name("v1", 0, 100.0) + t1.add_value_by_name("v2", 0, 200.0) + writer.write_table(t1) + + # d2: writes only v1 and v3 (skip v2) + t2 = Tablet( + ["device", "v1", "v2", "v3"], + [TSDataType.STRING, TSDataType.DOUBLE, TSDataType.DOUBLE, TSDataType.DOUBLE], + 1, + ) + t2.add_timestamp(0, 2) + t2.add_value_by_name("device", 0, "d2") + t2.add_value_by_name("v1", 0, 110.0) + t2.add_value_by_name("v3", 0, 330.0) + writer.write_table(t2) + + with TsFileDataFrame(str(path), show_progress=False) as tsdf: + # 4 real cells: (d1,v1), (d1,v2), (d2,v1), (d2,v3); NO phantoms. + assert len(tsdf) == 4 + assert sorted(tsdf.list_timeseries()) == [ + "bench.d1.v1", + "bench.d1.v2", + "bench.d2.v1", + "bench.d2.v3", + ] + + with pytest.raises(KeyError): + tsdf["bench.d1.v3"] + with pytest.raises(KeyError): + tsdf["bench.d2.v2"] + + def test_dataset_timeseries_supports_negative_step_slices(tmp_path): path = tmp_path / "weather.tsfile" _write_weather_file(path, 0) diff --git a/python/tsfile/dataset/metadata.py b/python/tsfile/dataset/metadata.py index cc163a694..52104d638 100644 --- a/python/tsfile/dataset/metadata.py +++ b/python/tsfile/dataset/metadata.py @@ -20,7 +20,7 @@ from dataclasses import dataclass, field import sys -from typing import Any, Dict, Iterable, Iterator, List, NamedTuple, Optional, Tuple +from typing import Any, Dict, Iterable, Iterator, List, NamedTuple, Tuple from ..constants import TSDataType @@ -33,24 +33,17 @@ class SeriesStats(NamedTuple): """Per-series statistics stored in ``MetadataCatalog.series_stats_by_ref``. A 6-field ``NamedTuple`` is ~5x smaller than the equivalent ``dict`` and - drops attribute access cost to a tuple slot read. Either the ``length`` - or ``timeline_*`` group may be ``None`` for placeholder rows. + drops attribute access cost to a tuple slot read. Only real series with + ``length > 0`` are stored; the catalog never holds schema-only phantoms, + so every field below is always populated with a concrete value. """ length: int - min_time: Optional[int] - max_time: Optional[int] + min_time: int + max_time: int timeline_length: int - timeline_min_time: Optional[int] - timeline_max_time: Optional[int] - - -_EMPTY_SERIES_STATS = SeriesStats(0, None, None, 0, None, None) - - -def empty_series_stats() -> SeriesStats: - """Return the canonical placeholder for a missing field on a device.""" - return _EMPTY_SERIES_STATS + timeline_min_time: int + timeline_max_time: int @dataclass(**_DATACLASS_SLOTS) diff --git a/python/tsfile/dataset/reader.py b/python/tsfile/dataset/reader.py index 348ee5ce6..94ff11171 100644 --- a/python/tsfile/dataset/reader.py +++ b/python/tsfile/dataset/reader.py @@ -31,7 +31,6 @@ MetadataCatalog, SeriesStats, build_series_path, - empty_series_stats, resolve_series_path, ) @@ -123,13 +122,15 @@ def iter_series_refs(self) -> Iterator[Tuple[str, int, int]]: ), device_id, field_idx def iter_owned_series_refs(self) -> Iterator[Tuple[int, int]]: - """Yield only ``(device_id, field_idx)`` pairs that have stats. - - Table-model catalogs populate stats for every declared field of each - device (with ``length=0`` placeholders when missing). Tree-model - catalogs only populate stats for measurements actually owned by each - device. The dataset-level cross-file index uses this iterator so it - never registers tree-model phantom (device, field) pairs. + """Yield only ``(device_id, field_idx)`` pairs that physically exist. + + The dataset layer treats a series as the unit of "data that was + actually written for one device on one field". A field declared in a + table schema but skipped (or written entirely as NaN) for some device + is intentionally absent here -- both table-model and tree-model + readers populate ``series_stats_by_ref`` only for cells with + ``length > 0`` so the cross-file index never registers schema-only + phantom (device, field) pairs. """ for ref in self._catalog.series_stats_by_ref.keys(): yield ref @@ -219,13 +220,13 @@ def _cache_metadata_table_model(self): for field_idx, field_name in enumerate(table_entry.field_columns): field_stats = stats_by_field.get(field_name) if field_stats is None: - self._catalog.series_stats_by_ref[(device_id, field_idx)] = ( - empty_series_stats() - ) - else: - self._catalog.series_stats_by_ref[(device_id, field_idx)] = ( - field_stats - ) + # Schema declares this field but the device never + # wrote it (or wrote it entirely as NaN). Skip -- + # the dataset surface only carries real series. + continue + self._catalog.series_stats_by_ref[(device_id, field_idx)] = ( + field_stats + ) if self.show_progress: sys.stderr.write( @@ -391,23 +392,24 @@ def _metadata_tag_values(group, tag_count: int) -> tuple: @staticmethod def _metadata_field_stats(group) -> Dict[str, SeriesStats]: + """Collect per-measurement stats for cells that have real values. + + A measurement appears in the result iff its native ``statistic`` block + is populated and reports a positive ``row_count``. Columns that the + device never wrote (Tablet skip / all-NaN pandas column) carry no + real values and are intentionally absent -- the dataset layer + surfaces only series that physically exist. + """ stats: Dict[str, SeriesStats] = {} for timeseries in group.timeseries: statistic = timeseries.statistic - timeline_statistic = timeseries.timeline_statistic - if ( - not timeline_statistic.has_statistic - or timeline_statistic.row_count <= 0 - ): + if not statistic.has_statistic or statistic.row_count <= 0: continue + timeline_statistic = timeseries.timeline_statistic stats[timeseries.measurement_name] = SeriesStats( - length=int(statistic.row_count) if statistic.has_statistic else 0, - min_time=( - int(statistic.start_time) if statistic.has_statistic else None - ), - max_time=( - int(statistic.end_time) if statistic.has_statistic else None - ), + length=int(statistic.row_count), + min_time=int(statistic.start_time), + max_time=int(statistic.end_time), timeline_length=int(timeline_statistic.row_count), timeline_min_time=int(timeline_statistic.start_time), timeline_max_time=int(timeline_statistic.end_time), From 2ff5c8c2902315741afa1d17c8ea5cd41c39d4f4 Mon Sep 17 00:00:00 2001 From: Young-Leo <562593859@qq.com> Date: Thu, 14 May 2026 19:08:48 +0800 Subject: [PATCH 4/7] refactor(python/dataset): inline iter_owned_series_refs After Phase 6 unified table/tree-model semantics, iter_owned_series_refs became a no-op wrapper around catalog.series_stats_by_ref.keys(). Its docstring still existed only to explain the (now-impossible) phantom- series concern. Remove the method and inline direct iteration at the 3 call sites: - reader.iter_series_paths / iter_series_refs: iterate the catalog dict directly (matches the pattern already used by series_count). - dataframe._register_reader: drop the getattr fallback (which existed only to handle a hypothetical reader without iter_owned_series_refs); iterate catalog.series_stats_by_ref directly. Net: -22 / +9 lines, no behavior change. 41/41 dataset tests pass. --- python/tsfile/dataset/dataframe.py | 16 +++++----------- python/tsfile/dataset/reader.py | 18 ++---------------- 2 files changed, 7 insertions(+), 27 deletions(-) diff --git a/python/tsfile/dataset/dataframe.py b/python/tsfile/dataset/dataframe.py index 2152e7d12..d5b303f0a 100644 --- a/python/tsfile/dataset/dataframe.py +++ b/python/tsfile/dataset/dataframe.py @@ -213,17 +213,11 @@ def _register_reader( ) index.device_bounds[device_idx] = (new_min, new_max) - # Register only the (device, field) pairs that actually have stats. For - # table-model catalogs every declared (device, field) is populated (with - # empty placeholders when missing), so this preserves the prior behavior. - # For tree-model catalogs each device only owns a subset of the synthetic - # table's union field columns, so this prevents phantom series. - iter_owned = getattr(reader, "iter_owned_series_refs", None) - if iter_owned is None: - owned_pairs = list(catalog.series_stats_by_ref.keys()) - else: - owned_pairs = list(iter_owned()) - for device_id, field_idx in owned_pairs: + # Register every (device, field) pair the reader physically holds. + # ``series_stats_by_ref`` is populated only for cells with real data + # (see ``_metadata_field_stats``), so this never produces phantom + # series for either model. + for device_id, field_idx in catalog.series_stats_by_ref: device_entry = catalog.device_entries[device_id] table_entry = catalog.table_entries[device_entry.table_id] device_key = (table_entry.table_name, tuple(device_entry.tag_values)) diff --git a/python/tsfile/dataset/reader.py b/python/tsfile/dataset/reader.py index 94ff11171..c3b8a5f18 100644 --- a/python/tsfile/dataset/reader.py +++ b/python/tsfile/dataset/reader.py @@ -112,29 +112,15 @@ def series_count(self) -> int: return len(self._catalog.series_stats_by_ref) def iter_series_paths(self) -> Iterator[str]: - for device_id, field_idx in self.iter_owned_series_refs(): + for device_id, field_idx in self._catalog.series_stats_by_ref: yield build_series_path(self._catalog, device_id, field_idx) def iter_series_refs(self) -> Iterator[Tuple[str, int, int]]: - for device_id, field_idx in self.iter_owned_series_refs(): + for device_id, field_idx in self._catalog.series_stats_by_ref: yield build_series_path( self._catalog, device_id, field_idx ), device_id, field_idx - def iter_owned_series_refs(self) -> Iterator[Tuple[int, int]]: - """Yield only ``(device_id, field_idx)`` pairs that physically exist. - - The dataset layer treats a series as the unit of "data that was - actually written for one device on one field". A field declared in a - table schema but skipped (or written entirely as NaN) for some device - is intentionally absent here -- both table-model and tree-model - readers populate ``series_stats_by_ref`` only for cells with - ``length > 0`` so the cross-file index never registers schema-only - phantom (device, field) pairs. - """ - for ref in self._catalog.series_stats_by_ref.keys(): - yield ref - def close(self): if hasattr(self, "_reader"): try: From 5365dad6ab33522b8c5f20aa21c136c6a6111d3d Mon Sep 17 00:00:00 2001 From: Young-Leo <562593859@qq.com> Date: Thu, 14 May 2026 22:47:31 +0800 Subject: [PATCH 5/7] refactor(python/dataset): rename _LogicalIndex to _DataFrameCatalog Rename the cross-file unified index to _DataFrameCatalog and shorten its field names for clarity: - device_order -> devices - device_index_by_key -> device_index - device_bounds -> device_time_bounds - series_refs_ordered -> series - series_ref_map -> series_shards Also slim the SeriesStats docstring in metadata.py and update tests to the new names. No behavior change. --- python/tests/test_tsfile_dataset.py | 30 +++--- python/tsfile/dataset/dataframe.py | 153 ++++++++++++++-------------- python/tsfile/dataset/metadata.py | 8 +- 3 files changed, 90 insertions(+), 101 deletions(-) diff --git a/python/tests/test_tsfile_dataset.py b/python/tests/test_tsfile_dataset.py index c7ed73662..b150103a5 100644 --- a/python/tests/test_tsfile_dataset.py +++ b/python/tests/test_tsfile_dataset.py @@ -387,7 +387,7 @@ def test_dataset_repr_only_builds_preview_rows(tmp_path, monkeypatch): _write_weather_file(path, 0) with TsFileDataFrame(str(path), show_progress=False) as tsdf: - tsdf._index.series_refs_ordered = [(0, 0)] * 1000 + tsdf._index.series = [(0, 0)] * 1000 built_rows = [] @@ -446,7 +446,7 @@ def test_dataset_omits_table_model_phantom_series_for_skipped_cells(tmp_path): (device, field) pair". A Tablet that skips ``add_value_by_name`` for a column produces a chunk with ``length=0``; that cell is not a real series and must not be exposed via ``list_timeseries`` / ``len(tsdf)`` / - ``series_ref_map`` -- table-model and tree-model behave identically here. + ``series_shards`` -- table-model and tree-model behave identically here. """ from tsfile import Tablet @@ -937,7 +937,7 @@ def query_table_by_row(self, *args, **kwargs): def test_dataframe_resolves_named_sparse_tag_series_path(): tsdf = object.__new__(TsFileDataFrame) - tsdf._index = dataframe_module._LogicalIndex() + tsdf._index = dataframe_module._DataFrameCatalog() tsdf._index.table_entries["weather"] = dataframe_module.TableEntry( table_name="weather", tag_columns=("city", "device", "region"), @@ -945,15 +945,15 @@ def test_dataframe_resolves_named_sparse_tag_series_path(): field_columns=("temperature",), ) device_key = ("weather", (None, "device_a")) - tsdf._index.device_order = [device_key] - tsdf._index.device_index_by_key = {device_key: 0} + tsdf._index.devices = [device_key] + tsdf._index.device_index = {device_key: 0} tsdf._index.tables_with_sparse_tag_values = {"weather"} tsdf._index.sparse_device_indices_by_compressed_path = { ("weather", ("device_a",)): [0] } - tsdf._index.device_bounds = [(0, 1)] - tsdf._index.series_refs_ordered = [(0, 0)] - tsdf._index.series_ref_map = {(0, 0): []} + tsdf._index.device_time_bounds = [(0, 1)] + tsdf._index.series = [(0, 0)] + tsdf._index.series_shards = {(0, 0): []} assert tsdf.list_timeseries() == ["weather.device_a.temperature"] assert tsdf._resolve_series_name("weather.device_a.temperature") == (0, 0) @@ -961,18 +961,18 @@ def test_dataframe_resolves_named_sparse_tag_series_path(): def test_dataframe_list_timeseries_filters_named_sparse_tag_prefix(): tsdf = object.__new__(TsFileDataFrame) - tsdf._index = dataframe_module._LogicalIndex() + tsdf._index = dataframe_module._DataFrameCatalog() tsdf._index.table_entries["weather"] = dataframe_module.TableEntry( table_name="weather", tag_columns=("city", "device", "region"), tag_types=(TSDataType.STRING, TSDataType.STRING, TSDataType.STRING), field_columns=("temperature",), ) - tsdf._index.device_order = [ + tsdf._index.devices = [ ("weather", (None, "device_a")), ("weather", ("beijing", "device_b")), ] - tsdf._index.device_index_by_key = { + tsdf._index.device_index = { ("weather", (None, "device_a")): 0, ("weather", ("beijing", "device_b")): 1, } @@ -981,9 +981,9 @@ def test_dataframe_list_timeseries_filters_named_sparse_tag_prefix(): ("weather", ("device_a",)): [0], ("weather", ("beijing", "device_b")): [1], } - tsdf._index.device_bounds = [(0, 1), (0, 1)] - tsdf._index.series_refs_ordered = [(0, 0), (1, 0)] - tsdf._index.series_ref_map = {(0, 0): [], (1, 0): []} + tsdf._index.device_time_bounds = [(0, 1), (0, 1)] + tsdf._index.series = [(0, 0), (1, 0)] + tsdf._index.series_shards = {(0, 0): [], (1, 0): []} assert tsdf.list_timeseries("weather.device_a") == ["weather.device_a.temperature"] @@ -995,7 +995,7 @@ def test_dataframe_list_timeseries_prefix_can_skip_full_name_build( _write_weather_file(path, 0) with TsFileDataFrame(str(path), show_progress=False) as tsdf: - tsdf._index.series_refs_ordered = [(0, 0)] * 1000 + tsdf._index.series = [(0, 0)] * 1000 def fail_build_series_name(_series_ref): raise AssertionError( diff --git a/python/tsfile/dataset/dataframe.py b/python/tsfile/dataset/dataframe.py index d5b303f0a..7c8e6555b 100644 --- a/python/tsfile/dataset/dataframe.py +++ b/python/tsfile/dataset/dataframe.py @@ -56,8 +56,10 @@ @dataclass(**_DATACLASS_SLOTS) -class _LogicalIndex: - """Cross-reader logical mapping for devices and series.""" +class _DataFrameCatalog: + """TsFileDataFrame's cross-file unified catalog: merges each tsfile's + ``MetadataCatalog`` into one user-facing global view. + """ # Model kind for the entire load set: "table" or "tree". A single # TsFileDataFrame is not allowed to mix table-model and tree-model files. @@ -67,9 +69,9 @@ class _LogicalIndex: table_entries: Dict[str, TableEntry] = field(default_factory=dict) # Stable logical device order, each item is (table_name, tag_values). - device_order: List[DeviceKey] = field(default_factory=list) + devices: List[DeviceKey] = field(default_factory=list) # Map one logical device key to its dataframe-local device index. - device_index_by_key: Dict[DeviceKey, int] = field(default_factory=dict) + device_index: Dict[DeviceKey, int] = field(default_factory=dict) # Tables that need sparse compressed-path lookup because some devices # contain non-trailing missing tag values. tables_with_sparse_tag_values: Set[str] = field(default_factory=set) @@ -77,21 +79,17 @@ class _LogicalIndex: sparse_device_indices_by_compressed_path: Dict[ Tuple[str, Tuple[str, ...]], List[int] ] = field(default_factory=dict) - # Per-logical-device aggregated time bounds (min_time, max_time). Replaces - # the previous list of contributing reader-local device refs which existed - # only to recompute these bounds at query time. Aggregating once at - # register time costs one tuple per device but drops the per-device - # contributing-ref list. - device_bounds: List[Tuple[Optional[int], Optional[int]]] = field( + # Aggregated (min_time, max_time) per logical device, computed once at + # registration so query-time lookups are O(1). + device_time_bounds: List[Tuple[Optional[int], Optional[int]]] = field( default_factory=list ) # Stable logical series order, each item is (device_idx, field_idx). - series_refs_ordered: List[SeriesRefKey] = field(default_factory=list) - # Map one logical series ref to the contributing reader-local series refs. - # Insertion order matches `series_refs_ordered`; doubles as an O(1) - # membership probe so we no longer need a redundant set. - series_ref_map: Dict[SeriesRefKey, List[SeriesRef]] = field(default_factory=dict) + series: List[SeriesRefKey] = field(default_factory=list) + # For each logical series: which shards hold it, plus its (device_id, + # field_idx) coordinates inside each shard. + series_shards: Dict[SeriesRefKey, List[SeriesRef]] = field(default_factory=dict) def _expand_paths(paths: Union[str, List[str]]) -> List[str]: @@ -148,7 +146,7 @@ def _validate_table_schema( def _register_reader( readers: Dict[str, object], - index: _LogicalIndex, + index: _DataFrameCatalog, file_path: str, reader, ) -> None: @@ -177,12 +175,12 @@ def _register_reader( for device_id, device_entry in enumerate(catalog.device_entries): table_entry = catalog.table_entries[device_entry.table_id] device_key = (table_entry.table_name, tuple(device_entry.tag_values)) - device_idx = index.device_index_by_key.get(device_key) + device_idx = index.device_index.get(device_key) if device_idx is None: - device_idx = len(index.device_order) - index.device_index_by_key[device_key] = device_idx - index.device_order.append(device_key) - index.device_bounds.append( + device_idx = len(index.devices) + index.device_index[device_key] = device_idx + index.devices.append(device_key) + index.device_time_bounds.append( (device_entry.min_time, device_entry.max_time) ) if any(value is None for value in device_entry.tag_values): @@ -200,7 +198,7 @@ def _register_reader( compressed_key, [] ).append(device_idx) else: - cur_min, cur_max = index.device_bounds[device_idx] + cur_min, cur_max = index.device_time_bounds[device_idx] new_min = ( device_entry.min_time if cur_min is None @@ -211,22 +209,19 @@ def _register_reader( if cur_max is None else max(cur_max, device_entry.max_time) ) - index.device_bounds[device_idx] = (new_min, new_max) + index.device_time_bounds[device_idx] = (new_min, new_max) # Register every (device, field) pair the reader physically holds. - # ``series_stats_by_ref`` is populated only for cells with real data - # (see ``_metadata_field_stats``), so this never produces phantom - # series for either model. for device_id, field_idx in catalog.series_stats_by_ref: device_entry = catalog.device_entries[device_id] table_entry = catalog.table_entries[device_entry.table_id] device_key = (table_entry.table_name, tuple(device_entry.tag_values)) - device_idx = index.device_index_by_key[device_key] + device_idx = index.device_index[device_key] series_ref = (device_idx, field_idx) - if series_ref not in index.series_ref_map: - index.series_refs_ordered.append(series_ref) - index.series_ref_map[series_ref] = [] - index.series_ref_map[series_ref].append((reader, device_id, field_idx)) + if series_ref not in index.series_shards: + index.series.append(series_ref) + index.series_shards[series_ref] = [] + index.series_shards[series_ref].append((reader, device_id, field_idx)) def _build_runtime_series_stats(refs: List[SeriesRef]) -> dict: @@ -506,10 +501,10 @@ def _parse_key(self, key): if isinstance(item, (int, np.integer)): idx = int(item) if idx < 0: - idx += len(self._df._index.series_refs_ordered) - if idx < 0 or idx >= len(self._df._index.series_refs_ordered): + idx += len(self._df._index.series) + if idx < 0 or idx >= len(self._df._index.series): raise IndexError(f"Series index {item} out of range") - series_ref = self._df._index.series_refs_ordered[idx] + series_ref = self._df._index.series[idx] elif isinstance(item, str): series_ref = self._df._resolve_series_name(item) else: @@ -533,7 +528,9 @@ def _query_aligned( groups = defaultdict(list) for col_idx, series_ref in enumerate(series_refs): device_idx, field_idx = series_ref - min_time_dev, max_time_dev = self._df._index.device_bounds[device_idx] + min_time_dev, max_time_dev = self._df._index.device_time_bounds[ + device_idx + ] if ( max_time_dev is None or max_time_dev < start_time @@ -543,7 +540,7 @@ def _query_aligned( _, table_entry, _ = self._df._get_series_components(series_ref) field_name = table_entry.field_columns[field_idx] - for reader, device_id, reader_field_idx in self._df._index.series_ref_map[ + for reader, device_id, reader_field_idx in self._df._index.series_shards[ series_ref ]: groups[(id(reader), device_id)].append( @@ -594,7 +591,7 @@ def __init__(self, paths: Union[str, List[str]], show_progress: bool = True): self._paths = _expand_paths(paths) self._show_progress = show_progress self._readers: Dict[str, object] = {} - self._index = _LogicalIndex() + self._index = _DataFrameCatalog() self._is_view = False self._root = None self._closed = False @@ -612,19 +609,18 @@ def _from_subset( obj._show_progress = parent._show_progress obj._readers = parent._readers # Reuse the parent's full mapping but restrict the membership scope to - # the requested subset, otherwise resolving a series name outside the - # subset would unexpectedly succeed via the shared dict. + # the requested subset. subset_refs = list(series_refs) - parent_map = parent._index.series_ref_map - subset_map = {ref: parent_map[ref] for ref in subset_refs} - obj._index = _LogicalIndex( + parent_shards = parent._index.series_shards + subset_shards = {ref: parent_shards[ref] for ref in subset_refs} + obj._index = _DataFrameCatalog( model=parent._index.model, table_entries=parent._index.table_entries, - device_order=parent._index.device_order, - device_index_by_key=parent._index.device_index_by_key, - device_bounds=parent._index.device_bounds, - series_refs_ordered=subset_refs, - series_ref_map=subset_map, + devices=parent._index.devices, + device_index=parent._index.device_index, + device_time_bounds=parent._index.device_time_bounds, + series=subset_refs, + series_shards=subset_shards, ) obj._closed = False return obj @@ -645,7 +641,7 @@ def _load_metadata(self): else: self._load_metadata_serial(TsFileSeriesReader) - if not self._index.series_refs_ordered: + if not self._index.series: raise ValueError("No valid time series found in the provided TsFile files") def _show_loading_progress(self, done: int, total: int, total_series: int = None): @@ -716,7 +712,7 @@ def _get_series_components( self, series_ref: SeriesRefKey ) -> Tuple[DeviceKey, TableEntry, int]: device_idx, field_idx = series_ref - device_key = self._index.device_order[device_idx] + device_key = self._index.devices[device_idx] return device_key, self._index.table_entries[device_key[0]], field_idx def _build_series_name(self, series_ref: SeriesRefKey) -> str: @@ -747,7 +743,7 @@ def _resolve_series_name(self, series_name: str) -> SeriesRefKey: raise KeyError(_series_lookup_hint(series_name)) from exc tag_parts = parts[1:-1] - direct_device_idx = self._index.device_index_by_key.get( + direct_device_idx = self._index.device_index.get( (table_name, tuple(tag_parts)) ) @@ -775,16 +771,15 @@ def _resolve_series_name(self, series_name: str) -> SeriesRefKey: device_idx = candidate_indices[0] series_ref = (device_idx, field_idx) - if series_ref not in self._index.series_ref_map: + if series_ref not in self._index.series_shards: raise KeyError(_series_lookup_hint(series_name)) return series_ref def _build_series_info(self, series_ref: SeriesRefKey) -> dict: device_idx, field_idx = series_ref device_key, table_entry, _ = self._get_series_components(series_ref) - # Aggregate per-shard timeline stats lazily; storing this for every - # series at load time burns ~10 MB / 30k devices. - field_stats = _build_field_stats(self._index.series_ref_map[series_ref]) + # Aggregate per-shard timeline stats lazily on demand for this series. + field_stats = _build_field_stats(self._index.series_shards[series_ref]) # Pad short tag tuples (tree-model devices whose path is shorter than # the synthetic table's max depth) with None so positional access by # `_col_i` index always lands on a defined cell. @@ -805,7 +800,7 @@ def _build_series_info(self, series_ref: SeriesRefKey) -> dict: } def __len__(self) -> int: - return len(self._index.series_refs_ordered) + return len(self._index.series) @property def model(self) -> str: @@ -815,7 +810,7 @@ def list_timeseries(self, path_prefix: str = "") -> List[str]: if not path_prefix: return [ self._build_series_name(series_ref) - for series_ref in self._index.series_refs_ordered + for series_ref in self._index.series ] try: @@ -824,7 +819,7 @@ def list_timeseries(self, path_prefix: str = "") -> List[str]: return [] matched = [] - for series_ref in self._index.series_refs_ordered: + for series_ref in self._index.series: device_key, table_entry, field_idx = self._get_series_components(series_ref) components = build_logical_series_components( table_entry.table_name, @@ -882,7 +877,7 @@ def list_timeseries_metadata(self, path_prefix: str = ""): index = [name for name, _ in rows] data = [row for _, row in rows] - df = pd.DataFrame(data, index=pd.Index(index, name="series")) + df = pd.DataFrame(data, index=index) # Stable, predictable column order: leading bookkeeping, then tags. leading = ["field", "start_time", "end_time", "count"] @@ -901,14 +896,14 @@ def _get_timeseries(self, series_ref: SeriesRefKey) -> Timeseries: series_name = self._build_series_name(series_ref) return Timeseries( series_name, - self._index.series_ref_map[series_ref], - _build_runtime_series_stats(self._index.series_ref_map[series_ref]), + self._index.series_shards[series_ref], + _build_runtime_series_stats(self._index.series_shards[series_ref]), self._assert_open, lambda: _merge_field_timestamps( - series_name, self._index.series_ref_map[series_ref] + series_name, self._index.series_shards[series_ref] ), lambda offset, limit: _read_field_by_position( - series_name, self._index.series_ref_map[series_ref], offset, limit + series_name, self._index.series_shards[series_ref], offset, limit ), ) @@ -918,7 +913,7 @@ def __getitem__(self, key): if isinstance(key, pd.Series) and key.dtype == bool: selected = [ - self._index.series_refs_ordered[idx] for idx in key.index[key] + self._index.series[idx] for idx in key.index[key] ] return TsFileDataFrame._from_subset(self, selected) except ImportError: @@ -927,12 +922,12 @@ def __getitem__(self, key): if isinstance(key, (int, np.integer)): idx = int(key) if idx < 0: - idx += len(self._index.series_refs_ordered) - if idx < 0 or idx >= len(self._index.series_refs_ordered): + idx += len(self._index.series) + if idx < 0 or idx >= len(self._index.series): raise IndexError( - f"Index {idx} out of range [0, {len(self._index.series_refs_ordered)})" + f"Index {idx} out of range [0, {len(self._index.series)})" ) - return self._get_timeseries(self._index.series_refs_ordered[idx]) + return self._get_timeseries(self._index.series[idx]) if isinstance(key, str): try: @@ -950,7 +945,7 @@ def __getitem__(self, key): import pandas as pd values = [] - for series_ref in self._index.series_refs_ordered: + for series_ref in self._index.series: info = self._build_series_info(series_ref) if key == "table": values.append(info["table_name"]) @@ -970,8 +965,8 @@ def __getitem__(self, key): return TsFileDataFrame._from_subset( self, [ - self._index.series_refs_ordered[idx] - for idx in range(*key.indices(len(self._index.series_refs_ordered))) + self._index.series[idx] + for idx in range(*key.indices(len(self._index.series))) ], ) @@ -984,12 +979,12 @@ def __getitem__(self, key): ) idx = int(item) if idx < 0: - idx += len(self._index.series_refs_ordered) - if idx < 0 or idx >= len(self._index.series_refs_ordered): + idx += len(self._index.series) + if idx < 0 or idx >= len(self._index.series): raise IndexError( - f"Index {item} out of range [0, {len(self._index.series_refs_ordered)})" + f"Index {item} out of range [0, {len(self._index.series)})" ) - selected.append(self._index.series_refs_ordered[idx]) + selected.append(self._index.series[idx]) return TsFileDataFrame._from_subset(self, selected) raise TypeError(f"Unsupported key type: {type(key)}") @@ -1000,7 +995,7 @@ def loc(self): def _collect_tag_columns(self) -> List[str]: seen = {} - for table_name, _ in self._index.device_order: + for table_name, _ in self._index.devices: for column in self._index.table_entries[table_name].tag_columns: seen.setdefault(column, True) return list(seen.keys()) @@ -1019,7 +1014,7 @@ def _preview_indices( def _format_table(self, indices=None, max_rows: int = 20) -> str: if indices is None: - indices = list(range(len(self._index.series_refs_ordered))) + indices = list(range(len(self._index.series))) else: indices = list(indices) @@ -1029,7 +1024,7 @@ def _format_table(self, indices=None, max_rows: int = 20) -> str: is_tree = self._index.model == MODEL_TREE rows = [] for idx in preview_indices: - series_ref = self._index.series_refs_ordered[idx] + series_ref = self._index.series[idx] info = self._build_series_info(series_ref) row = { "index": idx, @@ -1053,12 +1048,12 @@ def _format_table(self, indices=None, max_rows: int = 20) -> str: ) def _repr_header(self) -> str: - total = len(self._index.series_refs_ordered) + total = len(self._index.series) model_marker = self._index.model if self._is_view: return ( f"TsFileDataFrame({model_marker} model, {total} time series, " - f"subset of {len(self._root._index.series_refs_ordered)})\n" + f"subset of {len(self._root._index.series)})\n" ) return ( f"TsFileDataFrame({model_marker} model, {total} time series, " diff --git a/python/tsfile/dataset/metadata.py b/python/tsfile/dataset/metadata.py index 52104d638..95c6cace6 100644 --- a/python/tsfile/dataset/metadata.py +++ b/python/tsfile/dataset/metadata.py @@ -30,13 +30,7 @@ class SeriesStats(NamedTuple): - """Per-series statistics stored in ``MetadataCatalog.series_stats_by_ref``. - - A 6-field ``NamedTuple`` is ~5x smaller than the equivalent ``dict`` and - drops attribute access cost to a tuple slot read. Only real series with - ``length > 0`` are stored; the catalog never holds schema-only phantoms, - so every field below is always populated with a concrete value. - """ + """Statistics for a single time series.""" length: int min_time: int From 4170a645480080d68a743c98641945a55acf40f3 Mon Sep 17 00:00:00 2001 From: Young-Leo <562593859@qq.com> Date: Thu, 14 May 2026 23:02:23 +0800 Subject: [PATCH 6/7] polish reader.py tree-mode docstrings Slim docstrings on _cache_metadata_tree_model, _read_series_by_row_tree and _read_arrow_tree to the one-line essentials. --- python/tsfile/dataset/reader.py | 55 +++++++-------------------------- 1 file changed, 11 insertions(+), 44 deletions(-) diff --git a/python/tsfile/dataset/reader.py b/python/tsfile/dataset/reader.py index c3b8a5f18..2fcd18579 100644 --- a/python/tsfile/dataset/reader.py +++ b/python/tsfile/dataset/reader.py @@ -228,21 +228,15 @@ def _cache_metadata_table_model(self): sys.stderr.flush() def _cache_metadata_tree_model(self): - """Build the in-memory catalog from native tree-model device metadata. - - Tree TsFiles do not declare table schemas. We synthesize one virtual - table whose name is the common root segment (typically ``"root"``); - each remaining device path segment maps to a synthetic tag column - ``_col_1, _col_2, ..., _col_{N_max}`` (1-based). The set of declared - fields is the union of all measurements across devices, so a single - ``TableEntry`` covers the whole file. Per-device measurement ownership - is preserved by populating ``series_stats_by_ref`` only for the - ``(device_id, field_idx)`` pairs that actually exist on disk. - - Note: the cwrapper-exposed ``segments`` tuple on each device is not the - fully tokenized path — for tree-mode devices it is just - ``(table_name, last_segment)``. The fully tokenized path is the dict - key (``device path``) so we split that ourselves. + """Build the in-memory catalog by synthesizing one virtual table. + + Tree TsFiles have no schema, so we materialize a single + ``TableEntry``: table name = the shared root segment, tag columns + = ``_col_1..._col_{N_max}`` (one per remaining path segment), + fields = union of measurements across devices. Per-device + ownership is preserved by registering only the + ``(device_id, field_idx)`` pairs actually written on disk in + ``series_stats_by_ref``. """ metadata_groups = self._reader.get_timeseries_metadata(None) if not metadata_groups: @@ -535,21 +529,7 @@ def read_series_by_row( def _read_series_by_row_tree( self, device_path: str, field_name: str, offset: int, limit: int ) -> Tuple[np.ndarray, np.ndarray]: - """Tree-model row read: scan the on-tree result and apply offset/limit. - - Routed through ``query_table_on_tree`` because the cwrapper's direct - ``query_tree_by_row`` path currently rejects multi-segment device - paths in the installed binary (E_DEVICE_NOT_EXIST). On-tree scan + - client-side device filter + post-filter offset/limit is the most - reliable available implementation. - - Workaround note: successive ``query_table_on_tree`` invocations on - the same reader instance leak duplicate ``col_*`` columns into the - result schema (a known cwrapper bug). The genuine device-path cells - are always the *trailing* ``expected_path_len`` ``col_*`` cells of - every row, so we slice from the end instead of trusting column - names. - """ + """Tree-model row read: scan on-tree result, filter device, apply offset/limit.""" target_path_segments = device_path.split(".") # +1 because cwrapper prepends the root as an extra col_i cell. expected_path_len = max( @@ -728,20 +708,7 @@ def _read_arrow_tree( start_time: int, end_time: int, ) -> Tuple[np.ndarray, Dict[str, np.ndarray]]: - """Tree-model time-range read for one device (multi-field). - - Routed through ``query_table_on_tree`` because the cwrapper's direct - ``query_timeseries`` (a.k.a. ``tsfile_reader_query_paths``) path - currently rejects multi-segment device paths in the installed binary. - On-tree scan + client-side device filter is the most reliable - available implementation; performance is proportional to the number - of devices in the file (not just the target device) so callers reading - many disjoint devices on a wide file will see linear cost. - - Workaround note: successive ``query_table_on_tree`` invocations leak - duplicate ``col_*`` columns into the result schema (cwrapper bug), - so we take only the trailing ``expected_path_len`` ``col_*`` cells. - """ + """Tree-model time-range read for one device (multi-field).""" field_columns = list(field_columns) if not field_columns: return ( From 5dabf081d5f7be1afce5f947a927feae60e13901 Mon Sep 17 00:00:00 2001 From: Young-Leo <562593859@qq.com> Date: Fri, 15 May 2026 12:58:34 +0800 Subject: [PATCH 7/7] fix(python/dataset): dedup repeated series specifiers in tsdf.loc tsdf.loc[..., [name, idx]] where both refs resolve to the same logical series previously appended the read fragment once per spec entry, producing duplicated timestamps and NaN-padded rows downstream. Append once per series in _query_aligned and add a regression test. --- python/tests/test_tsfile_dataset.py | 48 +++++++++++++++++++++++++++++ python/tsfile/dataset/dataframe.py | 11 +++++-- 2 files changed, 56 insertions(+), 3 deletions(-) diff --git a/python/tests/test_tsfile_dataset.py b/python/tests/test_tsfile_dataset.py index b150103a5..2f7623ae9 100644 --- a/python/tests/test_tsfile_dataset.py +++ b/python/tests/test_tsfile_dataset.py @@ -307,6 +307,54 @@ def test_dataset_loc_supports_single_timestamp_and_mixed_series_specifiers(tmp_p np.testing.assert_array_equal(aligned.values, np.array([[21.5, 52.0]])) +def test_dataset_loc_dedups_repeated_series_specifiers(tmp_path): + path = tmp_path / "weather.tsfile" + _write_weather_file(path, 0) + + with TsFileDataFrame(str(path), show_progress=False) as tsdf: + humidity = "weather.device_a.humidity" + humidity_idx = tsdf.list_timeseries().index(humidity) + + # 1) name + matching idx pointing at the same series. + aligned_two_dup = tsdf.loc[0:2, [humidity, humidity_idx]] + assert aligned_two_dup.shape == (3, 2) + np.testing.assert_array_equal( + aligned_two_dup.timestamps, np.array([0, 1, 2], dtype=np.int64) + ) + np.testing.assert_array_equal( + aligned_two_dup.values, + np.array([[50.0, 50.0], [52.0, 52.0], [55.0, 55.0]]), + ) + + # 2) same name twice -- single-group, single-key dedup path. + aligned_name_twice = tsdf.loc[0:2, [humidity, humidity]] + assert aligned_name_twice.shape == (3, 2) + np.testing.assert_array_equal( + aligned_name_twice.values, + np.array([[50.0, 50.0], [52.0, 52.0], [55.0, 55.0]]), + ) + + # 3) duplicate among other distinct series must not regress the + # historically-passing case either. + aligned_mixed = tsdf.loc[ + 0:2, [humidity, "weather.device_a.temperature", humidity_idx] + ] + assert aligned_mixed.shape == (3, 3) + np.testing.assert_array_equal( + aligned_mixed.timestamps, np.array([0, 1, 2], dtype=np.int64) + ) + np.testing.assert_array_equal( + aligned_mixed.values, + np.array( + [ + [50.0, 20.0, 50.0], + [52.0, 21.5, 52.0], + [55.0, 23.0, 55.0], + ] + ), + ) + + def test_dataset_loc_supports_open_ended_ranges_and_negative_series_index(tmp_path): path = tmp_path / "weather.tsfile" _write_weather_file(path, 100) diff --git a/python/tsfile/dataset/dataframe.py b/python/tsfile/dataset/dataframe.py index 7c8e6555b..81b377ebd 100644 --- a/python/tsfile/dataset/dataframe.py +++ b/python/tsfile/dataset/dataframe.py @@ -563,10 +563,15 @@ def _query_aligned( ts_arr, field_vals = reader.read_device_fields_by_time_range( device_id, field_indices, start_time, end_time ) + if len(ts_arr) == 0: + continue + appended_series = set() for _, _, field_name, series_name, _, _ in entries: - if len(ts_arr) > 0: - series_time_parts[series_name].append(ts_arr) - series_value_parts[series_name].append(field_vals[field_name]) + if series_name in appended_series: + continue + appended_series.add(series_name) + series_time_parts[series_name].append(ts_arr) + series_value_parts[series_name].append(field_vals[field_name]) series_data = {} for name in series_names: