race.hpp 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691
  1. //
  2. // Copyright (c) 2022 Klemens Morgenstern (klemens.morgenstern@gmx.net)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_COBALT_DETAIL_RACE_HPP
  8. #define BOOST_COBALT_DETAIL_RACE_HPP
  9. #include <boost/cobalt/detail/await_result_helper.hpp>
  10. #include <boost/cobalt/detail/fork.hpp>
  11. #include <boost/cobalt/detail/handler.hpp>
  12. #include <boost/cobalt/detail/forward_cancellation.hpp>
  13. #include <boost/cobalt/result.hpp>
  14. #include <boost/cobalt/this_thread.hpp>
  15. #include <boost/cobalt/detail/util.hpp>
  16. #include <boost/asio/bind_allocator.hpp>
  17. #include <boost/asio/bind_cancellation_slot.hpp>
  18. #include <boost/asio/bind_executor.hpp>
  19. #include <boost/asio/cancellation_signal.hpp>
  20. #include <boost/asio/associated_cancellation_slot.hpp>
  21. #include <boost/intrusive_ptr.hpp>
  22. #include <boost/core/demangle.hpp>
  23. #include <boost/core/span.hpp>
  24. #include <boost/variant2/variant.hpp>
  25. #include <coroutine>
  26. #include <optional>
  27. #include <algorithm>
  28. namespace boost::cobalt::detail
  29. {
  30. struct left_race_tag {};
  31. // helpers it determining the type of things;
  32. template<typename Base, // range of aw
  33. typename Awaitable = Base>
  34. struct race_traits
  35. {
  36. // for a ranges race this is based on the range, not the AW in it.
  37. constexpr static bool is_lvalue = std::is_lvalue_reference_v<Base>;
  38. // what the value is supposed to be cast to before the co_await_operator
  39. using awaitable = std::conditional_t<is_lvalue, std::decay_t<Awaitable> &, Awaitable &&>;
  40. // do we need operator co_await
  41. constexpr static bool is_actual = awaitable_type<awaitable>;
  42. // the type with .await_ functions & interrupt_await
  43. using actual_awaitable
  44. = std::conditional_t<
  45. is_actual,
  46. awaitable,
  47. decltype(get_awaitable_type(std::declval<awaitable>()))>;
  48. // the type to be used with interruptible
  49. using interruptible_type
  50. = std::conditional_t<
  51. std::is_lvalue_reference_v<Base>,
  52. std::decay_t<actual_awaitable> &,
  53. std::decay_t<actual_awaitable> &&>;
  54. constexpr static bool interruptible =
  55. cobalt::interruptible<interruptible_type>;
  56. static void do_interrupt(std::decay_t<actual_awaitable> & aw)
  57. {
  58. if constexpr (interruptible)
  59. static_cast<interruptible_type>(aw).interrupt_await();
  60. }
  61. };
  62. struct interruptible_base
  63. {
  64. virtual void interrupt_await() = 0;
  65. };
  66. template<asio::cancellation_type Ct, typename URBG, typename ... Args>
  67. struct race_variadic_impl
  68. {
  69. template<typename URBG_>
  70. race_variadic_impl(URBG_ && g, Args && ... args)
  71. : args{std::forward<Args>(args)...}, g(std::forward<URBG_>(g))
  72. {
  73. }
  74. std::tuple<Args...> args;
  75. URBG g;
  76. constexpr static std::size_t tuple_size = sizeof...(Args);
  77. struct awaitable : fork::static_shared_state<256 * tuple_size>
  78. {
  79. #if !defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
  80. boost::source_location loc;
  81. #endif
  82. template<std::size_t ... Idx>
  83. awaitable(std::tuple<Args...> & args, URBG & g, std::index_sequence<Idx...>) :
  84. aws{args}
  85. {
  86. if constexpr (!std::is_same_v<URBG, left_race_tag>)
  87. std::shuffle(impls.begin(), impls.end(), g);
  88. std::fill(working.begin(), working.end(), nullptr);
  89. }
  90. std::tuple<Args...> & aws;
  91. std::array<asio::cancellation_signal, tuple_size> cancel_;
  92. template<typename > constexpr static auto make_null() {return nullptr;};
  93. std::array<asio::cancellation_signal*, tuple_size> cancel = {make_null<Args>()...};
  94. std::array<interruptible_base*, tuple_size> working;
  95. std::size_t index{std::numeric_limits<std::size_t>::max()};
  96. constexpr static bool all_void = (std::is_void_v<co_await_result_t<Args>> && ... );
  97. std::optional<variant2::variant<void_as_monostate<co_await_result_t<Args>>...>> result;
  98. std::exception_ptr error;
  99. bool has_result() const
  100. {
  101. return index != std::numeric_limits<std::size_t>::max();
  102. }
  103. void cancel_all()
  104. {
  105. interrupt_await();
  106. for (auto i = 0u; i < tuple_size; i++)
  107. if (auto &r = cancel[i]; r)
  108. std::exchange(r, nullptr)->emit(Ct);
  109. }
  110. void interrupt_await()
  111. {
  112. for (auto i : working)
  113. if (i)
  114. i->interrupt_await();
  115. }
  116. template<typename T, typename Error>
  117. void assign_error(system::result<T, Error> & res)
  118. try
  119. {
  120. std::move(res).value(loc);
  121. }
  122. catch(...)
  123. {
  124. error = std::current_exception();
  125. }
  126. template<typename T>
  127. void assign_error(system::result<T, std::exception_ptr> & res)
  128. {
  129. error = std::move(res).error();
  130. }
  131. template<std::size_t Idx>
  132. static detail::fork await_impl(awaitable & this_)
  133. try
  134. {
  135. using traits = race_traits<mp11::mp_at_c<mp11::mp_list<Args...>, Idx>>;
  136. typename traits::actual_awaitable aw_{
  137. get_awaitable_type(
  138. static_cast<typename traits::awaitable>(std::get<Idx>(this_.aws))
  139. )
  140. };
  141. as_result_t aw{aw_};
  142. struct interruptor final : interruptible_base
  143. {
  144. std::decay_t<typename traits::actual_awaitable> & aw;
  145. interruptor(std::decay_t<typename traits::actual_awaitable> & aw) : aw(aw) {}
  146. void interrupt_await() override
  147. {
  148. traits::do_interrupt(aw);
  149. }
  150. };
  151. interruptor in{aw_};
  152. //if constexpr (traits::interruptible)
  153. this_.working[Idx] = &in;
  154. auto transaction = [&this_, idx = Idx] {
  155. if (this_.has_result())
  156. boost::throw_exception(std::runtime_error("Another transaction already started"));
  157. this_.cancel[idx] = nullptr;
  158. // reserve the index early bc
  159. this_.index = idx;
  160. this_.cancel_all();
  161. };
  162. co_await fork::set_transaction_function(transaction);
  163. // check manually if we're ready
  164. auto rd = aw.await_ready();
  165. if (!rd)
  166. {
  167. this_.cancel[Idx] = &this_.cancel_[Idx];
  168. co_await this_.cancel[Idx]->slot();
  169. // make sure the executor is set
  170. co_await detail::fork::wired_up;
  171. // do the await - this doesn't call await-ready again
  172. if constexpr (std::is_void_v<decltype(aw_.await_resume())>)
  173. {
  174. auto res = co_await aw;
  175. if (!this_.has_result())
  176. {
  177. this_.index = Idx;
  178. if (res.has_error())
  179. this_.assign_error(res);
  180. }
  181. if constexpr(!all_void)
  182. if (this_.index == Idx && !res.has_error())
  183. this_.result.emplace(variant2::in_place_index<Idx>);
  184. }
  185. else
  186. {
  187. auto val = co_await aw;
  188. if (!this_.has_result())
  189. this_.index = Idx;
  190. if (this_.index == Idx)
  191. {
  192. if (val.has_error())
  193. this_.assign_error(val);
  194. else
  195. this_.result.emplace(variant2::in_place_index<Idx>, *std::move(val));
  196. }
  197. }
  198. this_.cancel[Idx] = nullptr;
  199. }
  200. else
  201. {
  202. if (!this_.has_result())
  203. this_.index = Idx;
  204. if constexpr (std::is_void_v<decltype(aw_.await_resume())>)
  205. {
  206. auto res = aw.await_resume();
  207. if (this_.index == Idx)
  208. {
  209. if (res.has_error())
  210. this_.assign_error(res);
  211. else
  212. this_.result.emplace(variant2::in_place_index<Idx>);
  213. }
  214. }
  215. else
  216. {
  217. if (this_.index == Idx)
  218. {
  219. auto res = aw.await_resume();
  220. if (res.has_error())
  221. this_.assign_error(res);
  222. else
  223. this_.result.emplace(variant2::in_place_index<Idx>, *std::move(res));
  224. }
  225. else
  226. aw.await_resume();
  227. }
  228. this_.cancel[Idx] = nullptr;
  229. }
  230. this_.cancel_all();
  231. this_.working[Idx] = nullptr;
  232. }
  233. catch(...)
  234. {
  235. if (!this_.has_result())
  236. this_.index = Idx;
  237. if (this_.index == Idx)
  238. this_.error = std::current_exception();
  239. this_.working[Idx] = nullptr;
  240. }
  241. std::array<detail::fork(*)(awaitable&), tuple_size> impls {
  242. []<std::size_t ... Idx>(std::index_sequence<Idx...>)
  243. {
  244. return std::array<detail::fork(*)(awaitable&), tuple_size>{&await_impl<Idx>...};
  245. }(std::make_index_sequence<tuple_size>{})
  246. };
  247. detail::fork last_forked;
  248. bool await_ready()
  249. {
  250. last_forked = impls[0](*this);
  251. return last_forked.done();
  252. }
  253. template<typename H>
  254. auto await_suspend(
  255. std::coroutine_handle<H> h,
  256. const boost::source_location & loc = BOOST_CURRENT_LOCATION)
  257. {
  258. this->loc = loc;
  259. this->exec = &cobalt::detail::get_executor(h);
  260. last_forked.release().resume();
  261. if (!this->outstanding_work()) // already done, resume rightaway.
  262. return false;
  263. for (std::size_t idx = 1u;
  264. idx < tuple_size; idx++) // we'
  265. {
  266. auto l = impls[idx](*this);
  267. const auto d = l.done();
  268. l.release();
  269. if (d)
  270. break;
  271. }
  272. if (!this->outstanding_work()) // already done, resume rightaway.
  273. return false;
  274. // arm the cancel
  275. assign_cancellation(
  276. h,
  277. [&](asio::cancellation_type ct)
  278. {
  279. for (auto & cs : cancel)
  280. if (cs)
  281. cs->emit(ct);
  282. });
  283. this->coro.reset(h.address());
  284. return true;
  285. }
  286. #if _MSC_VER
  287. BOOST_NOINLINE
  288. #endif
  289. auto await_resume()
  290. {
  291. if (error)
  292. std::rethrow_exception(error);
  293. if constexpr (all_void)
  294. return index;
  295. else
  296. return std::move(*result);
  297. }
  298. auto await_resume(const as_tuple_tag &)
  299. {
  300. if constexpr (all_void)
  301. return std::make_tuple(error, index);
  302. else
  303. return std::make_tuple(error, std::move(*result));
  304. }
  305. auto await_resume(const as_result_tag & )
  306. -> system::result<std::conditional_t<all_void, std::size_t, variant2::variant<void_as_monostate<co_await_result_t<Args>>...>>, std::exception_ptr>
  307. {
  308. if (error)
  309. return {system::in_place_error, error};
  310. if constexpr (all_void)
  311. return {system::in_place_value, index};
  312. else
  313. return {system::in_place_value, std::move(*result)};
  314. }
  315. };
  316. awaitable operator co_await() &&
  317. {
  318. return awaitable{args, g, std::make_index_sequence<tuple_size>{}};
  319. }
  320. };
  321. template<asio::cancellation_type Ct, typename URBG, typename Range>
  322. struct race_ranged_impl
  323. {
  324. using result_type = co_await_result_t<std::decay_t<decltype(*std::begin(std::declval<Range>()))>>;
  325. template<typename URBG_>
  326. race_ranged_impl(URBG_ && g, Range && rng)
  327. : range{std::forward<Range>(rng)}, g(std::forward<URBG_>(g))
  328. {
  329. }
  330. Range range;
  331. URBG g;
  332. struct awaitable : fork::shared_state
  333. {
  334. #if !defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
  335. boost::source_location loc;
  336. #endif
  337. using type = std::decay_t<decltype(*std::begin(std::declval<Range>()))>;
  338. using traits = race_traits<Range, type>;
  339. std::size_t index{std::numeric_limits<std::size_t>::max()};
  340. std::conditional_t<
  341. std::is_void_v<result_type>,
  342. variant2::monostate,
  343. std::optional<result_type>> result;
  344. std::exception_ptr error;
  345. #if !defined(BOOST_COBALT_NO_PMR)
  346. pmr::monotonic_buffer_resource res;
  347. pmr::polymorphic_allocator<void> alloc{&resource};
  348. Range &aws;
  349. struct dummy
  350. {
  351. template<typename ... Args>
  352. dummy(Args && ...) {}
  353. };
  354. std::conditional_t<traits::interruptible,
  355. pmr::vector<std::decay_t<typename traits::actual_awaitable>*>,
  356. dummy> working{std::size(aws), alloc};
  357. /* all below `reorder` is reordered
  358. *
  359. * cancel[idx] is for aws[reorder[idx]]
  360. */
  361. pmr::vector<std::size_t> reorder{std::size(aws), alloc};
  362. pmr::vector<asio::cancellation_signal> cancel_{std::size(aws), alloc};
  363. pmr::vector<asio::cancellation_signal*> cancel{std::size(aws), alloc};
  364. #else
  365. Range &aws;
  366. struct dummy
  367. {
  368. template<typename ... Args>
  369. dummy(Args && ...) {}
  370. };
  371. std::conditional_t<traits::interruptible,
  372. std::vector<std::decay_t<typename traits::actual_awaitable>*>,
  373. dummy> working{std::size(aws), std::allocator<void>()};
  374. /* all below `reorder` is reordered
  375. *
  376. * cancel[idx] is for aws[reorder[idx]]
  377. */
  378. std::vector<std::size_t> reorder{std::size(aws), std::allocator<void>()};
  379. std::vector<asio::cancellation_signal> cancel_{std::size(aws), std::allocator<void>()};
  380. std::vector<asio::cancellation_signal*> cancel{std::size(aws), std::allocator<void>()};
  381. #endif
  382. bool has_result() const {return index != std::numeric_limits<std::size_t>::max(); }
  383. awaitable(Range & aws, URBG & g)
  384. : fork::shared_state((256 + sizeof(co_awaitable_type<type>) + sizeof(std::size_t)) * std::size(aws))
  385. , aws(aws)
  386. {
  387. std::generate(reorder.begin(), reorder.end(), [i = std::size_t(0u)]() mutable {return i++;});
  388. if constexpr (traits::interruptible)
  389. std::fill(working.begin(), working.end(), nullptr);
  390. if constexpr (!std::is_same_v<URBG, left_race_tag>)
  391. std::shuffle(reorder.begin(), reorder.end(), g);
  392. }
  393. void cancel_all()
  394. {
  395. interrupt_await();
  396. for (auto & r : cancel)
  397. if (r)
  398. std::exchange(r, nullptr)->emit(Ct);
  399. }
  400. void interrupt_await()
  401. {
  402. if constexpr (traits::interruptible)
  403. for (auto aw : working)
  404. if (aw)
  405. traits::do_interrupt(*aw);
  406. }
  407. template<typename T, typename Error>
  408. void assign_error(system::result<T, Error> & res)
  409. try
  410. {
  411. std::move(res).value(loc);
  412. }
  413. catch(...)
  414. {
  415. error = std::current_exception();
  416. }
  417. template<typename T>
  418. void assign_error(system::result<T, std::exception_ptr> & res)
  419. {
  420. error = std::move(res).error();
  421. }
  422. static detail::fork await_impl(awaitable & this_, std::size_t idx)
  423. try
  424. {
  425. typename traits::actual_awaitable aw_{
  426. get_awaitable_type(
  427. static_cast<typename traits::awaitable>(*std::next(std::begin(this_.aws), idx))
  428. )};
  429. as_result_t aw{aw_};
  430. if constexpr (traits::interruptible)
  431. this_.working[idx] = &aw_;
  432. auto transaction = [&this_, idx = idx] {
  433. if (this_.has_result())
  434. boost::throw_exception(std::runtime_error("Another transaction already started"));
  435. this_.cancel[idx] = nullptr;
  436. // reserve the index early bc
  437. this_.index = idx;
  438. this_.cancel_all();
  439. };
  440. co_await fork::set_transaction_function(transaction);
  441. // check manually if we're ready
  442. auto rd = aw.await_ready();
  443. if (!rd)
  444. {
  445. this_.cancel[idx] = &this_.cancel_[idx];
  446. co_await this_.cancel[idx]->slot();
  447. // make sure the executor is set
  448. co_await detail::fork::wired_up;
  449. // do the await - this doesn't call await-ready again
  450. if constexpr (std::is_void_v<result_type>)
  451. {
  452. auto res = co_await aw;
  453. if (!this_.has_result())
  454. {
  455. if (res.has_error())
  456. this_.assign_error(res);
  457. this_.index = idx;
  458. }
  459. }
  460. else
  461. {
  462. auto val = co_await aw;
  463. if (!this_.has_result())
  464. this_.index = idx;
  465. if (this_.index == idx)
  466. {
  467. if (val.has_error())
  468. this_.assign_error(val);
  469. else
  470. this_.result.emplace(*std::move(val));
  471. }
  472. }
  473. this_.cancel[idx] = nullptr;
  474. }
  475. else
  476. {
  477. if (!this_.has_result())
  478. this_.index = idx;
  479. if constexpr (std::is_void_v<decltype(aw_.await_resume())>)
  480. {
  481. auto val = aw.await_resume();
  482. if (val.has_error())
  483. this_.assign_error(val);
  484. }
  485. else
  486. {
  487. if (this_.index == idx)
  488. {
  489. auto val = aw.await_resume();
  490. if (val.has_error())
  491. this_.assign_error(val);
  492. else
  493. this_.result.emplace(*std::move(val));
  494. }
  495. else
  496. aw.await_resume();
  497. }
  498. this_.cancel[idx] = nullptr;
  499. }
  500. this_.cancel_all();
  501. if constexpr (traits::interruptible)
  502. this_.working[idx] = nullptr;
  503. }
  504. catch(...)
  505. {
  506. if (!this_.has_result())
  507. this_.index = idx;
  508. if (this_.index == idx)
  509. this_.error = std::current_exception();
  510. if constexpr (traits::interruptible)
  511. this_.working[idx] = nullptr;
  512. }
  513. detail::fork last_forked;
  514. bool await_ready()
  515. {
  516. last_forked = await_impl(*this, reorder.front());
  517. return last_forked.done();
  518. }
  519. template<typename H>
  520. auto await_suspend(std::coroutine_handle<H> h,
  521. const boost::source_location & loc = BOOST_CURRENT_LOCATION)
  522. {
  523. this->loc = loc;
  524. this->exec = &detail::get_executor(h);
  525. last_forked.release().resume();
  526. if (!this->outstanding_work()) // already done, resume rightaway.
  527. return false;
  528. for (auto itr = std::next(reorder.begin());
  529. itr < reorder.end(); std::advance(itr, 1)) // we'
  530. {
  531. auto l = await_impl(*this, *itr);
  532. auto d = l.done();
  533. l.release();
  534. if (d)
  535. break;
  536. }
  537. if (!this->outstanding_work()) // already done, resume rightaway.
  538. return false;
  539. // arm the cancel
  540. assign_cancellation(
  541. h,
  542. [&](asio::cancellation_type ct)
  543. {
  544. for (auto & cs : cancel)
  545. if (cs)
  546. cs->emit(ct);
  547. });
  548. this->coro.reset(h.address());
  549. return true;
  550. }
  551. #if _MSC_VER
  552. BOOST_NOINLINE
  553. #endif
  554. auto await_resume()
  555. {
  556. if (error)
  557. std::rethrow_exception(error);
  558. if constexpr (std::is_void_v<result_type>)
  559. return index;
  560. else
  561. return std::make_pair(index, *result);
  562. }
  563. auto await_resume(const as_tuple_tag &)
  564. {
  565. if constexpr (std::is_void_v<result_type>)
  566. return std::make_tuple(error, index);
  567. else
  568. return std::make_tuple(error, std::make_pair(index, std::move(*result)));
  569. }
  570. auto await_resume(const as_result_tag & )
  571. -> system::result<result_type, std::exception_ptr>
  572. {
  573. if (error)
  574. return {system::in_place_error, error};
  575. if constexpr (std::is_void_v<result_type>)
  576. return {system::in_place_value, index};
  577. else
  578. return {system::in_place_value, std::make_pair(index, std::move(*result))};
  579. }
  580. };
  581. awaitable operator co_await() &&
  582. {
  583. return awaitable{range, g};
  584. }
  585. };
  586. }
  587. #endif //BOOST_COBALT_DETAIL_RACE_HPP