Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ if(PROJECT_IS_TOP_LEVEL)
enable_testing()
endif()

set(TODO stop_token) #-dk:TODO
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set(TODO stop_token) is a no-op here (the variable isn’t used) and reads like leftover debugging scaffolding. If the intent is to temporarily disable the stop_token example, it’s clearer to either leave an inline TODO comment where the example would be listed, or gate it behind an option/condition, rather than introducing an unused TODO variable.

Suggested change
set(TODO stop_token) #-dk:TODO
# TODO: Re-enable the 'stop_token' example when it is ready. #-dk:TODO

Copilot uses AI. Check for mistakes.
set(EXAMPLES
allocator
doc-just
Expand All @@ -29,7 +30,6 @@ set(EXAMPLES
playground
sender-demo
stackoverflow
stop_token
stopping
when_all-cancel
)
Expand Down
33 changes: 19 additions & 14 deletions include/beman/execution/detail/affine_on.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import beman.execution.detail.get_completion_signatures;
import beman.execution.detail.get_domain_early;
import beman.execution.detail.get_scheduler;
import beman.execution.detail.get_stop_token;
import beman.execution.detail.join_env;
import beman.execution.detail.make_sender;
import beman.execution.detail.never_stop_token;
import beman.execution.detail.nested_sender_has_affine_on;
Expand All @@ -37,8 +36,10 @@ import beman.execution.detail.sender_adaptor_closure;
import beman.execution.detail.sender_for;
import beman.execution.detail.sender_has_affine_on;
import beman.execution.detail.set_value;
import beman.execution.detail.store_receiver;
import beman.execution.detail.tag_of_t;
import beman.execution.detail.transform_sender;
import beman.execution.detail.unstoppable;
import beman.execution.detail.write_env;
#else
#include <beman/execution/detail/env.hpp>
Expand All @@ -47,7 +48,6 @@ import beman.execution.detail.write_env;
#include <beman/execution/detail/get_domain_early.hpp>
#include <beman/execution/detail/get_scheduler.hpp>
#include <beman/execution/detail/get_stop_token.hpp>
#include <beman/execution/detail/join_env.hpp>
#include <beman/execution/detail/make_sender.hpp>
#include <beman/execution/detail/never_stop_token.hpp>
#include <beman/execution/detail/prop.hpp>
Expand All @@ -57,8 +57,11 @@ import beman.execution.detail.write_env;
#include <beman/execution/detail/sender_adaptor_closure.hpp>
#include <beman/execution/detail/sender_for.hpp>
#include <beman/execution/detail/sender_has_affine_on.hpp>
#include <beman/execution/detail/set_value.hpp>
#include <beman/execution/detail/store_receiver.hpp>
#include <beman/execution/detail/tag_of_t.hpp>
#include <beman/execution/detail/transform_sender.hpp>
#include <beman/execution/detail/unstoppable.hpp>
#include <beman/execution/detail/write_env.hpp>
#endif

Expand Down Expand Up @@ -136,13 +139,13 @@ struct affine_on_t : ::beman::execution::sender_adaptor_closure<affine_on_t> {
static auto transform_sender(Sender&& sender, const Env& ev) {
static_assert(requires {
{
::beman::execution::get_completion_signatures<
decltype(::beman::execution::schedule(::beman::execution::get_scheduler(ev))),
decltype(::beman::execution::detail::join_env(
::beman::execution::env{::beman::execution::prop{
::beman::execution::get_stop_token, ::beman::execution::never_stop_token{}, {}}},
ev))>()
} -> ::std::same_as<::beman::execution::completion_signatures<::beman::execution::set_value_t()>>;
::beman::execution::get_completion_signatures<decltype(::beman::execution::unstoppable(
::beman::execution::schedule(
::beman::execution::get_scheduler(ev))),
ev)>()
} //-dk:TODO ->
//::std::same_as<::beman::execution::completion_signatures<::beman::execution::set_value_t()>>
;
});
//[[maybe_unused]] auto& [tag, data, child] = sender;
auto& child = sender.template get<2>();
Expand All @@ -152,11 +155,13 @@ struct affine_on_t : ::beman::execution::sender_adaptor_closure<affine_on_t> {
constexpr child_tag_t t{};
return t.affine_on(::beman::execution::detail::forward_like<Sender>(child), ev);
} else {
return ::beman::execution::write_env(
::beman::execution::schedule_from(
::beman::execution::get_scheduler(ev),
::beman::execution::write_env(::beman::execution::detail::forward_like<Sender>(child), ev)),
beman::execution::detail::affine_on_env(ev));
return ::beman::execution::detail::store_receiver(
::beman::execution::detail::forward_like<Sender>(child),
[]<typename Child>(Child&& child, const auto& ev) {
return ::beman::execution::unstoppable(::beman::execution::schedule_from(
::beman::execution::get_scheduler(ev),
::beman::execution::write_env(::std::forward<Child>(child), ev)));
});
}
}
template <typename, typename...>
Expand Down
18 changes: 6 additions & 12 deletions include/beman/execution/detail/sender_awaitable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import std;
#include <concepts>
#include <coroutine>
#include <exception>
#include <thread>
#include <tuple>
#include <type_traits>
#include <utility>
Expand Down Expand Up @@ -49,20 +48,19 @@ import beman.execution.detail.unspecified_promise;
namespace beman::execution::detail {
template <class Sndr, class Promise>
class sender_awaitable {
inline static constexpr bool enable_defence{true};
struct unit {};
using value_type =
::beman::execution::detail::single_sender_value_type<Sndr, ::beman::execution::env_of_t<Promise>>;
using result_type = ::std::conditional_t<::std::is_void_v<value_type>, unit, value_type>;
using variant_type = ::std::variant<::std::monostate, result_type, ::std::exception_ptr>;
using data_type = ::std::tuple<variant_type, ::std::atomic<::std::thread::id>, ::std::coroutine_handle<Promise>>;
using data_type = ::std::tuple<variant_type, ::std::atomic<bool>, ::std::coroutine_handle<Promise>>;

struct awaitable_receiver {
using receiver_concept = ::beman::execution::receiver_t;

void resume() {
std::thread::id id(::std::this_thread::get_id());
if (not ::std::get<1>(*result_ptr_)
.compare_exchange_strong(id, ::std::thread::id{}, std::memory_order_acq_rel)) {
if (not enable_defence || ::std::get<1>(*result_ptr_).exchange(true, std::memory_order_acq_rel)) {
::std::get<2>(*result_ptr_).resume();
}
}
Expand All @@ -85,9 +83,7 @@ class sender_awaitable {
}

void set_stopped() && noexcept {
std::thread::id id(::std::this_thread::get_id());
if (not ::std::get<1>(*result_ptr_)
.compare_exchange_strong(id, ::std::thread::id{}, ::std::memory_order_acq_rel)) {
if (not enable_defence || ::std::get<1>(*result_ptr_).exchange(true, ::std::memory_order_acq_rel)) {
static_cast<::std::coroutine_handle<>>(::std::get<2>(*result_ptr_).promise().unhandled_stopped())
.resume();
}
Expand All @@ -107,16 +103,14 @@ class sender_awaitable {

public:
sender_awaitable(Sndr&& sndr, Promise& p)
: result{::std::monostate{}, ::std::this_thread::get_id(), ::std::coroutine_handle<Promise>::from_promise(p)},
: result{::std::monostate{}, false, ::std::coroutine_handle<Promise>::from_promise(p)},
state{::beman::execution::connect(::std::forward<Sndr>(sndr),
sender_awaitable::awaitable_receiver{::std::addressof(result)})} {}

static constexpr bool await_ready() noexcept { return false; }
::std::coroutine_handle<> await_suspend(::std::coroutine_handle<Promise> handle) noexcept {
::beman::execution::start(state);
::std::thread::id id(::std::this_thread::get_id());
if (not ::std::get<1>(this->result)
.compare_exchange_strong(id, ::std::thread::id{}, ::std::memory_order_acq_rel)) {
if (enable_defence && ::std::get<1>(this->result).exchange(true, std::memory_order_acq_rel)) {
if (::std::holds_alternative<::std::monostate>(::std::get<0>(this->result))) {
return ::std::get<2>(this->result).promise().unhandled_stopped();
Comment on lines 111 to 115
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new atomic-flag defence in await_suspend is race-sensitive (completion can happen during/after start(state)). Existing tests cover synchronous completion via just(), but I couldn’t find coverage for the asynchronous path where the sender completes after the coroutine has actually suspended. Please add a test that co_awaits a sender completing on another thread/run_loop tick to validate correct resumption and no hangs.

Copilot uses AI. Check for mistakes.
}
Expand Down
115 changes: 115 additions & 0 deletions include/beman/execution/detail/store_receiver.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
//-dk:TODO rRW/// include/beman/execution/detail/store_receiver.hpp -*-C++-*-
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_STORE_RECEIVER
#define INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_STORE_RECEIVER

#include <beman/execution/detail/common.hpp>
#ifdef BEMAN_HAS_IMPORT_STD
import std;
#else
#include <memory>
#include <utility>
#include <type_traits>
#endif
#ifdef BEMAN_HAS_MODULES
import beman.execution.detail.connect;
import beman.execution.detail.connect_result_t;
import beman.execution.detail.env_of_t;
import beman.execution.detail.get_completion_signatures;
import beman.execution.detail.get_env;
import beman.execution.detail.operation_state;
import beman.execution.detail.receiver;
import beman.execution.detail.sender;
import beman.execution.detail.set_error;
import beman.execution.detail.set_stopped;
import beman.execution.detail.set_value;
import beman.execution.detail.start;
#else
#include <beman/execution/detail/connect.hpp>
#include <beman/execution/detail/connect_result_t.hpp>
#include <beman/execution/detail/env_of_t.hpp>
#include <beman/execution/detail/get_completion_signatures.hpp>
#include <beman/execution/detail/get_env.hpp>
#include <beman/execution/detail/sender.hpp>
#include <beman/execution/detail/receiver.hpp>
#include <beman/execution/detail/set_error.hpp>
#include <beman/execution/detail/set_stopped.hpp>
#include <beman/execution/detail/set_value.hpp>
#include <beman/execution/detail/operation_state.hpp>
#include <beman/execution/detail/start.hpp>
#endif

// ----------------------------------------------------------------------------

namespace beman::execution::detail {
struct store_receiver_t {
template <::beman::execution::receiver Rcvr>
struct receiver {
using receiver_concept = ::beman::execution::receiver_t;
Rcvr* rcvr;
template <typename... Args>
auto set_value(Args&&... args) && noexcept -> void {
::beman::execution::set_value(::std::move(*this->rcvr), ::std::forward<Args>(args)...);
}
template <typename Error>
auto set_error(Error&& error) && noexcept -> void {
::beman::execution::set_error(::std::move(*this->rcvr), ::std::forward<Error>(error));
}
auto set_stopped() && noexcept -> void { ::beman::execution::set_stopped(::std::move(*this->rcvr)); }
auto get_env() const noexcept { return ::beman::execution::get_env(*this->rcvr); }
};
template <::beman::execution::sender Sndr, typename Trans, ::beman::execution::receiver Rcvr>
struct state {
using operation_state_concept = ::beman::execution::operation_state_t;
using env_t = ::beman::execution::env_of_t<Rcvr>;
using state_t = ::beman::execution::connect_result_t<decltype(::std::declval<Trans>()(
::std::declval<Sndr>(), ::std::declval<env_t>())),
receiver<Rcvr>>;
Rcvr rcvr;
state_t op_state;
template <::beman::execution::sender S, typename T, ::beman::execution::receiver R>
state(S&& sndr, T&& trans, R&& r)
: rcvr(::std::forward<R>(r)),
op_state(::beman::execution::connect(
::std::forward<T>(trans)(::std::forward<S>(sndr), ::beman::execution::get_env(this->rcvr)),
receiver<Rcvr>{::std::addressof(this->rcvr)})) {}
auto start() & noexcept { ::beman::execution::start(this->op_state); }
};
template <::beman::execution::sender Sndr, typename Trans>
struct sender {
using sender_concept = ::beman::execution::sender_t;
template <typename... Env>
static consteval auto get_completion_signatures(Env&&... env) noexcept {
return ::beman::execution::
get_completion_signatures<decltype(::std::declval<Trans>()(::std::declval<Sndr>())), Env...>();
}
::std::remove_cvref_t<Sndr> sndr;
::std::remove_cvref_t<Trans> trans;

template <::beman::execution::receiver Receiver>
auto connect(Receiver&& r) && {
static_assert(::beman::execution::operation_state<state<Sndr, Trans, ::std::remove_cvref_t<Receiver>>>);
return state<Sndr, Trans, ::std::remove_cvref_t<Receiver>>(
::std::move(this->sndr), ::std::move(this->trans), ::std::forward<Receiver>(r));
}
template <::beman::execution::receiver Receiver>
auto connect(Receiver&& r) const& {
static_assert(::beman::execution::operation_state<state<Sndr, Trans, ::std::remove_cvref_t<Receiver>>>);
return state<Sndr, Trans, ::std::remove_cvref_t<Receiver>>(
this->sndr, this->trans, ::std::forward<Receiver>(r));
}
};
template <::beman::execution::sender Sndr, typename Trans>
auto operator()(Sndr&& sndr, Trans&& trans) const {
static_assert(::beman::execution::sender<sender<Sndr, Trans>>);
return sender<Sndr, Trans>{::std::forward<Sndr>(sndr), ::std::forward<Trans>(trans)};
}
};

inline constexpr store_receiver_t store_receiver{};
} // namespace beman::execution::detail

// ----------------------------------------------------------------------------

#endif
47 changes: 47 additions & 0 deletions include/beman/execution/detail/unstoppable.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// include/beman/execution/detail/unstoppable.hpp -*-C++-*-
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_UNSTOPPABLE
#define INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_UNSTOPPABLE

#include <beman/execution/detail/common.hpp>
#ifdef BEMAN_HAS_IMPORT_STD
import std;
#else
#include <utility>
#endif
#ifdef BEMAN_HAS_MODULES
import beman.execution.detail.get_stop_token;
import beman.execution.detail.never_stop_token;
import beman.execution.detail.prop;
import beman.execution.detail.sender;
import beman.execution.detail.write_env;
#else
#include <beman/execution/detail/get_stop_token.hpp>
#include <beman/execution/detail/never_stop_token.hpp>
#include <beman/execution/detail/prop.hpp>
#include <beman/execution/detail/sender.hpp>
#include <beman/execution/detail/write_env.hpp>
#endif

// ----------------------------------------------------------------------------

namespace beman::execution::detail {
struct unstoppable_t {
template <::beman::execution::sender Sndr>
auto operator()(Sndr&& sndr) const {
return ::beman::execution::write_env(
::std::forward<Sndr>(sndr),
::beman::execution::prop{::beman::execution::get_stop_token, ::beman::execution::never_stop_token{}, {}});
}
};
} // namespace beman::execution::detail

namespace beman::execution {
using unstoppable_t = ::beman::execution::detail::unstoppable_t;
inline constexpr ::beman::execution::unstoppable_t unstoppable{};
} // namespace beman::execution

// ----------------------------------------------------------------------------

#endif
4 changes: 4 additions & 0 deletions src/beman/execution/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ target_sources(
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/stoppable_token.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/stopped_as_error.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/stopped_as_optional.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/store_receiver.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/suppress_pop.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/suppress_push.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/suspend_complete.hpp
Expand All @@ -186,6 +187,7 @@ target_sources(
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/transform_sender.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/type_list.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/unspecified_promise.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/unstoppable.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/unstoppable_token.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/valid_completion_for.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/valid_completion_signatures.hpp
Expand Down Expand Up @@ -365,13 +367,15 @@ if(BEMAN_USE_MODULES)
stoppable_token.cppm
stopped_as_error.cppm
stopped_as_optional.cppm
store_receiver.cppm
suspend_complete.cppm
sync_wait.cppm
tag_of_t.cppm
then.cppm
transform_sender.cppm
type_list.cppm
unspecified_promise.cppm
unstoppable.cppm
unstoppable_token.cppm
valid_completion_for.cppm
valid_completion_signatures.cppm
Expand Down
1 change: 1 addition & 0 deletions src/beman/execution/execution.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ import beman.execution.detail.inline_scheduler;

// [stoptoken.concepts], stop token concepts
export import beman.execution.detail.stoppable_token;
export import beman.execution.detail.unstoppable;
export import beman.execution.detail.unstoppable_token;

// [exec.recv], receivers
Expand Down
11 changes: 11 additions & 0 deletions src/beman/execution/store_receiver.cppm
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module;
// src/beman/execution/store_receiver.cppm -*-C++-*-
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

#include <beman/execution/detail/store_receiver.hpp>

export module beman.execution.detail.store_receiver;

namespace beman::execution::detail {
export using beman::execution::detail::store_receiver;
} // namespace beman::execution::detail
11 changes: 11 additions & 0 deletions src/beman/execution/unstoppable.cppm
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module;
// src/beman/execution/unstoppable.cppm -*-C++-*-
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

#include <beman/execution/detail/unstoppable.hpp>

export module beman.execution.detail.unstoppable;

namespace beman::execution {
export using beman::execution::unstoppable;
} // namespace beman::execution
Loading