Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions efemel/cli.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
import os
import shutil
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import shutil

import click

Expand Down
2 changes: 1 addition & 1 deletion efemel/hooks_manager.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# hooks_manager.py
from collections.abc import Callable
import importlib.util
import os
import sys
from collections.abc import Callable
from typing import Any


Expand Down
70 changes: 55 additions & 15 deletions efemel/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,20 @@
"""

from collections import deque
from collections.abc import Callable, Generator, Iterable
from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait
from collections.abc import Callable
from collections.abc import Generator
from collections.abc import Iterable
from concurrent.futures import FIRST_COMPLETED
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import wait
from itertools import chain
from typing import Any, Self, TypeVar
from typing import Any
from typing import Self
from typing import TypeVar
from typing import overload

T = TypeVar("T") # Type variable for the elements in the pipeline
U = TypeVar("U") # Type variable for transformed elements
V = TypeVar("V") # Type variable for additional transformations


class Pipeline[T]:
Expand Down Expand Up @@ -49,6 +55,8 @@ def __init__(self, source: Iterable[T], chunk_size: int = 1000) -> None:
else:
self.generator = self._chunked(source, chunk_size)

self.chunk_size = chunk_size

@staticmethod
def _chunked(iterable: Iterable[T], size: int) -> Generator[list[T], None, None]:
"""Break an iterable into chunks of specified size."""
Expand All @@ -62,10 +70,11 @@ def _chunked(iterable: Iterable[T], size: int) -> Generator[list[T], None, None]
yield chunk

@classmethod
def _from_chunks(cls, chunks: Iterable[list[T]]) -> "Pipeline[T]":
def _from_chunks(cls, chunks: Iterable[list[T]], chunk_size: int = 1000) -> "Pipeline[T]":
"""Create a pipeline directly from an iterable of chunks."""
p = cls([])
p.generator = (chunk for chunk in chunks)
p.chunk_size = chunk_size
return p

@classmethod
Expand Down Expand Up @@ -111,15 +120,15 @@ def filter(self, predicate: Callable[[T], bool]) -> "Pipeline[T]":
def filter_chunk(chunk: list[T]) -> list[T]:
return [x for x in chunk if predicate(x)]

return Pipeline._from_chunks(filter_chunk(chunk) for chunk in self.generator)
return Pipeline._from_chunks((filter_chunk(chunk) for chunk in self.generator), self.chunk_size)

def map(self, function: Callable[[T], U]) -> "Pipeline[U]":
"""Transform elements using a function, applied per chunk."""

def map_chunk(chunk: list[T]) -> list[U]:
return [function(x) for x in chunk]

return Pipeline._from_chunks(map_chunk(chunk) for chunk in self.generator)
return Pipeline._from_chunks((map_chunk(chunk) for chunk in self.generator), self.chunk_size)

def reduce(self, function: Callable[[U, T], U], initial: U) -> "Pipeline[U]":
"""Reduce elements to a single value using the given function."""
Expand All @@ -137,7 +146,7 @@ def tap_chunk(chunk: list[T]) -> list[T]:
function(item)
return chunk

return Pipeline._from_chunks(tap_chunk(chunk) for chunk in self.generator)
return Pipeline._from_chunks((tap_chunk(chunk) for chunk in self.generator), self.chunk_size)

def each(self, function: Callable[[T], Any]) -> None:
"""Apply function to each element (terminal operation)."""
Expand All @@ -162,13 +171,42 @@ def apply(self, *functions: Callable[[Self], "Pipeline[U]"]) -> "Pipeline[U]":
result = function(result)
return result

def flatten(self: "Pipeline[Iterable[U]]") -> "Pipeline[U]":
"""Flatten pipeline of iterables into single pipeline."""
@overload
def flatten(self: "Pipeline[list[U]]") -> "Pipeline[U]": ...

@overload
def flatten(self: "Pipeline[tuple[U, ...]]") -> "Pipeline[U]": ...

@overload
def flatten(self: "Pipeline[set[U]]") -> "Pipeline[U]": ...

def flatten(
self: "Pipeline[list[U]] | Pipeline[tuple[U, ...]] | Pipeline[set[U]]",
) -> "Pipeline[Any]":
"""Flatten iterable chunks into a single pipeline of elements.

This method flattens each chunk of iterables and maintains the chunked
structure to avoid memory issues with large datasets. After flattening,
the data is re-chunked to maintain the original chunk_size.

Example:
[[1, 2], [3, 4]] -> [1, 2, 3, 4]
[(1, 2), (3, 4)] -> [1, 2, 3, 4]

