diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index f271e6cf..0f34e145 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -17,6 +17,7 @@ if(PROJECT_IS_TOP_LEVEL) enable_testing() endif() +set(TODO stop_token) #-dk:TODO set(EXAMPLES allocator doc-just @@ -29,7 +30,6 @@ set(EXAMPLES playground sender-demo stackoverflow - stop_token stopping when_all-cancel ) diff --git a/include/beman/execution/detail/affine_on.hpp b/include/beman/execution/detail/affine_on.hpp index cc5e4aaa..9c2210a5 100644 --- a/include/beman/execution/detail/affine_on.hpp +++ b/include/beman/execution/detail/affine_on.hpp @@ -24,7 +24,6 @@ import beman.execution.detail.get_completion_signatures; import beman.execution.detail.get_domain_early; import beman.execution.detail.get_scheduler; import beman.execution.detail.get_stop_token; -import beman.execution.detail.join_env; import beman.execution.detail.make_sender; import beman.execution.detail.never_stop_token; import beman.execution.detail.nested_sender_has_affine_on; @@ -37,8 +36,10 @@ import beman.execution.detail.sender_adaptor_closure; import beman.execution.detail.sender_for; import beman.execution.detail.sender_has_affine_on; import beman.execution.detail.set_value; +import beman.execution.detail.store_receiver; import beman.execution.detail.tag_of_t; import beman.execution.detail.transform_sender; +import beman.execution.detail.unstoppable; import beman.execution.detail.write_env; #else #include @@ -47,7 +48,6 @@ import beman.execution.detail.write_env; #include #include #include -#include #include #include #include @@ -57,8 +57,11 @@ import beman.execution.detail.write_env; #include #include #include +#include +#include #include #include +#include #include #endif @@ -136,13 +139,13 @@ struct affine_on_t : ::beman::execution::sender_adaptor_closure { static auto transform_sender(Sender&& sender, const Env& ev) { static_assert(requires { { - ::beman::execution::get_completion_signatures< - decltype(::beman::execution::schedule(::beman::execution::get_scheduler(ev))), - decltype(::beman::execution::detail::join_env( - ::beman::execution::env{::beman::execution::prop{ - ::beman::execution::get_stop_token, ::beman::execution::never_stop_token{}, {}}}, - ev))>() - } -> ::std::same_as<::beman::execution::completion_signatures<::beman::execution::set_value_t()>>; + ::beman::execution::get_completion_signatures() + } //-dk:TODO -> + //::std::same_as<::beman::execution::completion_signatures<::beman::execution::set_value_t()>> + ; }); //[[maybe_unused]] auto& [tag, data, child] = sender; auto& child = sender.template get<2>(); @@ -152,11 +155,13 @@ struct affine_on_t : ::beman::execution::sender_adaptor_closure { constexpr child_tag_t t{}; return t.affine_on(::beman::execution::detail::forward_like(child), ev); } else { - return ::beman::execution::write_env( - ::beman::execution::schedule_from( - ::beman::execution::get_scheduler(ev), - ::beman::execution::write_env(::beman::execution::detail::forward_like(child), ev)), - beman::execution::detail::affine_on_env(ev)); + return ::beman::execution::detail::store_receiver( + ::beman::execution::detail::forward_like(child), + [](Child&& child, const auto& ev) { + return ::beman::execution::unstoppable(::beman::execution::schedule_from( + ::beman::execution::get_scheduler(ev), + ::beman::execution::write_env(::std::forward(child), ev))); + }); } } template diff --git a/include/beman/execution/detail/sender_awaitable.hpp b/include/beman/execution/detail/sender_awaitable.hpp index 59edde18..4a7571f0 100644 --- a/include/beman/execution/detail/sender_awaitable.hpp +++ b/include/beman/execution/detail/sender_awaitable.hpp @@ -12,7 +12,6 @@ import std; #include #include #include -#include #include #include #include @@ -49,20 +48,19 @@ import beman.execution.detail.unspecified_promise; namespace beman::execution::detail { template class sender_awaitable { + inline static constexpr bool enable_defence{true}; struct unit {}; using value_type = ::beman::execution::detail::single_sender_value_type>; using result_type = ::std::conditional_t<::std::is_void_v, unit, value_type>; using variant_type = ::std::variant<::std::monostate, result_type, ::std::exception_ptr>; - using data_type = ::std::tuple, ::std::coroutine_handle>; + using data_type = ::std::tuple, ::std::coroutine_handle>; struct awaitable_receiver { using receiver_concept = ::beman::execution::receiver_t; void resume() { - std::thread::id id(::std::this_thread::get_id()); - if (not ::std::get<1>(*result_ptr_) - .compare_exchange_strong(id, ::std::thread::id{}, std::memory_order_acq_rel)) { + if (not enable_defence || ::std::get<1>(*result_ptr_).exchange(true, std::memory_order_acq_rel)) { ::std::get<2>(*result_ptr_).resume(); } } @@ -85,9 +83,7 @@ class sender_awaitable { } void set_stopped() && noexcept { - std::thread::id id(::std::this_thread::get_id()); - if (not ::std::get<1>(*result_ptr_) - .compare_exchange_strong(id, ::std::thread::id{}, ::std::memory_order_acq_rel)) { + if (not enable_defence || ::std::get<1>(*result_ptr_).exchange(true, ::std::memory_order_acq_rel)) { static_cast<::std::coroutine_handle<>>(::std::get<2>(*result_ptr_).promise().unhandled_stopped()) .resume(); } @@ -107,16 +103,14 @@ class sender_awaitable { public: sender_awaitable(Sndr&& sndr, Promise& p) - : result{::std::monostate{}, ::std::this_thread::get_id(), ::std::coroutine_handle::from_promise(p)}, + : result{::std::monostate{}, false, ::std::coroutine_handle::from_promise(p)}, state{::beman::execution::connect(::std::forward(sndr), sender_awaitable::awaitable_receiver{::std::addressof(result)})} {} static constexpr bool await_ready() noexcept { return false; } ::std::coroutine_handle<> await_suspend(::std::coroutine_handle handle) noexcept { ::beman::execution::start(state); - ::std::thread::id id(::std::this_thread::get_id()); - if (not ::std::get<1>(this->result) - .compare_exchange_strong(id, ::std::thread::id{}, ::std::memory_order_acq_rel)) { + if (enable_defence && ::std::get<1>(this->result).exchange(true, std::memory_order_acq_rel)) { if (::std::holds_alternative<::std::monostate>(::std::get<0>(this->result))) { return ::std::get<2>(this->result).promise().unhandled_stopped(); } diff --git a/include/beman/execution/detail/store_receiver.hpp b/include/beman/execution/detail/store_receiver.hpp new file mode 100644 index 00000000..df9068ae --- /dev/null +++ b/include/beman/execution/detail/store_receiver.hpp @@ -0,0 +1,115 @@ +//-dk:TODO rRW/// include/beman/execution/detail/store_receiver.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_STORE_RECEIVER +#define INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_STORE_RECEIVER + +#include +#ifdef BEMAN_HAS_IMPORT_STD +import std; +#else +#include +#include +#include +#endif +#ifdef BEMAN_HAS_MODULES +import beman.execution.detail.connect; +import beman.execution.detail.connect_result_t; +import beman.execution.detail.env_of_t; +import beman.execution.detail.get_completion_signatures; +import beman.execution.detail.get_env; +import beman.execution.detail.operation_state; +import beman.execution.detail.receiver; +import beman.execution.detail.sender; +import beman.execution.detail.set_error; +import beman.execution.detail.set_stopped; +import beman.execution.detail.set_value; +import beman.execution.detail.start; +#else +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#endif + +// ---------------------------------------------------------------------------- + +namespace beman::execution::detail { +struct store_receiver_t { + template <::beman::execution::receiver Rcvr> + struct receiver { + using receiver_concept = ::beman::execution::receiver_t; + Rcvr* rcvr; + template + auto set_value(Args&&... args) && noexcept -> void { + ::beman::execution::set_value(::std::move(*this->rcvr), ::std::forward(args)...); + } + template + auto set_error(Error&& error) && noexcept -> void { + ::beman::execution::set_error(::std::move(*this->rcvr), ::std::forward(error)); + } + auto set_stopped() && noexcept -> void { ::beman::execution::set_stopped(::std::move(*this->rcvr)); } + auto get_env() const noexcept { return ::beman::execution::get_env(*this->rcvr); } + }; + template <::beman::execution::sender Sndr, typename Trans, ::beman::execution::receiver Rcvr> + struct state { + using operation_state_concept = ::beman::execution::operation_state_t; + using env_t = ::beman::execution::env_of_t; + using state_t = ::beman::execution::connect_result_t()( + ::std::declval(), ::std::declval())), + receiver>; + Rcvr rcvr; + state_t op_state; + template <::beman::execution::sender S, typename T, ::beman::execution::receiver R> + state(S&& sndr, T&& trans, R&& r) + : rcvr(::std::forward(r)), + op_state(::beman::execution::connect( + ::std::forward(trans)(::std::forward(sndr), ::beman::execution::get_env(this->rcvr)), + receiver{::std::addressof(this->rcvr)})) {} + auto start() & noexcept { ::beman::execution::start(this->op_state); } + }; + template <::beman::execution::sender Sndr, typename Trans> + struct sender { + using sender_concept = ::beman::execution::sender_t; + template + static consteval auto get_completion_signatures(Env&&... env) noexcept { + return ::beman::execution:: + get_completion_signatures()(::std::declval())), Env...>(); + } + ::std::remove_cvref_t sndr; + ::std::remove_cvref_t trans; + + template <::beman::execution::receiver Receiver> + auto connect(Receiver&& r) && { + static_assert(::beman::execution::operation_state>>); + return state>( + ::std::move(this->sndr), ::std::move(this->trans), ::std::forward(r)); + } + template <::beman::execution::receiver Receiver> + auto connect(Receiver&& r) const& { + static_assert(::beman::execution::operation_state>>); + return state>( + this->sndr, this->trans, ::std::forward(r)); + } + }; + template <::beman::execution::sender Sndr, typename Trans> + auto operator()(Sndr&& sndr, Trans&& trans) const { + static_assert(::beman::execution::sender>); + return sender{::std::forward(sndr), ::std::forward(trans)}; + } +}; + +inline constexpr store_receiver_t store_receiver{}; +} // namespace beman::execution::detail + +// ---------------------------------------------------------------------------- + +#endif diff --git a/include/beman/execution/detail/unstoppable.hpp b/include/beman/execution/detail/unstoppable.hpp new file mode 100644 index 00000000..32b87fa6 --- /dev/null +++ b/include/beman/execution/detail/unstoppable.hpp @@ -0,0 +1,47 @@ +// include/beman/execution/detail/unstoppable.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_UNSTOPPABLE +#define INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_UNSTOPPABLE + +#include +#ifdef BEMAN_HAS_IMPORT_STD +import std; +#else +#include +#endif +#ifdef BEMAN_HAS_MODULES +import beman.execution.detail.get_stop_token; +import beman.execution.detail.never_stop_token; +import beman.execution.detail.prop; +import beman.execution.detail.sender; +import beman.execution.detail.write_env; +#else +#include +#include +#include +#include +#include +#endif + +// ---------------------------------------------------------------------------- + +namespace beman::execution::detail { +struct unstoppable_t { + template <::beman::execution::sender Sndr> + auto operator()(Sndr&& sndr) const { + return ::beman::execution::write_env( + ::std::forward(sndr), + ::beman::execution::prop{::beman::execution::get_stop_token, ::beman::execution::never_stop_token{}, {}}); + } +}; +} // namespace beman::execution::detail + +namespace beman::execution { +using unstoppable_t = ::beman::execution::detail::unstoppable_t; +inline constexpr ::beman::execution::unstoppable_t unstoppable{}; +} // namespace beman::execution + +// ---------------------------------------------------------------------------- + +#endif diff --git a/src/beman/execution/CMakeLists.txt b/src/beman/execution/CMakeLists.txt index cfef78f5..51ba7b1e 100644 --- a/src/beman/execution/CMakeLists.txt +++ b/src/beman/execution/CMakeLists.txt @@ -177,6 +177,7 @@ target_sources( ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/stoppable_token.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/stopped_as_error.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/stopped_as_optional.hpp + ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/store_receiver.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/suppress_pop.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/suppress_push.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/suspend_complete.hpp @@ -186,6 +187,7 @@ target_sources( ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/transform_sender.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/type_list.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/unspecified_promise.hpp + ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/unstoppable.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/unstoppable_token.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/valid_completion_for.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/valid_completion_signatures.hpp @@ -365,6 +367,7 @@ if(BEMAN_USE_MODULES) stoppable_token.cppm stopped_as_error.cppm stopped_as_optional.cppm + store_receiver.cppm suspend_complete.cppm sync_wait.cppm tag_of_t.cppm @@ -372,6 +375,7 @@ if(BEMAN_USE_MODULES) transform_sender.cppm type_list.cppm unspecified_promise.cppm + unstoppable.cppm unstoppable_token.cppm valid_completion_for.cppm valid_completion_signatures.cppm diff --git a/src/beman/execution/execution.cppm b/src/beman/execution/execution.cppm index cd2ab544..b961b353 100644 --- a/src/beman/execution/execution.cppm +++ b/src/beman/execution/execution.cppm @@ -81,6 +81,7 @@ import beman.execution.detail.inline_scheduler; // [stoptoken.concepts], stop token concepts export import beman.execution.detail.stoppable_token; +export import beman.execution.detail.unstoppable; export import beman.execution.detail.unstoppable_token; // [exec.recv], receivers diff --git a/src/beman/execution/store_receiver.cppm b/src/beman/execution/store_receiver.cppm new file mode 100644 index 00000000..bcfe2e7a --- /dev/null +++ b/src/beman/execution/store_receiver.cppm @@ -0,0 +1,11 @@ +module; +// src/beman/execution/store_receiver.cppm -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include + +export module beman.execution.detail.store_receiver; + +namespace beman::execution::detail { +export using beman::execution::detail::store_receiver; +} // namespace beman::execution::detail diff --git a/src/beman/execution/unstoppable.cppm b/src/beman/execution/unstoppable.cppm new file mode 100644 index 00000000..c1de28a8 --- /dev/null +++ b/src/beman/execution/unstoppable.cppm @@ -0,0 +1,11 @@ +module; +// src/beman/execution/unstoppable.cppm -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include + +export module beman.execution.detail.unstoppable; + +namespace beman::execution { +export using beman::execution::unstoppable; +} // namespace beman::execution