From a551a22c5fc74817a4b7a20ccbe82de98fa6ec12 Mon Sep 17 00:00:00 2001 From: Tamaki Nishino Date: Sun, 22 Feb 2026 20:33:13 +0900 Subject: [PATCH] Slim down README and move create_task to Basics section in Guide --- README.md | 570 +------------------------------------------------- docs/guide.md | 4 +- 2 files changed, 4 insertions(+), 570 deletions(-) diff --git a/README.md b/README.md index e866bc5..b2732e8 100644 --- a/README.md +++ b/README.md @@ -72,580 +72,12 @@ int main(int argc, char * argv[]) Every coroutine returns `Task` and is launched with `ctx.create_task(...)`. The standard `rclcpp::spin()` drives execution -- no special executor needed. -## Usage - -### create_task - -`create_task` registers and starts a coroutine. It accepts either a `Task` directly or a callable that returns one. - -```cpp -// From a coroutine function -Task my_coroutine(CoContext & ctx) { /* ... */ co_return; } -auto task = ctx.create_task(my_coroutine(ctx)); - -// From a lambda -auto task = ctx.create_task([&ctx]() -> Task { - co_await ctx.sleep(std::chrono::seconds(1)); -}); -``` - -You can launch multiple tasks concurrently -- they all run cooperatively on the single-threaded executor: - -```cpp -auto task_a = ctx.create_task(do_work_a(ctx)); -auto task_b = ctx.create_task(do_work_b(ctx)); -// Both run concurrently via co_await yield points -``` - -### Subscriber (TopicStream) - -`ctx.subscribe(topic, qos)` returns a `TopicStream` -- an async stream that yields messages via `co_await stream->next()`. - -```cpp -#include - -Task listen(CoContext & ctx, std::string topic) -{ - auto stream = ctx.subscribe(topic, 10); - - while (true) { - auto msg = co_await stream->next(); - if (!msg.has_value()) { - break; - } - RCLCPP_INFO(ctx.node().get_logger(), "Heard: %s", (*msg)->data.c_str()); - } -} -``` - -You can subscribe to multiple topics in parallel by launching separate tasks: - -```cpp -auto task_a = ctx.create_task(listen(ctx, "topic_a")); -auto task_b = ctx.create_task(listen(ctx, "topic_b")); -``` - -### Service Client - -Call a service with `co_await` -- no shared futures, no spinning while waiting. - -```cpp -#include - -using SetBool = std_srvs::srv::SetBool; - -Task call_service(CoContext & ctx) -{ - auto client = ctx.node().create_client("set_bool"); - - // Wait for the service to become available - auto wait_result = co_await ctx.wait_for_service(client, std::chrono::seconds(10)); - if (!wait_result.ok()) { - RCLCPP_ERROR(ctx.node().get_logger(), "Service not available"); - co_return; - } - - // Send request - auto req = std::make_shared(); - req->data = true; - - auto resp = co_await ctx.send_request(client, req); - RCLCPP_INFO(ctx.node().get_logger(), "Response: %s", resp->message.c_str()); -} -``` - -### Timer (TimerStream) - -`ctx.create_timer(period)` returns a `TimerStream` that ticks at the given interval. - -```cpp -Task tick(CoContext & ctx) -{ - auto timer = ctx.create_timer(std::chrono::seconds(1)); - int count = 0; - - while (true) { - co_await timer->next(); - RCLCPP_INFO(ctx.node().get_logger(), "Tick %d", count++); - } -} -``` - -### Service Server - -`ctx.create_service(name, callback)` creates a service whose handler is a coroutine. The handler can use `co_await` internally (e.g., to call other services) without blocking the executor. - -```cpp -auto service = ctx.create_service( - "set_bool", - [&ctx](SetBool::Request::SharedPtr req) -> Task { - // You can co_await inside the handler - co_await ctx.sleep(std::chrono::milliseconds(100)); - - SetBool::Response resp; - resp.success = req->data; - resp.message = req->data ? "enabled" : "disabled"; - co_return resp; - }); -``` - -### Action Server - -`ctx.create_action_server(name, callback)` creates an action server with a coroutine handler. Use `GoalContext` to check for cancellation and publish feedback. - -```cpp -#include - -using Fibonacci = example_interfaces::action::Fibonacci; - -auto action_server = ctx.create_action_server( - "fibonacci", - [&ctx](rclcpp_async::GoalContext goal) -> Task { - Fibonacci::Feedback feedback; - feedback.sequence = {0, 1}; - - for (int i = 2; i < goal.goal().order; i++) { - if (goal.is_canceling()) { - break; - } - auto n = feedback.sequence.size(); - feedback.sequence.push_back( - feedback.sequence[n - 1] + feedback.sequence[n - 2]); - goal.publish_feedback(feedback); - co_await ctx.sleep(std::chrono::milliseconds(500)); - } - - Fibonacci::Result result; - result.sequence = feedback.sequence; - co_return result; - }); -``` - -`GoalContext` provides: - -| Method | Description | -|---|---| -| `goal.goal()` | Access the goal request | -| `goal.is_canceling()` | Check if cancellation was requested | -| `goal.publish_feedback(fb)` | Send feedback to the client | -| `goal.abort()` | Abort the goal | - -### Single-Goal Action Server - -`create_single_goal_action_server(ctx, name, callback)` creates an action server that executes only one goal at a time. When a new goal arrives while another is executing, the previous goal is preempted via `CancelledException` and the new goal starts only after the previous goal has fully completed. - -```cpp -auto server = create_single_goal_action_server( - ctx, "fibonacci", - [&ctx](GoalContext goal) -> Task { - Fibonacci::Feedback feedback; - feedback.sequence = {0, 1}; - - for (int i = 2; i < goal.goal().order; i++) { - auto n = feedback.sequence.size(); - feedback.sequence.push_back( - feedback.sequence[n - 1] + feedback.sequence[n - 2]); - goal.publish_feedback(feedback); - co_await ctx.sleep(std::chrono::milliseconds(500)); - // If a new goal arrives, CancelledException is thrown here - } - - Fibonacci::Result result; - result.sequence = feedback.sequence; - co_return result; - }); -``` - -Both preemption (new goal arrives) and client cancel automatically throw `CancelledException` at the current `co_await` suspension point -- no need to poll `goal.is_canceling()`. Synchronous cleanup can be done in a `catch` block: - -```cpp -[&ctx](GoalContext goal) -> Task { - try { - // ... main execution ... - } catch (const CancelledException &) { - stop_motor(); // Cleanup runs before the next goal starts - throw; - } -} -``` - -Use `shield()` to protect critical sections that must complete even during cancellation: - -```cpp -[&ctx](GoalContext goal) -> Task { - // This sleep completes even if cancel is requested during it - co_await shield(save_state(ctx)); - - // Cancel takes effect at the next unshielded co_await - co_await ctx.sleep(std::chrono::seconds(10)); - - // ... -} -``` - -### Action Client - -Send a goal and iterate over feedback with `GoalStream`: - -```cpp -Task send_action(CoContext & ctx) -{ - auto client = rclcpp_action::create_client(&ctx.node(), "fibonacci"); - - auto wait_result = co_await ctx.wait_for_action(client, std::chrono::seconds(10)); - if (!wait_result.ok()) { co_return; } - - Fibonacci::Goal goal; - goal.order = 8; - - auto goal_result = co_await ctx.send_goal(client, goal); - if (!goal_result.ok()) { co_return; } - - auto stream = *goal_result.value; - - // Iterate over feedback - while (true) { - auto feedback = co_await stream->next(); - if (!feedback.has_value()) { - break; - } - auto & seq = (*feedback)->sequence; - RCLCPP_INFO(ctx.node().get_logger(), "Feedback: last=%d", seq.back()); - } - - // Get final result - auto result = stream->result(); - auto & seq = result.result->sequence; - RCLCPP_INFO(ctx.node().get_logger(), "Result: last=%d", seq.back()); -} -``` - -### Event - -`Event` is a coroutine-aware signal for synchronization between tasks. - -```cpp -rclcpp_async::Event event(ctx); - -// Task 1: wait for the event -auto waiter = ctx.create_task([&]() -> Task { - co_await event.wait(); - RCLCPP_INFO(ctx.node().get_logger(), "Event received!"); -}); - -// Task 2: signal the event -auto signaler = ctx.create_task([&]() -> Task { - co_await ctx.sleep(std::chrono::seconds(2)); - event.set(); // Wakes up all waiting coroutines -}); -``` - -| Method | Description | -|---|---| -| `event.set()` | Signal the event, resuming all waiters | -| `event.clear()` | Reset the event so future `wait()` calls will suspend | -| `event.wait()` | `co_await` -- suspends until the event is set | -| `event.is_set()` | Check the current state | - -### Mutex - -`Mutex` provides mutual exclusion for coroutines. Unlike `std::mutex`, it suspends the coroutine instead of blocking the thread. - -```cpp -rclcpp_async::Mutex mutex(ctx); - -Task critical_section(CoContext & ctx, Mutex & mutex, const std::string & name) -{ - co_await mutex.lock(); - RCLCPP_INFO(ctx.node().get_logger(), "%s: acquired lock", name.c_str()); - co_await ctx.sleep(std::chrono::seconds(1)); - mutex.unlock(); -} -``` - -| Method | Description | -|---|---| -| `mutex.lock()` | `co_await` -- suspends until the lock is acquired | -| `mutex.unlock()` | Release the lock, resuming the next waiter | -| `mutex.is_locked()` | Check the current state | - -### TF Lookup (TfBuffer) - -`TfBuffer` subscribes to `/tf` and `/tf_static` on a dedicated listener thread and provides `co_await`-able transform lookups. - -```cpp -Task run(CoContext & ctx) -{ - TfBuffer tf(ctx); - - // Sync lookup (latest) -- returns nullopt if not yet available - auto opt = tf.lookup_transform("map", "base_link"); - if (opt) { - RCLCPP_INFO(ctx.node().get_logger(), "x=%.2f", opt->transform.translation.x); - } - - // Async lookup -- suspends until the transform becomes available - auto t = co_await tf.lookup_transform("map", "base_link", rclcpp::Time(0)); - - // With timeout - auto result = co_await ctx.wait_for( - tf.lookup_transform("map", "odom", rclcpp::Time(0)), 5s); - - // Parallel lookups - auto [t1, t2] = co_await when_all( - tf.lookup_transform("map", "odom", rclcpp::Time(0)), - tf.lookup_transform("odom", "base_link", rclcpp::Time(0))); -} -``` - -### Channel - -`Channel` is a thread-safe MPSC (multi-producer, single-consumer) channel for sending values from worker threads to coroutines. - -```cpp -rclcpp_async::Channel ch(ctx); - -// Worker thread: send values -std::thread worker([&ch]() { - for (int i = 0; i < 10; i++) { - ch.send(i); // Thread-safe - } - ch.close(); // Signal end of stream -}); - -// Coroutine: receive values -auto task = ctx.create_task([&]() -> Task { - while (true) { - auto val = co_await ch.next(); - if (!val.has_value()) { - break; // Channel closed - } - RCLCPP_INFO(ctx.node().get_logger(), "Received: %d", *val); - } -}); -``` - -### when_all - -`when_all` concurrently awaits multiple tasks and returns all results as a `std::tuple`. It accepts `Task` objects as well as arbitrary awaitables (anything with `await_ready`/`await_suspend`/`await_resume`), which are automatically wrapped into tasks. - -```cpp -Task fetch_a(CoContext & ctx) { - co_await ctx.sleep(std::chrono::seconds(1)); - co_return 42; -} - -Task fetch_b(CoContext & ctx) { - co_await ctx.sleep(std::chrono::seconds(2)); - co_return std::string("hello"); -} - -Task run(CoContext & ctx) -{ - auto [a, b] = co_await when_all( - ctx.create_task(fetch_a(ctx)), - ctx.create_task(fetch_b(ctx))); - // a == 42, b == "hello" - // Total time ~2s (parallel), not 3s (sequential) - - // Awaitables can be passed directly -- no need to wrap in a Task - auto req = std::make_shared(); - auto [resp, _] = co_await when_all( - ctx.send_request(client, req), - ctx.sleep(std::chrono::seconds(1))); -} -``` - -`void` tasks return `std::monostate` in the tuple. Cancellation of the parent task propagates to all child tasks. - -### when_any - -`when_any` races multiple tasks concurrently and returns the result of whichever finishes first, wrapped in a `std::variant`. The remaining tasks are cancelled automatically. Like `when_all`, it accepts both `Task` objects and arbitrary awaitables. - -```cpp -Task fast(CoContext & ctx) { - co_await ctx.sleep(std::chrono::seconds(1)); - co_return 42; -} - -Task slow(CoContext & ctx) { - co_await ctx.sleep(std::chrono::seconds(10)); - co_return std::string("too slow"); -} - -Task run(CoContext & ctx) -{ - auto result = co_await when_any( - ctx.create_task(fast(ctx)), - ctx.create_task(slow(ctx))); - // result is std::variant - - if (result.index() == 0) { - int value = std::get<0>(result); // 42 - } - // slow() is cancelled automatically -} -``` - -`void` tasks produce `std::monostate` in the variant. Cancellation of the parent task propagates to all child tasks. - -### wait_for / poll_until - -`wait_for` races any task or awaitable against a timeout, returning `Result`. `poll_until` polls a predicate at a fixed interval with a built-in timeout. - -- `ctx.wait_for(task, timeout)` races a task against a timeout using `when_any`, returning `Result`. -- `ctx.wait_for(awaitable, timeout)` also accepts any awaitable directly (e.g., `sleep`, `send_request`), wrapping it in a `Task` automatically. -- `ctx.poll_until(pred, interval, timeout)` returns `Result` -- polls `pred()` every `interval`, returning `Ok` when true or `Timeout` after `timeout`. - -```cpp -Task run(CoContext & ctx) -{ - auto client = ctx.node().create_client("set_bool"); - - // Race an awaitable against a timeout - auto req = std::make_shared(); - req->data = true; - auto resp = co_await ctx.wait_for(ctx.send_request(client, req), 5s); - if (resp.ok()) { - RCLCPP_INFO(ctx.node().get_logger(), "Response: %s", resp.value->message.c_str()); - } -} -``` - -`wait_for_service` and `wait_for_action` are built on top of `poll_until`. Cancellation propagates automatically through the entire chain. - -## Result Type - -Operations that can timeout or fail return `Result`: - -```cpp -auto result = co_await ctx.wait_for_service(client, std::chrono::seconds(10)); - -if (result.ok()) { - // Service is available -} else if (result.timeout()) { - // Timed out -} -``` - -Most operations return their value directly (e.g., `send_request` returns `Response`, `sleep` returns `void`). `Result` is only used when timeout or error is a normal outcome: - -| Operation | Return type | Uses Result? | -|---|---|---| -| `sleep` | `void` | No | -| `send_request` | `Response` | No | -| `stream->next()` | `optional` | No | -| `event.wait()` | `void` | No | -| `mutex.lock()` | `void` | No | -| `tf.lookup_transform` | `TransformStamped` | No | -| `poll_until` | `Result` | Yes (timeout) | -| `wait_for` | `Result` | Yes (timeout) | -| `wait_for_service` | `Result` | Yes (timeout) | -| `wait_for_action` | `Result` | Yes (timeout) | -| `send_goal` | `Result` | Yes (rejected) | - -## Cancellation - -Calling `task.cancel()` cancels the task by throwing `CancelledException` at the current `co_await` suspension point. The exception propagates automatically through the coroutine chain -- no manual checking needed. - -```cpp -auto task = ctx.create_task(run(ctx)); - -// Later... -task.cancel(); // Throws CancelledException at the current co_await -``` - -To handle cancellation explicitly, catch `CancelledException`: - -```cpp -Task run(CoContext & ctx) -{ - try { - co_await ctx.sleep(std::chrono::seconds(10)); - } catch (const CancelledException &) { - RCLCPP_INFO(ctx.node().get_logger(), "Cancelled!"); - } -} -``` - -### shield - -`shield(task)` prevents a parent's cancellation from propagating to the inner task. The shielded task runs to completion even if the parent is cancelled. After the shielded task finishes, `CancelledException` is thrown at the next unshielded `co_await`. - -```cpp -Task run(CoContext & ctx) -{ - // This task completes even if run() is cancelled during it - co_await shield(save_critical_data(ctx)); - - // CancelledException is thrown here if cancelled - co_await ctx.sleep(std::chrono::seconds(1)); -} -``` - -## No Deadlocks on Single-Threaded Executors - -A key advantage of coroutine-based I/O: nested service calls work without deadlocks, even on a single-threaded executor. For example, a service handler can `co_await` another service call, which can itself call yet another service -- all on the same thread. - -See [`nested_demo.cpp`](rclcpp_async_example/src/nested_demo.cpp) for a full demonstration. - -## API Reference - -### CoContext - -| Method | Returns | Description | -|---|---|---| -| `create_task(task)` | `Task` | Start a coroutine | -| `subscribe(topic, qos)` | `shared_ptr>` | Subscribe to a topic | -| `create_timer(period)` | `shared_ptr` | Create a periodic timer stream | -| `create_service(name, cb)` | `shared_ptr>` | Create a coroutine service server | -| `create_action_server(name, cb)` | `shared_ptr>` | Create a coroutine action server | -| `create_single_goal_action_server(ctx, name, cb)` | `shared_ptr>` | Single-goal action server with preemption | -| `wait_for_service(client, timeout)` | *awaitable* `Result` | Wait for a service | -| `send_request(client, req)` | *awaitable* `Response` | Call a service | -| `wait_for_action(client, timeout)` | *awaitable* `Result` | Wait for an action server | -| `send_goal(client, goal)` | *awaitable* `Result>` | Send an action goal | -| `sleep(duration)` | *awaitable* `void` | Async sleep | -| `poll_until(pred, interval, timeout)` | `Task>` | Poll until predicate is true or timeout | -| `wait_for(awaitable, timeout)` | `Task>` | Race an awaitable against a timeout | -| `when_all(awaitables...)` | *awaitable* `tuple` | Await all tasks/awaitables concurrently | -| `when_any(awaitables...)` | *awaitable* `variant` | Race tasks/awaitables, return first result | -| `shield(task)` | *awaitable* `T` | Protect a task from parent cancellation | -| `post(fn)` | `void` | Post a callback to the executor thread (thread-safe) | -| `node()` | `Node&` | Access the underlying node | - -### TfBuffer - -| Method | Returns | Description | -|---|---|---| -| `TfBuffer(ctx)` | -- | Construct with a `CoContext`; starts a listener thread | -| `lookup_transform(target, source)` | `optional` | Sync lookup of latest transform | -| `lookup_transform(target, source, time)` | *awaitable* `TransformStamped` | Async lookup; suspends until the transform is available | +See the [Guide](https://otamachan.github.io/rclcpp_async/guide/) for topics, services, actions, concurrency primitives, cancellation, and TF lookups. ## Benchmarks See [rclcpp_async/test/benchmark/README.md](rclcpp_async/test/benchmark/README.md) for benchmark results comparing coroutine vs callback performance. -### Task - -| Method | Returns | Description | -|---|---|---| -| `operator bool()` | `bool` | `true` if the task holds a valid coroutine (not moved-from) | -| `done()` | `bool` | `true` if the coroutine has completed (`false` for null tasks) | -| `result()` | `T&` | Get the result of a completed `Task` (rethrows if exception) | -| `cancel()` | `void` | Request cancellation via `CancelledException` | - -```cpp -auto task = ctx.create_task(run(ctx)); - -if (task) { // true -- valid coroutine - // ... -} -if (task.done()) { - auto & value = task.result(); // access the result without co_await -} -``` - ## License Apache-2.0 diff --git a/docs/guide.md b/docs/guide.md index f9eb0b0..b0c092d 100644 --- a/docs/guide.md +++ b/docs/guide.md @@ -1,6 +1,6 @@ # Guide -## Topics and Timers +## Basics ### create_task @@ -25,6 +25,8 @@ auto task_b = ctx.create_task(do_work_b(ctx)); // Both run concurrently via co_await yield points ``` +## Topics and Timers + ### Subscriber (TopicStream) `ctx.subscribe(topic, qos)` returns a `TopicStream` -- an async stream that yields messages via `co_await stream->next()`.