From cab382380bfe2bcf7be10e2e5169657ff5c0ba4d Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 25 Jan 2026 14:27:34 -0500 Subject: [PATCH 01/12] cache manifest, not tuple --- pyiceberg/manifest.py | 47 +++++- tests/utils/test_manifest.py | 311 +++++++++++++++++++++++++++++++++-- 2 files changed, 333 insertions(+), 25 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 0afa16666e..f8f7985868 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -28,8 +28,7 @@ Literal, ) -from cachetools import LRUCache, cached -from cachetools.keys import hashkey +from cachetools import LRUCache from pydantic_core import to_json from pyiceberg.avro.codecs import AVRO_CODEC_KEY, AvroCompressionCodec @@ -892,15 +891,49 @@ def __hash__(self) -> int: return hash(self.manifest_path) -# Global cache for manifest lists -_manifest_cache: LRUCache[Any, tuple[ManifestFile, ...]] = LRUCache(maxsize=128) +# Global cache for individual ManifestFile objects, keyed by manifest_path. +# This avoids duplicating ManifestFile objects when multiple manifest lists +# share the same manifests (which is common after appends). +_manifest_cache: LRUCache[str, ManifestFile] = LRUCache(maxsize=512) + +# Lock for thread-safe cache access +_manifest_cache_lock = threading.RLock() -@cached(cache=_manifest_cache, key=lambda io, manifest_list: hashkey(manifest_list), lock=threading.RLock()) def _manifests(io: FileIO, manifest_list: str) -> tuple[ManifestFile, ...]: - """Read and cache manifests from the given manifest list, returning a tuple to prevent modification.""" + """Read manifests from the given manifest list, caching individual ManifestFile objects. + + Unlike caching entire manifest lists, this approach caches individual ManifestFile + objects by their manifest_path. This is more memory-efficient because: + - ManifestList1 contains: (ManifestFile1) + - ManifestList2 contains: (ManifestFile1, ManifestFile2) + - ManifestList3 contains: (ManifestFile1, ManifestFile2, ManifestFile3) + + With per-ManifestFile caching, ManifestFile1 is stored only once and reused, + instead of being duplicated in each manifest list's cached tuple. + + Args: + io: The FileIO to read the manifest list. + manifest_list: The path to the manifest list file. + + Returns: + A tuple of ManifestFile objects (tuple to prevent modification). + """ file = io.new_input(manifest_list) - return tuple(read_manifest_list(file)) + result = [] + + for manifest_file in read_manifest_list(file): + manifest_path = manifest_file.manifest_path + with _manifest_cache_lock: + if manifest_path in _manifest_cache: + # Reuse the cached ManifestFile object + result.append(_manifest_cache[manifest_path]) + else: + # Cache and use this ManifestFile + _manifest_cache[manifest_path] = manifest_file + result.append(manifest_file) + + return tuple(result) def read_manifest_list(input_file: InputFile) -> Iterator[ManifestFile]: diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index d12019c9e2..302c368394 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -16,7 +16,6 @@ # under the License. # pylint: disable=redefined-outer-name,arguments-renamed,fixme from tempfile import TemporaryDirectory -from unittest.mock import patch import fastavro import pytest @@ -29,10 +28,12 @@ DataFileContent, FileFormat, ManifestContent, + ManifestEntry, ManifestEntryStatus, ManifestFile, PartitionFieldSummary, _manifest_cache, + _manifests, read_manifest_list, write_manifest, write_manifest_list, @@ -314,27 +315,36 @@ def test_read_manifest_v2(generated_manifest_file_file_v2: str) -> None: def test_read_manifest_cache(generated_manifest_file_file_v2: str) -> None: - with patch("pyiceberg.manifest.read_manifest_list") as mocked_read_manifest_list: - io = load_file_io() + """Test that ManifestFile objects are cached and reused across multiple reads. - snapshot = Snapshot( - snapshot_id=25, - parent_snapshot_id=19, - timestamp_ms=1602638573590, - manifest_list=generated_manifest_file_file_v2, - summary=Summary(Operation.APPEND), - schema_id=3, - ) + The cache now stores individual ManifestFile objects by their manifest_path, + rather than caching entire manifest list tuples. This is more memory-efficient + when multiple manifest lists share overlapping ManifestFile objects. + """ + io = load_file_io() + + snapshot = Snapshot( + snapshot_id=25, + parent_snapshot_id=19, + timestamp_ms=1602638573590, + manifest_list=generated_manifest_file_file_v2, + summary=Summary(Operation.APPEND), + schema_id=3, + ) + + # Clear cache to ensure clean state + _manifest_cache.clear() - # Access the manifests property multiple times to test caching - manifests_first_call = snapshot.manifests(io) - manifests_second_call = snapshot.manifests(io) + # Access the manifests property multiple times + manifests_first_call = snapshot.manifests(io) + manifests_second_call = snapshot.manifests(io) - # Ensure that read_manifest_list was called only once - mocked_read_manifest_list.assert_called_once() + # Ensure that the same manifest list content is returned + assert manifests_first_call == manifests_second_call - # Ensure that the same manifest list is returned - assert manifests_first_call == manifests_second_call + # Verify that ManifestFile objects are the same instances (cached) + for mf1, mf2 in zip(manifests_first_call, manifests_second_call, strict=True): + assert mf1 is mf2, "ManifestFile objects should be the same cached instance" def test_write_empty_manifest() -> None: @@ -629,3 +639,268 @@ def test_file_format_case_insensitive(raw_file_format: str, expected_file_format else: with pytest.raises(ValueError): _ = FileFormat(raw_file_format) + + +def test_manifest_cache_deduplicates_manifest_files() -> None: + """Test that the manifest cache deduplicates ManifestFile objects across manifest lists. + + This test verifies the fix for https://github.com/apache/iceberg-python/issues/2325 + + The issue was that when caching manifest lists by their path, overlapping ManifestFile + objects were duplicated. For example: + - ManifestList1: (ManifestFile1) + - ManifestList2: (ManifestFile1, ManifestFile2) + - ManifestList3: (ManifestFile1, ManifestFile2, ManifestFile3) + + With the old approach, ManifestFile1 was stored 3 times in the cache. + With the new approach, ManifestFile objects are cached individually by their + manifest_path, so ManifestFile1 is stored only once and reused. + """ + io = PyArrowFileIO() + + with TemporaryDirectory() as tmp_dir: + # Create three manifest files to simulate manifests created during appends + manifest1_path = f"{tmp_dir}/manifest1.avro" + manifest2_path = f"{tmp_dir}/manifest2.avro" + manifest3_path = f"{tmp_dir}/manifest3.avro" + + schema = Schema(NestedField(field_id=1, name="id", field_type=IntegerType(), required=True)) + spec = UNPARTITIONED_PARTITION_SPEC + + # Create manifest file 1 + with write_manifest( + format_version=2, + spec=spec, + schema=schema, + output_file=io.new_output(manifest1_path), + snapshot_id=1, + avro_compression="zstandard", + ) as writer: + data_file1 = DataFile.from_args( + content=DataFileContent.DATA, + file_path=f"{tmp_dir}/data1.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=100, + file_size_in_bytes=1000, + ) + writer.add_entry( + ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + snapshot_id=1, + data_file=data_file1, + ) + ) + manifest_file1 = writer.to_manifest_file() + + # Create manifest file 2 + with write_manifest( + format_version=2, + spec=spec, + schema=schema, + output_file=io.new_output(manifest2_path), + snapshot_id=2, + avro_compression="zstandard", + ) as writer: + data_file2 = DataFile.from_args( + content=DataFileContent.DATA, + file_path=f"{tmp_dir}/data2.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=200, + file_size_in_bytes=2000, + ) + writer.add_entry( + ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + snapshot_id=2, + data_file=data_file2, + ) + ) + manifest_file2 = writer.to_manifest_file() + + # Create manifest file 3 + with write_manifest( + format_version=2, + spec=spec, + schema=schema, + output_file=io.new_output(manifest3_path), + snapshot_id=3, + avro_compression="zstandard", + ) as writer: + data_file3 = DataFile.from_args( + content=DataFileContent.DATA, + file_path=f"{tmp_dir}/data3.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=300, + file_size_in_bytes=3000, + ) + writer.add_entry( + ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + snapshot_id=3, + data_file=data_file3, + ) + ) + manifest_file3 = writer.to_manifest_file() + + # Create manifest list 1: contains only manifest1 + manifest_list1_path = f"{tmp_dir}/manifest-list1.avro" + with write_manifest_list( + format_version=2, + output_file=io.new_output(manifest_list1_path), + snapshot_id=1, + parent_snapshot_id=None, + sequence_number=1, + avro_compression="zstandard", + ) as list_writer: + list_writer.add_manifests([manifest_file1]) + + # Create manifest list 2: contains manifest1 and manifest2 (overlapping manifest1) + manifest_list2_path = f"{tmp_dir}/manifest-list2.avro" + with write_manifest_list( + format_version=2, + output_file=io.new_output(manifest_list2_path), + snapshot_id=2, + parent_snapshot_id=1, + sequence_number=2, + avro_compression="zstandard", + ) as list_writer: + list_writer.add_manifests([manifest_file1, manifest_file2]) + + # Create manifest list 3: contains all three manifests (overlapping manifest1 and manifest2) + manifest_list3_path = f"{tmp_dir}/manifest-list3.avro" + with write_manifest_list( + format_version=2, + output_file=io.new_output(manifest_list3_path), + snapshot_id=3, + parent_snapshot_id=2, + sequence_number=3, + avro_compression="zstandard", + ) as list_writer: + list_writer.add_manifests([manifest_file1, manifest_file2, manifest_file3]) + + # Clear the cache before testing + _manifest_cache.clear() + + # Read all three manifest lists + manifests1 = _manifests(io, manifest_list1_path) + manifests2 = _manifests(io, manifest_list2_path) + manifests3 = _manifests(io, manifest_list3_path) + + # Verify the manifest files have the expected paths + assert len(manifests1) == 1 + assert len(manifests2) == 2 + assert len(manifests3) == 3 + + # Verify that ManifestFile objects with the same manifest_path are the same object (identity) + # This is the key assertion - if caching works correctly, the same ManifestFile + # object should be reused instead of creating duplicates + + # manifest_file1 appears in all three lists - should be the same object + assert manifests1[0] is manifests2[0], "ManifestFile1 should be the same object instance across manifest lists" + assert manifests2[0] is manifests3[0], "ManifestFile1 should be the same object instance across manifest lists" + + # manifest_file2 appears in lists 2 and 3 - should be the same object + assert manifests2[1] is manifests3[1], "ManifestFile2 should be the same object instance across manifest lists" + + # Verify cache size - should only have 3 unique ManifestFile objects + # not 1 + 2 + 3 = 6 objects as with the old approach + assert len(_manifest_cache) == 3, ( + f"Cache should contain exactly 3 unique ManifestFile objects, but has {len(_manifest_cache)}" + ) + + +def test_manifest_cache_efficiency_with_many_overlapping_lists() -> None: + """Test that the manifest cache remains efficient with many overlapping manifest lists. + + This simulates the scenario from GitHub issue #2325 where many appends create + manifest lists that increasingly overlap. + """ + io = PyArrowFileIO() + + with TemporaryDirectory() as tmp_dir: + schema = Schema(NestedField(field_id=1, name="id", field_type=IntegerType(), required=True)) + spec = UNPARTITIONED_PARTITION_SPEC + + num_manifests = 10 + manifest_files = [] + + # Create N manifest files + for i in range(num_manifests): + manifest_path = f"{tmp_dir}/manifest{i}.avro" + with write_manifest( + format_version=2, + spec=spec, + schema=schema, + output_file=io.new_output(manifest_path), + snapshot_id=i + 1, + avro_compression="zstandard", + ) as writer: + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path=f"{tmp_dir}/data{i}.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=100 * (i + 1), + file_size_in_bytes=1000 * (i + 1), + ) + writer.add_entry( + ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + snapshot_id=i + 1, + data_file=data_file, + ) + ) + manifest_files.append(writer.to_manifest_file()) + + # Create N manifest lists, each containing an increasing number of manifests + # list[i] contains manifests[0:i+1] + manifest_list_paths = [] + for i in range(num_manifests): + list_path = f"{tmp_dir}/manifest-list{i}.avro" + with write_manifest_list( + format_version=2, + output_file=io.new_output(list_path), + snapshot_id=i + 1, + parent_snapshot_id=i if i > 0 else None, + sequence_number=i + 1, + avro_compression="zstandard", + ) as list_writer: + list_writer.add_manifests(manifest_files[: i + 1]) + manifest_list_paths.append(list_path) + + # Clear the cache + _manifest_cache.clear() + + # Read all manifest lists + all_results = [] + for path in manifest_list_paths: + result = _manifests(io, path) + all_results.append(result) + + # With the old cache approach, we would have: + # 1 + 2 + 3 + ... + N = N*(N+1)/2 ManifestFile objects in memory + # With the new approach, we should have exactly N objects + + # Verify cache has exactly N unique entries + assert len(_manifest_cache) == num_manifests, ( + f"Cache should contain exactly {num_manifests} ManifestFile objects, " + f"but has {len(_manifest_cache)}. " + f"Old approach would have {num_manifests * (num_manifests + 1) // 2} objects." + ) + + # Verify object identity - all references to the same manifest should be the same object + for i in range(num_manifests): + manifest_path = manifest_files[i].manifest_path + # Find all references to this manifest across all results + references = [] + for j, result in enumerate(all_results): + if j >= i: # This manifest should be in lists from i onwards + references.append(result[i]) + + # All references should be the same object + if len(references) > 1: + for ref in references[1:]: + assert ref is references[0], f"All references to manifest {i} should be the same object instance" From 0f2bf0dde8c1c8134c964b3b589b9a873e51dcb8 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 25 Jan 2026 15:22:15 -0500 Subject: [PATCH 02/12] thx drew --- pyiceberg/manifest.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index f8f7985868..14d3b0c05e 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -922,9 +922,9 @@ def _manifests(io: FileIO, manifest_list: str) -> tuple[ManifestFile, ...]: file = io.new_input(manifest_list) result = [] - for manifest_file in read_manifest_list(file): - manifest_path = manifest_file.manifest_path - with _manifest_cache_lock: + with _manifest_cache_lock: + for manifest_file in read_manifest_list(file): + manifest_path = manifest_file.manifest_path if manifest_path in _manifest_cache: # Reuse the cached ManifestFile object result.append(_manifest_cache[manifest_path]) From fa2863f9dc7758e10bd40a10de561d3559c74b48 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 25 Jan 2026 15:28:27 -0500 Subject: [PATCH 03/12] typo --- tests/utils/test_manifest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index 302c368394..558ff0fd8e 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -806,7 +806,7 @@ def test_manifest_cache_deduplicates_manifest_files() -> None: assert manifests2[1] is manifests3[1], "ManifestFile2 should be the same object instance across manifest lists" # Verify cache size - should only have 3 unique ManifestFile objects - # not 1 + 2 + 3 = 6 objects as with the old approach + # instead of 1 + 2 + 3 = 6 objects as with the old approach assert len(_manifest_cache) == 3, ( f"Cache should contain exactly 3 unique ManifestFile objects, but has {len(_manifest_cache)}" ) From a5b7544e737be4149b1b3217811a4e20ce4778d3 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 25 Jan 2026 15:29:09 -0500 Subject: [PATCH 04/12] add memory benchmark --- tests/benchmark/test_memory_benchmark.py | 280 +++++++++++++++++++++++ 1 file changed, 280 insertions(+) create mode 100644 tests/benchmark/test_memory_benchmark.py diff --git a/tests/benchmark/test_memory_benchmark.py b/tests/benchmark/test_memory_benchmark.py new file mode 100644 index 0000000000..65f2b3d822 --- /dev/null +++ b/tests/benchmark/test_memory_benchmark.py @@ -0,0 +1,280 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Memory benchmarks for manifest cache efficiency. + +These benchmarks reproduce the manifest cache memory issue described in: +https://github.com/apache/iceberg-python/issues/2325 + +The issue: When caching manifest lists as tuples, overlapping ManifestFile objects +are duplicated across cache entries, causing O(N²) memory growth instead of O(N). + +Run with: uv run pytest tests/benchmark/test_memory_benchmark.py -v -s -m benchmark +""" + +import gc +import tracemalloc +from datetime import datetime, timezone + +import pyarrow as pa +import pytest + +from pyiceberg.catalog.memory import InMemoryCatalog +from pyiceberg.manifest import _manifest_cache + + +def generate_test_dataframe() -> pa.Table: + """Generate a PyArrow table for testing, similar to the issue's example.""" + n_rows = 100 # Smaller for faster tests, increase for more realistic benchmarks + + return pa.table( + { + "event_type": ["playback"] * n_rows, + "event_origin": ["origin1"] * n_rows, + "event_send_at": [datetime.now(timezone.utc)] * n_rows, + "event_saved_at": [datetime.now(timezone.utc)] * n_rows, + "id": list(range(n_rows)), + "reference_id": [f"ref-{i}" for i in range(n_rows)], + } + ) + + +@pytest.fixture +def memory_catalog(tmp_path_factory: pytest.TempPathFactory) -> InMemoryCatalog: + """Create an in-memory catalog for memory testing.""" + warehouse_path = str(tmp_path_factory.mktemp("warehouse")) + catalog = InMemoryCatalog("memory_test", warehouse=f"file://{warehouse_path}") + catalog.create_namespace("default") + return catalog + + +@pytest.fixture(autouse=True) +def clear_caches() -> None: + """Clear caches before each test.""" + _manifest_cache.clear() + gc.collect() + + +@pytest.mark.benchmark +def test_manifest_cache_memory_growth(memory_catalog: InMemoryCatalog) -> None: + """Benchmark memory growth of manifest cache during repeated appends. + + This test reproduces the issue from GitHub #2325 where each append creates + a new manifest list entry in the cache, causing memory to grow. + + With the old caching strategy (tuple per manifest list), memory grew as O(N²). + With the new strategy (individual ManifestFile objects), memory grows as O(N). + """ + df = generate_test_dataframe() + table = memory_catalog.create_table("default.memory_test", schema=df.schema) + + tracemalloc.start() + + num_iterations = 50 + memory_samples: list[tuple[int, int, int]] = [] # (iteration, current_memory, cache_size) + + print("\n--- Manifest Cache Memory Growth Benchmark ---") + print(f"Running {num_iterations} append operations...") + + for i in range(num_iterations): + table.append(df) + + # Sample memory at intervals + if (i + 1) % 10 == 0: + current, _ = tracemalloc.get_traced_memory() + cache_size = len(_manifest_cache) + + memory_samples.append((i + 1, current, cache_size)) + print(f" Iteration {i + 1}: Memory={current / 1024:.1f} KB, Cache entries={cache_size}") + + tracemalloc.stop() + + # Analyze memory growth + if len(memory_samples) >= 2: + first_memory = memory_samples[0][1] + last_memory = memory_samples[-1][1] + memory_growth = last_memory - first_memory + growth_per_iteration = memory_growth / (memory_samples[-1][0] - memory_samples[0][0]) + + print("\nResults:") + print(f" Initial memory: {first_memory / 1024:.1f} KB") + print(f" Final memory: {last_memory / 1024:.1f} KB") + print(f" Total growth: {memory_growth / 1024:.1f} KB") + print(f" Growth per iteration: {growth_per_iteration:.1f} bytes") + print(f" Final cache size: {memory_samples[-1][2]} entries") + + # With efficient caching, growth should be roughly linear (O(N)) + # rather than quadratic (O(N²)) as it was before + # Memory growth includes ManifestFile objects, metadata, and other overhead + # We expect about 5-10 KB per iteration for typical workloads + # The key improvement is that growth is O(N) not O(N²) + assert growth_per_iteration < 15000, ( + f"Memory growth per iteration ({growth_per_iteration:.0f} bytes) is too high. " + "This may indicate the O(N²) cache inefficiency is present." + ) + + +@pytest.mark.benchmark +def test_memory_after_gc_with_cache_cleared(memory_catalog: InMemoryCatalog) -> None: + """Test that clearing the cache allows memory to be reclaimed. + + This test verifies that when we clear the manifest cache, the associated + memory can be garbage collected. + """ + df = generate_test_dataframe() + table = memory_catalog.create_table("default.gc_test", schema=df.schema) + + tracemalloc.start() + + print("\n--- Memory After GC Benchmark ---") + + # Phase 1: Fill the cache + print("Phase 1: Filling cache with 20 appends...") + for _ in range(20): + table.append(df) + + gc.collect() + before_clear_memory, _ = tracemalloc.get_traced_memory() + cache_size_before = len(_manifest_cache) + print(f" Memory before clear: {before_clear_memory / 1024:.1f} KB") + print(f" Cache size: {cache_size_before}") + + # Phase 2: Clear cache and GC + print("\nPhase 2: Clearing cache and running GC...") + _manifest_cache.clear() + gc.collect() + gc.collect() # Multiple GC passes for thorough cleanup + + after_clear_memory, _ = tracemalloc.get_traced_memory() + print(f" Memory after clear: {after_clear_memory / 1024:.1f} KB") + print(f" Memory reclaimed: {(before_clear_memory - after_clear_memory) / 1024:.1f} KB") + + tracemalloc.stop() + + memory_reclaimed = before_clear_memory - after_clear_memory + print("\nResults:") + print(f" Memory reclaimed by clearing cache: {memory_reclaimed / 1024:.1f} KB") + + +@pytest.mark.benchmark +def test_manifest_cache_deduplication_efficiency() -> None: + """Benchmark the efficiency of the per-ManifestFile caching strategy. + + This test verifies that when multiple manifest lists share the same + ManifestFile objects, they are properly deduplicated in the cache. + """ + from tempfile import TemporaryDirectory + + from pyiceberg.io.pyarrow import PyArrowFileIO + from pyiceberg.manifest import ( + DataFile, + DataFileContent, + FileFormat, + ManifestEntry, + ManifestEntryStatus, + read_manifest_list, + write_manifest, + write_manifest_list, + ) + from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC + from pyiceberg.schema import Schema + from pyiceberg.typedef import Record + from pyiceberg.types import IntegerType, NestedField + + io = PyArrowFileIO() + + print("\n--- Manifest Cache Deduplication Benchmark ---") + + with TemporaryDirectory() as tmp_dir: + schema = Schema(NestedField(field_id=1, name="id", field_type=IntegerType(), required=True)) + spec = UNPARTITIONED_PARTITION_SPEC + + # Create N manifest files + num_manifests = 20 + manifest_files = [] + + print(f"Creating {num_manifests} manifest files...") + for i in range(num_manifests): + manifest_path = f"{tmp_dir}/manifest_{i}.avro" + with write_manifest( + format_version=2, + spec=spec, + schema=schema, + output_file=io.new_output(manifest_path), + snapshot_id=i + 1, + avro_compression="null", + ) as writer: + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path=f"{tmp_dir}/data_{i}.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=100, + file_size_in_bytes=1000, + ) + writer.add_entry( + ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + snapshot_id=i + 1, + data_file=data_file, + ) + ) + manifest_files.append(writer.to_manifest_file()) + + # Create multiple manifest lists with overlapping manifest files + # List i contains manifest files 0 through i + num_lists = 10 + print(f"Creating {num_lists} manifest lists with overlapping manifests...") + + _manifest_cache.clear() + + for i in range(num_lists): + list_path = f"{tmp_dir}/manifest-list_{i}.avro" + manifests_to_include = manifest_files[: i + 1] + + with write_manifest_list( + format_version=2, + output_file=io.new_output(list_path), + snapshot_id=i + 1, + parent_snapshot_id=i if i > 0 else None, + sequence_number=i + 1, + avro_compression="null", + ) as list_writer: + list_writer.add_manifests(manifests_to_include) + + # Read the manifest list (this populates the cache) + input_file = io.new_input(list_path) + list(read_manifest_list(input_file)) + + # Analyze cache efficiency + cache_entries = len(_manifest_cache) + + print("\nResults:") + print(f" Manifest lists created: {num_lists}") + print(f" Total unique manifest files: {num_manifests}") + print(f" Cache entries: {cache_entries}") + + # With efficient per-ManifestFile caching, we should have at most + # num_manifests entries (one per unique manifest path), not + # sum(1..num_lists) entries as with the old strategy + print(f"\n Expected cache entries (efficient): <= {num_manifests}") + print(f" Actual cache entries: {cache_entries}") + + # The cache should be efficient - one entry per unique manifest path + assert cache_entries <= num_manifests + num_lists, ( + f"Cache has {cache_entries} entries, expected at most {num_manifests + num_lists}. " + "The cache may not be deduplicating properly." + ) From 3c32b5d85899e823f9c2fd8d0036ffd5532c5fbd Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 25 Jan 2026 15:36:39 -0500 Subject: [PATCH 05/12] dont lock during io --- pyiceberg/manifest.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 14d3b0c05e..8ee5dcac2a 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -919,11 +919,14 @@ def _manifests(io: FileIO, manifest_list: str) -> tuple[ManifestFile, ...]: Returns: A tuple of ManifestFile objects (tuple to prevent modification). """ + # Read manifest list outside the lock to avoid blocking other threads during I/O file = io.new_input(manifest_list) - result = [] + manifest_files = list(read_manifest_list(file)) + # Only hold the lock while updating the cache + result = [] with _manifest_cache_lock: - for manifest_file in read_manifest_list(file): + for manifest_file in manifest_files: manifest_path = manifest_file.manifest_path if manifest_path in _manifest_cache: # Reuse the cached ManifestFile object From c2fbb9c22237c97a9a7d6ce339ae46452c6c37af Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 25 Jan 2026 15:44:38 -0500 Subject: [PATCH 06/12] fix benchmark to use cache --- tests/benchmark/test_memory_benchmark.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/benchmark/test_memory_benchmark.py b/tests/benchmark/test_memory_benchmark.py index 65f2b3d822..73923af267 100644 --- a/tests/benchmark/test_memory_benchmark.py +++ b/tests/benchmark/test_memory_benchmark.py @@ -185,7 +185,7 @@ def test_manifest_cache_deduplication_efficiency() -> None: FileFormat, ManifestEntry, ManifestEntryStatus, - read_manifest_list, + _manifests, write_manifest, write_manifest_list, ) @@ -255,9 +255,8 @@ def test_manifest_cache_deduplication_efficiency() -> None: ) as list_writer: list_writer.add_manifests(manifests_to_include) - # Read the manifest list (this populates the cache) - input_file = io.new_input(list_path) - list(read_manifest_list(input_file)) + # Read the manifest list using _manifests (this populates the cache) + _manifests(io, list_path) # Analyze cache efficiency cache_entries = len(_manifest_cache) From 76c71aa0164ef25e74b371d18a4b7f08edac15a6 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 25 Jan 2026 15:48:39 -0500 Subject: [PATCH 07/12] fix benchmark --- tests/benchmark/test_memory_benchmark.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/tests/benchmark/test_memory_benchmark.py b/tests/benchmark/test_memory_benchmark.py index 73923af267..0be2f9fbc8 100644 --- a/tests/benchmark/test_memory_benchmark.py +++ b/tests/benchmark/test_memory_benchmark.py @@ -260,20 +260,22 @@ def test_manifest_cache_deduplication_efficiency() -> None: # Analyze cache efficiency cache_entries = len(_manifest_cache) + # List i contains manifests 0..i, so only the first num_lists manifests are actually used + manifests_actually_used = num_lists print("\nResults:") print(f" Manifest lists created: {num_lists}") - print(f" Total unique manifest files: {num_manifests}") + print(f" Manifest files created: {num_manifests}") + print(f" Manifest files actually used: {manifests_actually_used}") print(f" Cache entries: {cache_entries}") - # With efficient per-ManifestFile caching, we should have at most - # num_manifests entries (one per unique manifest path), not - # sum(1..num_lists) entries as with the old strategy - print(f"\n Expected cache entries (efficient): <= {num_manifests}") + # With efficient per-ManifestFile caching, we should have exactly + # manifests_actually_used entries (one per unique manifest path) + print(f"\n Expected cache entries (efficient): {manifests_actually_used}") print(f" Actual cache entries: {cache_entries}") # The cache should be efficient - one entry per unique manifest path - assert cache_entries <= num_manifests + num_lists, ( - f"Cache has {cache_entries} entries, expected at most {num_manifests + num_lists}. " + assert cache_entries == manifests_actually_used, ( + f"Cache has {cache_entries} entries, expected exactly {manifests_actually_used}. " "The cache may not be deduplicating properly." ) From d92accf58e8df5c143f70b9feecb24630e26b712 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 25 Jan 2026 16:14:14 -0500 Subject: [PATCH 08/12] update docs --- pyiceberg/manifest.py | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 8ee5dcac2a..0c1c24d382 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -891,9 +891,9 @@ def __hash__(self) -> int: return hash(self.manifest_path) -# Global cache for individual ManifestFile objects, keyed by manifest_path. -# This avoids duplicating ManifestFile objects when multiple manifest lists -# share the same manifests (which is common after appends). +# Global cache for ManifestFile objects, keyed by manifest_path. +# This deduplicates ManifestFile objects across manifest lists, which commonly +# share manifests after append operations. _manifest_cache: LRUCache[str, ManifestFile] = LRUCache(maxsize=512) # Lock for thread-safe cache access @@ -901,38 +901,39 @@ def __hash__(self) -> int: def _manifests(io: FileIO, manifest_list: str) -> tuple[ManifestFile, ...]: - """Read manifests from the given manifest list, caching individual ManifestFile objects. + """Read manifests from a manifest list, deduplicating ManifestFile objects via cache. - Unlike caching entire manifest lists, this approach caches individual ManifestFile - objects by their manifest_path. This is more memory-efficient because: - - ManifestList1 contains: (ManifestFile1) - - ManifestList2 contains: (ManifestFile1, ManifestFile2) - - ManifestList3 contains: (ManifestFile1, ManifestFile2, ManifestFile3) + Caches individual ManifestFile objects by manifest_path. This is memory-efficient + because consecutive manifest lists typically share most of their manifests: - With per-ManifestFile caching, ManifestFile1 is stored only once and reused, - instead of being duplicated in each manifest list's cached tuple. + ManifestList1: [ManifestFile1] + ManifestList2: [ManifestFile1, ManifestFile2] + ManifestList3: [ManifestFile1, ManifestFile2, ManifestFile3] + + With per-ManifestFile caching, each ManifestFile is stored once and reused. + + Note: The manifest list file is re-read on each call. This is intentional to + keep the implementation simple and avoid O(N²) memory growth from caching + overlapping manifest list tuples. Re-reading is cheap since manifest lists + are small metadata files. Args: - io: The FileIO to read the manifest list. - manifest_list: The path to the manifest list file. + io: FileIO instance for reading the manifest list. + manifest_list: Path to the manifest list file. Returns: - A tuple of ManifestFile objects (tuple to prevent modification). + A tuple of ManifestFile objects. """ - # Read manifest list outside the lock to avoid blocking other threads during I/O file = io.new_input(manifest_list) manifest_files = list(read_manifest_list(file)) - # Only hold the lock while updating the cache result = [] with _manifest_cache_lock: for manifest_file in manifest_files: manifest_path = manifest_file.manifest_path if manifest_path in _manifest_cache: - # Reuse the cached ManifestFile object result.append(_manifest_cache[manifest_path]) else: - # Cache and use this ManifestFile _manifest_cache[manifest_path] = manifest_file result.append(manifest_file) From 1721483ff9fcc8e5223cbd7fa3f064e70dc3a59c Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 25 Jan 2026 13:22:17 -0800 Subject: [PATCH 09/12] Update tests/utils/test_manifest.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- tests/utils/test_manifest.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index 558ff0fd8e..8c532bf531 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -893,7 +893,6 @@ def test_manifest_cache_efficiency_with_many_overlapping_lists() -> None: # Verify object identity - all references to the same manifest should be the same object for i in range(num_manifests): - manifest_path = manifest_files[i].manifest_path # Find all references to this manifest across all results references = [] for j, result in enumerate(all_results): From 1f5861b4c0796c0182de49a02e002d0b03caea01 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 25 Jan 2026 16:24:29 -0500 Subject: [PATCH 10/12] fix test --- tests/utils/test_manifest.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index 8c532bf531..c7e1b07639 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -332,9 +332,6 @@ def test_read_manifest_cache(generated_manifest_file_file_v2: str) -> None: schema_id=3, ) - # Clear cache to ensure clean state - _manifest_cache.clear() - # Access the manifests property multiple times manifests_first_call = snapshot.manifests(io) manifests_second_call = snapshot.manifests(io) @@ -781,9 +778,6 @@ def test_manifest_cache_deduplicates_manifest_files() -> None: ) as list_writer: list_writer.add_manifests([manifest_file1, manifest_file2, manifest_file3]) - # Clear the cache before testing - _manifest_cache.clear() - # Read all three manifest lists manifests1 = _manifests(io, manifest_list1_path) manifests2 = _manifests(io, manifest_list2_path) @@ -871,9 +865,6 @@ def test_manifest_cache_efficiency_with_many_overlapping_lists() -> None: list_writer.add_manifests(manifest_files[: i + 1]) manifest_list_paths.append(list_path) - # Clear the cache - _manifest_cache.clear() - # Read all manifest lists all_results = [] for path in manifest_list_paths: From 50a366cc4ef4c8913d74943ae61ea60053fd7b54 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 25 Jan 2026 16:26:07 -0500 Subject: [PATCH 11/12] more docs --- tests/benchmark/test_memory_benchmark.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/benchmark/test_memory_benchmark.py b/tests/benchmark/test_memory_benchmark.py index 0be2f9fbc8..adbe28ef09 100644 --- a/tests/benchmark/test_memory_benchmark.py +++ b/tests/benchmark/test_memory_benchmark.py @@ -121,6 +121,7 @@ def test_manifest_cache_memory_growth(memory_catalog: InMemoryCatalog) -> None: # Memory growth includes ManifestFile objects, metadata, and other overhead # We expect about 5-10 KB per iteration for typical workloads # The key improvement is that growth is O(N) not O(N²) + # Threshold of 15KB/iteration based on observed behavior - O(N²) would show ~50KB+/iteration assert growth_per_iteration < 15000, ( f"Memory growth per iteration ({growth_per_iteration:.0f} bytes) is too high. " "This may indicate the O(N²) cache inefficiency is present." @@ -168,6 +169,10 @@ def test_memory_after_gc_with_cache_cleared(memory_catalog: InMemoryCatalog) -> print("\nResults:") print(f" Memory reclaimed by clearing cache: {memory_reclaimed / 1024:.1f} KB") + # Verify that clearing the cache actually freed some memory + # Note: This may be flaky in some environments due to GC behavior + assert memory_reclaimed >= 0, "Memory should not increase after clearing cache" + @pytest.mark.benchmark def test_manifest_cache_deduplication_efficiency() -> None: From 462e975656747bd5aa589597cde12a1d65af7564 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Mon, 26 Jan 2026 17:37:22 -0500 Subject: [PATCH 12/12] feedback --- tests/benchmark/test_memory_benchmark.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/benchmark/test_memory_benchmark.py b/tests/benchmark/test_memory_benchmark.py index adbe28ef09..82454c8574 100644 --- a/tests/benchmark/test_memory_benchmark.py +++ b/tests/benchmark/test_memory_benchmark.py @@ -122,7 +122,8 @@ def test_manifest_cache_memory_growth(memory_catalog: InMemoryCatalog) -> None: # We expect about 5-10 KB per iteration for typical workloads # The key improvement is that growth is O(N) not O(N²) # Threshold of 15KB/iteration based on observed behavior - O(N²) would show ~50KB+/iteration - assert growth_per_iteration < 15000, ( + max_memory_growth_per_iteration_bytes = 15000 + assert growth_per_iteration < max_memory_growth_per_iteration_bytes, ( f"Memory growth per iteration ({growth_per_iteration:.0f} bytes) is too high. " "This may indicate the O(N²) cache inefficiency is present." )