diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 0afa16666e..0c1c24d382 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -28,8 +28,7 @@ Literal, ) -from cachetools import LRUCache, cached -from cachetools.keys import hashkey +from cachetools import LRUCache from pydantic_core import to_json from pyiceberg.avro.codecs import AVRO_CODEC_KEY, AvroCompressionCodec @@ -892,15 +891,53 @@ def __hash__(self) -> int: return hash(self.manifest_path) -# Global cache for manifest lists -_manifest_cache: LRUCache[Any, tuple[ManifestFile, ...]] = LRUCache(maxsize=128) +# Global cache for ManifestFile objects, keyed by manifest_path. +# This deduplicates ManifestFile objects across manifest lists, which commonly +# share manifests after append operations. +_manifest_cache: LRUCache[str, ManifestFile] = LRUCache(maxsize=512) + +# Lock for thread-safe cache access +_manifest_cache_lock = threading.RLock() -@cached(cache=_manifest_cache, key=lambda io, manifest_list: hashkey(manifest_list), lock=threading.RLock()) def _manifests(io: FileIO, manifest_list: str) -> tuple[ManifestFile, ...]: - """Read and cache manifests from the given manifest list, returning a tuple to prevent modification.""" + """Read manifests from a manifest list, deduplicating ManifestFile objects via cache. + + Caches individual ManifestFile objects by manifest_path. This is memory-efficient + because consecutive manifest lists typically share most of their manifests: + + ManifestList1: [ManifestFile1] + ManifestList2: [ManifestFile1, ManifestFile2] + ManifestList3: [ManifestFile1, ManifestFile2, ManifestFile3] + + With per-ManifestFile caching, each ManifestFile is stored once and reused. + + Note: The manifest list file is re-read on each call. This is intentional to + keep the implementation simple and avoid O(N²) memory growth from caching + overlapping manifest list tuples. Re-reading is cheap since manifest lists + are small metadata files. + + Args: + io: FileIO instance for reading the manifest list. + manifest_list: Path to the manifest list file. + + Returns: + A tuple of ManifestFile objects. + """ file = io.new_input(manifest_list) - return tuple(read_manifest_list(file)) + manifest_files = list(read_manifest_list(file)) + + result = [] + with _manifest_cache_lock: + for manifest_file in manifest_files: + manifest_path = manifest_file.manifest_path + if manifest_path in _manifest_cache: + result.append(_manifest_cache[manifest_path]) + else: + _manifest_cache[manifest_path] = manifest_file + result.append(manifest_file) + + return tuple(result) def read_manifest_list(input_file: InputFile) -> Iterator[ManifestFile]: diff --git a/tests/benchmark/test_memory_benchmark.py b/tests/benchmark/test_memory_benchmark.py new file mode 100644 index 0000000000..82454c8574 --- /dev/null +++ b/tests/benchmark/test_memory_benchmark.py @@ -0,0 +1,287 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Memory benchmarks for manifest cache efficiency. + +These benchmarks reproduce the manifest cache memory issue described in: +https://github.com/apache/iceberg-python/issues/2325 + +The issue: When caching manifest lists as tuples, overlapping ManifestFile objects +are duplicated across cache entries, causing O(N²) memory growth instead of O(N). + +Run with: uv run pytest tests/benchmark/test_memory_benchmark.py -v -s -m benchmark +""" + +import gc +import tracemalloc +from datetime import datetime, timezone + +import pyarrow as pa +import pytest + +from pyiceberg.catalog.memory import InMemoryCatalog +from pyiceberg.manifest import _manifest_cache + + +def generate_test_dataframe() -> pa.Table: + """Generate a PyArrow table for testing, similar to the issue's example.""" + n_rows = 100 # Smaller for faster tests, increase for more realistic benchmarks + + return pa.table( + { + "event_type": ["playback"] * n_rows, + "event_origin": ["origin1"] * n_rows, + "event_send_at": [datetime.now(timezone.utc)] * n_rows, + "event_saved_at": [datetime.now(timezone.utc)] * n_rows, + "id": list(range(n_rows)), + "reference_id": [f"ref-{i}" for i in range(n_rows)], + } + ) + + +@pytest.fixture +def memory_catalog(tmp_path_factory: pytest.TempPathFactory) -> InMemoryCatalog: + """Create an in-memory catalog for memory testing.""" + warehouse_path = str(tmp_path_factory.mktemp("warehouse")) + catalog = InMemoryCatalog("memory_test", warehouse=f"file://{warehouse_path}") + catalog.create_namespace("default") + return catalog + + +@pytest.fixture(autouse=True) +def clear_caches() -> None: + """Clear caches before each test.""" + _manifest_cache.clear() + gc.collect() + + +@pytest.mark.benchmark +def test_manifest_cache_memory_growth(memory_catalog: InMemoryCatalog) -> None: + """Benchmark memory growth of manifest cache during repeated appends. + + This test reproduces the issue from GitHub #2325 where each append creates + a new manifest list entry in the cache, causing memory to grow. + + With the old caching strategy (tuple per manifest list), memory grew as O(N²). + With the new strategy (individual ManifestFile objects), memory grows as O(N). + """ + df = generate_test_dataframe() + table = memory_catalog.create_table("default.memory_test", schema=df.schema) + + tracemalloc.start() + + num_iterations = 50 + memory_samples: list[tuple[int, int, int]] = [] # (iteration, current_memory, cache_size) + + print("\n--- Manifest Cache Memory Growth Benchmark ---") + print(f"Running {num_iterations} append operations...") + + for i in range(num_iterations): + table.append(df) + + # Sample memory at intervals + if (i + 1) % 10 == 0: + current, _ = tracemalloc.get_traced_memory() + cache_size = len(_manifest_cache) + + memory_samples.append((i + 1, current, cache_size)) + print(f" Iteration {i + 1}: Memory={current / 1024:.1f} KB, Cache entries={cache_size}") + + tracemalloc.stop() + + # Analyze memory growth + if len(memory_samples) >= 2: + first_memory = memory_samples[0][1] + last_memory = memory_samples[-1][1] + memory_growth = last_memory - first_memory + growth_per_iteration = memory_growth / (memory_samples[-1][0] - memory_samples[0][0]) + + print("\nResults:") + print(f" Initial memory: {first_memory / 1024:.1f} KB") + print(f" Final memory: {last_memory / 1024:.1f} KB") + print(f" Total growth: {memory_growth / 1024:.1f} KB") + print(f" Growth per iteration: {growth_per_iteration:.1f} bytes") + print(f" Final cache size: {memory_samples[-1][2]} entries") + + # With efficient caching, growth should be roughly linear (O(N)) + # rather than quadratic (O(N²)) as it was before + # Memory growth includes ManifestFile objects, metadata, and other overhead + # We expect about 5-10 KB per iteration for typical workloads + # The key improvement is that growth is O(N) not O(N²) + # Threshold of 15KB/iteration based on observed behavior - O(N²) would show ~50KB+/iteration + max_memory_growth_per_iteration_bytes = 15000 + assert growth_per_iteration < max_memory_growth_per_iteration_bytes, ( + f"Memory growth per iteration ({growth_per_iteration:.0f} bytes) is too high. " + "This may indicate the O(N²) cache inefficiency is present." + ) + + +@pytest.mark.benchmark +def test_memory_after_gc_with_cache_cleared(memory_catalog: InMemoryCatalog) -> None: + """Test that clearing the cache allows memory to be reclaimed. + + This test verifies that when we clear the manifest cache, the associated + memory can be garbage collected. + """ + df = generate_test_dataframe() + table = memory_catalog.create_table("default.gc_test", schema=df.schema) + + tracemalloc.start() + + print("\n--- Memory After GC Benchmark ---") + + # Phase 1: Fill the cache + print("Phase 1: Filling cache with 20 appends...") + for _ in range(20): + table.append(df) + + gc.collect() + before_clear_memory, _ = tracemalloc.get_traced_memory() + cache_size_before = len(_manifest_cache) + print(f" Memory before clear: {before_clear_memory / 1024:.1f} KB") + print(f" Cache size: {cache_size_before}") + + # Phase 2: Clear cache and GC + print("\nPhase 2: Clearing cache and running GC...") + _manifest_cache.clear() + gc.collect() + gc.collect() # Multiple GC passes for thorough cleanup + + after_clear_memory, _ = tracemalloc.get_traced_memory() + print(f" Memory after clear: {after_clear_memory / 1024:.1f} KB") + print(f" Memory reclaimed: {(before_clear_memory - after_clear_memory) / 1024:.1f} KB") + + tracemalloc.stop() + + memory_reclaimed = before_clear_memory - after_clear_memory + print("\nResults:") + print(f" Memory reclaimed by clearing cache: {memory_reclaimed / 1024:.1f} KB") + + # Verify that clearing the cache actually freed some memory + # Note: This may be flaky in some environments due to GC behavior + assert memory_reclaimed >= 0, "Memory should not increase after clearing cache" + + +@pytest.mark.benchmark +def test_manifest_cache_deduplication_efficiency() -> None: + """Benchmark the efficiency of the per-ManifestFile caching strategy. + + This test verifies that when multiple manifest lists share the same + ManifestFile objects, they are properly deduplicated in the cache. + """ + from tempfile import TemporaryDirectory + + from pyiceberg.io.pyarrow import PyArrowFileIO + from pyiceberg.manifest import ( + DataFile, + DataFileContent, + FileFormat, + ManifestEntry, + ManifestEntryStatus, + _manifests, + write_manifest, + write_manifest_list, + ) + from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC + from pyiceberg.schema import Schema + from pyiceberg.typedef import Record + from pyiceberg.types import IntegerType, NestedField + + io = PyArrowFileIO() + + print("\n--- Manifest Cache Deduplication Benchmark ---") + + with TemporaryDirectory() as tmp_dir: + schema = Schema(NestedField(field_id=1, name="id", field_type=IntegerType(), required=True)) + spec = UNPARTITIONED_PARTITION_SPEC + + # Create N manifest files + num_manifests = 20 + manifest_files = [] + + print(f"Creating {num_manifests} manifest files...") + for i in range(num_manifests): + manifest_path = f"{tmp_dir}/manifest_{i}.avro" + with write_manifest( + format_version=2, + spec=spec, + schema=schema, + output_file=io.new_output(manifest_path), + snapshot_id=i + 1, + avro_compression="null", + ) as writer: + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path=f"{tmp_dir}/data_{i}.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=100, + file_size_in_bytes=1000, + ) + writer.add_entry( + ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + snapshot_id=i + 1, + data_file=data_file, + ) + ) + manifest_files.append(writer.to_manifest_file()) + + # Create multiple manifest lists with overlapping manifest files + # List i contains manifest files 0 through i + num_lists = 10 + print(f"Creating {num_lists} manifest lists with overlapping manifests...") + + _manifest_cache.clear() + + for i in range(num_lists): + list_path = f"{tmp_dir}/manifest-list_{i}.avro" + manifests_to_include = manifest_files[: i + 1] + + with write_manifest_list( + format_version=2, + output_file=io.new_output(list_path), + snapshot_id=i + 1, + parent_snapshot_id=i if i > 0 else None, + sequence_number=i + 1, + avro_compression="null", + ) as list_writer: + list_writer.add_manifests(manifests_to_include) + + # Read the manifest list using _manifests (this populates the cache) + _manifests(io, list_path) + + # Analyze cache efficiency + cache_entries = len(_manifest_cache) + # List i contains manifests 0..i, so only the first num_lists manifests are actually used + manifests_actually_used = num_lists + + print("\nResults:") + print(f" Manifest lists created: {num_lists}") + print(f" Manifest files created: {num_manifests}") + print(f" Manifest files actually used: {manifests_actually_used}") + print(f" Cache entries: {cache_entries}") + + # With efficient per-ManifestFile caching, we should have exactly + # manifests_actually_used entries (one per unique manifest path) + print(f"\n Expected cache entries (efficient): {manifests_actually_used}") + print(f" Actual cache entries: {cache_entries}") + + # The cache should be efficient - one entry per unique manifest path + assert cache_entries == manifests_actually_used, ( + f"Cache has {cache_entries} entries, expected exactly {manifests_actually_used}. " + "The cache may not be deduplicating properly." + ) diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index d12019c9e2..c7e1b07639 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -16,7 +16,6 @@ # under the License. # pylint: disable=redefined-outer-name,arguments-renamed,fixme from tempfile import TemporaryDirectory -from unittest.mock import patch import fastavro import pytest @@ -29,10 +28,12 @@ DataFileContent, FileFormat, ManifestContent, + ManifestEntry, ManifestEntryStatus, ManifestFile, PartitionFieldSummary, _manifest_cache, + _manifests, read_manifest_list, write_manifest, write_manifest_list, @@ -314,27 +315,33 @@ def test_read_manifest_v2(generated_manifest_file_file_v2: str) -> None: def test_read_manifest_cache(generated_manifest_file_file_v2: str) -> None: - with patch("pyiceberg.manifest.read_manifest_list") as mocked_read_manifest_list: - io = load_file_io() + """Test that ManifestFile objects are cached and reused across multiple reads. - snapshot = Snapshot( - snapshot_id=25, - parent_snapshot_id=19, - timestamp_ms=1602638573590, - manifest_list=generated_manifest_file_file_v2, - summary=Summary(Operation.APPEND), - schema_id=3, - ) + The cache now stores individual ManifestFile objects by their manifest_path, + rather than caching entire manifest list tuples. This is more memory-efficient + when multiple manifest lists share overlapping ManifestFile objects. + """ + io = load_file_io() - # Access the manifests property multiple times to test caching - manifests_first_call = snapshot.manifests(io) - manifests_second_call = snapshot.manifests(io) + snapshot = Snapshot( + snapshot_id=25, + parent_snapshot_id=19, + timestamp_ms=1602638573590, + manifest_list=generated_manifest_file_file_v2, + summary=Summary(Operation.APPEND), + schema_id=3, + ) - # Ensure that read_manifest_list was called only once - mocked_read_manifest_list.assert_called_once() + # Access the manifests property multiple times + manifests_first_call = snapshot.manifests(io) + manifests_second_call = snapshot.manifests(io) - # Ensure that the same manifest list is returned - assert manifests_first_call == manifests_second_call + # Ensure that the same manifest list content is returned + assert manifests_first_call == manifests_second_call + + # Verify that ManifestFile objects are the same instances (cached) + for mf1, mf2 in zip(manifests_first_call, manifests_second_call, strict=True): + assert mf1 is mf2, "ManifestFile objects should be the same cached instance" def test_write_empty_manifest() -> None: @@ -629,3 +636,261 @@ def test_file_format_case_insensitive(raw_file_format: str, expected_file_format else: with pytest.raises(ValueError): _ = FileFormat(raw_file_format) + + +def test_manifest_cache_deduplicates_manifest_files() -> None: + """Test that the manifest cache deduplicates ManifestFile objects across manifest lists. + + This test verifies the fix for https://github.com/apache/iceberg-python/issues/2325 + + The issue was that when caching manifest lists by their path, overlapping ManifestFile + objects were duplicated. For example: + - ManifestList1: (ManifestFile1) + - ManifestList2: (ManifestFile1, ManifestFile2) + - ManifestList3: (ManifestFile1, ManifestFile2, ManifestFile3) + + With the old approach, ManifestFile1 was stored 3 times in the cache. + With the new approach, ManifestFile objects are cached individually by their + manifest_path, so ManifestFile1 is stored only once and reused. + """ + io = PyArrowFileIO() + + with TemporaryDirectory() as tmp_dir: + # Create three manifest files to simulate manifests created during appends + manifest1_path = f"{tmp_dir}/manifest1.avro" + manifest2_path = f"{tmp_dir}/manifest2.avro" + manifest3_path = f"{tmp_dir}/manifest3.avro" + + schema = Schema(NestedField(field_id=1, name="id", field_type=IntegerType(), required=True)) + spec = UNPARTITIONED_PARTITION_SPEC + + # Create manifest file 1 + with write_manifest( + format_version=2, + spec=spec, + schema=schema, + output_file=io.new_output(manifest1_path), + snapshot_id=1, + avro_compression="zstandard", + ) as writer: + data_file1 = DataFile.from_args( + content=DataFileContent.DATA, + file_path=f"{tmp_dir}/data1.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=100, + file_size_in_bytes=1000, + ) + writer.add_entry( + ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + snapshot_id=1, + data_file=data_file1, + ) + ) + manifest_file1 = writer.to_manifest_file() + + # Create manifest file 2 + with write_manifest( + format_version=2, + spec=spec, + schema=schema, + output_file=io.new_output(manifest2_path), + snapshot_id=2, + avro_compression="zstandard", + ) as writer: + data_file2 = DataFile.from_args( + content=DataFileContent.DATA, + file_path=f"{tmp_dir}/data2.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=200, + file_size_in_bytes=2000, + ) + writer.add_entry( + ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + snapshot_id=2, + data_file=data_file2, + ) + ) + manifest_file2 = writer.to_manifest_file() + + # Create manifest file 3 + with write_manifest( + format_version=2, + spec=spec, + schema=schema, + output_file=io.new_output(manifest3_path), + snapshot_id=3, + avro_compression="zstandard", + ) as writer: + data_file3 = DataFile.from_args( + content=DataFileContent.DATA, + file_path=f"{tmp_dir}/data3.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=300, + file_size_in_bytes=3000, + ) + writer.add_entry( + ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + snapshot_id=3, + data_file=data_file3, + ) + ) + manifest_file3 = writer.to_manifest_file() + + # Create manifest list 1: contains only manifest1 + manifest_list1_path = f"{tmp_dir}/manifest-list1.avro" + with write_manifest_list( + format_version=2, + output_file=io.new_output(manifest_list1_path), + snapshot_id=1, + parent_snapshot_id=None, + sequence_number=1, + avro_compression="zstandard", + ) as list_writer: + list_writer.add_manifests([manifest_file1]) + + # Create manifest list 2: contains manifest1 and manifest2 (overlapping manifest1) + manifest_list2_path = f"{tmp_dir}/manifest-list2.avro" + with write_manifest_list( + format_version=2, + output_file=io.new_output(manifest_list2_path), + snapshot_id=2, + parent_snapshot_id=1, + sequence_number=2, + avro_compression="zstandard", + ) as list_writer: + list_writer.add_manifests([manifest_file1, manifest_file2]) + + # Create manifest list 3: contains all three manifests (overlapping manifest1 and manifest2) + manifest_list3_path = f"{tmp_dir}/manifest-list3.avro" + with write_manifest_list( + format_version=2, + output_file=io.new_output(manifest_list3_path), + snapshot_id=3, + parent_snapshot_id=2, + sequence_number=3, + avro_compression="zstandard", + ) as list_writer: + list_writer.add_manifests([manifest_file1, manifest_file2, manifest_file3]) + + # Read all three manifest lists + manifests1 = _manifests(io, manifest_list1_path) + manifests2 = _manifests(io, manifest_list2_path) + manifests3 = _manifests(io, manifest_list3_path) + + # Verify the manifest files have the expected paths + assert len(manifests1) == 1 + assert len(manifests2) == 2 + assert len(manifests3) == 3 + + # Verify that ManifestFile objects with the same manifest_path are the same object (identity) + # This is the key assertion - if caching works correctly, the same ManifestFile + # object should be reused instead of creating duplicates + + # manifest_file1 appears in all three lists - should be the same object + assert manifests1[0] is manifests2[0], "ManifestFile1 should be the same object instance across manifest lists" + assert manifests2[0] is manifests3[0], "ManifestFile1 should be the same object instance across manifest lists" + + # manifest_file2 appears in lists 2 and 3 - should be the same object + assert manifests2[1] is manifests3[1], "ManifestFile2 should be the same object instance across manifest lists" + + # Verify cache size - should only have 3 unique ManifestFile objects + # instead of 1 + 2 + 3 = 6 objects as with the old approach + assert len(_manifest_cache) == 3, ( + f"Cache should contain exactly 3 unique ManifestFile objects, but has {len(_manifest_cache)}" + ) + + +def test_manifest_cache_efficiency_with_many_overlapping_lists() -> None: + """Test that the manifest cache remains efficient with many overlapping manifest lists. + + This simulates the scenario from GitHub issue #2325 where many appends create + manifest lists that increasingly overlap. + """ + io = PyArrowFileIO() + + with TemporaryDirectory() as tmp_dir: + schema = Schema(NestedField(field_id=1, name="id", field_type=IntegerType(), required=True)) + spec = UNPARTITIONED_PARTITION_SPEC + + num_manifests = 10 + manifest_files = [] + + # Create N manifest files + for i in range(num_manifests): + manifest_path = f"{tmp_dir}/manifest{i}.avro" + with write_manifest( + format_version=2, + spec=spec, + schema=schema, + output_file=io.new_output(manifest_path), + snapshot_id=i + 1, + avro_compression="zstandard", + ) as writer: + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path=f"{tmp_dir}/data{i}.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=100 * (i + 1), + file_size_in_bytes=1000 * (i + 1), + ) + writer.add_entry( + ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + snapshot_id=i + 1, + data_file=data_file, + ) + ) + manifest_files.append(writer.to_manifest_file()) + + # Create N manifest lists, each containing an increasing number of manifests + # list[i] contains manifests[0:i+1] + manifest_list_paths = [] + for i in range(num_manifests): + list_path = f"{tmp_dir}/manifest-list{i}.avro" + with write_manifest_list( + format_version=2, + output_file=io.new_output(list_path), + snapshot_id=i + 1, + parent_snapshot_id=i if i > 0 else None, + sequence_number=i + 1, + avro_compression="zstandard", + ) as list_writer: + list_writer.add_manifests(manifest_files[: i + 1]) + manifest_list_paths.append(list_path) + + # Read all manifest lists + all_results = [] + for path in manifest_list_paths: + result = _manifests(io, path) + all_results.append(result) + + # With the old cache approach, we would have: + # 1 + 2 + 3 + ... + N = N*(N+1)/2 ManifestFile objects in memory + # With the new approach, we should have exactly N objects + + # Verify cache has exactly N unique entries + assert len(_manifest_cache) == num_manifests, ( + f"Cache should contain exactly {num_manifests} ManifestFile objects, " + f"but has {len(_manifest_cache)}. " + f"Old approach would have {num_manifests * (num_manifests + 1) // 2} objects." + ) + + # Verify object identity - all references to the same manifest should be the same object + for i in range(num_manifests): + # Find all references to this manifest across all results + references = [] + for j, result in enumerate(all_results): + if j >= i: # This manifest should be in lists from i onwards + references.append(result[i]) + + # All references should be the same object + if len(references) > 1: + for ref in references[1:]: + assert ref is references[0], f"All references to manifest {i} should be the same object instance"