channel_service.hpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622
  1. //
  2. // experimental/detail/impl/channel_service.hpp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef BOOST_ASIO_EXPERIMENTAL_DETAIL_IMPL_CHANNEL_SERVICE_HPP
  11. #define BOOST_ASIO_EXPERIMENTAL_DETAIL_IMPL_CHANNEL_SERVICE_HPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <boost/asio/detail/push_options.hpp>
  16. namespace boost {
  17. namespace asio {
  18. namespace experimental {
  19. namespace detail {
  20. template <typename Mutex>
  21. inline channel_service<Mutex>::channel_service(execution_context& ctx)
  22. : boost::asio::detail::execution_context_service_base<channel_service>(ctx),
  23. mutex_(),
  24. impl_list_(0)
  25. {
  26. }
  27. template <typename Mutex>
  28. inline void channel_service<Mutex>::shutdown()
  29. {
  30. // Abandon all pending operations.
  31. boost::asio::detail::op_queue<channel_operation> ops;
  32. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  33. base_implementation_type* impl = impl_list_;
  34. while (impl)
  35. {
  36. ops.push(impl->waiters_);
  37. impl = impl->next_;
  38. }
  39. }
  40. template <typename Mutex>
  41. inline void channel_service<Mutex>::construct(
  42. channel_service<Mutex>::base_implementation_type& impl,
  43. std::size_t max_buffer_size)
  44. {
  45. impl.max_buffer_size_ = max_buffer_size;
  46. impl.receive_state_ = block;
  47. impl.send_state_ = max_buffer_size ? buffer : block;
  48. // Insert implementation into linked list of all implementations.
  49. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  50. impl.next_ = impl_list_;
  51. impl.prev_ = 0;
  52. if (impl_list_)
  53. impl_list_->prev_ = &impl;
  54. impl_list_ = &impl;
  55. }
  56. template <typename Mutex>
  57. template <typename Traits, typename... Signatures>
  58. void channel_service<Mutex>::destroy(
  59. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl)
  60. {
  61. cancel(impl);
  62. base_destroy(impl);
  63. }
  64. template <typename Mutex>
  65. template <typename Traits, typename... Signatures>
  66. void channel_service<Mutex>::move_construct(
  67. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
  68. channel_service<Mutex>::implementation_type<
  69. Traits, Signatures...>& other_impl)
  70. {
  71. impl.max_buffer_size_ = other_impl.max_buffer_size_;
  72. impl.receive_state_ = other_impl.receive_state_;
  73. other_impl.receive_state_ = block;
  74. impl.send_state_ = other_impl.send_state_;
  75. other_impl.send_state_ = other_impl.max_buffer_size_ ? buffer : block;
  76. impl.buffer_move_from(other_impl);
  77. // Insert implementation into linked list of all implementations.
  78. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  79. impl.next_ = impl_list_;
  80. impl.prev_ = 0;
  81. if (impl_list_)
  82. impl_list_->prev_ = &impl;
  83. impl_list_ = &impl;
  84. }
  85. template <typename Mutex>
  86. template <typename Traits, typename... Signatures>
  87. void channel_service<Mutex>::move_assign(
  88. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
  89. channel_service& other_service,
  90. channel_service<Mutex>::implementation_type<
  91. Traits, Signatures...>& other_impl)
  92. {
  93. cancel(impl);
  94. if (this != &other_service)
  95. {
  96. // Remove implementation from linked list of all implementations.
  97. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  98. if (impl_list_ == &impl)
  99. impl_list_ = impl.next_;
  100. if (impl.prev_)
  101. impl.prev_->next_ = impl.next_;
  102. if (impl.next_)
  103. impl.next_->prev_= impl.prev_;
  104. impl.next_ = 0;
  105. impl.prev_ = 0;
  106. }
  107. impl.max_buffer_size_ = other_impl.max_buffer_size_;
  108. impl.receive_state_ = other_impl.receive_state_;
  109. other_impl.receive_state_ = block;
  110. impl.send_state_ = other_impl.send_state_;
  111. other_impl.send_state_ = other_impl.max_buffer_size_ ? buffer : block;
  112. impl.buffer_move_from(other_impl);
  113. if (this != &other_service)
  114. {
  115. // Insert implementation into linked list of all implementations.
  116. boost::asio::detail::mutex::scoped_lock lock(other_service.mutex_);
  117. impl.next_ = other_service.impl_list_;
  118. impl.prev_ = 0;
  119. if (other_service.impl_list_)
  120. other_service.impl_list_->prev_ = &impl;
  121. other_service.impl_list_ = &impl;
  122. }
  123. }
  124. template <typename Mutex>
  125. inline void channel_service<Mutex>::base_destroy(
  126. channel_service<Mutex>::base_implementation_type& impl)
  127. {
  128. // Remove implementation from linked list of all implementations.
  129. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  130. if (impl_list_ == &impl)
  131. impl_list_ = impl.next_;
  132. if (impl.prev_)
  133. impl.prev_->next_ = impl.next_;
  134. if (impl.next_)
  135. impl.next_->prev_= impl.prev_;
  136. impl.next_ = 0;
  137. impl.prev_ = 0;
  138. }
  139. template <typename Mutex>
  140. inline std::size_t channel_service<Mutex>::capacity(
  141. const channel_service<Mutex>::base_implementation_type& impl)
  142. const noexcept
  143. {
  144. typename Mutex::scoped_lock lock(impl.mutex_);
  145. return impl.max_buffer_size_;
  146. }
  147. template <typename Mutex>
  148. inline bool channel_service<Mutex>::is_open(
  149. const channel_service<Mutex>::base_implementation_type& impl)
  150. const noexcept
  151. {
  152. typename Mutex::scoped_lock lock(impl.mutex_);
  153. return impl.send_state_ != closed;
  154. }
  155. template <typename Mutex>
  156. template <typename Traits, typename... Signatures>
  157. void channel_service<Mutex>::reset(
  158. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl)
  159. {
  160. cancel(impl);
  161. typename Mutex::scoped_lock lock(impl.mutex_);
  162. impl.receive_state_ = block;
  163. impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
  164. impl.buffer_clear();
  165. }
  166. template <typename Mutex>
  167. template <typename Traits, typename... Signatures>
  168. void channel_service<Mutex>::close(
  169. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl)
  170. {
  171. typedef typename implementation_type<Traits,
  172. Signatures...>::traits_type traits_type;
  173. typedef typename implementation_type<Traits,
  174. Signatures...>::payload_type payload_type;
  175. typename Mutex::scoped_lock lock(impl.mutex_);
  176. if (impl.receive_state_ == block)
  177. {
  178. while (channel_operation* op = impl.waiters_.front())
  179. {
  180. impl.waiters_.pop();
  181. traits_type::invoke_receive_closed(
  182. post_receive<payload_type,
  183. typename traits_type::receive_closed_signature>(
  184. static_cast<channel_receive<payload_type>*>(op)));
  185. }
  186. }
  187. impl.send_state_ = closed;
  188. if (impl.receive_state_ != buffer)
  189. impl.receive_state_ = closed;
  190. }
  191. template <typename Mutex>
  192. template <typename Traits, typename... Signatures>
  193. void channel_service<Mutex>::cancel(
  194. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl)
  195. {
  196. typedef typename implementation_type<Traits,
  197. Signatures...>::traits_type traits_type;
  198. typedef typename implementation_type<Traits,
  199. Signatures...>::payload_type payload_type;
  200. typename Mutex::scoped_lock lock(impl.mutex_);
  201. while (channel_operation* op = impl.waiters_.front())
  202. {
  203. if (impl.send_state_ == block)
  204. {
  205. impl.waiters_.pop();
  206. static_cast<channel_send<payload_type>*>(op)->cancel();
  207. }
  208. else
  209. {
  210. impl.waiters_.pop();
  211. traits_type::invoke_receive_cancelled(
  212. post_receive<payload_type,
  213. typename traits_type::receive_cancelled_signature>(
  214. static_cast<channel_receive<payload_type>*>(op)));
  215. }
  216. }
  217. if (impl.receive_state_ == waiter)
  218. impl.receive_state_ = block;
  219. if (impl.send_state_ == waiter)
  220. impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
  221. }
  222. template <typename Mutex>
  223. template <typename Traits, typename... Signatures>
  224. void channel_service<Mutex>::cancel_by_key(
  225. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
  226. void* cancellation_key)
  227. {
  228. typedef typename implementation_type<Traits,
  229. Signatures...>::traits_type traits_type;
  230. typedef typename implementation_type<Traits,
  231. Signatures...>::payload_type payload_type;
  232. typename Mutex::scoped_lock lock(impl.mutex_);
  233. boost::asio::detail::op_queue<channel_operation> other_ops;
  234. while (channel_operation* op = impl.waiters_.front())
  235. {
  236. if (op->cancellation_key_ == cancellation_key)
  237. {
  238. if (impl.send_state_ == block)
  239. {
  240. impl.waiters_.pop();
  241. static_cast<channel_send<payload_type>*>(op)->cancel();
  242. }
  243. else
  244. {
  245. impl.waiters_.pop();
  246. traits_type::invoke_receive_cancelled(
  247. post_receive<payload_type,
  248. typename traits_type::receive_cancelled_signature>(
  249. static_cast<channel_receive<payload_type>*>(op)));
  250. }
  251. }
  252. else
  253. {
  254. impl.waiters_.pop();
  255. other_ops.push(op);
  256. }
  257. }
  258. impl.waiters_.push(other_ops);
  259. if (impl.waiters_.empty())
  260. {
  261. if (impl.receive_state_ == waiter)
  262. impl.receive_state_ = block;
  263. if (impl.send_state_ == waiter)
  264. impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
  265. }
  266. }
  267. template <typename Mutex>
  268. inline bool channel_service<Mutex>::ready(
  269. const channel_service<Mutex>::base_implementation_type& impl)
  270. const noexcept
  271. {
  272. typename Mutex::scoped_lock lock(impl.mutex_);
  273. return impl.receive_state_ != block;
  274. }
  275. template <typename Mutex>
  276. template <typename Message, typename Traits,
  277. typename... Signatures, typename... Args>
  278. bool channel_service<Mutex>::try_send(
  279. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
  280. bool via_dispatch, Args&&... args)
  281. {
  282. typedef typename implementation_type<Traits,
  283. Signatures...>::payload_type payload_type;
  284. typename Mutex::scoped_lock lock(impl.mutex_);
  285. switch (impl.send_state_)
  286. {
  287. case block:
  288. {
  289. return false;
  290. }
  291. case buffer:
  292. {
  293. impl.buffer_push(Message(0, static_cast<Args&&>(args)...));
  294. impl.receive_state_ = buffer;
  295. if (impl.buffer_size() == impl.max_buffer_size_)
  296. impl.send_state_ = block;
  297. return true;
  298. }
  299. case waiter:
  300. {
  301. payload_type payload(Message(0, static_cast<Args&&>(args)...));
  302. channel_receive<payload_type>* receive_op =
  303. static_cast<channel_receive<payload_type>*>(impl.waiters_.front());
  304. impl.waiters_.pop();
  305. if (impl.waiters_.empty())
  306. impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
  307. lock.unlock();
  308. if (via_dispatch)
  309. receive_op->dispatch(static_cast<payload_type&&>(payload));
  310. else
  311. receive_op->post(static_cast<payload_type&&>(payload));
  312. return true;
  313. }
  314. case closed:
  315. default:
  316. {
  317. return false;
  318. }
  319. }
  320. }
  321. template <typename Mutex>
  322. template <typename Message, typename Traits,
  323. typename... Signatures, typename... Args>
  324. std::size_t channel_service<Mutex>::try_send_n(
  325. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
  326. std::size_t count, bool via_dispatch, Args&&... args)
  327. {
  328. typedef typename implementation_type<Traits,
  329. Signatures...>::payload_type payload_type;
  330. typename Mutex::scoped_lock lock(impl.mutex_);
  331. if (count == 0)
  332. return 0;
  333. switch (impl.send_state_)
  334. {
  335. case block:
  336. return 0;
  337. case buffer:
  338. case waiter:
  339. break;
  340. case closed:
  341. default:
  342. return 0;
  343. }
  344. payload_type payload(Message(0, static_cast<Args&&>(args)...));
  345. for (std::size_t i = 0; i < count; ++i)
  346. {
  347. switch (impl.send_state_)
  348. {
  349. case block:
  350. {
  351. return i;
  352. }
  353. case buffer:
  354. {
  355. i += impl.buffer_push_n(count - i,
  356. static_cast<payload_type&&>(payload));
  357. impl.receive_state_ = buffer;
  358. if (impl.buffer_size() == impl.max_buffer_size_)
  359. impl.send_state_ = block;
  360. return i;
  361. }
  362. case waiter:
  363. {
  364. channel_receive<payload_type>* receive_op =
  365. static_cast<channel_receive<payload_type>*>(impl.waiters_.front());
  366. impl.waiters_.pop();
  367. if (impl.waiters_.empty())
  368. impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
  369. lock.unlock();
  370. if (via_dispatch)
  371. receive_op->dispatch(payload);
  372. else
  373. receive_op->post(payload);
  374. break;
  375. }
  376. case closed:
  377. default:
  378. {
  379. return i;
  380. }
  381. }
  382. }
  383. return count;
  384. }
  385. template <typename Mutex>
  386. template <typename Traits, typename... Signatures>
  387. void channel_service<Mutex>::start_send_op(
  388. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
  389. channel_send<typename implementation_type<
  390. Traits, Signatures...>::payload_type>* send_op)
  391. {
  392. typedef typename implementation_type<Traits,
  393. Signatures...>::payload_type payload_type;
  394. typename Mutex::scoped_lock lock(impl.mutex_);
  395. switch (impl.send_state_)
  396. {
  397. case block:
  398. {
  399. impl.waiters_.push(send_op);
  400. if (impl.receive_state_ == block)
  401. impl.receive_state_ = waiter;
  402. return;
  403. }
  404. case buffer:
  405. {
  406. impl.buffer_push(send_op->get_payload());
  407. impl.receive_state_ = buffer;
  408. if (impl.buffer_size() == impl.max_buffer_size_)
  409. impl.send_state_ = block;
  410. send_op->immediate();
  411. break;
  412. }
  413. case waiter:
  414. {
  415. channel_receive<payload_type>* receive_op =
  416. static_cast<channel_receive<payload_type>*>(impl.waiters_.front());
  417. impl.waiters_.pop();
  418. if (impl.waiters_.empty())
  419. impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
  420. receive_op->post(send_op->get_payload());
  421. send_op->immediate();
  422. break;
  423. }
  424. case closed:
  425. default:
  426. {
  427. send_op->close();
  428. break;
  429. }
  430. }
  431. }
  432. template <typename Mutex>
  433. template <typename Traits, typename... Signatures, typename Handler>
  434. bool channel_service<Mutex>::try_receive(
  435. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
  436. Handler&& handler)
  437. {
  438. typedef typename implementation_type<Traits,
  439. Signatures...>::payload_type payload_type;
  440. typename Mutex::scoped_lock lock(impl.mutex_);
  441. switch (impl.receive_state_)
  442. {
  443. case block:
  444. {
  445. return false;
  446. }
  447. case buffer:
  448. {
  449. payload_type payload(impl.buffer_front());
  450. if (channel_send<payload_type>* send_op =
  451. static_cast<channel_send<payload_type>*>(impl.waiters_.front()))
  452. {
  453. impl.buffer_pop();
  454. impl.buffer_push(send_op->get_payload());
  455. impl.waiters_.pop();
  456. send_op->post();
  457. }
  458. else
  459. {
  460. impl.buffer_pop();
  461. if (impl.buffer_size() == 0)
  462. impl.receive_state_ = (impl.send_state_ == closed) ? closed : block;
  463. impl.send_state_ = (impl.send_state_ == closed) ? closed : buffer;
  464. }
  465. lock.unlock();
  466. boost::asio::detail::non_const_lvalue<Handler> handler2(handler);
  467. channel_handler<payload_type, decay_t<Handler>>(
  468. static_cast<payload_type&&>(payload), handler2.value)();
  469. return true;
  470. }
  471. case waiter:
  472. {
  473. channel_send<payload_type>* send_op =
  474. static_cast<channel_send<payload_type>*>(impl.waiters_.front());
  475. payload_type payload = send_op->get_payload();
  476. impl.waiters_.pop();
  477. if (impl.waiters_.front() == 0)
  478. impl.receive_state_ = (impl.send_state_ == closed) ? closed : block;
  479. send_op->post();
  480. lock.unlock();
  481. boost::asio::detail::non_const_lvalue<Handler> handler2(handler);
  482. channel_handler<payload_type, decay_t<Handler>>(
  483. static_cast<payload_type&&>(payload), handler2.value)();
  484. return true;
  485. }
  486. case closed:
  487. default:
  488. {
  489. return false;
  490. }
  491. }
  492. }
  493. template <typename Mutex>
  494. template <typename Traits, typename... Signatures>
  495. void channel_service<Mutex>::start_receive_op(
  496. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
  497. channel_receive<typename implementation_type<
  498. Traits, Signatures...>::payload_type>* receive_op)
  499. {
  500. typedef typename implementation_type<Traits,
  501. Signatures...>::traits_type traits_type;
  502. typedef typename implementation_type<Traits,
  503. Signatures...>::payload_type payload_type;
  504. typename Mutex::scoped_lock lock(impl.mutex_);
  505. switch (impl.receive_state_)
  506. {
  507. case block:
  508. {
  509. impl.waiters_.push(receive_op);
  510. if (impl.send_state_ != closed)
  511. impl.send_state_ = waiter;
  512. return;
  513. }
  514. case buffer:
  515. {
  516. payload_type payload(
  517. static_cast<payload_type&&>(impl.buffer_front()));
  518. if (channel_send<payload_type>* send_op =
  519. static_cast<channel_send<payload_type>*>(impl.waiters_.front()))
  520. {
  521. impl.buffer_pop();
  522. impl.buffer_push(send_op->get_payload());
  523. impl.waiters_.pop();
  524. send_op->post();
  525. }
  526. else
  527. {
  528. impl.buffer_pop();
  529. if (impl.buffer_size() == 0)
  530. impl.receive_state_ = (impl.send_state_ == closed) ? closed : block;
  531. impl.send_state_ = (impl.send_state_ == closed) ? closed : buffer;
  532. }
  533. receive_op->immediate(static_cast<payload_type&&>(payload));
  534. break;
  535. }
  536. case waiter:
  537. {
  538. channel_send<payload_type>* send_op =
  539. static_cast<channel_send<payload_type>*>(impl.waiters_.front());
  540. payload_type payload = send_op->get_payload();
  541. impl.waiters_.pop();
  542. if (impl.waiters_.front() == 0)
  543. impl.receive_state_ = (impl.send_state_ == closed) ? closed : block;
  544. send_op->post();
  545. receive_op->immediate(static_cast<payload_type&&>(payload));
  546. break;
  547. }
  548. case closed:
  549. default:
  550. {
  551. traits_type::invoke_receive_closed(
  552. post_receive<payload_type,
  553. typename traits_type::receive_closed_signature>(receive_op));
  554. break;
  555. }
  556. }
  557. }
  558. } // namespace detail
  559. } // namespace experimental
  560. } // namespace asio
  561. } // namespace boost
  562. #include <boost/asio/detail/pop_options.hpp>
  563. #endif // BOOST_ASIO_EXPERIMENTAL_DETAIL_IMPL_CHANNEL_SERVICE_HPP