Skip to content

Commit ba2be73

Browse files
authored
Merge pull request #14 from ringoldsdev/feat/20250729/catch
fix: addressed catch function pickleability
2 parents 9e0ba1a + 02480e8 commit ba2be73

File tree

4 files changed

+21
-10
lines changed

4 files changed

+21
-10
lines changed

laygo/transformers/http.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ def apply[T](self, t: Callable[["HTTPTransformer[In, Out]"], "Transformer[In, T]
233233
def catch[U](
234234
self,
235235
sub_pipeline_builder: Callable[[Transformer[Out, Out]], Transformer[Out, U]],
236-
on_error: ChunkErrorHandler[Out, U] | None = None,
236+
on_error: ChunkErrorHandler[Out, None] | None = None,
237237
) -> "HTTPTransformer[In, U]":
238238
super().catch(sub_pipeline_builder, on_error)
239239
return self # type: ignore

laygo/transformers/parallel.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ def apply[T](
226226
def catch[U](
227227
self,
228228
sub_pipeline_builder: Callable[[Transformer[Out, Out]], Transformer[Out, U]],
229-
on_error: ChunkErrorHandler[Out, U] | None = None,
229+
on_error: ChunkErrorHandler[Out, None] | None = None,
230230
) -> "ParallelTransformer[In, U]":
231231
super().catch(sub_pipeline_builder, on_error)
232232
return self # type: ignore

laygo/transformers/threaded.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ def apply[T](
241241
def catch[U](
242242
self,
243243
sub_pipeline_builder: Callable[[Transformer[Out, Out]], Transformer[Out, U]],
244-
on_error: ChunkErrorHandler[Out, U] | None = None,
244+
on_error: ChunkErrorHandler[Out, None] | None = None,
245245
) -> "ThreadedTransformer[In, U]":
246246
super().catch(sub_pipeline_builder, on_error)
247247
return self # type: ignore

laygo/transformers/transformer.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -442,22 +442,28 @@ def _reduce(data: Iterable[In], context: IContextManager | None = None) -> Itera
442442
def catch[U](
443443
self,
444444
sub_pipeline_builder: Callable[["Transformer[Out, Out]"], "Transformer[Out, U]"],
445-
on_error: ChunkErrorHandler[Out, U] | None = None,
445+
on_error: ChunkErrorHandler[Out, None] | None = None,
446446
) -> "Transformer[In, U]":
447447
"""Isolate a sub-pipeline in a chunk-based try-catch block.
448448
449449
If the sub-pipeline fails for a chunk, the on_error handler is invoked.
450450
451451
Args:
452452
sub_pipeline_builder: A function that builds the sub-pipeline to protect.
453-
on_error: Optional error handler for when the sub-pipeline fails.
453+
on_error: A picklable error handler for when the sub-pipeline fails.
454+
It takes a chunk, exception, and context, and must return a
455+
replacement chunk (`list[U]`). If not provided, an empty
456+
list is returned on error.
454457
455458
Returns:
456459
A transformer with error handling applied.
457460
"""
458461

459-
if on_error:
460-
self.on_error(on_error) # type: ignore
462+
# Use the global error handler if it exists, otherwise create an internal one
463+
catch_error_handler = self.error_handler
464+
465+
if on_error is not None:
466+
catch_error_handler.on_error(on_error) # type: ignore
461467

462468
# Create a blank transformer for the sub-pipeline
463469
temp_transformer = createTransformer(_type_hint=..., chunk_size=self.chunk_size) # type: ignore
@@ -466,13 +472,18 @@ def catch[U](
466472
sub_pipeline = sub_pipeline_builder(temp_transformer)
467473
sub_transformer_func = sub_pipeline.transformer
468474

475+
# This 'operation' function is now picklable. It only closes over
476+
# `sub_transformer_func` and `catch_error_handler`, both of which are
477+
# picklable, and it no longer references `self`.
469478
def operation(chunk: list[Out], ctx: IContextManager) -> list[U]:
470479
try:
471-
# Attempt to process the whole chunk with the sub-pipeline
480+
# Attempt to process the chunk with the sub-pipeline
472481
return sub_transformer_func(chunk, ctx)
473482
except Exception as e:
474-
# On failure, delegate to the chunk-based error handler
475-
self.error_handler.handle(chunk, e, ctx)
483+
# Call the error handler (which may include both global and local handlers)
484+
catch_error_handler.handle(chunk, e, ctx)
485+
486+
# Return an empty list as the default behavior after handling the error
476487
return []
477488

478489
return self._pipe(operation) # type: ignore

0 commit comments

Comments
 (0)