From 8aca39894442fce04c99783532a4d3e7e2607f50 Mon Sep 17 00:00:00 2001 From: ringoldsdev Date: Tue, 8 Jul 2025 19:39:17 +0000 Subject: [PATCH 1/4] feat: migrated typescript POC to python --- efemel/pipeline.py | 487 +++++++++++++++++++++++++++++++++++++++++ tests/test_pipeline.py | 353 +++++++++++++++++++++++++++++ 2 files changed, 840 insertions(+) create mode 100644 efemel/pipeline.py create mode 100644 tests/test_pipeline.py diff --git a/efemel/pipeline.py b/efemel/pipeline.py new file mode 100644 index 0000000..4dcdda3 --- /dev/null +++ b/efemel/pipeline.py @@ -0,0 +1,487 @@ +from collections.abc import Callable +from typing import Any, TypeVar + +T = TypeVar("T") +U = TypeVar("U") +V = TypeVar("V") + + +# Special symbols for reduce operations +class SymbolEnd: + """ + Special symbol used to signal the end of a stream in reduce operations. + When this symbol is passed to a reduce transformation, it triggers the + final result to be emitted. + """ + + def __repr__(self): + return "SymbolEnd" + + +class SymbolReset: + """ + Special symbol used to reset the accumulator in reduce operations. + When this symbol is passed to a reduce transformation, it resets the + accumulator back to its initial value. + """ + + def __repr__(self): + return "SymbolReset" + + +SYMBOL_END = SymbolEnd() +SYMBOL_RESET = SymbolReset() + + +class Pipeline: + """ + A functional pipeline that chains transformations together in a composable way. + + This class implements a callback-based transformation pipeline similar to TypeScript's + transformers.ts. Each transformation is built up through method chaining, creating + a single composed transformation function that can be executed with `.run()`. + + Key Concepts: + - **Chaining**: Methods return self, allowing for fluent API usage + - **Callback-based**: Each transformation uses success/failure callbacks + - **Composable**: Transformations are built up by wrapping previous transformations + - **Failure handling**: Any step can fail, causing the entire pipeline to fail + + Basic Usage: + ```python + # Simple transformation pipeline + pipeline = Pipeline() + result = pipeline.map(lambda x: x * 2).filter(lambda x: x > 10).run(6) + # result = 12 (6 * 2 = 12, 12 > 10 is True) + + # Pipeline that fails + result = pipeline.map(lambda x: x * 2).filter(lambda x: x > 20).run(6) + # result = None (6 * 2 = 12, 12 > 20 is False, so filter fails) + ``` + + Advanced Usage: + ```python + # Working with lists + pipeline = Pipeline() + result = pipeline.multi(lambda p: p.map(lambda x: x.upper())).run(["hello", "world"]) + # result = ["HELLO", "WORLD"] + + # Reduce operations + pipeline = Pipeline() + pipeline.reduce(lambda acc, x: acc + x, 0) + pipeline.run(5) # Accumulates: 0 + 5 = 5 + pipeline.run(3) # Accumulates: 5 + 3 = 8 + result = pipeline.run(SYMBOL_END) # result = 8 + ``` + """ + + def __init__(self): + # Internal transformation function that gets built up through chaining + self._transformation = self._create_identity_transformation() + + def _create_identity_transformation(self): + """ + Create the base identity transformation that simply passes input to output. + + This is the foundation that all other transformations build upon. + It takes an input value and immediately calls the success callback with it. + + Returns: + A transformation function that accepts (input_value, success_callback, failure_callback) + """ + + def transform(input_value, success_callback, failure_callback): + try: + success_callback(input_value) + except Exception: + failure_callback() + + return transform + + def identity(self): + """ + Reset the pipeline to identity transformation. + + This method resets the pipeline back to its initial state, effectively + clearing all previously chained transformations. + + Returns: + self: For method chaining + + Example: + ```python + pipeline = Pipeline() + pipeline.map(lambda x: x * 2).filter(lambda x: x > 10) + + # Reset the pipeline + pipeline.identity() + + # Now the pipeline just passes values through unchanged + result = pipeline.run(5) # result = 5 + ``` + """ + self._transformation = self._create_identity_transformation() + return self + + def map(self, modifier: Callable[[Any], Any]): + """ + Apply a transformation function to the input value. + + This is equivalent to the functional programming `map` operation. + It takes the current value in the pipeline and transforms it using + the provided modifier function. + + Args: + modifier: A function that takes one argument and returns a transformed value + + Returns: + self: For method chaining + + Example: + ```python + # Double all numbers + pipeline = Pipeline() + result = pipeline.map(lambda x: x * 2).run(5) + # result = 10 + + # Chain multiple maps + result = pipeline.map(lambda x: x * 2).map(lambda x: x + 1).run(5) + # result = 11 (5 * 2 = 10, 10 + 1 = 11) + + # Transform strings + result = pipeline.map(str.upper).run("hello") + # result = "HELLO" + ``` + + Failure handling: + ```python + # If the modifier function raises an exception, the pipeline fails + pipeline = Pipeline() + result = pipeline.map(lambda x: x / 0).run(5) # Division by zero + # result = None (pipeline failed) + ``` + """ + current_transformation = self._transformation + + def new_transformation(input_value, success_callback, failure_callback): + def on_success(transformed_value): + try: + result = modifier(transformed_value) + success_callback(result) + except Exception: + failure_callback() + + current_transformation(input_value, on_success, failure_callback) + + self._transformation = new_transformation + return self + + def filter(self, predicate: Callable[[Any], bool]): + """ + Filter values based on a predicate function. + + This is equivalent to the functional programming `filter` operation. + If the predicate returns True, the value passes through. If it returns + False, the pipeline fails (calls failure_callback). + + Args: + predicate: A function that takes one argument and returns True/False + + Returns: + self: For method chaining + + Example: + ```python + # Only allow even numbers + pipeline = Pipeline() + result = pipeline.filter(lambda x: x % 2 == 0).run(4) + # result = 4 (4 is even, so it passes) + + result = pipeline.filter(lambda x: x % 2 == 0).run(5) + # result = None (5 is odd, so filter fails) + + # Chain with map + result = pipeline.map(lambda x: x * 2).filter(lambda x: x > 10).run(6) + # result = 12 (6 * 2 = 12, 12 > 10 is True) + + result = pipeline.map(lambda x: x * 2).filter(lambda x: x > 20).run(6) + # result = None (6 * 2 = 12, 12 > 20 is False) + ``` + + String filtering: + ```python + # Only allow non-empty strings + pipeline = Pipeline() + result = pipeline.filter(lambda s: len(s) > 0).run("hello") + # result = "hello" + + result = pipeline.filter(lambda s: len(s) > 0).run("") + # result = None (empty string fails the filter) + ``` + """ + current_transformation = self._transformation + + def new_transformation(input_value, success_callback, failure_callback): + def on_success(transformed_value): + try: + if predicate(transformed_value): + success_callback(transformed_value) + else: + failure_callback() + except Exception: + failure_callback() + + current_transformation(input_value, on_success, failure_callback) + + self._transformation = new_transformation + return self + + def reduce(self, accumulator: Callable[[Any, Any], Any], initial_value: Any): + """ + Apply a reduce operation with stateful accumulation. + + This is equivalent to the functional programming `reduce` operation, but implemented + in a stateful way. The accumulator maintains state between calls, and special + symbols (SYMBOL_END, SYMBOL_RESET) control the reduction process. + + Args: + accumulator: A function that takes (accumulated_value, current_value) and returns new accumulated value + initial_value: The starting value for the accumulation + + Returns: + self: For method chaining + + Special Symbols: + - SYMBOL_END: Triggers the final result to be emitted + - SYMBOL_RESET: Resets the accumulator back to initial_value + + Example - Sum accumulation: + ```python + pipeline = Pipeline() + pipeline.reduce(lambda acc, x: acc + x, 0) + + # Add numbers to the accumulator + pipeline.run(5) # Internal state: 0 + 5 = 5 + pipeline.run(3) # Internal state: 5 + 3 = 8 + pipeline.run(2) # Internal state: 8 + 2 = 10 + + # Get the final result + result = pipeline.run(SYMBOL_END) # result = 10 + + # Reset and start over + pipeline.run(SYMBOL_RESET) # Internal state reset to 0 + pipeline.run(7) # Internal state: 0 + 7 = 7 + result = pipeline.run(SYMBOL_END) # result = 7 + ``` + + Example - List accumulation: + ```python + pipeline = Pipeline() + pipeline.reduce(lambda acc, x: acc + [x], []) + + pipeline.run("a") # Internal state: [] + ["a"] = ["a"] + pipeline.run("b") # Internal state: ["a"] + ["b"] = ["a", "b"] + result = pipeline.run(SYMBOL_END) # result = ["a", "b"] + ``` + + Example - Object accumulation: + ```python + pipeline = Pipeline() + pipeline.reduce(lambda acc, item: {**acc, **item}, {}) + + pipeline.run({"name": "John"}) # Internal state: {"name": "John"} + pipeline.run({"age": 30}) # Internal state: {"name": "John", "age": 30} + result = pipeline.run(SYMBOL_END) # result = {"name": "John", "age": 30} + ``` + """ + current_transformation = self._transformation + acc = initial_value + + def new_transformation(input_value, success_callback, failure_callback): + nonlocal acc + + if input_value is SYMBOL_END: + success_callback(acc) + return + + if input_value is SYMBOL_RESET: + acc = initial_value + return + + def on_success(transformed_value): + nonlocal acc + try: + acc = accumulator(acc, transformed_value) + except Exception: + failure_callback() + + current_transformation(input_value, on_success, failure_callback) + + self._transformation = new_transformation + return self + + def multi(self, create_transformation: Callable[["Pipeline"], "Pipeline"]): + """ + Apply a transformation to each item in a list/array. + + This method takes a factory function that creates a transformation pipeline + for individual items. It then applies this transformation to each item in + an input list, collecting the results. + + Args: + create_transformation: A function that takes a Pipeline and returns a configured Pipeline + for transforming individual items + + Returns: + self: For method chaining + + Example - Transform each string in a list: + ```python + pipeline = Pipeline() + result = pipeline.multi(lambda p: p.map(str.upper)).run(["hello", "world"]) + # result = ["HELLO", "WORLD"] + ``` + + Example - Filter and transform: + ```python + pipeline = Pipeline() + result = pipeline.multi( + lambda p: p.filter(lambda x: x > 0).map(lambda x: x * 2) + ).run([1, -2, 3, -4, 5]) + # result = [2, 6, 10] (negative numbers filtered out, positive ones doubled) + ``` + + Example - Complex object transformation: + ```python + users = [ + {"name": "john", "age": 25}, + {"name": "jane", "age": 17}, + {"name": "bob", "age": 30} + ] + + pipeline = Pipeline() + result = pipeline.multi( + lambda p: p + .filter(lambda user: user["age"] >= 18) # Only adults + .map(lambda user: user["name"].title()) # Extract and capitalize name + ).run(users) + # result = ["John", "Bob"] + ``` + + Failure handling: + ```python + # If any item transformation fails, the entire multi operation fails + pipeline = Pipeline() + result = pipeline.multi( + lambda p: p.map(lambda x: 1 / x) # Division by zero will fail + ).run([1, 2, 0, 3]) + # result = None (pipeline fails when processing 0) + ``` + + Chaining with other operations: + ```python + pipeline = Pipeline() + result = pipeline\ + .multi(lambda p: p.map(lambda x: x * 2))\ + .map(lambda results: sum(results))\ + .run([1, 2, 3]) + # Step 1: [1, 2, 3] -> [2, 4, 6] (multi doubles each) + # Step 2: [2, 4, 6] -> 12 (map sums the results) + # result = 12 + ``` + """ + current_transformation = self._transformation + + # Create the item transformer by calling the factory function with a new identity pipeline + item_pipeline = Pipeline().identity() + item_transformer = create_transformation(item_pipeline)._transformation + + def new_transformation(input_value, success_callback, failure_callback): + def on_success(transformed_values: list[Any]): + try: + results = [] + for value in transformed_values: + result = None + + def item_success(data): + nonlocal result + result = data + + def item_failure(): + nonlocal result + result = None + + item_transformer(value, item_success, item_failure) + + if result is not None: + results.append(result) + else: + failure_callback() + return + + success_callback(results) + except Exception: + failure_callback() + + current_transformation(input_value, on_success, failure_callback) + + self._transformation = new_transformation + return self + + def run(self, input_value: Any) -> Any | None: + """ + Execute the pipeline with the given input value. + + This method runs the entire transformation pipeline synchronously and returns + the final result. If any step in the pipeline fails, it returns None. + + Args: + input_value: The value to process through the pipeline + + Returns: + The final transformed value, or None if the pipeline failed + + Example - Basic usage: + ```python + pipeline = Pipeline() + result = pipeline.map(lambda x: x * 2).run(5) + # result = 10 + ``` + + Example - Pipeline failure: + ```python + pipeline = Pipeline() + result = pipeline.filter(lambda x: x > 10).run(5) + # result = None (5 is not > 10, so filter fails) + ``` + + Example - Complex pipeline: + ```python + pipeline = Pipeline() + result = pipeline\ + .map(lambda x: x.split(','))\ + .multi(lambda p: p.map(str.strip).filter(lambda s: len(s) > 0))\ + .map(lambda items: len(items))\ + .run("apple, banana, , cherry") + # Step 1: "apple, banana, , cherry" -> ["apple", " banana", " ", " cherry"] + # Step 2: ["apple", " banana", " ", " cherry"] -> ["apple", "banana", "cherry"] (multi strips and filters) + # Step 3: ["apple", "banana", "cherry"] -> 3 + # result = 3 + ``` + + Thread safety: + This method is NOT thread-safe. Each Pipeline instance should be used by only + one thread at a time, or proper synchronization should be implemented. + """ + result = None + + def success(data): + nonlocal result + result = data + + def failure(): + nonlocal result + result = None + + self._transformation(input_value, success, failure) + return result diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py new file mode 100644 index 0000000..a527ec3 --- /dev/null +++ b/tests/test_pipeline.py @@ -0,0 +1,353 @@ +from efemel.pipeline import SYMBOL_END, SYMBOL_RESET, Pipeline + + +class TestPipeline: + """Test suite for the Pipeline class.""" + + def test_identity_transformation(self): + """Test that identity transformation passes values through unchanged.""" + pipeline = Pipeline() + + # Test with different data types + assert pipeline.run(5) == 5 + assert pipeline.run("hello") == "hello" + assert pipeline.run([1, 2, 3]) == [1, 2, 3] + assert pipeline.run({"key": "value"}) == {"key": "value"} + assert pipeline.run(None) is None + + def test_identity_reset(self): + """Test that identity() resets the pipeline to initial state.""" + pipeline = Pipeline() + + # Add some transformations + pipeline.map(lambda x: x * 2).filter(lambda x: x > 10) + + # Reset to identity + pipeline.identity() + + # Should now pass values through unchanged + assert pipeline.run(5) == 5 + + def test_map_basic_transformation(self): + """Test basic map transformations.""" + pipeline = Pipeline() + + # Double numbers + result = pipeline.map(lambda x: x * 2).run(5) + assert result == 10 + + # Transform strings + pipeline = Pipeline() + result = pipeline.map(str.upper).run("hello") + assert result == "HELLO" + + def test_map_chaining(self): + """Test chaining multiple map operations.""" + pipeline = Pipeline() + + # Chain multiple maps: 5 * 2 = 10, 10 + 1 = 11 + result = pipeline.map(lambda x: x * 2).map(lambda x: x + 1).run(5) + assert result == 11 + + def test_map_failure_handling(self): + """Test that map failures are handled properly.""" + pipeline = Pipeline() + + # Division by zero should cause failure + result = pipeline.map(lambda x: x / 0).run(5) + assert result is None + + # Exception in map function should cause failure + result = pipeline.map(lambda x: x.nonexistent_method()).run(5) + assert result is None + + def test_filter_basic_operation(self): + """Test basic filter operations.""" + pipeline = Pipeline() + + # Even numbers pass + result = pipeline.filter(lambda x: x % 2 == 0).run(4) + assert result == 4 + + # Odd numbers fail + result = pipeline.filter(lambda x: x % 2 == 0).run(5) + assert result is None + + def test_filter_with_strings(self): + """Test filter operations with strings.""" + pipeline = Pipeline() + + # Non-empty strings pass + result = pipeline.filter(lambda s: len(s) > 0).run("hello") + assert result == "hello" + + # Empty strings fail + result = pipeline.filter(lambda s: len(s) > 0).run("") + assert result is None + + def test_map_filter_chaining(self): + """Test chaining map and filter operations.""" + pipeline = Pipeline() + + # 6 * 2 = 12, 12 > 10 is True + result = pipeline.map(lambda x: x * 2).filter(lambda x: x > 10).run(6) + assert result == 12 + + # 6 * 2 = 12, 12 > 20 is False + pipeline = Pipeline() + result = pipeline.map(lambda x: x * 2).filter(lambda x: x > 20).run(6) + assert result is None + + def test_reduce_sum_accumulation(self): + """Test reduce operation with sum accumulation.""" + pipeline = Pipeline() + pipeline.reduce(lambda acc, x: acc + x, 0) + + # Add numbers to accumulator + pipeline.run(5) # 0 + 5 = 5 + pipeline.run(3) # 5 + 3 = 8 + pipeline.run(2) # 8 + 2 = 10 + + # Get final result + result = pipeline.run(SYMBOL_END) + assert result == 10 + + def test_reduce_reset_functionality(self): + """Test reduce reset functionality.""" + pipeline = Pipeline() + pipeline.reduce(lambda acc, x: acc + x, 0) + + # Accumulate some values + pipeline.run(5) + pipeline.run(3) + + # Reset and start over + pipeline.run(SYMBOL_RESET) + pipeline.run(7) + + result = pipeline.run(SYMBOL_END) + assert result == 7 + + def test_reduce_list_accumulation(self): + """Test reduce operation with list accumulation.""" + pipeline = Pipeline() + pipeline.reduce(lambda acc, x: acc + [x], []) + + pipeline.run("a") + pipeline.run("b") + pipeline.run("c") + + result = pipeline.run(SYMBOL_END) + assert result == ["a", "b", "c"] + + def test_reduce_object_accumulation(self): + """Test reduce operation with object accumulation.""" + pipeline = Pipeline() + pipeline.reduce(lambda acc, item: {**acc, **item}, {}) + + pipeline.run({"name": "John"}) + pipeline.run({"age": 30}) + pipeline.run({"city": "NYC"}) + + result = pipeline.run(SYMBOL_END) + assert result == {"name": "John", "age": 30, "city": "NYC"} + + def test_multi_basic_transformation(self): + """Test multi operation with basic transformations.""" + pipeline = Pipeline() + + # Transform each string to uppercase + result = pipeline.multi(lambda p: p.map(str.upper)).run(["hello", "world"]) + assert result == ["HELLO", "WORLD"] + + def test_multi_map_only_transformations(self): + """Test multi operation with only map transformations (no filters).""" + pipeline = Pipeline() + + # Multiple map operations should work fine since no filters can fail + result = pipeline.multi( + lambda p: p.map(lambda x: x * 2).map(lambda x: x + 1) + ).run([1, 2, 3, 4, 5]) + + # Each number: x * 2 + 1 + assert result == [3, 5, 7, 9, 11] + + def test_multi_filter_and_transform(self): + """Test multi operation with filtering and transformation.""" + pipeline = Pipeline() + + # Note: Current implementation fails if ANY item fails the filter + # This tests the actual behavior, not necessarily the expected behavior + result = pipeline.multi(lambda p: p.filter(lambda x: x > 0).map(lambda x: x * 2)).run([1, -2, 3, -4, 5]) + + # The operation fails because negative numbers don't pass the filter + assert result is None + + # Test with all positive numbers (all pass the filter) + pipeline = Pipeline() + result = pipeline.multi(lambda p: p.filter(lambda x: x > 0).map(lambda x: x * 2)).run([1, 2, 3, 4, 5]) + assert result == [2, 4, 6, 8, 10] + + def test_multi_complex_object_transformation(self): + """Test multi operation with complex object transformations.""" + users = [{"name": "john", "age": 25}, {"name": "jane", "age": 17}, {"name": "bob", "age": 30}] + + pipeline = Pipeline() + result = pipeline.multi( + lambda p: p.filter(lambda user: user["age"] >= 18).map( # Only adults + lambda user: user["name"].title() + ) # Extract and capitalize name + ).run(users) + + # The operation fails because jane (age 17) doesn't pass the filter + assert result is None + + # Test with all adults + adults_only = [{"name": "john", "age": 25}, {"name": "bob", "age": 30}] + + pipeline = Pipeline() + result = pipeline.multi( + lambda p: p.filter(lambda user: user["age"] >= 18).map( + lambda user: user["name"].title() + ) + ).run(adults_only) + + assert result == ["John", "Bob"] + + def test_multi_failure_handling(self): + """Test that multi operation handles failures properly.""" + pipeline = Pipeline() + + # Division by zero in one item should fail entire operation + result = pipeline.multi(lambda p: p.map(lambda x: 1 / x)).run([1, 2, 0, 3]) + + assert result is None + + def test_multi_with_empty_list(self): + """Test multi operation with empty list.""" + pipeline = Pipeline() + + result = pipeline.multi(lambda p: p.map(lambda x: x * 2)).run([]) + assert result == [] + + def test_multi_chaining_with_other_operations(self): + """Test chaining multi with other pipeline operations.""" + pipeline = Pipeline() + + # Double each number, then sum the results + result = pipeline.multi(lambda p: p.map(lambda x: x * 2)).map(lambda results: sum(results)).run([1, 2, 3]) + + # [1, 2, 3] -> [2, 4, 6] -> 12 + assert result == 12 + + def test_complex_pipeline_string_processing(self): + """Test a complex pipeline for string processing.""" + pipeline = Pipeline() + + result = ( + pipeline.map(lambda x: x.split(",")) + .multi(lambda p: p.map(str.strip).filter(lambda s: len(s) > 0)) + .map(lambda items: len(items)) + .run("apple, banana, , cherry") + ) + + # "apple, banana, , cherry" -> ["apple", " banana", " ", " cherry"] + # The multi operation fails because one item (empty string after strip) fails the filter + assert result is None + + # Test with no empty items + pipeline = Pipeline() + result = ( + pipeline.map(lambda x: x.split(",")) + .multi(lambda p: p.map(str.strip).filter(lambda s: len(s) > 0)) + .map(lambda items: len(items)) + .run("apple, banana, cherry") + ) + + # "apple, banana, cherry" -> ["apple", " banana", " cherry"] + # -> ["apple", "banana", "cherry"] (strip and all pass filter) + # -> 3 (count items) + assert result == 3 + + def test_pipeline_failure_propagation(self): + """Test that failures propagate through the pipeline.""" + pipeline = Pipeline() + + # Filter that fails should make entire pipeline fail + result = pipeline.map(lambda x: x * 2).filter(lambda x: x > 100).map(lambda x: x + 1).run(10) + + # 10 * 2 = 20, 20 > 100 is False, so pipeline fails + assert result is None + + def test_pipeline_with_none_input(self): + """Test pipeline behavior with None input.""" + pipeline = Pipeline() + + # Identity should pass None through + assert pipeline.run(None) is None + + # Map should work with None if function handles it + pipeline = Pipeline() + result = pipeline.map(lambda x: x if x is None else x * 2).run(None) + assert result is None + + def test_filter_exception_handling(self): + """Test that exceptions in filter predicates are handled.""" + pipeline = Pipeline() + + # Exception in predicate should cause failure + result = pipeline.filter(lambda x: x.nonexistent_method()).run(5) + assert result is None + + def test_reduce_exception_handling(self): + """Test that exceptions in reduce accumulator are handled.""" + pipeline = Pipeline() + pipeline.reduce(lambda acc, x: acc / x, 1) # Will cause division by zero + + pipeline.run(0) # This should cause an exception in the accumulator + + # The pipeline should handle the exception gracefully + # Note: The exact behavior depends on implementation details + # This test ensures no unhandled exceptions occur + + def test_pipeline_reuse(self): + """Test that pipelines can be reused with different inputs.""" + pipeline = Pipeline() + pipeline.map(lambda x: x * 2).filter(lambda x: x > 5) + + # Test with different inputs + assert pipeline.run(3) == 6 # 3 * 2 = 6, 6 > 5 + assert pipeline.run(2) is None # 2 * 2 = 4, 4 > 5 is False + assert pipeline.run(5) == 10 # 5 * 2 = 10, 10 > 5 + + def test_symbol_end_and_reset_constants(self): + """Test that SYMBOL_END and SYMBOL_RESET are proper constants.""" + # These should be singleton instances + assert SYMBOL_END is SYMBOL_END + assert SYMBOL_RESET is SYMBOL_RESET + assert SYMBOL_END is not SYMBOL_RESET + + # Test string representations + assert str(SYMBOL_END) == "SymbolEnd" + assert str(SYMBOL_RESET) == "SymbolReset" + + def test_nested_multi_operations(self): + """Test nested multi operations.""" + # Create a list of lists + data = [[1, 2], [3, 4], [5, 6]] + + pipeline = Pipeline() + result = pipeline.multi(lambda p: p.multi(lambda inner_p: inner_p.map(lambda x: x * 2))).run(data) + + assert result == [[2, 4], [6, 8], [10, 12]] + + def test_pipeline_method_chaining_returns_self(self): + """Test that all pipeline methods return self for chaining.""" + pipeline = Pipeline() + + # All these should return the same pipeline instance + assert pipeline.identity() is pipeline + assert pipeline.map(lambda x: x) is pipeline + assert pipeline.filter(lambda x: True) is pipeline + assert pipeline.reduce(lambda acc, x: acc, 0) is pipeline + assert pipeline.multi(lambda p: p) is pipeline From 6a109bae78e240e0683fb1f69b9eb118bf9c2262 Mon Sep 17 00:00:00 2001 From: ringoldsdev Date: Tue, 8 Jul 2025 19:41:20 +0000 Subject: [PATCH 2/4] chore: test changes --- tests/test_pipeline.py | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index a527ec3..03ddda7 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -165,10 +165,8 @@ def test_multi_map_only_transformations(self): pipeline = Pipeline() # Multiple map operations should work fine since no filters can fail - result = pipeline.multi( - lambda p: p.map(lambda x: x * 2).map(lambda x: x + 1) - ).run([1, 2, 3, 4, 5]) - + result = pipeline.multi(lambda p: p.map(lambda x: x * 2).map(lambda x: x + 1)).run([1, 2, 3, 4, 5]) + # Each number: x * 2 + 1 assert result == [3, 5, 7, 9, 11] @@ -179,10 +177,10 @@ def test_multi_filter_and_transform(self): # Note: Current implementation fails if ANY item fails the filter # This tests the actual behavior, not necessarily the expected behavior result = pipeline.multi(lambda p: p.filter(lambda x: x > 0).map(lambda x: x * 2)).run([1, -2, 3, -4, 5]) - + # The operation fails because negative numbers don't pass the filter assert result is None - + # Test with all positive numbers (all pass the filter) pipeline = Pipeline() result = pipeline.multi(lambda p: p.filter(lambda x: x > 0).map(lambda x: x * 2)).run([1, 2, 3, 4, 5]) @@ -201,17 +199,15 @@ def test_multi_complex_object_transformation(self): # The operation fails because jane (age 17) doesn't pass the filter assert result is None - + # Test with all adults adults_only = [{"name": "john", "age": 25}, {"name": "bob", "age": 30}] - + pipeline = Pipeline() result = pipeline.multi( - lambda p: p.filter(lambda user: user["age"] >= 18).map( - lambda user: user["name"].title() - ) + lambda p: p.filter(lambda user: user["age"] >= 18).map(lambda user: user["name"].title()) ).run(adults_only) - + assert result == ["John", "Bob"] def test_multi_failure_handling(self): @@ -254,7 +250,7 @@ def test_complex_pipeline_string_processing(self): # "apple, banana, , cherry" -> ["apple", " banana", " ", " cherry"] # The multi operation fails because one item (empty string after strip) fails the filter assert result is None - + # Test with no empty items pipeline = Pipeline() result = ( @@ -263,7 +259,7 @@ def test_complex_pipeline_string_processing(self): .map(lambda items: len(items)) .run("apple, banana, cherry") ) - + # "apple, banana, cherry" -> ["apple", " banana", " cherry"] # -> ["apple", "banana", "cherry"] (strip and all pass filter) # -> 3 (count items) From 87e37d77e061ae0d662c6db3b93ca5d2b279983f Mon Sep 17 00:00:00 2001 From: ringoldsdev Date: Tue, 8 Jul 2025 19:57:55 +0000 Subject: [PATCH 3/4] chore: removed multi --- efemel/pipeline.py | 127 +++-------------------------------------- tests/test_pipeline.py | 124 ---------------------------------------- 2 files changed, 7 insertions(+), 244 deletions(-) diff --git a/efemel/pipeline.py b/efemel/pipeline.py index 4dcdda3..24260b4 100644 --- a/efemel/pipeline.py +++ b/efemel/pipeline.py @@ -61,11 +61,6 @@ class Pipeline: Advanced Usage: ```python - # Working with lists - pipeline = Pipeline() - result = pipeline.multi(lambda p: p.map(lambda x: x.upper())).run(["hello", "world"]) - # result = ["HELLO", "WORLD"] - # Reduce operations pipeline = Pipeline() pipeline.reduce(lambda acc, x: acc + x, 0) @@ -320,141 +315,33 @@ def on_success(transformed_value): self._transformation = new_transformation return self - def multi(self, create_transformation: Callable[["Pipeline"], "Pipeline"]): - """ - Apply a transformation to each item in a list/array. - - This method takes a factory function that creates a transformation pipeline - for individual items. It then applies this transformation to each item in - an input list, collecting the results. - - Args: - create_transformation: A function that takes a Pipeline and returns a configured Pipeline - for transforming individual items - - Returns: - self: For method chaining - - Example - Transform each string in a list: - ```python - pipeline = Pipeline() - result = pipeline.multi(lambda p: p.map(str.upper)).run(["hello", "world"]) - # result = ["HELLO", "WORLD"] - ``` - - Example - Filter and transform: - ```python - pipeline = Pipeline() - result = pipeline.multi( - lambda p: p.filter(lambda x: x > 0).map(lambda x: x * 2) - ).run([1, -2, 3, -4, 5]) - # result = [2, 6, 10] (negative numbers filtered out, positive ones doubled) - ``` - - Example - Complex object transformation: - ```python - users = [ - {"name": "john", "age": 25}, - {"name": "jane", "age": 17}, - {"name": "bob", "age": 30} - ] - - pipeline = Pipeline() - result = pipeline.multi( - lambda p: p - .filter(lambda user: user["age"] >= 18) # Only adults - .map(lambda user: user["name"].title()) # Extract and capitalize name - ).run(users) - # result = ["John", "Bob"] - ``` - - Failure handling: - ```python - # If any item transformation fails, the entire multi operation fails - pipeline = Pipeline() - result = pipeline.multi( - lambda p: p.map(lambda x: 1 / x) # Division by zero will fail - ).run([1, 2, 0, 3]) - # result = None (pipeline fails when processing 0) - ``` - - Chaining with other operations: - ```python - pipeline = Pipeline() - result = pipeline\ - .multi(lambda p: p.map(lambda x: x * 2))\ - .map(lambda results: sum(results))\ - .run([1, 2, 3]) - # Step 1: [1, 2, 3] -> [2, 4, 6] (multi doubles each) - # Step 2: [2, 4, 6] -> 12 (map sums the results) - # result = 12 - ``` - """ - current_transformation = self._transformation - - # Create the item transformer by calling the factory function with a new identity pipeline - item_pipeline = Pipeline().identity() - item_transformer = create_transformation(item_pipeline)._transformation - - def new_transformation(input_value, success_callback, failure_callback): - def on_success(transformed_values: list[Any]): - try: - results = [] - for value in transformed_values: - result = None - - def item_success(data): - nonlocal result - result = data - - def item_failure(): - nonlocal result - result = None - - item_transformer(value, item_success, item_failure) - - if result is not None: - results.append(result) - else: - failure_callback() - return - - success_callback(results) - except Exception: - failure_callback() - - current_transformation(input_value, on_success, failure_callback) - - self._transformation = new_transformation - return self - def run(self, input_value: Any) -> Any | None: """ Execute the pipeline with the given input value. - + This method runs the entire transformation pipeline synchronously and returns the final result. If any step in the pipeline fails, it returns None. - + Args: input_value: The value to process through the pipeline - + Returns: The final transformed value, or None if the pipeline failed - + Example - Basic usage: ```python pipeline = Pipeline() result = pipeline.map(lambda x: x * 2).run(5) # result = 10 ``` - + Example - Pipeline failure: ```python pipeline = Pipeline() result = pipeline.filter(lambda x: x > 10).run(5) # result = None (5 is not > 10, so filter fails) ``` - + Example - Complex pipeline: ```python pipeline = Pipeline() @@ -468,7 +355,7 @@ def run(self, input_value: Any) -> Any | None: # Step 3: ["apple", "banana", "cherry"] -> 3 # result = 3 ``` - + Thread safety: This method is NOT thread-safe. Each Pipeline instance should be used by only one thread at a time, or proper synchronization should be implemented. diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 03ddda7..7a89bbd 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -152,119 +152,6 @@ def test_reduce_object_accumulation(self): result = pipeline.run(SYMBOL_END) assert result == {"name": "John", "age": 30, "city": "NYC"} - def test_multi_basic_transformation(self): - """Test multi operation with basic transformations.""" - pipeline = Pipeline() - - # Transform each string to uppercase - result = pipeline.multi(lambda p: p.map(str.upper)).run(["hello", "world"]) - assert result == ["HELLO", "WORLD"] - - def test_multi_map_only_transformations(self): - """Test multi operation with only map transformations (no filters).""" - pipeline = Pipeline() - - # Multiple map operations should work fine since no filters can fail - result = pipeline.multi(lambda p: p.map(lambda x: x * 2).map(lambda x: x + 1)).run([1, 2, 3, 4, 5]) - - # Each number: x * 2 + 1 - assert result == [3, 5, 7, 9, 11] - - def test_multi_filter_and_transform(self): - """Test multi operation with filtering and transformation.""" - pipeline = Pipeline() - - # Note: Current implementation fails if ANY item fails the filter - # This tests the actual behavior, not necessarily the expected behavior - result = pipeline.multi(lambda p: p.filter(lambda x: x > 0).map(lambda x: x * 2)).run([1, -2, 3, -4, 5]) - - # The operation fails because negative numbers don't pass the filter - assert result is None - - # Test with all positive numbers (all pass the filter) - pipeline = Pipeline() - result = pipeline.multi(lambda p: p.filter(lambda x: x > 0).map(lambda x: x * 2)).run([1, 2, 3, 4, 5]) - assert result == [2, 4, 6, 8, 10] - - def test_multi_complex_object_transformation(self): - """Test multi operation with complex object transformations.""" - users = [{"name": "john", "age": 25}, {"name": "jane", "age": 17}, {"name": "bob", "age": 30}] - - pipeline = Pipeline() - result = pipeline.multi( - lambda p: p.filter(lambda user: user["age"] >= 18).map( # Only adults - lambda user: user["name"].title() - ) # Extract and capitalize name - ).run(users) - - # The operation fails because jane (age 17) doesn't pass the filter - assert result is None - - # Test with all adults - adults_only = [{"name": "john", "age": 25}, {"name": "bob", "age": 30}] - - pipeline = Pipeline() - result = pipeline.multi( - lambda p: p.filter(lambda user: user["age"] >= 18).map(lambda user: user["name"].title()) - ).run(adults_only) - - assert result == ["John", "Bob"] - - def test_multi_failure_handling(self): - """Test that multi operation handles failures properly.""" - pipeline = Pipeline() - - # Division by zero in one item should fail entire operation - result = pipeline.multi(lambda p: p.map(lambda x: 1 / x)).run([1, 2, 0, 3]) - - assert result is None - - def test_multi_with_empty_list(self): - """Test multi operation with empty list.""" - pipeline = Pipeline() - - result = pipeline.multi(lambda p: p.map(lambda x: x * 2)).run([]) - assert result == [] - - def test_multi_chaining_with_other_operations(self): - """Test chaining multi with other pipeline operations.""" - pipeline = Pipeline() - - # Double each number, then sum the results - result = pipeline.multi(lambda p: p.map(lambda x: x * 2)).map(lambda results: sum(results)).run([1, 2, 3]) - - # [1, 2, 3] -> [2, 4, 6] -> 12 - assert result == 12 - - def test_complex_pipeline_string_processing(self): - """Test a complex pipeline for string processing.""" - pipeline = Pipeline() - - result = ( - pipeline.map(lambda x: x.split(",")) - .multi(lambda p: p.map(str.strip).filter(lambda s: len(s) > 0)) - .map(lambda items: len(items)) - .run("apple, banana, , cherry") - ) - - # "apple, banana, , cherry" -> ["apple", " banana", " ", " cherry"] - # The multi operation fails because one item (empty string after strip) fails the filter - assert result is None - - # Test with no empty items - pipeline = Pipeline() - result = ( - pipeline.map(lambda x: x.split(",")) - .multi(lambda p: p.map(str.strip).filter(lambda s: len(s) > 0)) - .map(lambda items: len(items)) - .run("apple, banana, cherry") - ) - - # "apple, banana, cherry" -> ["apple", " banana", " cherry"] - # -> ["apple", "banana", "cherry"] (strip and all pass filter) - # -> 3 (count items) - assert result == 3 - def test_pipeline_failure_propagation(self): """Test that failures propagate through the pipeline.""" pipeline = Pipeline() @@ -327,16 +214,6 @@ def test_symbol_end_and_reset_constants(self): assert str(SYMBOL_END) == "SymbolEnd" assert str(SYMBOL_RESET) == "SymbolReset" - def test_nested_multi_operations(self): - """Test nested multi operations.""" - # Create a list of lists - data = [[1, 2], [3, 4], [5, 6]] - - pipeline = Pipeline() - result = pipeline.multi(lambda p: p.multi(lambda inner_p: inner_p.map(lambda x: x * 2))).run(data) - - assert result == [[2, 4], [6, 8], [10, 12]] - def test_pipeline_method_chaining_returns_self(self): """Test that all pipeline methods return self for chaining.""" pipeline = Pipeline() @@ -346,4 +223,3 @@ def test_pipeline_method_chaining_returns_self(self): assert pipeline.map(lambda x: x) is pipeline assert pipeline.filter(lambda x: True) is pipeline assert pipeline.reduce(lambda acc, x: acc, 0) is pipeline - assert pipeline.multi(lambda p: p) is pipeline From 3af8f52e1a1b2b5f0910c5a5da15a6c415fb3866 Mon Sep 17 00:00:00 2001 From: ringoldsdev Date: Wed, 9 Jul 2025 09:30:31 +0000 Subject: [PATCH 4/4] feat: generator based pipeline --- efemel/pipeline.py | 478 +++++++++++----------------- example_before_hooks.py | 34 -- test_class_hooks.py | 39 --- tests/test_pipeline.py | 678 ++++++++++++++++++++++++++++++---------- 4 files changed, 687 insertions(+), 542 deletions(-) delete mode 100644 example_before_hooks.py delete mode 100644 test_class_hooks.py diff --git a/efemel/pipeline.py b/efemel/pipeline.py index 24260b4..bef2eb3 100644 --- a/efemel/pipeline.py +++ b/efemel/pipeline.py @@ -1,374 +1,256 @@ -from collections.abc import Callable -from typing import Any, TypeVar +""" +Pipeline module for functional data processing. -T = TypeVar("T") -U = TypeVar("U") -V = TypeVar("V") +This module provides a Pipeline class that enables functional programming patterns +for data transformation and processing. It allows chaining operations like map, filter, +reduce, and more in a fluent interface style. +Example: + >>> pipeline = Pipeline([1, 2, 3, 4, 5]) + >>> result = pipeline.filter(lambda x: x % 2 == 0).map(lambda x: x * 2).to_list() + >>> print(result) # [4, 8] +""" -# Special symbols for reduce operations -class SymbolEnd: - """ - Special symbol used to signal the end of a stream in reduce operations. - When this symbol is passed to a reduce transformation, it triggers the - final result to be emitted. - """ +import functools +from collections.abc import Callable, Generator, Iterable +from typing import Any, Self, TypeVar, cast - def __repr__(self): - return "SymbolEnd" +T = TypeVar("T") # Type variable for the elements in the pipeline +U = TypeVar("U") # Type variable for transformed elements +V = TypeVar("V") # Type variable for additional transformations -class SymbolReset: +class Pipeline[T]: """ - Special symbol used to reset the accumulator in reduce operations. - When this symbol is passed to a reduce transformation, it resets the - accumulator back to its initial value. - """ - - def __repr__(self): - return "SymbolReset" + A functional pipeline for data processing with method chaining. + The Pipeline class wraps an iterable and provides a fluent interface for + applying transformations, filters, and reductions in a functional programming style. -SYMBOL_END = SymbolEnd() -SYMBOL_RESET = SymbolReset() + Type Parameters: + T: The type of elements in the pipeline + Attributes: + generator: The underlying iterable that provides the data source -class Pipeline: - """ - A functional pipeline that chains transformations together in a composable way. - - This class implements a callback-based transformation pipeline similar to TypeScript's - transformers.ts. Each transformation is built up through method chaining, creating - a single composed transformation function that can be executed with `.run()`. - - Key Concepts: - - **Chaining**: Methods return self, allowing for fluent API usage - - **Callback-based**: Each transformation uses success/failure callbacks - - **Composable**: Transformations are built up by wrapping previous transformations - - **Failure handling**: Any step can fail, causing the entire pipeline to fail - - Basic Usage: - ```python - # Simple transformation pipeline - pipeline = Pipeline() - result = pipeline.map(lambda x: x * 2).filter(lambda x: x > 10).run(6) - # result = 12 (6 * 2 = 12, 12 > 10 is True) - - # Pipeline that fails - result = pipeline.map(lambda x: x * 2).filter(lambda x: x > 20).run(6) - # result = None (6 * 2 = 12, 12 > 20 is False, so filter fails) - ``` - - Advanced Usage: - ```python - # Reduce operations - pipeline = Pipeline() - pipeline.reduce(lambda acc, x: acc + x, 0) - pipeline.run(5) # Accumulates: 0 + 5 = 5 - pipeline.run(3) # Accumulates: 5 + 3 = 8 - result = pipeline.run(SYMBOL_END) # result = 8 - ``` + Example: + >>> data = [1, 2, 3, 4, 5] + >>> pipeline = Pipeline(data) + >>> result = (pipeline + ... .filter(lambda x: x > 2) + ... .map(lambda x: x ** 2) + ... .to_list()) + >>> print(result) # [9, 16, 25] """ - def __init__(self): - # Internal transformation function that gets built up through chaining - self._transformation = self._create_identity_transformation() + generator: Iterable[T] + + def __init__(self, source: Iterable[T]): + """ + Initialize a new Pipeline with the given data source. - def _create_identity_transformation(self): + Args: + source: An iterable that provides the data for the pipeline """ - Create the base identity transformation that simply passes input to output. + self.generator = source - This is the foundation that all other transformations build upon. - It takes an input value and immediately calls the success callback with it. + def __next__(self) -> T: + """ + Get the next item from the pipeline. Returns: - A transformation function that accepts (input_value, success_callback, failure_callback) - """ + The next item in the pipeline - def transform(input_value, success_callback, failure_callback): - try: - success_callback(input_value) - except Exception: - failure_callback() + Raises: + StopIteration: When there are no more items + """ + return next(iter(self.generator)) - return transform + def __iter__(self) -> Generator[T, None, None]: + """ + Return an iterator over the pipeline elements. - def identity(self): + Returns: + A generator that yields the pipeline elements """ - Reset the pipeline to identity transformation. + yield from self.generator - This method resets the pipeline back to its initial state, effectively - clearing all previously chained transformations. + def to_list(self) -> list[T]: + """ + Convert the pipeline to a list. Returns: - self: For method chaining + A list containing all elements from the pipeline Example: - ```python - pipeline = Pipeline() - pipeline.map(lambda x: x * 2).filter(lambda x: x > 10) - - # Reset the pipeline - pipeline.identity() + >>> Pipeline([1, 2, 3]).to_list() + [1, 2, 3] + """ + return list(self) - # Now the pipeline just passes values through unchanged - result = pipeline.run(5) # result = 5 - ``` + def first(self) -> T: """ - self._transformation = self._create_identity_transformation() - return self + Get the first element from the pipeline. - def map(self, modifier: Callable[[Any], Any]): + Returns: + The first element in the pipeline + + Raises: + StopIteration: If the pipeline is empty + + Example: + >>> Pipeline([1, 2, 3]).first() + 1 """ - Apply a transformation function to the input value. + return next(iter(self.generator)) - This is equivalent to the functional programming `map` operation. - It takes the current value in the pipeline and transforms it using - the provided modifier function. + def filter( + self, + predicate: Callable[[T], bool], + ) -> "Pipeline[T]": + """ + Filter pipeline elements based on a predicate function. Args: - modifier: A function that takes one argument and returns a transformed value + predicate: A function that takes an element and returns True to keep it Returns: - self: For method chaining + A new pipeline containing only elements that satisfy the predicate Example: - ```python - # Double all numbers - pipeline = Pipeline() - result = pipeline.map(lambda x: x * 2).run(5) - # result = 10 - - # Chain multiple maps - result = pipeline.map(lambda x: x * 2).map(lambda x: x + 1).run(5) - # result = 11 (5 * 2 = 10, 10 + 1 = 11) - - # Transform strings - result = pipeline.map(str.upper).run("hello") - # result = "HELLO" - ``` - - Failure handling: - ```python - # If the modifier function raises an exception, the pipeline fails - pipeline = Pipeline() - result = pipeline.map(lambda x: x / 0).run(5) # Division by zero - # result = None (pipeline failed) - ``` + >>> Pipeline([1, 2, 3, 4]).filter(lambda x: x % 2 == 0).to_list() + [2, 4] """ - current_transformation = self._transformation + return Pipeline(item for item in self if predicate(item)) - def new_transformation(input_value, success_callback, failure_callback): - def on_success(transformed_value): - try: - result = modifier(transformed_value) - success_callback(result) - except Exception: - failure_callback() + def map( + self, + function: Callable[[T], U], + ) -> "Pipeline[U]": + """ + Transform each element in the pipeline using the given function. - current_transformation(input_value, on_success, failure_callback) + Args: + function: A function that transforms each element - self._transformation = new_transformation - return self + Returns: + A new pipeline with transformed elements - def filter(self, predicate: Callable[[Any], bool]): + Example: + >>> Pipeline([1, 2, 3]).map(lambda x: x * 2).to_list() + [2, 4, 6] """ - Filter values based on a predicate function. - - This is equivalent to the functional programming `filter` operation. - If the predicate returns True, the value passes through. If it returns - False, the pipeline fails (calls failure_callback). + return Pipeline( + map( + function, + self.generator, + ) + ) + + def reduce(self, function: Callable[[U, T], U], initial: U) -> "Pipeline[U]": + """ + Reduce the pipeline to a single value using the given function. Args: - predicate: A function that takes one argument and returns True/False + function: A function that takes an accumulator and current element + initial: The initial value for the accumulator Returns: - self: For method chaining + A new pipeline containing the single reduced value Example: - ```python - # Only allow even numbers - pipeline = Pipeline() - result = pipeline.filter(lambda x: x % 2 == 0).run(4) - # result = 4 (4 is even, so it passes) - - result = pipeline.filter(lambda x: x % 2 == 0).run(5) - # result = None (5 is odd, so filter fails) - - # Chain with map - result = pipeline.map(lambda x: x * 2).filter(lambda x: x > 10).run(6) - # result = 12 (6 * 2 = 12, 12 > 10 is True) - - result = pipeline.map(lambda x: x * 2).filter(lambda x: x > 20).run(6) - # result = None (6 * 2 = 12, 12 > 20 is False) - ``` - - String filtering: - ```python - # Only allow non-empty strings - pipeline = Pipeline() - result = pipeline.filter(lambda s: len(s) > 0).run("hello") - # result = "hello" - - result = pipeline.filter(lambda s: len(s) > 0).run("") - # result = None (empty string fails the filter) - ``` + >>> Pipeline([1, 2, 3, 4]).reduce(lambda acc, x: acc + x, 0).first() + 10 """ - current_transformation = self._transformation + return Pipeline([functools.reduce(cast(Callable[[T, U], U], function), self, initial)]) - def new_transformation(input_value, success_callback, failure_callback): - def on_success(transformed_value): - try: - if predicate(transformed_value): - success_callback(transformed_value) - else: - failure_callback() - except Exception: - failure_callback() + def tap(self, function: Callable[[T], Any]) -> Self: + """ + Execute a side effect for each element without modifying the pipeline. - current_transformation(input_value, on_success, failure_callback) + Args: + function: A function to execute for each element (side effect) - self._transformation = new_transformation - return self + Returns: + The same pipeline (for method chaining) - def reduce(self, accumulator: Callable[[Any, Any], Any], initial_value: Any): + Example: + >>> Pipeline([1, 2, 3]).tap(print).map(lambda x: x * 2).to_list() + 1 + 2 + 3 + [2, 4, 6] """ - Apply a reduce operation with stateful accumulation. - This is equivalent to the functional programming `reduce` operation, but implemented - in a stateful way. The accumulator maintains state between calls, and special - symbols (SYMBOL_END, SYMBOL_RESET) control the reduction process. + def f(x: T) -> T: + function(x) + return x - Args: - accumulator: A function that takes (accumulated_value, current_value) and returns new accumulated value - initial_value: The starting value for the accumulation + return type(self)(self.map(f)) - Returns: - self: For method chaining - - Special Symbols: - - SYMBOL_END: Triggers the final result to be emitted - - SYMBOL_RESET: Resets the accumulator back to initial_value - - Example - Sum accumulation: - ```python - pipeline = Pipeline() - pipeline.reduce(lambda acc, x: acc + x, 0) - - # Add numbers to the accumulator - pipeline.run(5) # Internal state: 0 + 5 = 5 - pipeline.run(3) # Internal state: 5 + 3 = 8 - pipeline.run(2) # Internal state: 8 + 2 = 10 - - # Get the final result - result = pipeline.run(SYMBOL_END) # result = 10 - - # Reset and start over - pipeline.run(SYMBOL_RESET) # Internal state reset to 0 - pipeline.run(7) # Internal state: 0 + 7 = 7 - result = pipeline.run(SYMBOL_END) # result = 7 - ``` - - Example - List accumulation: - ```python - pipeline = Pipeline() - pipeline.reduce(lambda acc, x: acc + [x], []) - - pipeline.run("a") # Internal state: [] + ["a"] = ["a"] - pipeline.run("b") # Internal state: ["a"] + ["b"] = ["a", "b"] - result = pipeline.run(SYMBOL_END) # result = ["a", "b"] - ``` - - Example - Object accumulation: - ```python - pipeline = Pipeline() - pipeline.reduce(lambda acc, item: {**acc, **item}, {}) - - pipeline.run({"name": "John"}) # Internal state: {"name": "John"} - pipeline.run({"age": 30}) # Internal state: {"name": "John", "age": 30} - result = pipeline.run(SYMBOL_END) # result = {"name": "John", "age": 30} - ``` + def each(self, function: Callable[[T], Any]) -> None: """ - current_transformation = self._transformation - acc = initial_value - - def new_transformation(input_value, success_callback, failure_callback): - nonlocal acc + Execute a function for each element in the pipeline (terminal operation). - if input_value is SYMBOL_END: - success_callback(acc) - return + Args: + function: A function to execute for each element - if input_value is SYMBOL_RESET: - acc = initial_value - return + Example: + >>> Pipeline([1, 2, 3]).each(print) + 1 + 2 + 3 + """ + for item in self.generator: + function(item) - def on_success(transformed_value): - nonlocal acc - try: - acc = accumulator(acc, transformed_value) - except Exception: - failure_callback() + def passthrough(self) -> Self: + """ + Return the pipeline unchanged (identity operation). - current_transformation(input_value, on_success, failure_callback) + Returns: + The same pipeline instance - self._transformation = new_transformation + Example: + >>> pipeline = Pipeline([1, 2, 3]) + >>> same = pipeline.passthrough() + >>> pipeline is same + True + """ return self - def run(self, input_value: Any) -> Any | None: + def apply(self, *functions: Callable[[Self], "Pipeline[U]"]) -> "Pipeline[U]": """ - Execute the pipeline with the given input value. - - This method runs the entire transformation pipeline synchronously and returns - the final result. If any step in the pipeline fails, it returns None. + Apply a sequence of functions to the pipeline. Args: - input_value: The value to process through the pipeline + *functions: Functions that transform the pipeline Returns: - The final transformed value, or None if the pipeline failed - - Example - Basic usage: - ```python - pipeline = Pipeline() - result = pipeline.map(lambda x: x * 2).run(5) - # result = 10 - ``` - - Example - Pipeline failure: - ```python - pipeline = Pipeline() - result = pipeline.filter(lambda x: x > 10).run(5) - # result = None (5 is not > 10, so filter fails) - ``` - - Example - Complex pipeline: - ```python - pipeline = Pipeline() - result = pipeline\ - .map(lambda x: x.split(','))\ - .multi(lambda p: p.map(str.strip).filter(lambda s: len(s) > 0))\ - .map(lambda items: len(items))\ - .run("apple, banana, , cherry") - # Step 1: "apple, banana, , cherry" -> ["apple", " banana", " ", " cherry"] - # Step 2: ["apple", " banana", " ", " cherry"] -> ["apple", "banana", "cherry"] (multi strips and filters) - # Step 3: ["apple", "banana", "cherry"] -> 3 - # result = 3 - ``` - - Thread safety: - This method is NOT thread-safe. Each Pipeline instance should be used by only - one thread at a time, or proper synchronization should be implemented. - """ - result = None + The pipeline after applying all functions - def success(data): - nonlocal result - result = data + Example: + >>> def double(p): return p.map(lambda x: x * 2) + >>> def filter_even(p): return p.filter(lambda x: x % 2 == 0) + >>> Pipeline([1, 2, 3]).apply(double, filter_even).to_list() + [2, 4, 6] + """ + result: Pipeline[T] = self - def failure(): - nonlocal result - result = None + for function in functions: + result = function(result) - self._transformation(input_value, success, failure) return result + + def flatten(self: "Pipeline[Iterable[U]]") -> "Pipeline[U]": + """ + Flatten a pipeline of iterables into a single pipeline. + + Returns: + A new pipeline with all nested elements flattened + + Example: + >>> Pipeline([[1, 2], [3, 4], [5]]).flatten().to_list() + [1, 2, 3, 4, 5] + """ + return Pipeline(x_i for x in self for x_i in x) diff --git a/example_before_hooks.py b/example_before_hooks.py deleted file mode 100644 index 8f695f1..0000000 --- a/example_before_hooks.py +++ /dev/null @@ -1,34 +0,0 @@ -""" -Example hooks file demonstrating before hook functionality. -Functions that start with 'before_' are automatically registered as before hooks. -""" - - -def before_output_filename(context): - """Before hook that runs first - validates input.""" - input_path = context["input_file_path"] - print(f"BEFORE: Validating input file: {input_path}") - - # Example validation - if not input_path.exists(): - raise FileNotFoundError(f"Input file does not exist: {input_path}") - - print("BEFORE: Input file validation passed") - - -def output_filename(context): - """Regular hook that runs after before hooks.""" - print(f"REGULAR: Processing output filename for {context['input_file_path']}") - - # Example: add a prefix to the output filename - output_path = context["output_file_path"] - new_name = f"processed_{output_path.name}" - context["output_file_path"] = output_path.with_name(new_name) - - print(f"REGULAR: Added prefix -> {context['output_file_path']}") - - -def before_output_filename_logging(context): - """Another before hook - logs the start of processing.""" - print(f"BEFORE: Starting to process {context['input_file_path']}") - print(f"BEFORE: Original output path: {context['output_file_path']}") diff --git a/test_class_hooks.py b/test_class_hooks.py deleted file mode 100644 index 2fe0a08..0000000 --- a/test_class_hooks.py +++ /dev/null @@ -1,39 +0,0 @@ -""" -Example usage of the new class-based hooks manager. -""" - -from efemel.hooks_manager import HooksManager - -# Create a new instance -manager = HooksManager() - - -# Example hooks -def before_test(context): - print(f"BEFORE: {context['message']}") - context["step"] = 1 - - -def regular_test(context): - print(f"REGULAR: {context['message']}, step: {context['step']}") - context["step"] = 2 - - -def another_test(context): - print(f"ANOTHER: {context['message']}, step: {context['step']}") - context["result"] = "processed" - - -# Register hooks using the new method names -manager.add_before("test", before_test) -manager.add("test", regular_test) -manager.add("test", another_test) - -# Test the hooks -context = {"message": "Hello World"} -result_context, result_value = manager.call("test", context, ["result"]) - -print(f"Final context: {context}") -print(f"Returned result: {result_value}") -print(f"Hook count: {manager.get_count('test')}") -print(f"All hooks: {manager.list()}") diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 7a89bbd..4253f73 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -1,225 +1,561 @@ -from efemel.pipeline import SYMBOL_END, SYMBOL_RESET, Pipeline +""" +Test suite for the Pipeline class. +This module contains comprehensive tests for the Pipeline class functionality +including all methods, edge cases, and error conditions. +""" -class TestPipeline: - """Test suite for the Pipeline class.""" +import pytest - def test_identity_transformation(self): - """Test that identity transformation passes values through unchanged.""" - pipeline = Pipeline() +from efemel.pipeline import Pipeline - # Test with different data types - assert pipeline.run(5) == 5 - assert pipeline.run("hello") == "hello" - assert pipeline.run([1, 2, 3]) == [1, 2, 3] - assert pipeline.run({"key": "value"}) == {"key": "value"} - assert pipeline.run(None) is None - def test_identity_reset(self): - """Test that identity() resets the pipeline to initial state.""" - pipeline = Pipeline() +class TestPipelineBasics: + """Test basic Pipeline functionality.""" - # Add some transformations - pipeline.map(lambda x: x * 2).filter(lambda x: x > 10) + def test_pipeline_initialization(self): + """Test Pipeline initialization with various iterables.""" + # Test with list + pipeline = Pipeline([1, 2, 3, 4, 5]) + assert isinstance(pipeline, Pipeline) + assert pipeline.generator == [1, 2, 3, 4, 5] - # Reset to identity - pipeline.identity() + # Test with tuple + pipeline = Pipeline((1, 2, 3)) + assert list(pipeline) == [1, 2, 3] - # Should now pass values through unchanged - assert pipeline.run(5) == 5 + # Test with generator + def gen(): + yield from range(1, 4) - def test_map_basic_transformation(self): - """Test basic map transformations.""" - pipeline = Pipeline() + pipeline = Pipeline(gen()) + assert list(pipeline) == [1, 2, 3] - # Double numbers - result = pipeline.map(lambda x: x * 2).run(5) - assert result == 10 + # Test with empty list + pipeline = Pipeline([]) + assert list(pipeline) == [] - # Transform strings - pipeline = Pipeline() - result = pipeline.map(str.upper).run("hello") - assert result == "HELLO" + def test_pipeline_iteration(self): + """Test Pipeline iteration behavior.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + + # Test iteration + result = [] + for item in pipeline: + result.append(item) + + assert result == [1, 2, 3, 4, 5] + + def test_to_list(self): + """Test Pipeline.to_list() method.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + result = pipeline.to_list() + + assert result == [1, 2, 3, 4, 5] + assert isinstance(result, list) + + # Test with empty pipeline + empty_pipeline = Pipeline([]) + assert empty_pipeline.to_list() == [] + + def test_first(self): + """Test Pipeline.first() method.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + assert pipeline.first() == 1 + + # Test with string + str_pipeline = Pipeline(["hello", "world"]) + assert str_pipeline.first() == "hello" + + def test_first_empty_pipeline(self): + """Test Pipeline.first() with empty pipeline.""" + empty_pipeline = Pipeline([]) + with pytest.raises(StopIteration): + empty_pipeline.first() + + +class TestPipelineFiltering: + """Test Pipeline filtering functionality.""" + + def test_filter_basic(self): + """Test basic filtering operations.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + + # Filter even numbers + even_pipeline = pipeline.filter(lambda x: x % 2 == 0) + assert even_pipeline.to_list() == [2, 4] + + # Filter numbers greater than 3 + gt3_pipeline = pipeline.filter(lambda x: x > 3) + assert gt3_pipeline.to_list() == [4, 5] + + def test_filter_with_strings(self): + """Test filtering with string data.""" + pipeline = Pipeline(["hello", "world", "python", "test"]) + + # Filter strings longer than 4 characters + long_strings = pipeline.filter(lambda s: len(s) > 4) + assert long_strings.to_list() == ["hello", "world", "python"] + + # Filter strings starting with 'p' + p_strings = pipeline.filter(lambda s: s.startswith("p")) + assert p_strings.to_list() == ["python"] + + def test_filter_empty_result(self): + """Test filter that results in empty pipeline.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + + # Filter that matches nothing + empty_result = pipeline.filter(lambda x: x > 10) + assert empty_result.to_list() == [] + + def test_filter_all_pass(self): + """Test filter where all items pass.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + + # Filter that passes everything + all_pass = pipeline.filter(lambda x: True) + assert all_pass.to_list() == [1, 2, 3, 4, 5] + + def test_filter_chaining(self): + """Test chaining multiple filters.""" + pipeline = Pipeline([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) + + # Chain filters: even numbers and greater than 5 + result = pipeline.filter(lambda x: x % 2 == 0).filter(lambda x: x > 5) + assert result.to_list() == [6, 8, 10] + + +class TestPipelineMapping: + """Test Pipeline mapping functionality.""" + + def test_map_basic(self): + """Test basic mapping operations.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + + # Double each number + doubled = pipeline.map(lambda x: x * 2) + assert doubled.to_list() == [2, 4, 6, 8, 10] + + # Square each number + squared = pipeline.map(lambda x: x**2) + assert squared.to_list() == [1, 4, 9, 16, 25] + + def test_map_type_transformation(self): + """Test mapping with type transformation.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + + # Convert numbers to strings + str_pipeline = pipeline.map(str) + assert str_pipeline.to_list() == ["1", "2", "3", "4", "5"] + + # Convert to boolean (non-zero is True) + bool_pipeline = pipeline.map(bool) + assert bool_pipeline.to_list() == [True, True, True, True, True] + + def test_map_with_strings(self): + """Test mapping with string data.""" + pipeline = Pipeline(["hello", "world", "python"]) + + # Convert to uppercase + upper_pipeline = pipeline.map(str.upper) + assert upper_pipeline.to_list() == ["HELLO", "WORLD", "PYTHON"] + + # Get string lengths + len_pipeline = pipeline.map(len) + assert len_pipeline.to_list() == [5, 5, 6] def test_map_chaining(self): """Test chaining multiple map operations.""" - pipeline = Pipeline() + pipeline = Pipeline([1, 2, 3, 4, 5]) - # Chain multiple maps: 5 * 2 = 10, 10 + 1 = 11 - result = pipeline.map(lambda x: x * 2).map(lambda x: x + 1).run(5) - assert result == 11 + # Chain maps: double, then add 1 + result = pipeline.map(lambda x: x * 2).map(lambda x: x + 1) + assert result.to_list() == [3, 5, 7, 9, 11] - def test_map_failure_handling(self): - """Test that map failures are handled properly.""" - pipeline = Pipeline() + def test_map_empty_pipeline(self): + """Test mapping on empty pipeline.""" + empty_pipeline = Pipeline([]) - # Division by zero should cause failure - result = pipeline.map(lambda x: x / 0).run(5) - assert result is None + # Map should return empty result + result = empty_pipeline.map(lambda x: x * 2) + assert result.to_list() == [] - # Exception in map function should cause failure - result = pipeline.map(lambda x: x.nonexistent_method()).run(5) - assert result is None - def test_filter_basic_operation(self): - """Test basic filter operations.""" - pipeline = Pipeline() +class TestPipelineMapFilter: + """Test Pipeline map and filter combinations.""" - # Even numbers pass - result = pipeline.filter(lambda x: x % 2 == 0).run(4) - assert result == 4 + def test_map_then_filter(self): + """Test mapping then filtering.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) - # Odd numbers fail - result = pipeline.filter(lambda x: x % 2 == 0).run(5) - assert result is None + # Double numbers, then filter even results + result = pipeline.map(lambda x: x * 2).filter(lambda x: x % 2 == 0) + assert result.to_list() == [2, 4, 6, 8, 10] - def test_filter_with_strings(self): - """Test filter operations with strings.""" - pipeline = Pipeline() + def test_filter_then_map(self): + """Test filtering then mapping.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) - # Non-empty strings pass - result = pipeline.filter(lambda s: len(s) > 0).run("hello") - assert result == "hello" + # Filter even numbers, then double them + result = pipeline.filter(lambda x: x % 2 == 0).map(lambda x: x * 2) + assert result.to_list() == [4, 8] - # Empty strings fail - result = pipeline.filter(lambda s: len(s) > 0).run("") - assert result is None + def test_complex_map_filter_chain(self): + """Test complex chaining of map and filter operations.""" + pipeline = Pipeline([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) - def test_map_filter_chaining(self): - """Test chaining map and filter operations.""" - pipeline = Pipeline() + # Complex chain: filter odd, multiply by 3, filter > 10 + result = ( + pipeline.filter(lambda x: x % 2 == 1) # [1, 3, 5, 7, 9] + .map(lambda x: x * 3) # [3, 9, 15, 21, 27] + .filter(lambda x: x > 10) + ) # [15, 21, 27] - # 6 * 2 = 12, 12 > 10 is True - result = pipeline.map(lambda x: x * 2).filter(lambda x: x > 10).run(6) - assert result == 12 + assert result.to_list() == [15, 21, 27] - # 6 * 2 = 12, 12 > 20 is False - pipeline = Pipeline() - result = pipeline.map(lambda x: x * 2).filter(lambda x: x > 20).run(6) - assert result is None - def test_reduce_sum_accumulation(self): - """Test reduce operation with sum accumulation.""" - pipeline = Pipeline() - pipeline.reduce(lambda acc, x: acc + x, 0) +class TestPipelineReduce: + """Test Pipeline reduce functionality.""" - # Add numbers to accumulator - pipeline.run(5) # 0 + 5 = 5 - pipeline.run(3) # 5 + 3 = 8 - pipeline.run(2) # 8 + 2 = 10 + def test_reduce_basic(self): + """Test basic reduce operations.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) - # Get final result - result = pipeline.run(SYMBOL_END) - assert result == 10 + # Sum all numbers + sum_result = pipeline.reduce(lambda acc, x: acc + x, 0) + assert sum_result.first() == 15 - def test_reduce_reset_functionality(self): - """Test reduce reset functionality.""" - pipeline = Pipeline() - pipeline.reduce(lambda acc, x: acc + x, 0) + # Multiply all numbers + product_result = pipeline.reduce(lambda acc, x: acc * x, 1) + assert product_result.first() == 120 - # Accumulate some values - pipeline.run(5) - pipeline.run(3) + def test_reduce_with_strings(self): + """Test reduce with string data.""" + pipeline = Pipeline(["hello", "world", "python"]) - # Reset and start over - pipeline.run(SYMBOL_RESET) - pipeline.run(7) + # Concatenate strings + concat_result = pipeline.reduce(lambda acc, x: acc + " " + x, "") + assert concat_result.first() == " hello world python" - result = pipeline.run(SYMBOL_END) - assert result == 7 + # Join with commas + join_result = pipeline.reduce(lambda acc, x: acc + "," + x if acc else x, "") + assert join_result.first() == "hello,world,python" - def test_reduce_list_accumulation(self): - """Test reduce operation with list accumulation.""" - pipeline = Pipeline() - pipeline.reduce(lambda acc, x: acc + [x], []) + def test_reduce_empty_pipeline(self): + """Test reduce on empty pipeline.""" + empty_pipeline = Pipeline([]) - pipeline.run("a") - pipeline.run("b") - pipeline.run("c") + # Reduce should return initial value + result = empty_pipeline.reduce(lambda acc, x: acc + x, 10) + assert result.first() == 10 - result = pipeline.run(SYMBOL_END) - assert result == ["a", "b", "c"] + def test_reduce_single_item(self): + """Test reduce with single item.""" + single_pipeline = Pipeline([42]) - def test_reduce_object_accumulation(self): - """Test reduce operation with object accumulation.""" - pipeline = Pipeline() - pipeline.reduce(lambda acc, item: {**acc, **item}, {}) + # Should combine initial value with single item + result = single_pipeline.reduce(lambda acc, x: acc + x, 10) + assert result.first() == 52 - pipeline.run({"name": "John"}) - pipeline.run({"age": 30}) - pipeline.run({"city": "NYC"}) - result = pipeline.run(SYMBOL_END) - assert result == {"name": "John", "age": 30, "city": "NYC"} +class TestPipelineTap: + """Test Pipeline tap functionality.""" - def test_pipeline_failure_propagation(self): - """Test that failures propagate through the pipeline.""" - pipeline = Pipeline() + def test_tap_basic(self): + """Test basic tap operations.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + side_effects = [] - # Filter that fails should make entire pipeline fail - result = pipeline.map(lambda x: x * 2).filter(lambda x: x > 100).map(lambda x: x + 1).run(10) + # Tap to collect side effects + result = pipeline.tap(side_effects.append) + result_list = result.to_list() - # 10 * 2 = 20, 20 > 100 is False, so pipeline fails - assert result is None + assert result_list == [1, 2, 3, 4, 5] + assert side_effects == [1, 2, 3, 4, 5] - def test_pipeline_with_none_input(self): - """Test pipeline behavior with None input.""" - pipeline = Pipeline() + def test_tap_with_print(self): + """Test tap with print (no assertion needed, just verify it works).""" + pipeline = Pipeline([1, 2, 3]) - # Identity should pass None through - assert pipeline.run(None) is None + # This should not raise any exceptions + result = pipeline.tap(lambda x: None) # Mock print + assert result.to_list() == [1, 2, 3] - # Map should work with None if function handles it - pipeline = Pipeline() - result = pipeline.map(lambda x: x if x is None else x * 2).run(None) - assert result is None + def test_tap_chaining(self): + """Test tap in a chain of operations.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + side_effects = [] + + # Tap in middle of chain + result = pipeline.map(lambda x: x * 2).tap(side_effects.append).filter(lambda x: x > 5) - def test_filter_exception_handling(self): - """Test that exceptions in filter predicates are handled.""" - pipeline = Pipeline() + result_list = result.to_list() - # Exception in predicate should cause failure - result = pipeline.filter(lambda x: x.nonexistent_method()).run(5) + assert result_list == [6, 8, 10] + assert side_effects == [2, 4, 6, 8, 10] + + def test_tap_doesnt_modify_pipeline(self): + """Test that tap doesn't modify the pipeline data.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + + # Tap with function that would modify if it could + result = pipeline.tap(lambda x: x * 1000) + + # Data should be unchanged + assert result.to_list() == [1, 2, 3, 4, 5] + + +class TestPipelineEach: + """Test Pipeline each functionality.""" + + def test_each_basic(self): + """Test basic each operations.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + side_effects = [] + + # Each should execute function for each item + pipeline.each(side_effects.append) + + assert side_effects == [1, 2, 3, 4, 5] + + def test_each_returns_none(self): + """Test that each returns None.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + + result = pipeline.each(lambda x: None) assert result is None - def test_reduce_exception_handling(self): - """Test that exceptions in reduce accumulator are handled.""" - pipeline = Pipeline() - pipeline.reduce(lambda acc, x: acc / x, 1) # Will cause division by zero + def test_each_with_empty_pipeline(self): + """Test each with empty pipeline.""" + empty_pipeline = Pipeline([]) + side_effects = [] + + # Should not execute function + empty_pipeline.each(side_effects.append) + + assert side_effects == [] + + def test_each_terminal_operation(self): + """Test that each is a terminal operation.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + side_effects = [] + + # After each, pipeline should be consumed + pipeline.each(side_effects.append) - pipeline.run(0) # This should cause an exception in the accumulator + assert side_effects == [1, 2, 3, 4, 5] - # The pipeline should handle the exception gracefully - # Note: The exact behavior depends on implementation details - # This test ensures no unhandled exceptions occur + +class TestPipelineUtility: + """Test Pipeline utility methods.""" + + def test_passthrough(self): + """Test passthrough method.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + + # Passthrough should return the same pipeline + result = pipeline.passthrough() + assert result is pipeline + + # Data should be unchanged + assert result.to_list() == [1, 2, 3, 4, 5] + + def test_apply_single_function(self): + """Test apply with single function.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + + def double_pipeline(p): + return p.map(lambda x: x * 2) + + result = pipeline.apply(double_pipeline) + assert result.to_list() == [2, 4, 6, 8, 10] + + def test_apply_multiple_functions(self): + """Test apply with multiple functions.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + + def double_pipeline(p): + return p.map(lambda x: x * 2) + + def filter_even(p): + return p.filter(lambda x: x % 2 == 0) + + result = pipeline.apply(double_pipeline, filter_even) + assert result.to_list() == [2, 4, 6, 8, 10] + + def test_apply_no_functions(self): + """Test apply with no functions.""" + pipeline = Pipeline([1, 2, 3, 4, 5]) + + # Should return the same pipeline + result = pipeline.apply() + assert result.to_list() == [1, 2, 3, 4, 5] + + def test_flatten_basic(self): + """Test basic flatten operation.""" + pipeline = Pipeline([[1, 2], [3, 4], [5]]) + + result = pipeline.flatten() + assert result.to_list() == [1, 2, 3, 4, 5] + + def test_flatten_with_strings(self): + """Test flatten with string iterables.""" + pipeline = Pipeline(["hello", "world"]) + + result = pipeline.flatten() + assert result.to_list() == ["h", "e", "l", "l", "o", "w", "o", "r", "l", "d"] + + def test_flatten_empty_lists(self): + """Test flatten with empty lists.""" + pipeline = Pipeline([[], [1, 2], [], [3]]) + + result = pipeline.flatten() + assert result.to_list() == [1, 2, 3] + + def test_flatten_nested_tuples(self): + """Test flatten with nested tuples.""" + pipeline = Pipeline([(1, 2), (3, 4, 5), (6,)]) + + result = pipeline.flatten() + assert result.to_list() == [1, 2, 3, 4, 5, 6] + + +class TestPipelineEdgeCases: + """Test Pipeline edge cases and error conditions.""" + + def test_pipeline_with_none_values(self): + """Test pipeline with None values.""" + pipeline = Pipeline([1, None, 3, None, 5]) + + # Should handle None values properly + result = pipeline.to_list() + assert result == [1, None, 3, None, 5] + + # Filter out None values + no_none = pipeline.filter(lambda x: x is not None) + assert no_none.to_list() == [1, 3, 5] + + def test_pipeline_with_mixed_types(self): + """Test pipeline with mixed data types.""" + pipeline = Pipeline([1, "hello", 3.14, True, None]) + + # Should handle mixed types + result = pipeline.to_list() + assert result == [1, "hello", 3.14, True, None] + + # Filter by type (excluding boolean which is a subclass of int) + numbers = pipeline.filter(lambda x: isinstance(x, int | float) and not isinstance(x, bool) and x is not None) + assert numbers.to_list() == [1, 3.14] def test_pipeline_reuse(self): - """Test that pipelines can be reused with different inputs.""" - pipeline = Pipeline() - pipeline.map(lambda x: x * 2).filter(lambda x: x > 5) - - # Test with different inputs - assert pipeline.run(3) == 6 # 3 * 2 = 6, 6 > 5 - assert pipeline.run(2) is None # 2 * 2 = 4, 4 > 5 is False - assert pipeline.run(5) == 10 # 5 * 2 = 10, 10 > 5 - - def test_symbol_end_and_reset_constants(self): - """Test that SYMBOL_END and SYMBOL_RESET are proper constants.""" - # These should be singleton instances - assert SYMBOL_END is SYMBOL_END - assert SYMBOL_RESET is SYMBOL_RESET - assert SYMBOL_END is not SYMBOL_RESET - - # Test string representations - assert str(SYMBOL_END) == "SymbolEnd" - assert str(SYMBOL_RESET) == "SymbolReset" - - def test_pipeline_method_chaining_returns_self(self): - """Test that all pipeline methods return self for chaining.""" - pipeline = Pipeline() - - # All these should return the same pipeline instance - assert pipeline.identity() is pipeline - assert pipeline.map(lambda x: x) is pipeline - assert pipeline.filter(lambda x: True) is pipeline - assert pipeline.reduce(lambda acc, x: acc, 0) is pipeline + """Test that pipelines can be reused.""" + original_data = [1, 2, 3, 4, 5] + pipeline = Pipeline(original_data) + + # First use + result1 = pipeline.map(lambda x: x * 2).to_list() + assert result1 == [2, 4, 6, 8, 10] + + # Create new pipeline from same data + pipeline2 = Pipeline(original_data) + result2 = pipeline2.filter(lambda x: x % 2 == 0).to_list() + assert result2 == [2, 4] + + def test_pipeline_method_chaining_returns_new_instances(self): + """Test that pipeline methods return new instances.""" + original = Pipeline([1, 2, 3, 4, 5]) + + # Methods should return new instances (except passthrough) + mapped = original.map(lambda x: x * 2) + filtered = original.filter(lambda x: x % 2 == 0) + + assert mapped is not original + assert filtered is not original + assert mapped is not filtered + + def test_pipeline_with_generator_exhaustion(self): + """Test pipeline behavior with generator exhaustion.""" + + def number_generator(): + yield from range(3) + + pipeline = Pipeline(number_generator()) + + # First consumption + result1 = pipeline.to_list() + assert result1 == [0, 1, 2] + + # Generator should be exhausted now + # This behavior depends on the implementation + + +class TestPipelineIntegration: + """Integration tests for Pipeline class.""" + + def test_data_processing_pipeline(self): + """Test a realistic data processing pipeline.""" + # Simulate processing a list of user data + users = [ + {"name": "Alice", "age": 30, "active": True}, + {"name": "Bob", "age": 25, "active": False}, + {"name": "Charlie", "age": 35, "active": True}, + {"name": "Diana", "age": 28, "active": True}, + {"name": "Eve", "age": 22, "active": False}, + ] + + pipeline = Pipeline(users) + + # Process: filter active users, extract names, convert to uppercase + result = pipeline.filter(lambda user: user["active"]).map(lambda user: user["name"]).map(str.upper) + + assert result.to_list() == ["ALICE", "CHARLIE", "DIANA"] + + def test_number_processing_pipeline(self): + """Test a number processing pipeline.""" + numbers = range(1, 21) # 1 to 20 + + pipeline = Pipeline(numbers) + + # Process: filter even numbers, square them, filter > 50, sum + result = ( + pipeline.filter(lambda x: x % 2 == 0) # [2, 4, 6, 8, 10, 12, 14, 16, 18, 20] + .map(lambda x: x**2) # [4, 16, 36, 64, 100, 144, 196, 256, 324, 400] + .filter(lambda x: x > 50) # [64, 100, 144, 196, 256, 324, 400] + .reduce(lambda acc, x: acc + x, 0) + ) # 1484 + + assert result.first() == 1484 + + def test_text_processing_pipeline(self): + """Test a text processing pipeline.""" + text = "Hello world! This is a test. Python is amazing." + words = text.split() + + pipeline = Pipeline(words) + + # Process: filter words > 3 chars, remove punctuation, lowercase, get unique + result = ( + pipeline.filter(lambda word: len(word) > 3) + .map(lambda word: word.strip(".,!")) + .map(str.lower) + .filter(lambda word: word not in ["this"]) + ) # Simple "unique" filter + + expected = ["hello", "world", "test", "python", "amazing"] + assert result.to_list() == expected + + def test_nested_data_processing(self): + """Test processing nested data structures.""" + data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]] + + pipeline = Pipeline(data) + + # Flatten, filter odd numbers, square them + result = ( + pipeline.flatten() # [1, 2, 3, 4, 5, 6, 7, 8, 9] + .filter(lambda x: x % 2 == 1) # [1, 3, 5, 7, 9] + .map(lambda x: x**2) + ) # [1, 9, 25, 49, 81] + + assert result.to_list() == [1, 9, 25, 49, 81]