parallel_group.hpp 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789
  1. //
  2. // experimental/impl/parallel_group.hpp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef BOOST_ASIO_IMPL_EXPERIMENTAL_PARALLEL_GROUP_HPP
  11. #define BOOST_ASIO_IMPL_EXPERIMENTAL_PARALLEL_GROUP_HPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <boost/asio/detail/config.hpp>
  16. #include <atomic>
  17. #include <deque>
  18. #include <memory>
  19. #include <new>
  20. #include <tuple>
  21. #include <boost/asio/associated_cancellation_slot.hpp>
  22. #include <boost/asio/detail/recycling_allocator.hpp>
  23. #include <boost/asio/detail/type_traits.hpp>
  24. #include <boost/asio/dispatch.hpp>
  25. #include <boost/asio/detail/push_options.hpp>
  26. namespace boost {
  27. namespace asio {
  28. namespace experimental {
  29. namespace detail {
  30. // Stores the result from an individual asynchronous operation.
  31. template <typename T, typename = void>
  32. struct parallel_group_op_result
  33. {
  34. public:
  35. parallel_group_op_result()
  36. : has_value_(false)
  37. {
  38. }
  39. parallel_group_op_result(parallel_group_op_result&& other)
  40. : has_value_(other.has_value_)
  41. {
  42. if (has_value_)
  43. new (&u_.value_) T(std::move(other.get()));
  44. }
  45. ~parallel_group_op_result()
  46. {
  47. if (has_value_)
  48. u_.value_.~T();
  49. }
  50. T& get() noexcept
  51. {
  52. return u_.value_;
  53. }
  54. template <typename... Args>
  55. void emplace(Args&&... args)
  56. {
  57. new (&u_.value_) T(std::forward<Args>(args)...);
  58. has_value_ = true;
  59. }
  60. private:
  61. union u
  62. {
  63. u() {}
  64. ~u() {}
  65. char c_;
  66. T value_;
  67. } u_;
  68. bool has_value_;
  69. };
  70. // Proxy completion handler for the group of parallel operatations. Unpacks and
  71. // concatenates the individual operations' results, and invokes the user's
  72. // completion handler.
  73. template <typename Handler, typename... Ops>
  74. struct parallel_group_completion_handler
  75. {
  76. typedef decay_t<
  77. prefer_result_t<
  78. associated_executor_t<Handler>,
  79. execution::outstanding_work_t::tracked_t
  80. >
  81. > executor_type;
  82. parallel_group_completion_handler(Handler&& h)
  83. : handler_(std::move(h)),
  84. executor_(
  85. boost::asio::prefer(
  86. boost::asio::get_associated_executor(handler_),
  87. execution::outstanding_work.tracked))
  88. {
  89. }
  90. executor_type get_executor() const noexcept
  91. {
  92. return executor_;
  93. }
  94. void operator()()
  95. {
  96. this->invoke(boost::asio::detail::make_index_sequence<sizeof...(Ops)>());
  97. }
  98. template <std::size_t... I>
  99. void invoke(boost::asio::detail::index_sequence<I...>)
  100. {
  101. this->invoke(std::tuple_cat(std::move(std::get<I>(args_).get())...));
  102. }
  103. template <typename... Args>
  104. void invoke(std::tuple<Args...>&& args)
  105. {
  106. this->invoke(std::move(args),
  107. boost::asio::detail::index_sequence_for<Args...>());
  108. }
  109. template <typename... Args, std::size_t... I>
  110. void invoke(std::tuple<Args...>&& args,
  111. boost::asio::detail::index_sequence<I...>)
  112. {
  113. std::move(handler_)(completion_order_, std::move(std::get<I>(args))...);
  114. }
  115. Handler handler_;
  116. executor_type executor_;
  117. std::array<std::size_t, sizeof...(Ops)> completion_order_{};
  118. std::tuple<
  119. parallel_group_op_result<
  120. typename parallel_op_signature_as_tuple<
  121. completion_signature_of_t<Ops>
  122. >::type
  123. >...
  124. > args_{};
  125. };
  126. // Shared state for the parallel group.
  127. template <typename Condition, typename Handler, typename... Ops>
  128. struct parallel_group_state
  129. {
  130. parallel_group_state(Condition&& c, Handler&& h)
  131. : cancellation_condition_(std::move(c)),
  132. handler_(std::move(h))
  133. {
  134. }
  135. // The number of operations that have completed so far. Used to determine the
  136. // order of completion.
  137. std::atomic<unsigned int> completed_{0};
  138. // The non-none cancellation type that resulted from a cancellation condition.
  139. // Stored here for use by the group's initiating function.
  140. std::atomic<cancellation_type_t> cancel_type_{cancellation_type::none};
  141. // The number of cancellations that have been requested, either on completion
  142. // of the operations within the group, or via the cancellation slot for the
  143. // group operation. Initially set to the number of operations to prevent
  144. // cancellation signals from being emitted until after all of the group's
  145. // operations' initiating functions have completed.
  146. std::atomic<unsigned int> cancellations_requested_{sizeof...(Ops)};
  147. // The number of operations that are yet to complete. Used to determine when
  148. // it is safe to invoke the user's completion handler.
  149. std::atomic<unsigned int> outstanding_{sizeof...(Ops)};
  150. // The cancellation signals for each operation in the group.
  151. boost::asio::cancellation_signal cancellation_signals_[sizeof...(Ops)];
  152. // The cancellation condition is used to determine whether the results from an
  153. // individual operation warrant a cancellation request for the whole group.
  154. Condition cancellation_condition_;
  155. // The proxy handler to be invoked once all operations in the group complete.
  156. parallel_group_completion_handler<Handler, Ops...> handler_;
  157. };
  158. // Handler for an individual operation within the parallel group.
  159. template <std::size_t I, typename Condition, typename Handler, typename... Ops>
  160. struct parallel_group_op_handler
  161. {
  162. typedef boost::asio::cancellation_slot cancellation_slot_type;
  163. parallel_group_op_handler(
  164. std::shared_ptr<parallel_group_state<Condition, Handler, Ops...>> state)
  165. : state_(std::move(state))
  166. {
  167. }
  168. cancellation_slot_type get_cancellation_slot() const noexcept
  169. {
  170. return state_->cancellation_signals_[I].slot();
  171. }
  172. template <typename... Args>
  173. void operator()(Args... args)
  174. {
  175. // Capture this operation into the completion order.
  176. state_->handler_.completion_order_[state_->completed_++] = I;
  177. // Determine whether the results of this operation require cancellation of
  178. // the whole group.
  179. cancellation_type_t cancel_type = state_->cancellation_condition_(args...);
  180. // Capture the result of the operation into the proxy completion handler.
  181. std::get<I>(state_->handler_.args_).emplace(std::move(args)...);
  182. if (cancel_type != cancellation_type::none)
  183. {
  184. // Save the type for potential use by the group's initiating function.
  185. state_->cancel_type_ = cancel_type;
  186. // If we are the first operation to request cancellation, emit a signal
  187. // for each operation in the group.
  188. if (state_->cancellations_requested_++ == 0)
  189. for (std::size_t i = 0; i < sizeof...(Ops); ++i)
  190. if (i != I)
  191. state_->cancellation_signals_[i].emit(cancel_type);
  192. }
  193. // If this is the last outstanding operation, invoke the user's handler.
  194. if (--state_->outstanding_ == 0)
  195. boost::asio::dispatch(std::move(state_->handler_));
  196. }
  197. std::shared_ptr<parallel_group_state<Condition, Handler, Ops...>> state_;
  198. };
  199. // Handler for an individual operation within the parallel group that has an
  200. // explicitly specified executor.
  201. template <typename Executor, std::size_t I,
  202. typename Condition, typename Handler, typename... Ops>
  203. struct parallel_group_op_handler_with_executor :
  204. parallel_group_op_handler<I, Condition, Handler, Ops...>
  205. {
  206. typedef parallel_group_op_handler<I, Condition, Handler, Ops...> base_type;
  207. typedef boost::asio::cancellation_slot cancellation_slot_type;
  208. typedef Executor executor_type;
  209. parallel_group_op_handler_with_executor(
  210. std::shared_ptr<parallel_group_state<Condition, Handler, Ops...>> state,
  211. executor_type ex)
  212. : parallel_group_op_handler<I, Condition, Handler, Ops...>(std::move(state))
  213. {
  214. cancel_proxy_ =
  215. &this->state_->cancellation_signals_[I].slot().template
  216. emplace<cancel_proxy>(this->state_, std::move(ex));
  217. }
  218. cancellation_slot_type get_cancellation_slot() const noexcept
  219. {
  220. return cancel_proxy_->signal_.slot();
  221. }
  222. executor_type get_executor() const noexcept
  223. {
  224. return cancel_proxy_->executor_;
  225. }
  226. // Proxy handler that forwards the emitted signal to the correct executor.
  227. struct cancel_proxy
  228. {
  229. cancel_proxy(
  230. std::shared_ptr<parallel_group_state<
  231. Condition, Handler, Ops...>> state,
  232. executor_type ex)
  233. : state_(std::move(state)),
  234. executor_(std::move(ex))
  235. {
  236. }
  237. void operator()(cancellation_type_t type)
  238. {
  239. if (auto state = state_.lock())
  240. {
  241. boost::asio::cancellation_signal* sig = &signal_;
  242. boost::asio::dispatch(executor_,
  243. [state, sig, type]{ sig->emit(type); });
  244. }
  245. }
  246. std::weak_ptr<parallel_group_state<Condition, Handler, Ops...>> state_;
  247. boost::asio::cancellation_signal signal_;
  248. executor_type executor_;
  249. };
  250. cancel_proxy* cancel_proxy_;
  251. };
  252. // Helper to launch an operation using the correct executor, if any.
  253. template <std::size_t I, typename Op, typename = void>
  254. struct parallel_group_op_launcher
  255. {
  256. template <typename Condition, typename Handler, typename... Ops>
  257. static void launch(Op& op,
  258. const std::shared_ptr<parallel_group_state<
  259. Condition, Handler, Ops...>>& state)
  260. {
  261. typedef associated_executor_t<Op> ex_type;
  262. ex_type ex = boost::asio::get_associated_executor(op);
  263. std::move(op)(
  264. parallel_group_op_handler_with_executor<ex_type, I,
  265. Condition, Handler, Ops...>(state, std::move(ex)));
  266. }
  267. };
  268. // Specialised launcher for operations that specify no executor.
  269. template <std::size_t I, typename Op>
  270. struct parallel_group_op_launcher<I, Op,
  271. enable_if_t<
  272. is_same<
  273. typename associated_executor<
  274. Op>::asio_associated_executor_is_unspecialised,
  275. void
  276. >::value
  277. >>
  278. {
  279. template <typename Condition, typename Handler, typename... Ops>
  280. static void launch(Op& op,
  281. const std::shared_ptr<parallel_group_state<
  282. Condition, Handler, Ops...>>& state)
  283. {
  284. std::move(op)(
  285. parallel_group_op_handler<I, Condition, Handler, Ops...>(state));
  286. }
  287. };
  288. template <typename Condition, typename Handler, typename... Ops>
  289. struct parallel_group_cancellation_handler
  290. {
  291. parallel_group_cancellation_handler(
  292. std::shared_ptr<parallel_group_state<Condition, Handler, Ops...>> state)
  293. : state_(std::move(state))
  294. {
  295. }
  296. void operator()(cancellation_type_t cancel_type)
  297. {
  298. // If we are the first place to request cancellation, i.e. no operation has
  299. // yet completed and requested cancellation, emit a signal for each
  300. // operation in the group.
  301. if (cancel_type != cancellation_type::none)
  302. if (auto state = state_.lock())
  303. if (state->cancellations_requested_++ == 0)
  304. for (std::size_t i = 0; i < sizeof...(Ops); ++i)
  305. state->cancellation_signals_[i].emit(cancel_type);
  306. }
  307. std::weak_ptr<parallel_group_state<Condition, Handler, Ops...>> state_;
  308. };
  309. template <typename Condition, typename Handler,
  310. typename... Ops, std::size_t... I>
  311. void parallel_group_launch(Condition cancellation_condition, Handler handler,
  312. std::tuple<Ops...>& ops, boost::asio::detail::index_sequence<I...>)
  313. {
  314. // Get the user's completion handler's cancellation slot, so that we can allow
  315. // cancellation of the entire group.
  316. associated_cancellation_slot_t<Handler> slot
  317. = boost::asio::get_associated_cancellation_slot(handler);
  318. // Create the shared state for the operation.
  319. typedef parallel_group_state<Condition, Handler, Ops...> state_type;
  320. std::shared_ptr<state_type> state = std::allocate_shared<state_type>(
  321. boost::asio::detail::recycling_allocator<state_type,
  322. boost::asio::detail::thread_info_base::parallel_group_tag>(),
  323. std::move(cancellation_condition), std::move(handler));
  324. // Initiate each individual operation in the group.
  325. int fold[] = { 0,
  326. ( parallel_group_op_launcher<I, Ops>::launch(std::get<I>(ops), state),
  327. 0 )...
  328. };
  329. (void)fold;
  330. // Check if any of the operations has already requested cancellation, and if
  331. // so, emit a signal for each operation in the group.
  332. if ((state->cancellations_requested_ -= sizeof...(Ops)) > 0)
  333. for (auto& signal : state->cancellation_signals_)
  334. signal.emit(state->cancel_type_);
  335. // Register a handler with the user's completion handler's cancellation slot.
  336. if (slot.is_connected())
  337. slot.template emplace<
  338. parallel_group_cancellation_handler<
  339. Condition, Handler, Ops...>>(state);
  340. }
  341. // Proxy completion handler for the ranged group of parallel operatations.
  342. // Unpacks and recombines the individual operations' results, and invokes the
  343. // user's completion handler.
  344. template <typename Handler, typename Op, typename Allocator>
  345. struct ranged_parallel_group_completion_handler
  346. {
  347. typedef decay_t<
  348. prefer_result_t<
  349. associated_executor_t<Handler>,
  350. execution::outstanding_work_t::tracked_t
  351. >
  352. > executor_type;
  353. typedef typename parallel_op_signature_as_tuple<
  354. completion_signature_of_t<Op>
  355. >::type op_tuple_type;
  356. typedef parallel_group_op_result<op_tuple_type> op_result_type;
  357. ranged_parallel_group_completion_handler(Handler&& h,
  358. std::size_t size, const Allocator& allocator)
  359. : handler_(std::move(h)),
  360. executor_(
  361. boost::asio::prefer(
  362. boost::asio::get_associated_executor(handler_),
  363. execution::outstanding_work.tracked)),
  364. allocator_(allocator),
  365. completion_order_(size, 0,
  366. BOOST_ASIO_REBIND_ALLOC(Allocator, std::size_t)(allocator)),
  367. args_(BOOST_ASIO_REBIND_ALLOC(Allocator, op_result_type)(allocator))
  368. {
  369. for (std::size_t i = 0; i < size; ++i)
  370. args_.emplace_back();
  371. }
  372. executor_type get_executor() const noexcept
  373. {
  374. return executor_;
  375. }
  376. void operator()()
  377. {
  378. this->invoke(
  379. boost::asio::detail::make_index_sequence<
  380. std::tuple_size<op_tuple_type>::value>());
  381. }
  382. template <std::size_t... I>
  383. void invoke(boost::asio::detail::index_sequence<I...>)
  384. {
  385. typedef typename parallel_op_signature_as_tuple<
  386. typename ranged_parallel_group_signature<
  387. completion_signature_of_t<Op>,
  388. Allocator
  389. >::raw_type
  390. >::type vectors_type;
  391. // Construct all result vectors using the supplied allocator.
  392. vectors_type vectors{
  393. typename std::tuple_element<I, vectors_type>::type(
  394. BOOST_ASIO_REBIND_ALLOC(Allocator, int)(allocator_))...};
  395. // Reserve sufficient space in each of the result vectors.
  396. int reserve_fold[] = { 0,
  397. ( std::get<I>(vectors).reserve(completion_order_.size()),
  398. 0 )...
  399. };
  400. (void)reserve_fold;
  401. // Copy the results from all operations into the result vectors.
  402. for (std::size_t idx = 0; idx < completion_order_.size(); ++idx)
  403. {
  404. int pushback_fold[] = { 0,
  405. ( std::get<I>(vectors).push_back(
  406. std::move(std::get<I>(args_[idx].get()))),
  407. 0 )...
  408. };
  409. (void)pushback_fold;
  410. }
  411. std::move(handler_)(completion_order_, std::move(std::get<I>(vectors))...);
  412. }
  413. Handler handler_;
  414. executor_type executor_;
  415. Allocator allocator_;
  416. std::vector<std::size_t,
  417. BOOST_ASIO_REBIND_ALLOC(Allocator, std::size_t)> completion_order_;
  418. std::deque<op_result_type,
  419. BOOST_ASIO_REBIND_ALLOC(Allocator, op_result_type)> args_;
  420. };
  421. // Shared state for the parallel group.
  422. template <typename Condition, typename Handler, typename Op, typename Allocator>
  423. struct ranged_parallel_group_state
  424. {
  425. ranged_parallel_group_state(Condition&& c, Handler&& h,
  426. std::size_t size, const Allocator& allocator)
  427. : cancellations_requested_(size),
  428. outstanding_(size),
  429. cancellation_signals_(
  430. BOOST_ASIO_REBIND_ALLOC(Allocator,
  431. boost::asio::cancellation_signal)(allocator)),
  432. cancellation_condition_(std::move(c)),
  433. handler_(std::move(h), size, allocator)
  434. {
  435. for (std::size_t i = 0; i < size; ++i)
  436. cancellation_signals_.emplace_back();
  437. }
  438. // The number of operations that have completed so far. Used to determine the
  439. // order of completion.
  440. std::atomic<unsigned int> completed_{0};
  441. // The non-none cancellation type that resulted from a cancellation condition.
  442. // Stored here for use by the group's initiating function.
  443. std::atomic<cancellation_type_t> cancel_type_{cancellation_type::none};
  444. // The number of cancellations that have been requested, either on completion
  445. // of the operations within the group, or via the cancellation slot for the
  446. // group operation. Initially set to the number of operations to prevent
  447. // cancellation signals from being emitted until after all of the group's
  448. // operations' initiating functions have completed.
  449. std::atomic<unsigned int> cancellations_requested_;
  450. // The number of operations that are yet to complete. Used to determine when
  451. // it is safe to invoke the user's completion handler.
  452. std::atomic<unsigned int> outstanding_;
  453. // The cancellation signals for each operation in the group.
  454. std::deque<boost::asio::cancellation_signal,
  455. BOOST_ASIO_REBIND_ALLOC(Allocator, boost::asio::cancellation_signal)>
  456. cancellation_signals_;
  457. // The cancellation condition is used to determine whether the results from an
  458. // individual operation warrant a cancellation request for the whole group.
  459. Condition cancellation_condition_;
  460. // The proxy handler to be invoked once all operations in the group complete.
  461. ranged_parallel_group_completion_handler<Handler, Op, Allocator> handler_;
  462. };
  463. // Handler for an individual operation within the parallel group.
  464. template <typename Condition, typename Handler, typename Op, typename Allocator>
  465. struct ranged_parallel_group_op_handler
  466. {
  467. typedef boost::asio::cancellation_slot cancellation_slot_type;
  468. ranged_parallel_group_op_handler(
  469. std::shared_ptr<ranged_parallel_group_state<
  470. Condition, Handler, Op, Allocator>> state,
  471. std::size_t idx)
  472. : state_(std::move(state)),
  473. idx_(idx)
  474. {
  475. }
  476. cancellation_slot_type get_cancellation_slot() const noexcept
  477. {
  478. return state_->cancellation_signals_[idx_].slot();
  479. }
  480. template <typename... Args>
  481. void operator()(Args... args)
  482. {
  483. // Capture this operation into the completion order.
  484. state_->handler_.completion_order_[state_->completed_++] = idx_;
  485. // Determine whether the results of this operation require cancellation of
  486. // the whole group.
  487. cancellation_type_t cancel_type = state_->cancellation_condition_(args...);
  488. // Capture the result of the operation into the proxy completion handler.
  489. state_->handler_.args_[idx_].emplace(std::move(args)...);
  490. if (cancel_type != cancellation_type::none)
  491. {
  492. // Save the type for potential use by the group's initiating function.
  493. state_->cancel_type_ = cancel_type;
  494. // If we are the first operation to request cancellation, emit a signal
  495. // for each operation in the group.
  496. if (state_->cancellations_requested_++ == 0)
  497. for (std::size_t i = 0; i < state_->cancellation_signals_.size(); ++i)
  498. if (i != idx_)
  499. state_->cancellation_signals_[i].emit(cancel_type);
  500. }
  501. // If this is the last outstanding operation, invoke the user's handler.
  502. if (--state_->outstanding_ == 0)
  503. boost::asio::dispatch(std::move(state_->handler_));
  504. }
  505. std::shared_ptr<ranged_parallel_group_state<
  506. Condition, Handler, Op, Allocator>> state_;
  507. std::size_t idx_;
  508. };
  509. // Handler for an individual operation within the parallel group that has an
  510. // explicitly specified executor.
  511. template <typename Executor, typename Condition,
  512. typename Handler, typename Op, typename Allocator>
  513. struct ranged_parallel_group_op_handler_with_executor :
  514. ranged_parallel_group_op_handler<Condition, Handler, Op, Allocator>
  515. {
  516. typedef ranged_parallel_group_op_handler<
  517. Condition, Handler, Op, Allocator> base_type;
  518. typedef boost::asio::cancellation_slot cancellation_slot_type;
  519. typedef Executor executor_type;
  520. ranged_parallel_group_op_handler_with_executor(
  521. std::shared_ptr<ranged_parallel_group_state<
  522. Condition, Handler, Op, Allocator>> state,
  523. executor_type ex, std::size_t idx)
  524. : ranged_parallel_group_op_handler<Condition, Handler, Op, Allocator>(
  525. std::move(state), idx)
  526. {
  527. cancel_proxy_ =
  528. &this->state_->cancellation_signals_[idx].slot().template
  529. emplace<cancel_proxy>(this->state_, std::move(ex));
  530. }
  531. cancellation_slot_type get_cancellation_slot() const noexcept
  532. {
  533. return cancel_proxy_->signal_.slot();
  534. }
  535. executor_type get_executor() const noexcept
  536. {
  537. return cancel_proxy_->executor_;
  538. }
  539. // Proxy handler that forwards the emitted signal to the correct executor.
  540. struct cancel_proxy
  541. {
  542. cancel_proxy(
  543. std::shared_ptr<ranged_parallel_group_state<
  544. Condition, Handler, Op, Allocator>> state,
  545. executor_type ex)
  546. : state_(std::move(state)),
  547. executor_(std::move(ex))
  548. {
  549. }
  550. void operator()(cancellation_type_t type)
  551. {
  552. if (auto state = state_.lock())
  553. {
  554. boost::asio::cancellation_signal* sig = &signal_;
  555. boost::asio::dispatch(executor_,
  556. [state, sig, type]{ sig->emit(type); });
  557. }
  558. }
  559. std::weak_ptr<ranged_parallel_group_state<
  560. Condition, Handler, Op, Allocator>> state_;
  561. boost::asio::cancellation_signal signal_;
  562. executor_type executor_;
  563. };
  564. cancel_proxy* cancel_proxy_;
  565. };
  566. template <typename Condition, typename Handler, typename Op, typename Allocator>
  567. struct ranged_parallel_group_cancellation_handler
  568. {
  569. ranged_parallel_group_cancellation_handler(
  570. std::shared_ptr<ranged_parallel_group_state<
  571. Condition, Handler, Op, Allocator>> state)
  572. : state_(std::move(state))
  573. {
  574. }
  575. void operator()(cancellation_type_t cancel_type)
  576. {
  577. // If we are the first place to request cancellation, i.e. no operation has
  578. // yet completed and requested cancellation, emit a signal for each
  579. // operation in the group.
  580. if (cancel_type != cancellation_type::none)
  581. if (auto state = state_.lock())
  582. if (state->cancellations_requested_++ == 0)
  583. for (std::size_t i = 0; i < state->cancellation_signals_.size(); ++i)
  584. state->cancellation_signals_[i].emit(cancel_type);
  585. }
  586. std::weak_ptr<ranged_parallel_group_state<
  587. Condition, Handler, Op, Allocator>> state_;
  588. };
  589. template <typename Condition, typename Handler,
  590. typename Range, typename Allocator>
  591. void ranged_parallel_group_launch(Condition cancellation_condition,
  592. Handler handler, Range&& range, const Allocator& allocator)
  593. {
  594. // Get the user's completion handler's cancellation slot, so that we can allow
  595. // cancellation of the entire group.
  596. associated_cancellation_slot_t<Handler> slot
  597. = boost::asio::get_associated_cancellation_slot(handler);
  598. // The type of the asynchronous operation.
  599. typedef decay_t<decltype(*declval<typename Range::iterator>())> op_type;
  600. // Create the shared state for the operation.
  601. typedef ranged_parallel_group_state<Condition,
  602. Handler, op_type, Allocator> state_type;
  603. std::shared_ptr<state_type> state = std::allocate_shared<state_type>(
  604. boost::asio::detail::recycling_allocator<state_type,
  605. boost::asio::detail::thread_info_base::parallel_group_tag>(),
  606. std::move(cancellation_condition),
  607. std::move(handler), range.size(), allocator);
  608. std::size_t idx = 0;
  609. for (auto&& op : std::forward<Range>(range))
  610. {
  611. typedef associated_executor_t<op_type> ex_type;
  612. ex_type ex = boost::asio::get_associated_executor(op);
  613. std::move(op)(
  614. ranged_parallel_group_op_handler_with_executor<
  615. ex_type, Condition, Handler, op_type, Allocator>(
  616. state, std::move(ex), idx++));
  617. }
  618. // Check if any of the operations has already requested cancellation, and if
  619. // so, emit a signal for each operation in the group.
  620. if ((state->cancellations_requested_ -= range.size()) > 0)
  621. for (auto& signal : state->cancellation_signals_)
  622. signal.emit(state->cancel_type_);
  623. // Register a handler with the user's completion handler's cancellation slot.
  624. if (slot.is_connected())
  625. slot.template emplace<
  626. ranged_parallel_group_cancellation_handler<
  627. Condition, Handler, op_type, Allocator>>(state);
  628. }
  629. } // namespace detail
  630. } // namespace experimental
  631. template <template <typename, typename> class Associator,
  632. typename Handler, typename... Ops, typename DefaultCandidate>
  633. struct associator<Associator,
  634. experimental::detail::parallel_group_completion_handler<Handler, Ops...>,
  635. DefaultCandidate>
  636. : Associator<Handler, DefaultCandidate>
  637. {
  638. static typename Associator<Handler, DefaultCandidate>::type get(
  639. const experimental::detail::parallel_group_completion_handler<
  640. Handler, Ops...>& h) noexcept
  641. {
  642. return Associator<Handler, DefaultCandidate>::get(h.handler_);
  643. }
  644. static auto get(
  645. const experimental::detail::parallel_group_completion_handler<
  646. Handler, Ops...>& h,
  647. const DefaultCandidate& c) noexcept
  648. -> decltype(Associator<Handler, DefaultCandidate>::get(h.handler_, c))
  649. {
  650. return Associator<Handler, DefaultCandidate>::get(h.handler_, c);
  651. }
  652. };
  653. template <template <typename, typename> class Associator, typename Handler,
  654. typename Op, typename Allocator, typename DefaultCandidate>
  655. struct associator<Associator,
  656. experimental::detail::ranged_parallel_group_completion_handler<
  657. Handler, Op, Allocator>,
  658. DefaultCandidate>
  659. : Associator<Handler, DefaultCandidate>
  660. {
  661. static typename Associator<Handler, DefaultCandidate>::type get(
  662. const experimental::detail::ranged_parallel_group_completion_handler<
  663. Handler, Op, Allocator>& h) noexcept
  664. {
  665. return Associator<Handler, DefaultCandidate>::get(h.handler_);
  666. }
  667. static auto get(
  668. const experimental::detail::ranged_parallel_group_completion_handler<
  669. Handler, Op, Allocator>& h,
  670. const DefaultCandidate& c) noexcept
  671. -> decltype(Associator<Handler, DefaultCandidate>::get(h.handler_, c))
  672. {
  673. return Associator<Handler, DefaultCandidate>::get(h.handler_, c);
  674. }
  675. };
  676. } // namespace asio
  677. } // namespace boost
  678. #include <boost/asio/detail/pop_options.hpp>
  679. #endif // BOOST_ASIO_IMPL_EXPERIMENTAL_PARALLEL_GROUP_HPP