diff --git a/efemel/cli.py b/efemel/cli.py index 385bcbe..4cbf191 100644 --- a/efemel/cli.py +++ b/efemel/cli.py @@ -1,7 +1,8 @@ +from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import as_completed import os -import shutil -from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path +import shutil import click diff --git a/efemel/hooks_manager.py b/efemel/hooks_manager.py index 3006a6f..bd7adfd 100644 --- a/efemel/hooks_manager.py +++ b/efemel/hooks_manager.py @@ -1,8 +1,8 @@ # hooks_manager.py +from collections.abc import Callable import importlib.util import os import sys -from collections.abc import Callable from typing import Any diff --git a/efemel/pipeline.py b/efemel/pipeline.py index cb22bd2..34492d5 100644 --- a/efemel/pipeline.py +++ b/efemel/pipeline.py @@ -7,14 +7,20 @@ """ from collections import deque -from collections.abc import Callable, Generator, Iterable -from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait +from collections.abc import Callable +from collections.abc import Generator +from collections.abc import Iterable +from concurrent.futures import FIRST_COMPLETED +from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import wait from itertools import chain -from typing import Any, Self, TypeVar +from typing import Any +from typing import Self +from typing import TypeVar +from typing import overload T = TypeVar("T") # Type variable for the elements in the pipeline U = TypeVar("U") # Type variable for transformed elements -V = TypeVar("V") # Type variable for additional transformations class Pipeline[T]: @@ -49,6 +55,8 @@ def __init__(self, source: Iterable[T], chunk_size: int = 1000) -> None: else: self.generator = self._chunked(source, chunk_size) + self.chunk_size = chunk_size + @staticmethod def _chunked(iterable: Iterable[T], size: int) -> Generator[list[T], None, None]: """Break an iterable into chunks of specified size.""" @@ -62,10 +70,11 @@ def _chunked(iterable: Iterable[T], size: int) -> Generator[list[T], None, None] yield chunk @classmethod - def _from_chunks(cls, chunks: Iterable[list[T]]) -> "Pipeline[T]": + def _from_chunks(cls, chunks: Iterable[list[T]], chunk_size: int = 1000) -> "Pipeline[T]": """Create a pipeline directly from an iterable of chunks.""" p = cls([]) p.generator = (chunk for chunk in chunks) + p.chunk_size = chunk_size return p @classmethod @@ -111,7 +120,7 @@ def filter(self, predicate: Callable[[T], bool]) -> "Pipeline[T]": def filter_chunk(chunk: list[T]) -> list[T]: return [x for x in chunk if predicate(x)] - return Pipeline._from_chunks(filter_chunk(chunk) for chunk in self.generator) + return Pipeline._from_chunks((filter_chunk(chunk) for chunk in self.generator), self.chunk_size) def map(self, function: Callable[[T], U]) -> "Pipeline[U]": """Transform elements using a function, applied per chunk.""" @@ -119,7 +128,7 @@ def map(self, function: Callable[[T], U]) -> "Pipeline[U]": def map_chunk(chunk: list[T]) -> list[U]: return [function(x) for x in chunk] - return Pipeline._from_chunks(map_chunk(chunk) for chunk in self.generator) + return Pipeline._from_chunks((map_chunk(chunk) for chunk in self.generator), self.chunk_size) def reduce(self, function: Callable[[U, T], U], initial: U) -> "Pipeline[U]": """Reduce elements to a single value using the given function.""" @@ -137,7 +146,7 @@ def tap_chunk(chunk: list[T]) -> list[T]: function(item) return chunk - return Pipeline._from_chunks(tap_chunk(chunk) for chunk in self.generator) + return Pipeline._from_chunks((tap_chunk(chunk) for chunk in self.generator), self.chunk_size) def each(self, function: Callable[[T], Any]) -> None: """Apply function to each element (terminal operation).""" @@ -162,13 +171,42 @@ def apply(self, *functions: Callable[[Self], "Pipeline[U]"]) -> "Pipeline[U]": result = function(result) return result - def flatten(self: "Pipeline[Iterable[U]]") -> "Pipeline[U]": - """Flatten pipeline of iterables into single pipeline.""" + @overload + def flatten(self: "Pipeline[list[U]]") -> "Pipeline[U]": ... + + @overload + def flatten(self: "Pipeline[tuple[U, ...]]") -> "Pipeline[U]": ... + + @overload + def flatten(self: "Pipeline[set[U]]") -> "Pipeline[U]": ... + + def flatten( + self: "Pipeline[list[U]] | Pipeline[tuple[U, ...]] | Pipeline[set[U]]", + ) -> "Pipeline[Any]": + """Flatten iterable chunks into a single pipeline of elements. + + This method flattens each chunk of iterables and maintains the chunked + structure to avoid memory issues with large datasets. After flattening, + the data is re-chunked to maintain the original chunk_size. + + Example: + [[1, 2], [3, 4]] -> [1, 2, 3, 4] + [(1, 2), (3, 4)] -> [1, 2, 3, 4] + + Note: + We need to overload this method to handle different iterable types because + using Iterable[U] does not preserve the type information for the flattened elements. + It returns Pipeline[Any] instead of Pipeline[U], which is incorrect. + """ - def flatten_chunk(chunk: list[Iterable[U]]) -> list[U]: - return [item for iterable in chunk for item in iterable] + def flatten_generator() -> Generator[Any, None, None]: + """Generator that yields individual flattened items.""" + for chunk in self.generator: + for iterable in chunk: + yield from iterable - return Pipeline._from_chunks(flatten_chunk(chunk) for chunk in self.generator) + # Re-chunk the flattened stream to maintain consistent chunk size + return Pipeline._from_chunks(self._chunked(flatten_generator(), self.chunk_size), self.chunk_size) def concurrent( self, @@ -256,7 +294,7 @@ def unordered_generator() -> Generator[list[U], None, None]: continue gen = ordered_generator() if ordered else unordered_generator() - return Pipeline._from_chunks(gen) + return Pipeline._from_chunks(gen, self.chunk_size) @classmethod def chain(cls, *pipelines: "Pipeline[T]") -> "Pipeline[T]": @@ -274,4 +312,6 @@ def chain_generator(): for pipeline in pipelines: yield from pipeline.generator - return cls._from_chunks(chain_generator()) + # Use chunk_size from the first pipeline, or default if no pipelines + chunk_size = pipelines[0].chunk_size if pipelines else 1000 + return cls._from_chunks(chain_generator(), chunk_size) diff --git a/efemel/process.py b/efemel/process.py index aa26d74..ce6b20f 100644 --- a/efemel/process.py +++ b/efemel/process.py @@ -1,7 +1,7 @@ -import importlib.util -import sys from importlib.abc import MetaPathFinder +import importlib.util from pathlib import Path +import sys class EnvironmentModuleFinder(MetaPathFinder): diff --git a/efemel/readers/local.py b/efemel/readers/local.py index 3f29b4d..eef4cc7 100644 --- a/efemel/readers/local.py +++ b/efemel/readers/local.py @@ -1,5 +1,5 @@ -import os from glob import glob +import os from pathlib import Path diff --git a/pyproject.toml b/pyproject.toml index 6a0017a..ee38b38 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,6 +52,15 @@ indent-style = "space" skip-magic-trailing-comma = false line-ending = "auto" +[tool.ruff.lint.isort] +force-single-line = true +order-by-type = true +known-first-party = ["efemel"] +combine-as-imports = false +force-sort-within-sections = true +case-sensitive = false +split-on-trailing-comma = false + [tool.pytest.ini_options] testpaths = ["tests"] python_files = ["test_*.py", "*_test.py"] diff --git a/tests/test_cli.py b/tests/test_cli.py index f4e348e..909e2df 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,9 +1,9 @@ import json -import shutil from pathlib import Path +import shutil -import pytest from click.testing import CliRunner +import pytest from efemel.cli import cli diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 7e85607..cb6baf9 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -402,13 +402,6 @@ def test_flatten_basic(self): result = pipeline.flatten() assert result.to_list() == [1, 2, 3, 4, 5] - def test_flatten_with_strings(self): - """Test flatten with string iterables.""" - pipeline = Pipeline(["hello", "world"]) - - result = pipeline.flatten() - assert result.to_list() == ["h", "e", "l", "l", "o", "w", "o", "r", "l", "d"] - def test_flatten_empty_lists(self): """Test flatten with empty lists.""" pipeline = Pipeline([[], [1, 2], [], [3]]) @@ -423,6 +416,72 @@ def test_flatten_nested_tuples(self): result = pipeline.flatten() assert result.to_list() == [1, 2, 3, 4, 5, 6] + def test_flatten_chunked_processing(self): + """Test that flatten maintains chunked processing and doesn't consume entire pipeline.""" + + # Create a large dataset that would cause OOM if consumed all at once + def generate_nested_data(): + for i in range(1000): + yield [i * 2, i * 2 + 1] + + # Use small chunk size to verify chunked processing + pipeline = Pipeline(generate_nested_data(), chunk_size=10) + result = pipeline.flatten() + + # Verify first few elements without consuming the entire pipeline + first_10 = [] + count = 0 + for item in result: + first_10.append(item) + count += 1 + if count >= 10: + break + + assert first_10 == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + + def test_flatten_preserves_chunk_structure(self): + """Test that flatten preserves chunked structure.""" + # Create pipeline with known chunk structure + data = [[1, 2], [3, 4], [5, 6], [7, 8]] + pipeline = Pipeline(data, chunk_size=2) # 2 sublists per chunk + + result = pipeline.flatten() + + # Verify the result is correct + assert result.to_list() == [1, 2, 3, 4, 5, 6, 7, 8] + + def test_flatten_maintains_chunk_size(self): + """Test that flatten maintains the original chunk_size setting.""" + # Create test data with varying sizes of sublists + data = [[1, 2], [3, 4, 5], [6], [7, 8, 9, 10]] + chunk_size = 3 + pipeline = Pipeline(data, chunk_size=chunk_size) + + # Flatten the pipeline + flattened = pipeline.flatten() + + # Verify chunk_size is preserved + assert flattened.chunk_size == chunk_size + + # Collect chunks to verify structure + chunks = [] + for chunk in flattened.generator: + chunks.append(chunk) + + # Verify all chunks except the last have the expected size + for i, chunk in enumerate(chunks[:-1]): + assert len(chunk) == chunk_size, f"Chunk {i} has size {len(chunk)}, expected {chunk_size}" + + # Last chunk can be smaller than chunk_size + if chunks: + assert len(chunks[-1]) <= chunk_size, f"Last chunk has size {len(chunks[-1])}, should be <= {chunk_size}" + + # Verify the final result is correct + result = [] + for chunk in chunks: + result.extend(chunk) + assert result == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + class TestPipelineEdgeCases: """Test Pipeline edge cases and error conditions."""