channel_service.hpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679
  1. //
  2. // experimental/detail/channel_service.hpp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef BOOST_ASIO_EXPERIMENTAL_DETAIL_CHANNEL_SERVICE_HPP
  11. #define BOOST_ASIO_EXPERIMENTAL_DETAIL_CHANNEL_SERVICE_HPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <boost/asio/detail/config.hpp>
  16. #include <boost/asio/associated_cancellation_slot.hpp>
  17. #include <boost/asio/cancellation_type.hpp>
  18. #include <boost/asio/detail/mutex.hpp>
  19. #include <boost/asio/detail/op_queue.hpp>
  20. #include <boost/asio/execution_context.hpp>
  21. #include <boost/asio/experimental/detail/channel_message.hpp>
  22. #include <boost/asio/experimental/detail/channel_receive_op.hpp>
  23. #include <boost/asio/experimental/detail/channel_send_op.hpp>
  24. #include <boost/asio/experimental/detail/has_signature.hpp>
  25. #include <boost/asio/detail/push_options.hpp>
  26. namespace boost {
  27. namespace asio {
  28. namespace experimental {
  29. namespace detail {
  30. template <typename Mutex>
  31. class channel_service
  32. : public boost::asio::detail::execution_context_service_base<
  33. channel_service<Mutex>>
  34. {
  35. public:
  36. // Possible states for a channel end.
  37. enum state
  38. {
  39. buffer = 0,
  40. waiter = 1,
  41. block = 2,
  42. closed = 3
  43. };
  44. // The base implementation type of all channels.
  45. struct base_implementation_type
  46. {
  47. // Default constructor.
  48. base_implementation_type()
  49. : receive_state_(block),
  50. send_state_(block),
  51. max_buffer_size_(0),
  52. next_(0),
  53. prev_(0)
  54. {
  55. }
  56. // The current state of the channel.
  57. state receive_state_ : 16;
  58. state send_state_ : 16;
  59. // The maximum number of elements that may be buffered in the channel.
  60. std::size_t max_buffer_size_;
  61. // The operations that are waiting on the channel.
  62. boost::asio::detail::op_queue<channel_operation> waiters_;
  63. // Pointers to adjacent channel implementations in linked list.
  64. base_implementation_type* next_;
  65. base_implementation_type* prev_;
  66. // The mutex type to protect the internal implementation.
  67. mutable Mutex mutex_;
  68. };
  69. // The implementation for a specific value type.
  70. template <typename Traits, typename... Signatures>
  71. struct implementation_type;
  72. // Constructor.
  73. channel_service(execution_context& ctx);
  74. // Destroy all user-defined handler objects owned by the service.
  75. void shutdown();
  76. // Construct a new channel implementation.
  77. void construct(base_implementation_type& impl, std::size_t max_buffer_size);
  78. // Destroy a channel implementation.
  79. template <typename Traits, typename... Signatures>
  80. void destroy(implementation_type<Traits, Signatures...>& impl);
  81. // Move-construct a new channel implementation.
  82. template <typename Traits, typename... Signatures>
  83. void move_construct(implementation_type<Traits, Signatures...>& impl,
  84. implementation_type<Traits, Signatures...>& other_impl);
  85. // Move-assign from another channel implementation.
  86. template <typename Traits, typename... Signatures>
  87. void move_assign(implementation_type<Traits, Signatures...>& impl,
  88. channel_service& other_service,
  89. implementation_type<Traits, Signatures...>& other_impl);
  90. // Get the capacity of the channel.
  91. std::size_t capacity(
  92. const base_implementation_type& impl) const noexcept;
  93. // Determine whether the channel is open.
  94. bool is_open(const base_implementation_type& impl) const noexcept;
  95. // Reset the channel to its initial state.
  96. template <typename Traits, typename... Signatures>
  97. void reset(implementation_type<Traits, Signatures...>& impl);
  98. // Close the channel.
  99. template <typename Traits, typename... Signatures>
  100. void close(implementation_type<Traits, Signatures...>& impl);
  101. // Cancel all operations associated with the channel.
  102. template <typename Traits, typename... Signatures>
  103. void cancel(implementation_type<Traits, Signatures...>& impl);
  104. // Cancel the operation associated with the channel that has the given key.
  105. template <typename Traits, typename... Signatures>
  106. void cancel_by_key(implementation_type<Traits, Signatures...>& impl,
  107. void* cancellation_key);
  108. // Determine whether a value can be read from the channel without blocking.
  109. bool ready(const base_implementation_type& impl) const noexcept;
  110. // Synchronously send a new value into the channel.
  111. template <typename Message, typename Traits,
  112. typename... Signatures, typename... Args>
  113. bool try_send(implementation_type<Traits, Signatures...>& impl,
  114. bool via_dispatch, Args&&... args);
  115. // Synchronously send a number of new values into the channel.
  116. template <typename Message, typename Traits,
  117. typename... Signatures, typename... Args>
  118. std::size_t try_send_n(implementation_type<Traits, Signatures...>& impl,
  119. std::size_t count, bool via_dispatch, Args&&... args);
  120. // Asynchronously send a new value into the channel.
  121. template <typename Traits, typename... Signatures,
  122. typename Handler, typename IoExecutor>
  123. void async_send(implementation_type<Traits, Signatures...>& impl,
  124. typename implementation_type<Traits,
  125. Signatures...>::payload_type&& payload,
  126. Handler& handler, const IoExecutor& io_ex)
  127. {
  128. associated_cancellation_slot_t<Handler> slot
  129. = boost::asio::get_associated_cancellation_slot(handler);
  130. // Allocate and construct an operation to wrap the handler.
  131. typedef channel_send_op<
  132. typename implementation_type<Traits, Signatures...>::payload_type,
  133. Handler, IoExecutor> op;
  134. typename op::ptr p = { boost::asio::detail::addressof(handler),
  135. op::ptr::allocate(handler), 0 };
  136. p.p = new (p.v) op(static_cast<typename implementation_type<
  137. Traits, Signatures...>::payload_type&&>(payload), handler, io_ex);
  138. // Optionally register for per-operation cancellation.
  139. if (slot.is_connected())
  140. {
  141. p.p->cancellation_key_ =
  142. &slot.template emplace<op_cancellation<Traits, Signatures...>>(
  143. this, &impl);
  144. }
  145. BOOST_ASIO_HANDLER_CREATION((this->context(), *p.p,
  146. "channel", &impl, 0, "async_send"));
  147. start_send_op(impl, p.p);
  148. p.v = p.p = 0;
  149. }
  150. // Synchronously receive a value from the channel.
  151. template <typename Traits, typename... Signatures, typename Handler>
  152. bool try_receive(implementation_type<Traits, Signatures...>& impl,
  153. Handler&& handler);
  154. // Asynchronously receive a value from the channel.
  155. template <typename Traits, typename... Signatures,
  156. typename Handler, typename IoExecutor>
  157. void async_receive(implementation_type<Traits, Signatures...>& impl,
  158. Handler& handler, const IoExecutor& io_ex)
  159. {
  160. associated_cancellation_slot_t<Handler> slot
  161. = boost::asio::get_associated_cancellation_slot(handler);
  162. // Allocate and construct an operation to wrap the handler.
  163. typedef channel_receive_op<
  164. typename implementation_type<Traits, Signatures...>::payload_type,
  165. Handler, IoExecutor> op;
  166. typename op::ptr p = { boost::asio::detail::addressof(handler),
  167. op::ptr::allocate(handler), 0 };
  168. p.p = new (p.v) op(handler, io_ex);
  169. // Optionally register for per-operation cancellation.
  170. if (slot.is_connected())
  171. {
  172. p.p->cancellation_key_ =
  173. &slot.template emplace<op_cancellation<Traits, Signatures...>>(
  174. this, &impl);
  175. }
  176. BOOST_ASIO_HANDLER_CREATION((this->context(), *p.p,
  177. "channel", &impl, 0, "async_receive"));
  178. start_receive_op(impl, p.p);
  179. p.v = p.p = 0;
  180. }
  181. private:
  182. // Helper function object to handle a closed notification.
  183. template <typename Payload, typename Signature>
  184. struct post_receive
  185. {
  186. explicit post_receive(channel_receive<Payload>* op)
  187. : op_(op)
  188. {
  189. }
  190. template <typename... Args>
  191. void operator()(Args&&... args)
  192. {
  193. op_->post(
  194. channel_message<Signature>(0,
  195. static_cast<Args&&>(args)...));
  196. }
  197. channel_receive<Payload>* op_;
  198. };
  199. // Destroy a base channel implementation.
  200. void base_destroy(base_implementation_type& impl);
  201. // Helper function to start an asynchronous put operation.
  202. template <typename Traits, typename... Signatures>
  203. void start_send_op(implementation_type<Traits, Signatures...>& impl,
  204. channel_send<typename implementation_type<
  205. Traits, Signatures...>::payload_type>* send_op);
  206. // Helper function to start an asynchronous get operation.
  207. template <typename Traits, typename... Signatures>
  208. void start_receive_op(implementation_type<Traits, Signatures...>& impl,
  209. channel_receive<typename implementation_type<
  210. Traits, Signatures...>::payload_type>* receive_op);
  211. // Helper class used to implement per-operation cancellation.
  212. template <typename Traits, typename... Signatures>
  213. class op_cancellation
  214. {
  215. public:
  216. op_cancellation(channel_service* s,
  217. implementation_type<Traits, Signatures...>* impl)
  218. : service_(s),
  219. impl_(impl)
  220. {
  221. }
  222. void operator()(cancellation_type_t type)
  223. {
  224. if (!!(type &
  225. (cancellation_type::terminal
  226. | cancellation_type::partial
  227. | cancellation_type::total)))
  228. {
  229. service_->cancel_by_key(*impl_, this);
  230. }
  231. }
  232. private:
  233. channel_service* service_;
  234. implementation_type<Traits, Signatures...>* impl_;
  235. };
  236. // Mutex to protect access to the linked list of implementations.
  237. boost::asio::detail::mutex mutex_;
  238. // The head of a linked list of all implementations.
  239. base_implementation_type* impl_list_;
  240. };
  241. // The implementation for a specific value type.
  242. template <typename Mutex>
  243. template <typename Traits, typename... Signatures>
  244. struct channel_service<Mutex>::implementation_type : base_implementation_type
  245. {
  246. // The traits type associated with the channel.
  247. typedef typename Traits::template rebind<Signatures...>::other traits_type;
  248. // Type of an element stored in the buffer.
  249. typedef conditional_t<
  250. has_signature<
  251. typename traits_type::receive_cancelled_signature,
  252. Signatures...
  253. >::value,
  254. conditional_t<
  255. has_signature<
  256. typename traits_type::receive_closed_signature,
  257. Signatures...
  258. >::value,
  259. channel_payload<Signatures...>,
  260. channel_payload<
  261. Signatures...,
  262. typename traits_type::receive_closed_signature
  263. >
  264. >,
  265. conditional_t<
  266. has_signature<
  267. typename traits_type::receive_closed_signature,
  268. Signatures...,
  269. typename traits_type::receive_cancelled_signature
  270. >::value,
  271. channel_payload<
  272. Signatures...,
  273. typename traits_type::receive_cancelled_signature
  274. >,
  275. channel_payload<
  276. Signatures...,
  277. typename traits_type::receive_cancelled_signature,
  278. typename traits_type::receive_closed_signature
  279. >
  280. >
  281. > payload_type;
  282. // Move from another buffer.
  283. void buffer_move_from(implementation_type& other)
  284. {
  285. buffer_ = static_cast<
  286. typename traits_type::template container<payload_type>::type&&>(
  287. other.buffer_);
  288. other.buffer_clear();
  289. }
  290. // Get number of buffered elements.
  291. std::size_t buffer_size() const
  292. {
  293. return buffer_.size();
  294. }
  295. // Push a new value to the back of the buffer.
  296. void buffer_push(payload_type payload)
  297. {
  298. buffer_.push_back(static_cast<payload_type&&>(payload));
  299. }
  300. // Push new values to the back of the buffer.
  301. std::size_t buffer_push_n(std::size_t count, payload_type payload)
  302. {
  303. std::size_t i = 0;
  304. for (; i < count && buffer_.size() < this->max_buffer_size_; ++i)
  305. buffer_.push_back(payload);
  306. return i;
  307. }
  308. // Get the element at the front of the buffer.
  309. payload_type buffer_front()
  310. {
  311. return static_cast<payload_type&&>(buffer_.front());
  312. }
  313. // Pop a value from the front of the buffer.
  314. void buffer_pop()
  315. {
  316. buffer_.pop_front();
  317. }
  318. // Clear all buffered values.
  319. void buffer_clear()
  320. {
  321. buffer_.clear();
  322. }
  323. private:
  324. // Buffered values.
  325. typename traits_type::template container<payload_type>::type buffer_;
  326. };
  327. // The implementation for a void value type.
  328. template <typename Mutex>
  329. template <typename Traits, typename R>
  330. struct channel_service<Mutex>::implementation_type<Traits, R()>
  331. : channel_service::base_implementation_type
  332. {
  333. // The traits type associated with the channel.
  334. typedef typename Traits::template rebind<R()>::other traits_type;
  335. // Type of an element stored in the buffer.
  336. typedef conditional_t<
  337. has_signature<
  338. typename traits_type::receive_cancelled_signature,
  339. R()
  340. >::value,
  341. conditional_t<
  342. has_signature<
  343. typename traits_type::receive_closed_signature,
  344. R()
  345. >::value,
  346. channel_payload<R()>,
  347. channel_payload<
  348. R(),
  349. typename traits_type::receive_closed_signature
  350. >
  351. >,
  352. conditional_t<
  353. has_signature<
  354. typename traits_type::receive_closed_signature,
  355. R(),
  356. typename traits_type::receive_cancelled_signature
  357. >::value,
  358. channel_payload<
  359. R(),
  360. typename traits_type::receive_cancelled_signature
  361. >,
  362. channel_payload<
  363. R(),
  364. typename traits_type::receive_cancelled_signature,
  365. typename traits_type::receive_closed_signature
  366. >
  367. >
  368. > payload_type;
  369. // Construct with empty buffer.
  370. implementation_type()
  371. : buffer_(0)
  372. {
  373. }
  374. // Move from another buffer.
  375. void buffer_move_from(implementation_type& other)
  376. {
  377. buffer_ = other.buffer_;
  378. other.buffer_ = 0;
  379. }
  380. // Get number of buffered elements.
  381. std::size_t buffer_size() const
  382. {
  383. return buffer_;
  384. }
  385. // Push a new value to the back of the buffer.
  386. void buffer_push(payload_type)
  387. {
  388. ++buffer_;
  389. }
  390. // Push new values to the back of the buffer.
  391. std::size_t buffer_push_n(std::size_t count, payload_type)
  392. {
  393. std::size_t available = this->max_buffer_size_ - buffer_;
  394. count = (count < available) ? count : available;
  395. buffer_ += count;
  396. return count;
  397. }
  398. // Get the element at the front of the buffer.
  399. payload_type buffer_front()
  400. {
  401. return payload_type(channel_message<R()>(0));
  402. }
  403. // Pop a value from the front of the buffer.
  404. void buffer_pop()
  405. {
  406. --buffer_;
  407. }
  408. // Clear all values from the buffer.
  409. void buffer_clear()
  410. {
  411. buffer_ = 0;
  412. }
  413. private:
  414. // Number of buffered "values".
  415. std::size_t buffer_;
  416. };
  417. // The implementation for an error_code signature.
  418. template <typename Mutex>
  419. template <typename Traits, typename R>
  420. struct channel_service<Mutex>::implementation_type<
  421. Traits, R(boost::system::error_code)>
  422. : channel_service::base_implementation_type
  423. {
  424. // The traits type associated with the channel.
  425. typedef typename Traits::template rebind<R(boost::system::error_code)>::other
  426. traits_type;
  427. // Type of an element stored in the buffer.
  428. typedef conditional_t<
  429. has_signature<
  430. typename traits_type::receive_cancelled_signature,
  431. R(boost::system::error_code)
  432. >::value,
  433. conditional_t<
  434. has_signature<
  435. typename traits_type::receive_closed_signature,
  436. R(boost::system::error_code)
  437. >::value,
  438. channel_payload<R(boost::system::error_code)>,
  439. channel_payload<
  440. R(boost::system::error_code),
  441. typename traits_type::receive_closed_signature
  442. >
  443. >,
  444. conditional_t<
  445. has_signature<
  446. typename traits_type::receive_closed_signature,
  447. R(boost::system::error_code),
  448. typename traits_type::receive_cancelled_signature
  449. >::value,
  450. channel_payload<
  451. R(boost::system::error_code),
  452. typename traits_type::receive_cancelled_signature
  453. >,
  454. channel_payload<
  455. R(boost::system::error_code),
  456. typename traits_type::receive_cancelled_signature,
  457. typename traits_type::receive_closed_signature
  458. >
  459. >
  460. > payload_type;
  461. // Construct with empty buffer.
  462. implementation_type()
  463. : size_(0)
  464. {
  465. first_.count_ = 0;
  466. }
  467. // Move from another buffer.
  468. void buffer_move_from(implementation_type& other)
  469. {
  470. size_ = other.buffer_;
  471. other.size_ = 0;
  472. first_ = other.first_;
  473. other.first.count_ = 0;
  474. rest_ = static_cast<
  475. typename traits_type::template container<buffered_value>::type&&>(
  476. other.rest_);
  477. other.buffer_clear();
  478. }
  479. // Get number of buffered elements.
  480. std::size_t buffer_size() const
  481. {
  482. return size_;
  483. }
  484. // Push a new value to the back of the buffer.
  485. void buffer_push(payload_type payload)
  486. {
  487. buffered_value& last = rest_.empty() ? first_ : rest_.back();
  488. if (last.count_ == 0)
  489. {
  490. value_handler handler{last.value_};
  491. payload.receive(handler);
  492. last.count_ = 1;
  493. }
  494. else
  495. {
  496. boost::system::error_code value{last.value_};
  497. value_handler handler{value};
  498. payload.receive(handler);
  499. if (last.value_ == value)
  500. ++last.count_;
  501. else
  502. rest_.push_back({value, 1});
  503. }
  504. ++size_;
  505. }
  506. // Push new values to the back of the buffer.
  507. std::size_t buffer_push_n(std::size_t count, payload_type payload)
  508. {
  509. std::size_t available = this->max_buffer_size_ - size_;
  510. count = (count < available) ? count : available;
  511. if (count > 0)
  512. {
  513. buffered_value& last = rest_.empty() ? first_ : rest_.back();
  514. if (last.count_ == 0)
  515. {
  516. payload.receive(value_handler{last.value_});
  517. last.count_ = count;
  518. }
  519. else
  520. {
  521. boost::system::error_code value{last.value_};
  522. payload.receive(value_handler{value});
  523. if (last.value_ == value)
  524. last.count_ += count;
  525. else
  526. rest_.push_back({value, count});
  527. }
  528. size_ += count;
  529. }
  530. return count;
  531. }
  532. // Get the element at the front of the buffer.
  533. payload_type buffer_front()
  534. {
  535. return payload_type({0, first_.value_});
  536. }
  537. // Pop a value from the front of the buffer.
  538. void buffer_pop()
  539. {
  540. --size_;
  541. if (--first_.count_ == 0 && !rest_.empty())
  542. {
  543. first_ = rest_.front();
  544. rest_.pop_front();
  545. }
  546. }
  547. // Clear all values from the buffer.
  548. void buffer_clear()
  549. {
  550. size_ = 0;
  551. first_.count_ == 0;
  552. rest_.clear();
  553. }
  554. private:
  555. struct buffered_value
  556. {
  557. boost::system::error_code value_;
  558. std::size_t count_;
  559. };
  560. struct value_handler
  561. {
  562. boost::system::error_code& target_;
  563. template <typename... Args>
  564. void operator()(const boost::system::error_code& value, Args&&...)
  565. {
  566. target_ = value;
  567. }
  568. };
  569. buffered_value& last_value()
  570. {
  571. return rest_.empty() ? first_ : rest_.back();
  572. }
  573. // Total number of buffered values.
  574. std::size_t size_;
  575. // The first buffered value is maintained as a separate data member to avoid
  576. // allocating space in the container in the common case.
  577. buffered_value first_;
  578. // The rest of the buffered values.
  579. typename traits_type::template container<buffered_value>::type rest_;
  580. };
  581. } // namespace detail
  582. } // namespace experimental
  583. } // namespace asio
  584. } // namespace boost
  585. #include <boost/asio/detail/pop_options.hpp>
  586. #include <boost/asio/experimental/detail/impl/channel_service.hpp>
  587. #endif // BOOST_ASIO_EXPERIMENTAL_DETAIL_CHANNEL_SERVICE_HPP