diff --git a/efemel/pipeline.py b/efemel/pipeline.py new file mode 100644 index 0000000..bef2eb3 --- /dev/null +++ b/efemel/pipeline.py @@ -0,0 +1,256 @@ +""" +Pipeline module for functional data processing. + +This module provides a Pipeline class that enables functional programming patterns +for data transformation and processing. It allows chaining operations like map, filter, +reduce, and more in a fluent interface style. + +Example: + >>> pipeline = Pipeline([1, 2, 3, 4, 5]) + >>> result = pipeline.filter(lambda x: x % 2 == 0).map(lambda x: x * 2).to_list() + >>> print(result) # [4, 8] +""" + +import functools +from collections.abc import Callable, Generator, Iterable +from typing import Any, Self, TypeVar, cast + +T = TypeVar("T") # Type variable for the elements in the pipeline +U = TypeVar("U") # Type variable for transformed elements +V = TypeVar("V") # Type variable for additional transformations + + +class Pipeline[T]: + """ + A functional pipeline for data processing with method chaining. + + The Pipeline class wraps an iterable and provides a fluent interface for + applying transformations, filters, and reductions in a functional programming style. + + Type Parameters: + T: The type of elements in the pipeline + + Attributes: + generator: The underlying iterable that provides the data source + + Example: + >>> data = [1, 2, 3, 4, 5] + >>> pipeline = Pipeline(data) + >>> result = (pipeline + ... .filter(lambda x: x > 2) + ... .map(lambda x: x ** 2) + ... .to_list()) + >>> print(result) # [9, 16, 25] + """ + + generator: Iterable[T] + + def __init__(self, source: Iterable[T]): + """ + Initialize a new Pipeline with the given data source. + + Args: + source: An iterable that provides the data for the pipeline + """ + self.generator = source + + def __next__(self) -> T: + """ + Get the next item from the pipeline. + + Returns: + The next item in the pipeline + + Raises: + StopIteration: When there are no more items + """ + return next(iter(self.generator)) + + def __iter__(self) -> Generator[T, None, None]: + """ + Return an iterator over the pipeline elements. + + Returns: + A generator that yields the pipeline elements + """ + yield from self.generator + + def to_list(self) -> list[T]: + """ + Convert the pipeline to a list. + + Returns: + A list containing all elements from the pipeline + + Example: + >>> Pipeline([1, 2, 3]).to_list() + [1, 2, 3] + """ + return list(self) + + def first(self) -> T: + """ + Get the first element from the pipeline. + + Returns: + The first element in the pipeline + + Raises: + StopIteration: If the pipeline is empty + + Example: + >>> Pipeline([1, 2, 3]).first() + 1 + """ + return next(iter(self.generator)) + + def filter( + self, + predicate: Callable[[T], bool], + ) -> "Pipeline[T]": + """ + Filter pipeline elements based on a predicate function. + + Args: + predicate: A function that takes an element and returns True to keep it + + Returns: + A new pipeline containing only elements that satisfy the predicate + + Example: + >>> Pipeline([1, 2, 3, 4]).filter(lambda x: x % 2 == 0).to_list() + [2, 4] + """ + return Pipeline(item for item in self if predicate(item)) + + def map( + self, + function: Callable[[T], U], + ) -> "Pipeline[U]": + """ + Transform each element in the pipeline using the given function. + + Args: + function: A function that transforms each element + + Returns: + A new pipeline with transformed elements + + Example: + >>> Pipeline([1, 2, 3]).map(lambda x: x * 2).to_list() + [2, 4, 6] + """ + return Pipeline( + map( + function, + self.generator, + ) + ) + + def reduce(self, function: Callable[[U, T], U], initial: U) -> "Pipeline[U]": + """ + Reduce the pipeline to a single value using the given function. + + Args: + function: A function that takes an accumulator and current element + initial: The initial value for the accumulator + + Returns: + A new pipeline containing the single reduced value + + Example: + >>> Pipeline([1, 2, 3, 4]).reduce(lambda acc, x: acc + x, 0).first() + 10 + """ + return Pipeline([functools.reduce(cast(Callable[[T, U], U], function), self, initial)]) + + def tap(self, function: Callable[[T], Any]) -> Self: + """ + Execute a side effect for each element without modifying the pipeline. + + Args: + function: A function to execute for each element (side effect) + + Returns: + The same pipeline (for method chaining) + + Example: + >>> Pipeline([1, 2, 3]).tap(print).map(lambda x: x * 2).to_list() + 1 + 2 + 3 + [2, 4, 6] + """ + + def f(x: T) -> T: + function(x) + return x + + return type(self)(self.map(f)) + + def each(self, function: Callable[[T], Any]) -> None: + """ + Execute a function for each element in the pipeline (terminal operation). + + Args: + function: A function to execute for each element + + Example: + >>> Pipeline([1, 2, 3]).each(print) + 1 + 2 + 3 + """ + for item in self.generator: + function(item) + + def passthrough(self) -> Self: + """ + Return the pipeline unchanged (identity operation). + + Returns: + The same pipeline instance + + Example: + >>> pipeline = Pipeline([1, 2, 3]) + >>> same = pipeline.passthrough() + >>> pipeline is same + True + """ + return self + + def apply(self, *functions: Callable[[Self], "Pipeline[U]"]) -> "Pipeline[U]": + """ + Apply a sequence of functions to the pipeline. + + Args: + *functions: Functions that transform the pipeline + + Returns: + The pipeline after applying all functions + + Example: + >>> def double(p): return p.map(lambda x: x * 2) + >>> def filter_even(p): return p.filter(lambda x: x % 2 == 0) + >>> Pipeline([1, 2, 3]).apply(double, filter_even).to_list() + [2, 4, 6] + """ + result: Pipeline[T] = self + + for function in functions: + result = function(result) + + return result + + def flatten(self: "Pipeline[Iterable[U]]") -> "Pipeline[U]": + """ + Flatten a pipeline of iterables into a single pipeline. + + Returns: + A new pipeline with all nested elements flattened + + Example: + >>> Pipeline([[1, 2], [3, 4], [5]]).flatten().to_list() + [1, 2, 3, 4, 5] + """ + return Pipeline(x_i for x in self for x_i in x) diff --git a/example_before_hooks.py b/example_before_hooks.py deleted file mode 100644 index 8f695f1..0000000 --- a/example_before_hooks.py +++ /dev/null @@ -1,34 +0,0 @@ -""" -Example hooks file demonstrating before hook functionality. -Functions that start with 'before_' are automatically registered as before hooks. -""" - - -def before_output_filename(context): - """Before hook that runs first - validates input.""" - input_path = context["input_file_path"] - print(f"BEFORE: Validating input file: {input_path}") - - # Example validation - if not input_path.exists(): - raise FileNotFoundError(f"Input file does not exist: {input_path}") - - print("BEFORE: Input file validation passed") - - -def output_filename(context): - """Regular hook that runs after before hooks.""" - print(f"REGULAR: Processing output filename for {context['input_file_path']}") - - # Example: add a prefix to the output filename - output_path = context["output_file_path"] - new_name = f"processed_{output_path.name}" - context["output_file_path"] = output_path.with_name(new_name) - - print(f"REGULAR: Added prefix -> {context['output_file_path']}") - - -def before_output_filename_logging(context): - """Another before hook - logs the start of processing.""" - print(f"BEFORE: Starting to process {context['input_file_path']}") - print(f"BEFORE: Original output path: {context['output_file_path']}") diff --git a/test_class_hooks.py b/test_class_hooks.py deleted file mode 100644 index 2fe0a08..0000000 --- a/test_class_hooks.py +++ /dev/null @@ -1,39 +0,0 @@ -""" -Example usage of the new class-based hooks manager. -""" - -from efemel.hooks_manager import HooksManager - -# Create a new instance -manager = HooksManager() - - -# Example hooks -def before_test(context): - print(f"BEFORE: {context['message']}") - context["step"] = 1 - - -def regular_test(context): - print(f"REGULAR: {context['message']}, step: {context['step']}") - context["step"] = 2 - - -def another_test(context): - print(f"ANOTHER: {context['message']}, step: {context['step']}") - context["result"] = "processed" - - -# Register hooks using the new method names -manager.add_before("test", before_test) -manager.add("test", regular_test) -manager.add("test", another_test) - -# Test the hooks -context = {"message": "Hello World"} -result_context, result_value = manager.call("test", context, ["result"]) - -print(f"Final context: {context}") -print(f"Returned result: {result_value}") -print(f"Hook count: {manager.get_count('test')}") -print(f"All hooks: {manager.list()}") diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py new file mode 100644 index 0000000..4253f73 --- /dev/null +++ b/tests/test_pipeline.py @@ -0,0 +1,561 @@ +""" +Test suite for the Pipeline class. + +This module contains comprehensive tests for the Pipeline class functionality +including all methods, edge cases, and error conditions. +""" + +import pytest + +from efemel.pipeline import Pipeline + + +class TestPipelineBasics: + """Test basic Pipeline functionality.""" + + def test_pipeline_initialization(self): + """Test Pipeline initialization with various iterables.""" + # Test with list + pipeline = Pipeline([1, 2, 3, 4, 5]) + assert isinstance(pipeline, Pipeline) + assert pipeline.generator == [1, 2, 3, 4, 5] + + # Test with tuple + pipeline = Pipeline((1, 2, 3)) + assert list(pipeline) == [1, 2, 3] + + # Test with generator + def gen(): + yield from range(1, 4) + + pipeline = Pipeline(gen()) + assert list(pipeline) == [1, 2, 3] + + # Test with empty list + pipeline = Pipeline([]) + assert list(pipeline) == [] + + def test_pipeline_iteration(self): + """Test Pipeline iteration behavior.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + + # Test iteration + result = [] + for item in pipeline: + result.append(item) + + assert result == [1, 2, 3, 4, 5] + + def test_to_list(self): + """Test Pipeline.to_list() method.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + result = pipeline.to_list() + + assert result == [1, 2, 3, 4, 5] + assert isinstance(result, list) + + # Test with empty pipeline + empty_pipeline = Pipeline([]) + assert empty_pipeline.to_list() == [] + + def test_first(self): + """Test Pipeline.first() method.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + assert pipeline.first() == 1 + + # Test with string + str_pipeline = Pipeline(["hello", "world"]) + assert str_pipeline.first() == "hello" + + def test_first_empty_pipeline(self): + """Test Pipeline.first() with empty pipeline.""" + empty_pipeline = Pipeline([]) + with pytest.raises(StopIteration): + empty_pipeline.first() + + +class TestPipelineFiltering: + """Test Pipeline filtering functionality.""" + + def test_filter_basic(self): + """Test basic filtering operations.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + + # Filter even numbers + even_pipeline = pipeline.filter(lambda x: x % 2 == 0) + assert even_pipeline.to_list() == [2, 4] + + # Filter numbers greater than 3 + gt3_pipeline = pipeline.filter(lambda x: x > 3) + assert gt3_pipeline.to_list() == [4, 5] + + def test_filter_with_strings(self): + """Test filtering with string data.""" + pipeline = Pipeline(["hello", "world", "python", "test"]) + + # Filter strings longer than 4 characters + long_strings = pipeline.filter(lambda s: len(s) > 4) + assert long_strings.to_list() == ["hello", "world", "python"] + + # Filter strings starting with 'p' + p_strings = pipeline.filter(lambda s: s.startswith("p")) + assert p_strings.to_list() == ["python"] + + def test_filter_empty_result(self): + """Test filter that results in empty pipeline.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + + # Filter that matches nothing + empty_result = pipeline.filter(lambda x: x > 10) + assert empty_result.to_list() == [] + + def test_filter_all_pass(self): + """Test filter where all items pass.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + + # Filter that passes everything + all_pass = pipeline.filter(lambda x: True) + assert all_pass.to_list() == [1, 2, 3, 4, 5] + + def test_filter_chaining(self): + """Test chaining multiple filters.""" + pipeline = Pipeline([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) + + # Chain filters: even numbers and greater than 5 + result = pipeline.filter(lambda x: x % 2 == 0).filter(lambda x: x > 5) + assert result.to_list() == [6, 8, 10] + + +class TestPipelineMapping: + """Test Pipeline mapping functionality.""" + + def test_map_basic(self): + """Test basic mapping operations.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + + # Double each number + doubled = pipeline.map(lambda x: x * 2) + assert doubled.to_list() == [2, 4, 6, 8, 10] + + # Square each number + squared = pipeline.map(lambda x: x**2) + assert squared.to_list() == [1, 4, 9, 16, 25] + + def test_map_type_transformation(self): + """Test mapping with type transformation.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + + # Convert numbers to strings + str_pipeline = pipeline.map(str) + assert str_pipeline.to_list() == ["1", "2", "3", "4", "5"] + + # Convert to boolean (non-zero is True) + bool_pipeline = pipeline.map(bool) + assert bool_pipeline.to_list() == [True, True, True, True, True] + + def test_map_with_strings(self): + """Test mapping with string data.""" + pipeline = Pipeline(["hello", "world", "python"]) + + # Convert to uppercase + upper_pipeline = pipeline.map(str.upper) + assert upper_pipeline.to_list() == ["HELLO", "WORLD", "PYTHON"] + + # Get string lengths + len_pipeline = pipeline.map(len) + assert len_pipeline.to_list() == [5, 5, 6] + + def test_map_chaining(self): + """Test chaining multiple map operations.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + + # Chain maps: double, then add 1 + result = pipeline.map(lambda x: x * 2).map(lambda x: x + 1) + assert result.to_list() == [3, 5, 7, 9, 11] + + def test_map_empty_pipeline(self): + """Test mapping on empty pipeline.""" + empty_pipeline = Pipeline([]) + + # Map should return empty result + result = empty_pipeline.map(lambda x: x * 2) + assert result.to_list() == [] + + +class TestPipelineMapFilter: + """Test Pipeline map and filter combinations.""" + + def test_map_then_filter(self): + """Test mapping then filtering.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + + # Double numbers, then filter even results + result = pipeline.map(lambda x: x * 2).filter(lambda x: x % 2 == 0) + assert result.to_list() == [2, 4, 6, 8, 10] + + def test_filter_then_map(self): + """Test filtering then mapping.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + + # Filter even numbers, then double them + result = pipeline.filter(lambda x: x % 2 == 0).map(lambda x: x * 2) + assert result.to_list() == [4, 8] + + def test_complex_map_filter_chain(self): + """Test complex chaining of map and filter operations.""" + pipeline = Pipeline([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) + + # Complex chain: filter odd, multiply by 3, filter > 10 + result = ( + pipeline.filter(lambda x: x % 2 == 1) # [1, 3, 5, 7, 9] + .map(lambda x: x * 3) # [3, 9, 15, 21, 27] + .filter(lambda x: x > 10) + ) # [15, 21, 27] + + assert result.to_list() == [15, 21, 27] + + +class TestPipelineReduce: + """Test Pipeline reduce functionality.""" + + def test_reduce_basic(self): + """Test basic reduce operations.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + + # Sum all numbers + sum_result = pipeline.reduce(lambda acc, x: acc + x, 0) + assert sum_result.first() == 15 + + # Multiply all numbers + product_result = pipeline.reduce(lambda acc, x: acc * x, 1) + assert product_result.first() == 120 + + def test_reduce_with_strings(self): + """Test reduce with string data.""" + pipeline = Pipeline(["hello", "world", "python"]) + + # Concatenate strings + concat_result = pipeline.reduce(lambda acc, x: acc + " " + x, "") + assert concat_result.first() == " hello world python" + + # Join with commas + join_result = pipeline.reduce(lambda acc, x: acc + "," + x if acc else x, "") + assert join_result.first() == "hello,world,python" + + def test_reduce_empty_pipeline(self): + """Test reduce on empty pipeline.""" + empty_pipeline = Pipeline([]) + + # Reduce should return initial value + result = empty_pipeline.reduce(lambda acc, x: acc + x, 10) + assert result.first() == 10 + + def test_reduce_single_item(self): + """Test reduce with single item.""" + single_pipeline = Pipeline([42]) + + # Should combine initial value with single item + result = single_pipeline.reduce(lambda acc, x: acc + x, 10) + assert result.first() == 52 + + +class TestPipelineTap: + """Test Pipeline tap functionality.""" + + def test_tap_basic(self): + """Test basic tap operations.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + side_effects = [] + + # Tap to collect side effects + result = pipeline.tap(side_effects.append) + result_list = result.to_list() + + assert result_list == [1, 2, 3, 4, 5] + assert side_effects == [1, 2, 3, 4, 5] + + def test_tap_with_print(self): + """Test tap with print (no assertion needed, just verify it works).""" + pipeline = Pipeline([1, 2, 3]) + + # This should not raise any exceptions + result = pipeline.tap(lambda x: None) # Mock print + assert result.to_list() == [1, 2, 3] + + def test_tap_chaining(self): + """Test tap in a chain of operations.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + side_effects = [] + + # Tap in middle of chain + result = pipeline.map(lambda x: x * 2).tap(side_effects.append).filter(lambda x: x > 5) + + result_list = result.to_list() + + assert result_list == [6, 8, 10] + assert side_effects == [2, 4, 6, 8, 10] + + def test_tap_doesnt_modify_pipeline(self): + """Test that tap doesn't modify the pipeline data.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + + # Tap with function that would modify if it could + result = pipeline.tap(lambda x: x * 1000) + + # Data should be unchanged + assert result.to_list() == [1, 2, 3, 4, 5] + + +class TestPipelineEach: + """Test Pipeline each functionality.""" + + def test_each_basic(self): + """Test basic each operations.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + side_effects = [] + + # Each should execute function for each item + pipeline.each(side_effects.append) + + assert side_effects == [1, 2, 3, 4, 5] + + def test_each_returns_none(self): + """Test that each returns None.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + + result = pipeline.each(lambda x: None) + assert result is None + + def test_each_with_empty_pipeline(self): + """Test each with empty pipeline.""" + empty_pipeline = Pipeline([]) + side_effects = [] + + # Should not execute function + empty_pipeline.each(side_effects.append) + + assert side_effects == [] + + def test_each_terminal_operation(self): + """Test that each is a terminal operation.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + side_effects = [] + + # After each, pipeline should be consumed + pipeline.each(side_effects.append) + + assert side_effects == [1, 2, 3, 4, 5] + + +class TestPipelineUtility: + """Test Pipeline utility methods.""" + + def test_passthrough(self): + """Test passthrough method.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + + # Passthrough should return the same pipeline + result = pipeline.passthrough() + assert result is pipeline + + # Data should be unchanged + assert result.to_list() == [1, 2, 3, 4, 5] + + def test_apply_single_function(self): + """Test apply with single function.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + + def double_pipeline(p): + return p.map(lambda x: x * 2) + + result = pipeline.apply(double_pipeline) + assert result.to_list() == [2, 4, 6, 8, 10] + + def test_apply_multiple_functions(self): + """Test apply with multiple functions.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + + def double_pipeline(p): + return p.map(lambda x: x * 2) + + def filter_even(p): + return p.filter(lambda x: x % 2 == 0) + + result = pipeline.apply(double_pipeline, filter_even) + assert result.to_list() == [2, 4, 6, 8, 10] + + def test_apply_no_functions(self): + """Test apply with no functions.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + + # Should return the same pipeline + result = pipeline.apply() + assert result.to_list() == [1, 2, 3, 4, 5] + + def test_flatten_basic(self): + """Test basic flatten operation.""" + pipeline = Pipeline([[1, 2], [3, 4], [5]]) + + result = pipeline.flatten() + assert result.to_list() == [1, 2, 3, 4, 5] + + def test_flatten_with_strings(self): + """Test flatten with string iterables.""" + pipeline = Pipeline(["hello", "world"]) + + result = pipeline.flatten() + assert result.to_list() == ["h", "e", "l", "l", "o", "w", "o", "r", "l", "d"] + + def test_flatten_empty_lists(self): + """Test flatten with empty lists.""" + pipeline = Pipeline([[], [1, 2], [], [3]]) + + result = pipeline.flatten() + assert result.to_list() == [1, 2, 3] + + def test_flatten_nested_tuples(self): + """Test flatten with nested tuples.""" + pipeline = Pipeline([(1, 2), (3, 4, 5), (6,)]) + + result = pipeline.flatten() + assert result.to_list() == [1, 2, 3, 4, 5, 6] + + +class TestPipelineEdgeCases: + """Test Pipeline edge cases and error conditions.""" + + def test_pipeline_with_none_values(self): + """Test pipeline with None values.""" + pipeline = Pipeline([1, None, 3, None, 5]) + + # Should handle None values properly + result = pipeline.to_list() + assert result == [1, None, 3, None, 5] + + # Filter out None values + no_none = pipeline.filter(lambda x: x is not None) + assert no_none.to_list() == [1, 3, 5] + + def test_pipeline_with_mixed_types(self): + """Test pipeline with mixed data types.""" + pipeline = Pipeline([1, "hello", 3.14, True, None]) + + # Should handle mixed types + result = pipeline.to_list() + assert result == [1, "hello", 3.14, True, None] + + # Filter by type (excluding boolean which is a subclass of int) + numbers = pipeline.filter(lambda x: isinstance(x, int | float) and not isinstance(x, bool) and x is not None) + assert numbers.to_list() == [1, 3.14] + + def test_pipeline_reuse(self): + """Test that pipelines can be reused.""" + original_data = [1, 2, 3, 4, 5] + pipeline = Pipeline(original_data) + + # First use + result1 = pipeline.map(lambda x: x * 2).to_list() + assert result1 == [2, 4, 6, 8, 10] + + # Create new pipeline from same data + pipeline2 = Pipeline(original_data) + result2 = pipeline2.filter(lambda x: x % 2 == 0).to_list() + assert result2 == [2, 4] + + def test_pipeline_method_chaining_returns_new_instances(self): + """Test that pipeline methods return new instances.""" + original = Pipeline([1, 2, 3, 4, 5]) + + # Methods should return new instances (except passthrough) + mapped = original.map(lambda x: x * 2) + filtered = original.filter(lambda x: x % 2 == 0) + + assert mapped is not original + assert filtered is not original + assert mapped is not filtered + + def test_pipeline_with_generator_exhaustion(self): + """Test pipeline behavior with generator exhaustion.""" + + def number_generator(): + yield from range(3) + + pipeline = Pipeline(number_generator()) + + # First consumption + result1 = pipeline.to_list() + assert result1 == [0, 1, 2] + + # Generator should be exhausted now + # This behavior depends on the implementation + + +class TestPipelineIntegration: + """Integration tests for Pipeline class.""" + + def test_data_processing_pipeline(self): + """Test a realistic data processing pipeline.""" + # Simulate processing a list of user data + users = [ + {"name": "Alice", "age": 30, "active": True}, + {"name": "Bob", "age": 25, "active": False}, + {"name": "Charlie", "age": 35, "active": True}, + {"name": "Diana", "age": 28, "active": True}, + {"name": "Eve", "age": 22, "active": False}, + ] + + pipeline = Pipeline(users) + + # Process: filter active users, extract names, convert to uppercase + result = pipeline.filter(lambda user: user["active"]).map(lambda user: user["name"]).map(str.upper) + + assert result.to_list() == ["ALICE", "CHARLIE", "DIANA"] + + def test_number_processing_pipeline(self): + """Test a number processing pipeline.""" + numbers = range(1, 21) # 1 to 20 + + pipeline = Pipeline(numbers) + + # Process: filter even numbers, square them, filter > 50, sum + result = ( + pipeline.filter(lambda x: x % 2 == 0) # [2, 4, 6, 8, 10, 12, 14, 16, 18, 20] + .map(lambda x: x**2) # [4, 16, 36, 64, 100, 144, 196, 256, 324, 400] + .filter(lambda x: x > 50) # [64, 100, 144, 196, 256, 324, 400] + .reduce(lambda acc, x: acc + x, 0) + ) # 1484 + + assert result.first() == 1484 + + def test_text_processing_pipeline(self): + """Test a text processing pipeline.""" + text = "Hello world! This is a test. Python is amazing." + words = text.split() + + pipeline = Pipeline(words) + + # Process: filter words > 3 chars, remove punctuation, lowercase, get unique + result = ( + pipeline.filter(lambda word: len(word) > 3) + .map(lambda word: word.strip(".,!")) + .map(str.lower) + .filter(lambda word: word not in ["this"]) + ) # Simple "unique" filter + + expected = ["hello", "world", "test", "python", "amazing"] + assert result.to_list() == expected + + def test_nested_data_processing(self): + """Test processing nested data structures.""" + data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]] + + pipeline = Pipeline(data) + + # Flatten, filter odd numbers, square them + result = ( + pipeline.flatten() # [1, 2, 3, 4, 5, 6, 7, 8, 9] + .filter(lambda x: x % 2 == 1) # [1, 3, 5, 7, 9] + .map(lambda x: x**2) + ) # [1, 9, 25, 49, 81] + + assert result.to_list() == [1, 9, 25, 49, 81]