@@ -123,13 +123,54 @@ def flatten[T](
123123 """Flattens nested lists; the context is passed through the operation."""
124124 return self ._pipe (lambda chunk , ctx : [item for sublist in chunk for item in sublist ]) # type: ignore
125125
126- def tap ( self , function : PipelineFunction [ Out , Any ]) -> "Transformer[In, Out]" :
127- """Applies a side-effect function without modifying the data."""
126+ @ overload
127+ def tap ( self , arg : "Transformer[Out, Any]" ) -> "Transformer[In, Out]" : ...
128128
129- if is_context_aware (function ):
130- return self ._pipe (lambda chunk , ctx : [x for x in chunk if function (x , ctx ) or True ])
129+ @overload
130+ def tap (self , arg : PipelineFunction [Out , Any ]) -> "Transformer[In, Out]" : ...
131+
132+ def tap (
133+ self ,
134+ arg : Union ["Transformer[Out, Any]" , PipelineFunction [Out , Any ]],
135+ ) -> "Transformer[In, Out]" :
136+ """
137+ Applies a side-effect without modifying the main data stream.
138+
139+ This method can be used in two ways:
140+ 1. With a `Transformer`: Applies a sub-pipeline to each chunk for side-effects
141+ (e.g., logging a chunk), discarding the sub-pipeline's output.
142+ 2. With a `function`: Applies a function to each element individually for
143+ side-effects (e.g., printing an item).
144+
145+ Args:
146+ arg: A `Transformer` instance or a function to be applied for side-effects.
147+
148+ Returns:
149+ The transformer instance for method chaining.
150+ """
151+ match arg :
152+ # Case 1: The argument is another Transformer
153+ case Transformer () as tapped_transformer :
154+ tapped_func = tapped_transformer .transformer
155+
156+ def operation (chunk : list [Out ], ctx : PipelineContext ) -> list [Out ]:
157+ # Execute the tapped transformer's logic on the chunk for side-effects.
158+ _ = tapped_func (chunk , ctx )
159+ # Return the original chunk to continue the main pipeline.
160+ return chunk
161+
162+ return self ._pipe (operation )
163+
164+ # Case 2: The argument is a callable function
165+ case function if callable (function ):
166+ if is_context_aware (function ):
167+ return self ._pipe (lambda chunk , ctx : [x for x in chunk if function (x , ctx ) or True ])
168+
169+ return self ._pipe (lambda chunk , _ctx : [x for x in chunk if function (x ) or True ]) # type: ignore
131170
132- return self ._pipe (lambda chunk , _ctx : [function (x ) or x for x in chunk ]) # type: ignore
171+ # Default case for robustness
172+ case _:
173+ raise TypeError (f"tap() argument must be a Transformer or a callable, not { type (arg ).__name__ } " )
133174
134175 def apply [T ](self , t : Callable [[Self ], "Transformer[In, T]" ]) -> "Transformer[In, T]" :
135176 """Apply another pipeline to the current one."""
0 commit comments