stream.hpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509
  1. //
  2. // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // Official repository: https://github.com/boostorg/beast
  8. //
  9. #ifndef BOOST_BEAST_TEST_IMPL_STREAM_HPP
  10. #define BOOST_BEAST_TEST_IMPL_STREAM_HPP
  11. #include <boost/beast/core/bind_handler.hpp>
  12. #include <boost/beast/core/buffer_traits.hpp>
  13. #include <boost/beast/core/detail/service_base.hpp>
  14. #include <boost/beast/core/detail/is_invocable.hpp>
  15. #include <boost/asio/any_io_executor.hpp>
  16. #include <boost/asio/dispatch.hpp>
  17. #include <boost/asio/post.hpp>
  18. #include <mutex>
  19. #include <stdexcept>
  20. #include <vector>
  21. namespace boost {
  22. namespace beast {
  23. namespace test {
  24. //------------------------------------------------------------------------------
  25. template<class Executor>
  26. template<class Handler, class Buffers>
  27. class basic_stream<Executor>::read_op : public detail::stream_read_op_base
  28. {
  29. struct lambda
  30. {
  31. Handler h_;
  32. boost::weak_ptr<detail::stream_state> wp_;
  33. Buffers b_;
  34. #if defined(BOOST_ASIO_NO_TS_EXECUTORS)
  35. net::any_io_executor wg2_;
  36. #else // defined(BOOST_ASIO_NO_TS_EXECUTORS)
  37. net::executor_work_guard<
  38. net::associated_executor_t<Handler, net::any_io_executor>> wg2_;
  39. #endif // defined(BOOST_ASIO_NO_TS_EXECUTORS)
  40. lambda(lambda&&) = default;
  41. lambda(lambda const&) = default;
  42. template<class Handler_>
  43. lambda(
  44. Handler_&& h,
  45. boost::shared_ptr<detail::stream_state> const& s,
  46. Buffers const& b)
  47. : h_(std::forward<Handler_>(h))
  48. , wp_(s)
  49. , b_(b)
  50. #if defined(BOOST_ASIO_NO_TS_EXECUTORS)
  51. , wg2_(net::prefer(
  52. net::get_associated_executor(
  53. h_, s->exec),
  54. net::execution::outstanding_work.tracked))
  55. #else // defined(BOOST_ASIO_NO_TS_EXECUTORS)
  56. , wg2_(net::get_associated_executor(
  57. h_, s->exec))
  58. #endif // defined(BOOST_ASIO_NO_TS_EXECUTORS)
  59. {
  60. }
  61. using allocator_type = net::associated_allocator_t<Handler>;
  62. allocator_type get_allocator() const noexcept
  63. {
  64. return net::get_associated_allocator(h_);
  65. }
  66. using cancellation_slot_type =
  67. net::associated_cancellation_slot_t<Handler>;
  68. cancellation_slot_type
  69. get_cancellation_slot() const noexcept
  70. {
  71. return net::get_associated_cancellation_slot(h_,
  72. net::cancellation_slot());
  73. }
  74. void
  75. operator()(error_code ec)
  76. {
  77. std::size_t bytes_transferred = 0;
  78. auto sp = wp_.lock();
  79. if(! sp)
  80. {
  81. BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
  82. }
  83. if(! ec)
  84. {
  85. std::lock_guard<std::mutex> lock(sp->m);
  86. BOOST_ASSERT(! sp->op);
  87. if(sp->b.size() > 0)
  88. {
  89. bytes_transferred =
  90. net::buffer_copy(
  91. b_, sp->b.data(), sp->read_max);
  92. sp->b.consume(bytes_transferred);
  93. sp->nread_bytes += bytes_transferred;
  94. }
  95. else if (buffer_bytes(b_) > 0)
  96. {
  97. BOOST_BEAST_ASSIGN_EC(ec, net::error::eof);
  98. }
  99. }
  100. #if defined(BOOST_ASIO_NO_TS_EXECUTORS)
  101. net::dispatch(wg2_,
  102. beast::bind_front_handler(std::move(h_),
  103. ec, bytes_transferred));
  104. wg2_ = net::any_io_executor(); // probably unnecessary
  105. #else // defined(BOOST_ASIO_NO_TS_EXECUTORS)
  106. net::dispatch(wg2_.get_executor(),
  107. beast::bind_front_handler(std::move(h_),
  108. ec, bytes_transferred));
  109. wg2_.reset();
  110. #endif // defined(BOOST_ASIO_NO_TS_EXECUTORS)
  111. }
  112. };
  113. lambda fn_;
  114. #if defined(BOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT)
  115. net::executor_work_guard<net::any_io_executor> wg1_;
  116. #else
  117. net::any_io_executor wg1_;
  118. #endif
  119. public:
  120. template<class Handler_>
  121. read_op(
  122. Handler_&& h,
  123. boost::shared_ptr<detail::stream_state> const& s,
  124. Buffers const& b)
  125. : fn_(std::forward<Handler_>(h), s, b)
  126. #if defined(BOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT)
  127. , wg1_(s->exec)
  128. #else
  129. , wg1_(net::prefer(s->exec,
  130. net::execution::outstanding_work.tracked))
  131. #endif
  132. {
  133. }
  134. void
  135. operator()(error_code ec) override
  136. {
  137. #if defined(BOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT)
  138. net::post(wg1_.get_executor(),
  139. beast::bind_front_handler(std::move(fn_), ec));
  140. wg1_.reset();
  141. #else
  142. net::post(wg1_, beast::bind_front_handler(std::move(fn_), ec));
  143. wg1_ = net::any_io_executor(); // probably unnecessary
  144. #endif
  145. }
  146. };
  147. template<class Executor>
  148. struct basic_stream<Executor>::run_read_op
  149. {
  150. template<
  151. class ReadHandler,
  152. class MutableBufferSequence>
  153. void
  154. operator()(
  155. ReadHandler&& h,
  156. boost::shared_ptr<detail::stream_state> const& in,
  157. MutableBufferSequence const& buffers)
  158. {
  159. // If you get an error on the following line it means
  160. // that your handler does not meet the documented type
  161. // requirements for the handler.
  162. static_assert(
  163. beast::detail::is_invocable<ReadHandler,
  164. void(error_code, std::size_t)>::value,
  165. "ReadHandler type requirements not met");
  166. initiate_read(
  167. in,
  168. std::unique_ptr<detail::stream_read_op_base>{
  169. new read_op<
  170. typename std::decay<ReadHandler>::type,
  171. MutableBufferSequence>(
  172. std::move(h),
  173. in,
  174. buffers)},
  175. buffer_bytes(buffers));
  176. }
  177. };
  178. template<class Executor>
  179. struct basic_stream<Executor>::run_write_op
  180. {
  181. template<
  182. class WriteHandler,
  183. class ConstBufferSequence>
  184. void
  185. operator()(
  186. WriteHandler&& h,
  187. boost::shared_ptr<detail::stream_state> in_,
  188. boost::weak_ptr<detail::stream_state> out_,
  189. ConstBufferSequence const& buffers)
  190. {
  191. // If you get an error on the following line it means
  192. // that your handler does not meet the documented type
  193. // requirements for the handler.
  194. static_assert(
  195. beast::detail::is_invocable<WriteHandler,
  196. void(error_code, std::size_t)>::value,
  197. "WriteHandler type requirements not met");
  198. ++in_->nwrite;
  199. auto const upcall = [&](error_code ec, std::size_t n)
  200. {
  201. net::post(
  202. in_->exec,
  203. beast::bind_front_handler(std::move(h), ec, n));
  204. };
  205. // test failure
  206. error_code ec;
  207. std::size_t n = 0;
  208. if(in_->fc && in_->fc->fail(ec))
  209. return upcall(ec, n);
  210. // A request to write 0 bytes to a stream is a no-op.
  211. if(buffer_bytes(buffers) == 0)
  212. return upcall(ec, n);
  213. // connection closed
  214. auto out = out_.lock();
  215. if(! out)
  216. return upcall(net::error::connection_reset, n);
  217. // copy buffers
  218. n = std::min<std::size_t>(
  219. buffer_bytes(buffers), in_->write_max);
  220. {
  221. std::lock_guard<std::mutex> lock(out->m);
  222. n = net::buffer_copy(out->b.prepare(n), buffers);
  223. out->b.commit(n);
  224. out->nwrite_bytes += n;
  225. out->notify_read();
  226. }
  227. BOOST_ASSERT(! ec);
  228. upcall(ec, n);
  229. }
  230. };
  231. //------------------------------------------------------------------------------
  232. template<class Executor>
  233. template<class MutableBufferSequence>
  234. std::size_t
  235. basic_stream<Executor>::
  236. read_some(MutableBufferSequence const& buffers)
  237. {
  238. static_assert(net::is_mutable_buffer_sequence<
  239. MutableBufferSequence>::value,
  240. "MutableBufferSequence type requirements not met");
  241. error_code ec;
  242. auto const n = read_some(buffers, ec);
  243. if(ec)
  244. BOOST_THROW_EXCEPTION(system_error{ec});
  245. return n;
  246. }
  247. template<class Executor>
  248. template<class MutableBufferSequence>
  249. std::size_t
  250. basic_stream<Executor>::
  251. read_some(MutableBufferSequence const& buffers,
  252. error_code& ec)
  253. {
  254. static_assert(net::is_mutable_buffer_sequence<
  255. MutableBufferSequence>::value,
  256. "MutableBufferSequence type requirements not met");
  257. ++in_->nread;
  258. // test failure
  259. if(in_->fc && in_->fc->fail(ec))
  260. return 0;
  261. // A request to read 0 bytes from a stream is a no-op.
  262. if(buffer_bytes(buffers) == 0)
  263. {
  264. ec = {};
  265. return 0;
  266. }
  267. std::unique_lock<std::mutex> lock{in_->m};
  268. BOOST_ASSERT(! in_->op);
  269. in_->cv.wait(lock,
  270. [&]()
  271. {
  272. return
  273. in_->b.size() > 0 ||
  274. in_->code != detail::stream_status::ok;
  275. });
  276. // deliver bytes before eof
  277. if(in_->b.size() > 0)
  278. {
  279. auto const n = net::buffer_copy(
  280. buffers, in_->b.data(), in_->read_max);
  281. in_->b.consume(n);
  282. in_->nread_bytes += n;
  283. return n;
  284. }
  285. // deliver error
  286. BOOST_ASSERT(in_->code != detail::stream_status::ok);
  287. BOOST_BEAST_ASSIGN_EC(ec, net::error::eof);
  288. return 0;
  289. }
  290. template<class Executor>
  291. template<class MutableBufferSequence,
  292. BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, std::size_t)) ReadHandler>
  293. BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(ReadHandler, void(error_code, std::size_t))
  294. basic_stream<Executor>::
  295. async_read_some(
  296. MutableBufferSequence const& buffers,
  297. ReadHandler&& handler)
  298. {
  299. static_assert(net::is_mutable_buffer_sequence<
  300. MutableBufferSequence>::value,
  301. "MutableBufferSequence type requirements not met");
  302. return net::async_initiate<
  303. ReadHandler,
  304. void(error_code, std::size_t)>(
  305. run_read_op{},
  306. handler,
  307. in_,
  308. buffers);
  309. }
  310. template<class Executor>
  311. template<class ConstBufferSequence>
  312. std::size_t
  313. basic_stream<Executor>::
  314. write_some(ConstBufferSequence const& buffers)
  315. {
  316. static_assert(net::is_const_buffer_sequence<
  317. ConstBufferSequence>::value,
  318. "ConstBufferSequence type requirements not met");
  319. error_code ec;
  320. auto const bytes_transferred =
  321. write_some(buffers, ec);
  322. if(ec)
  323. BOOST_THROW_EXCEPTION(system_error{ec});
  324. return bytes_transferred;
  325. }
  326. template<class Executor>
  327. template<class ConstBufferSequence>
  328. std::size_t
  329. basic_stream<Executor>::
  330. write_some(
  331. ConstBufferSequence const& buffers, error_code& ec)
  332. {
  333. static_assert(net::is_const_buffer_sequence<
  334. ConstBufferSequence>::value,
  335. "ConstBufferSequence type requirements not met");
  336. ++in_->nwrite;
  337. // test failure
  338. if(in_->fc && in_->fc->fail(ec))
  339. return 0;
  340. // A request to write 0 bytes to a stream is a no-op.
  341. if(buffer_bytes(buffers) == 0)
  342. {
  343. ec = {};
  344. return 0;
  345. }
  346. // connection closed
  347. auto out = out_.lock();
  348. if(! out)
  349. {
  350. BOOST_BEAST_ASSIGN_EC(ec, net::error::connection_reset);
  351. return 0;
  352. }
  353. // copy buffers
  354. auto n = std::min<std::size_t>(
  355. buffer_bytes(buffers), in_->write_max);
  356. {
  357. std::lock_guard<std::mutex> lock(out->m);
  358. n = net::buffer_copy(out->b.prepare(n), buffers);
  359. out->b.commit(n);
  360. out->nwrite_bytes += n;
  361. out->notify_read();
  362. }
  363. return n;
  364. }
  365. template<class Executor>
  366. template<class ConstBufferSequence,
  367. BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, std::size_t)) WriteHandler>
  368. BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(WriteHandler, void(error_code, std::size_t))
  369. basic_stream<Executor>::
  370. async_write_some(
  371. ConstBufferSequence const& buffers,
  372. WriteHandler&& handler)
  373. {
  374. static_assert(net::is_const_buffer_sequence<
  375. ConstBufferSequence>::value,
  376. "ConstBufferSequence type requirements not met");
  377. return net::async_initiate<
  378. WriteHandler,
  379. void(error_code, std::size_t)>(
  380. run_write_op{},
  381. handler,
  382. in_,
  383. out_,
  384. buffers);
  385. }
  386. //------------------------------------------------------------------------------
  387. template<class Executor, class TeardownHandler>
  388. void
  389. async_teardown(
  390. role_type,
  391. basic_stream<Executor>& s,
  392. TeardownHandler&& handler)
  393. {
  394. error_code ec;
  395. if( s.in_->fc &&
  396. s.in_->fc->fail(ec))
  397. return net::post(
  398. s.get_executor(),
  399. beast::bind_front_handler(
  400. std::move(handler), ec));
  401. s.close();
  402. if( s.in_->fc &&
  403. s.in_->fc->fail(ec))
  404. {
  405. BOOST_BEAST_ASSIGN_EC(ec, net::error::eof);
  406. }
  407. else
  408. ec = {};
  409. net::post(
  410. s.get_executor(),
  411. beast::bind_front_handler(
  412. std::move(handler), ec));
  413. }
  414. //------------------------------------------------------------------------------
  415. template<class Executor, class Arg1, class... ArgN>
  416. basic_stream<Executor>
  417. connect(stream& to, Arg1&& arg1, ArgN&&... argn)
  418. {
  419. stream from{
  420. std::forward<Arg1>(arg1),
  421. std::forward<ArgN>(argn)...};
  422. from.connect(to);
  423. return from;
  424. }
  425. namespace detail
  426. {
  427. template<class To>
  428. struct extract_executor_op
  429. {
  430. To operator()(net::any_io_executor& ex) const
  431. {
  432. assert(ex.template target<To>());
  433. return *ex.template target<To>();
  434. }
  435. };
  436. template<>
  437. struct extract_executor_op<net::any_io_executor>
  438. {
  439. net::any_io_executor operator()(net::any_io_executor& ex) const
  440. {
  441. return ex;
  442. }
  443. };
  444. }
  445. template<class Executor>
  446. auto basic_stream<Executor>::get_executor() noexcept -> executor_type
  447. {
  448. return detail::extract_executor_op<Executor>()(in_->exec);
  449. }
  450. } // test
  451. } // beast
  452. } // boost
  453. #endif