Note:
We need to overload this method to handle different iterable types because
using Iterable[U] does not preserve the type information for the flattened elements.
It returns Pipeline[Any] instead of Pipeline[U], which is incorrect.
"""

def flatten_chunk(chunk: list[Iterable[U]]) -> list[U]:
return [item for iterable in chunk for item in iterable]
def flatten_generator() -> Generator[Any, None, None]:
"""Generator that yields individual flattened items."""
for chunk in self.generator:
for iterable in chunk:
yield from iterable

return Pipeline._from_chunks(flatten_chunk(chunk) for chunk in self.generator)
# Re-chunk the flattened stream to maintain consistent chunk size
return Pipeline._from_chunks(self._chunked(flatten_generator(), self.chunk_size), self.chunk_size)

def concurrent(
self,
Expand Down Expand Up @@ -256,7 +294,7 @@ def unordered_generator() -> Generator[list[U], None, None]:
continue

gen = ordered_generator() if ordered else unordered_generator()
return Pipeline._from_chunks(gen)
return Pipeline._from_chunks(gen, self.chunk_size)

@classmethod
def chain(cls, *pipelines: "Pipeline[T]") -> "Pipeline[T]":
Expand All @@ -274,4 +312,6 @@ def chain_generator():
for pipeline in pipelines:
yield from pipeline.generator

return cls._from_chunks(chain_generator())
# Use chunk_size from the first pipeline, or default if no pipelines
chunk_size = pipelines[0].chunk_size if pipelines else 1000
return cls._from_chunks(chain_generator(), chunk_size)
4 changes: 2 additions & 2 deletions efemel/process.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import importlib.util
import sys
from importlib.abc import MetaPathFinder
import importlib.util
from pathlib import Path
import sys


class EnvironmentModuleFinder(MetaPathFinder):
Expand Down
2 changes: 1 addition & 1 deletion efemel/readers/local.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from glob import glob
import os
from pathlib import Path


Expand Down
9 changes: 9 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ indent-style = "space"
skip-magic-trailing-comma = false
line-ending = "auto"

[tool.ruff.lint.isort]
force-single-line = true
order-by-type = true
known-first-party = ["efemel"]
combine-as-imports = false
force-sort-within-sections = true
case-sensitive = false
split-on-trailing-comma = false

[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py", "*_test.py"]
Expand Down
4 changes: 2 additions & 2 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import json
import shutil
from pathlib import Path
import shutil

import pytest
from click.testing import CliRunner
import pytest

from efemel.cli import cli

Expand Down
73 changes: 66 additions & 7 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,13 +402,6 @@ def test_flatten_basic(self):
result = pipeline.flatten()
assert result.to_list() == [1, 2, 3, 4, 5]

def test_flatten_with_strings(self):
"""Test flatten with string iterables."""
pipeline = Pipeline(["hello", "world"])

result = pipeline.flatten()
assert result.to_list() == ["h", "e", "l", "l", "o", "w", "o", "r", "l", "d"]

def test_flatten_empty_lists(self):
"""Test flatten with empty lists."""
pipeline = Pipeline([[], [1, 2], [], [3]])
Expand All @@ -423,6 +416,72 @@ def test_flatten_nested_tuples(self):
result = pipeline.flatten()
assert result.to_list() == [1, 2, 3, 4, 5, 6]

def test_flatten_chunked_processing(self):
"""Test that flatten maintains chunked processing and doesn't consume entire pipeline."""

# Create a large dataset that would cause OOM if consumed all at once
def generate_nested_data():
for i in range(1000):
yield [i * 2, i * 2 + 1]

# Use small chunk size to verify chunked processing
pipeline = Pipeline(generate_nested_data(), chunk_size=10)
result = pipeline.flatten()

# Verify first few elements without consuming the entire pipeline
first_10 = []
count = 0
for item in result:
first_10.append(item)
count += 1
if count >= 10:
break

assert first_10 == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

def test_flatten_preserves_chunk_structure(self):
"""Test that flatten preserves chunked structure."""
# Create pipeline with known chunk structure
data = [[1, 2], [3, 4], [5, 6], [7, 8]]
pipeline = Pipeline(data, chunk_size=2) # 2 sublists per chunk

result = pipeline.flatten()

# Verify the result is correct
assert result.to_list() == [1, 2, 3, 4, 5, 6, 7, 8]

def test_flatten_maintains_chunk_size(self):
"""Test that flatten maintains the original chunk_size setting."""
# Create test data with varying sizes of sublists
data = [[1, 2], [3, 4, 5], [6], [7, 8, 9, 10]]
chunk_size = 3
pipeline = Pipeline(data, chunk_size=chunk_size)

# Flatten the pipeline
flattened = pipeline.flatten()

# Verify chunk_size is preserved
assert flattened.chunk_size == chunk_size

# Collect chunks to verify structure
chunks = []
for chunk in flattened.generator:
chunks.append(chunk)

# Verify all chunks except the last have the expected size
for i, chunk in enumerate(chunks[:-1]):
assert len(chunk) == chunk_size, f"Chunk {i} has size {len(chunk)}, expected {chunk_size}"

# Last chunk can be smaller than chunk_size
if chunks:
assert len(chunks[-1]) <= chunk_size, f"Last chunk has size {len(chunks[-1])}, should be <= {chunk_size}"

# Verify the final result is correct
result = []
for chunk in chunks:
result.extend(chunk)
assert result == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


class TestPipelineEdgeCases:
"""Test Pipeline edge cases and error conditions."""
Expand Down
Loading