Skip to content

Execute PyActors in asyncio.TaskGroup#943

Merged
rapids-bot[bot] merged 4 commits intorapidsai:mainfrom
mroeschke:ref/pyactors/taskgroup
Apr 10, 2026
Merged

Execute PyActors in asyncio.TaskGroup#943
rapids-bot[bot] merged 4 commits intorapidsai:mainfrom
mroeschke:ref/pyactors/taskgroup

Conversation

@mroeschke
Copy link
Copy Markdown
Contributor

Similar what was done in cudf_polars, rapidsai/cudf#21858, now that Python 3.11 is the minimum version, it would be good to start running PyActors in an asyncio.TaskGroup to have the nice benefit of canceling other PyActor if one fails.

Once difference is that asyncio.TaskGroups return ExceptionGroups instead of regular Exceptions like

  |   File "/conda/envs/rapidsmpf-dev/lib/python3.14/asyncio/taskgroups.py", line 174, in _aexit
  |     raise BaseExceptionGroup(
  |     ...<2 lines>...
  |     ) from None
  | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "rapidsmpf/streaming/core/actor.pyx", line 143, in _await_actor
    |     return await actor
    |     ^^^^^^^
    |   File "rapidsmpf/streaming/core/actor.pyx", line 105, in run
    |     return await coro
    |     
    |   File "/rapidsmpf/python/rapidsmpf/rapidsmpf/tests/streaming/test_exceptions.py", line 24, in task_that_throws
    |     raise RuntimeError("Throwing in task")
    | RuntimeError: Throwing in task
    +------------------------------------

which I think is OK as it provides a little more context about the exception.

@mroeschke mroeschke requested a review from a team as a code owner April 3, 2026 23:41
@madsbk madsbk added breaking Introduces a breaking change improvement Improves an existing functionality labels Apr 7, 2026
Copy link
Copy Markdown
Member

@madsbk madsbk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good

Comment thread python/rapidsmpf/rapidsmpf/streaming/core/actor.pyx Outdated
@wence-
Copy link
Copy Markdown
Contributor

wence- commented Apr 7, 2026

I am looking at the PyActor stuff again and I am still a bit confused as to why we need this class at all. PyActor does two things:

  1. augment the decorated function so that channels are automatically shut down on exceptions
  2. provide a way of distinguishing from the CppActors that must be on the libcoro event loop.

(1) is important, but other than the need for (2) a PyActor is really just any Python async function. Given that we can distinguish CppActors because those all do inherit from a given class I wonder why we don't just have the define_actor decorator just handle point (2). Something like this:

diff --git a/python/rapidsmpf/rapidsmpf/streaming/core/actor.pyx b/python/rapidsmpf/rapidsmpf/streaming/core/actor.pyx
index 7ffdac24..97e0c56e 100644
--- a/python/rapidsmpf/rapidsmpf/streaming/core/actor.pyx
+++ b/python/rapidsmpf/rapidsmpf/streaming/core/actor.pyx
@@ -79,38 +79,21 @@ cdef class CppActor:
         return move(self._handle)
 
 
-class PyActor(Awaitable[None]):
-    """
-    A streaming actor implemented in Python.
-
-    This runs as an Python coroutine (asyncio), which means it comes with a significant
-    Python overhead. The GIL is released while C++ actors are executing.
-    """
-    def __init__(self, func, extra_channels, /, *args, **kwargs):
-        if len(args) < 1 or not isinstance(args[0], Context):
-            raise TypeError(
-                "expect a Context as the first positional argument "
-                "(not as a keyword argument)"
-            )
-        ctx = args[0]
-        channels_to_shutdown = (*collect_channels(args, kwargs), *extra_channels)
-        self._coro = self.run(ctx, channels_to_shutdown, func(*args, **kwargs))
-
-    @staticmethod
-    async def run(Context ctx not None, channels_to_shutdown, coro):
-        """
-        Run the coroutine and shutdown the channels when done.
-        """
-        try:
-            return await coro
-        finally:
-            for ch in channels_to_shutdown:
-                await ch.shutdown(ctx)
-
-    def __await__(self):
-        return self._coro.__await__()
-
-
+async def py_actor(func, extra_channels, /, *args, **kwargs):
+    if len(args) < 1 or not isinstance(args[0], Context):
+        raise TypeError(
+            "expect a Context as the first positional argument "
+            "(not as a keyword argument)"
+        )
+    ctx = args[0]
+    channels_to_shutdown = (*collect_channels(args, kwargs), *extra_channels)
+    try:
+        return await func(*args, **kwargs)
+    finally:
+        for ch in channels_to_shutdown:
+            await ch.shutdown(ctx)
+    
+    
 def collect_channels(*objs):
     """Recursively yield all `Channel` instances found in ``objs``."""
     for obj in objs:
@@ -128,7 +111,7 @@ cdef decorate_actor(extra_channels, func):
     """Validate ``func`` is async and wrap it as a PyActor."""
     if not inspect.iscoroutinefunction(func):
         raise TypeError(f"`{func.__qualname__}` must be an async function")
-    return wraps(func)(partial(PyActor, func, extra_channels))
+    return wraps(func)(partial(py_actor, func, extra_channels))
 
 
 async def run_py_actors(py_actors):
@@ -248,12 +231,8 @@ def run_actor_network(*, actors, py_executor = None):
     for actor in actors:
         if isinstance(actor, CppActor):
             cpp_actors.push_back(move(deref((<CppActor>actor).release_handle())))
-        elif isinstance(actor, PyActor):
-            py_actors.append(actor)
         else:
-            raise ValueError(
-                "Unknown actor type, did you forget to use `@define_actor()`?"
-            )
+            py_actors.append(actor)
 
     if len(py_actors) > 0:
         if py_executor is None:

@madsbk
Copy link
Copy Markdown
Member

madsbk commented Apr 7, 2026

I am looking at the PyActor stuff again and I am still a bit confused as to why we need this class at all. PyActor does two things:

1. augment the decorated function so that channels are automatically shut down on exceptions

2. provide a way of distinguishing from the `CppActor`s that must be on the libcoro event loop.

(1) is important, but other than the need for (2) a PyActor is really just any Python async function. Given that we can distinguish CppActors because those all do inherit from a given class I wonder why we don't just have the define_actor decorator just handle point (2).

I think that might work fine.

@mroeschke
Copy link
Copy Markdown
Contributor Author

mroeschke commented Apr 7, 2026

Something like this:

Sure thing, implemented in 5ad476d. I also don't see PyActor explicitly used in cudf_polars so this shouldn't be breaking there

@mroeschke mroeschke requested a review from wence- April 7, 2026 20:56
@mroeschke
Copy link
Copy Markdown
Contributor Author

/merge

@rapids-bot rapids-bot bot merged commit 95891ba into rapidsai:main Apr 10, 2026
58 checks passed
@mroeschke mroeschke deleted the ref/pyactors/taskgroup branch April 10, 2026 16:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

breaking Introduces a breaking change improvement Improves an existing functionality

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants