runner.hpp 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. /* Copyright (c) 2018-2023 Marcelo Zimbres Silva (mzimbres@gmail.com)
  2. *
  3. * Distributed under the Boost Software License, Version 1.0. (See
  4. * accompanying file LICENSE.txt)
  5. */
  6. #ifndef BOOST_REDIS_RUNNER_HPP
  7. #define BOOST_REDIS_RUNNER_HPP
  8. #include <boost/redis/detail/health_checker.hpp>
  9. #include <boost/redis/config.hpp>
  10. #include <boost/redis/response.hpp>
  11. #include <boost/redis/detail/helper.hpp>
  12. #include <boost/redis/error.hpp>
  13. #include <boost/redis/logger.hpp>
  14. #include <boost/redis/operation.hpp>
  15. #include <boost/redis/detail/connector.hpp>
  16. #include <boost/redis/detail/resolver.hpp>
  17. #include <boost/redis/detail/handshaker.hpp>
  18. #include <boost/asio/compose.hpp>
  19. #include <boost/asio/connect.hpp>
  20. #include <boost/asio/coroutine.hpp>
  21. #include <boost/asio/experimental/parallel_group.hpp>
  22. #include <boost/asio/ip/tcp.hpp>
  23. #include <boost/asio/steady_timer.hpp>
  24. #include <string>
  25. #include <memory>
  26. #include <chrono>
  27. namespace boost::redis::detail
  28. {
  29. template <class Runner, class Connection, class Logger>
  30. struct hello_op {
  31. Runner* runner_ = nullptr;
  32. Connection* conn_ = nullptr;
  33. Logger logger_;
  34. asio::coroutine coro_{};
  35. template <class Self>
  36. void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
  37. {
  38. BOOST_ASIO_CORO_REENTER (coro_)
  39. {
  40. runner_->hello_req_.clear();
  41. if (runner_->hello_resp_.has_value())
  42. runner_->hello_resp_.value().clear();
  43. runner_->add_hello();
  44. BOOST_ASIO_CORO_YIELD
  45. conn_->async_exec(runner_->hello_req_, runner_->hello_resp_, std::move(self));
  46. logger_.on_hello(ec, runner_->hello_resp_);
  47. if (ec || runner_->has_error_in_response() || is_cancelled(self)) {
  48. logger_.trace("hello-op: error/canceled. Exiting ...");
  49. conn_->cancel(operation::run);
  50. self.complete(!!ec ? ec : asio::error::operation_aborted);
  51. return;
  52. }
  53. self.complete({});
  54. }
  55. }
  56. };
  57. template <class Runner, class Connection, class Logger>
  58. class runner_op {
  59. private:
  60. Runner* runner_ = nullptr;
  61. Connection* conn_ = nullptr;
  62. Logger logger_;
  63. asio::coroutine coro_{};
  64. public:
  65. runner_op(Runner* runner, Connection* conn, Logger l)
  66. : runner_{runner}
  67. , conn_{conn}
  68. , logger_{l}
  69. {}
  70. template <class Self>
  71. void operator()( Self& self
  72. , std::array<std::size_t, 3> order = {}
  73. , system::error_code ec0 = {}
  74. , system::error_code ec1 = {}
  75. , system::error_code ec2 = {}
  76. , std::size_t = 0)
  77. {
  78. BOOST_ASIO_CORO_REENTER (coro_)
  79. {
  80. BOOST_ASIO_CORO_YIELD
  81. asio::experimental::make_parallel_group(
  82. [this](auto token) { return runner_->async_run_all(*conn_, logger_, token); },
  83. [this](auto token) { return runner_->health_checker_.async_check_health(*conn_, logger_, token); },
  84. [this](auto token) { return runner_->async_hello(*conn_, logger_, token); }
  85. ).async_wait(
  86. asio::experimental::wait_for_all(),
  87. std::move(self));
  88. logger_.on_runner(ec0, ec1, ec2);
  89. if (is_cancelled(self)) {
  90. self.complete(asio::error::operation_aborted);
  91. return;
  92. }
  93. if (ec0 == error::connect_timeout || ec0 == error::resolve_timeout) {
  94. self.complete(ec0);
  95. return;
  96. }
  97. if (order[0] == 2 && !!ec2) {
  98. self.complete(ec2);
  99. return;
  100. }
  101. if (order[0] == 1 && ec1 == error::pong_timeout) {
  102. self.complete(ec1);
  103. return;
  104. }
  105. self.complete(ec0);
  106. }
  107. }
  108. };
  109. template <class Runner, class Connection, class Logger>
  110. struct run_all_op {
  111. Runner* runner_ = nullptr;
  112. Connection* conn_ = nullptr;
  113. Logger logger_;
  114. asio::coroutine coro_{};
  115. template <class Self>
  116. void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
  117. {
  118. BOOST_ASIO_CORO_REENTER (coro_)
  119. {
  120. BOOST_ASIO_CORO_YIELD
  121. runner_->resv_.async_resolve(std::move(self));
  122. logger_.on_resolve(ec, runner_->resv_.results());
  123. BOOST_REDIS_CHECK_OP0(conn_->cancel(operation::run);)
  124. BOOST_ASIO_CORO_YIELD
  125. runner_->ctor_.async_connect(conn_->next_layer().next_layer(), runner_->resv_.results(), std::move(self));
  126. logger_.on_connect(ec, runner_->ctor_.endpoint());
  127. BOOST_REDIS_CHECK_OP0(conn_->cancel(operation::run);)
  128. if (conn_->use_ssl()) {
  129. BOOST_ASIO_CORO_YIELD
  130. runner_->hsher_.async_handshake(conn_->next_layer(), std::move(self));
  131. logger_.on_ssl_handshake(ec);
  132. BOOST_REDIS_CHECK_OP0(conn_->cancel(operation::run);)
  133. }
  134. BOOST_ASIO_CORO_YIELD
  135. conn_->async_run_lean(runner_->cfg_, logger_, std::move(self));
  136. BOOST_REDIS_CHECK_OP0(;)
  137. self.complete(ec);
  138. }
  139. }
  140. };
  141. template <class Executor>
  142. class runner {
  143. public:
  144. runner(Executor ex, config cfg)
  145. : resv_{ex}
  146. , ctor_{ex}
  147. , hsher_{ex}
  148. , health_checker_{ex}
  149. , cfg_{cfg}
  150. { }
  151. std::size_t cancel(operation op)
  152. {
  153. resv_.cancel(op);
  154. ctor_.cancel(op);
  155. hsher_.cancel(op);
  156. health_checker_.cancel(op);
  157. return 0U;
  158. }
  159. void set_config(config const& cfg)
  160. {
  161. cfg_ = cfg;
  162. resv_.set_config(cfg);
  163. ctor_.set_config(cfg);
  164. hsher_.set_config(cfg);
  165. health_checker_.set_config(cfg);
  166. }
  167. template <class Connection, class Logger, class CompletionToken>
  168. auto async_run(Connection& conn, Logger l, CompletionToken token)
  169. {
  170. return asio::async_compose
  171. < CompletionToken
  172. , void(system::error_code)
  173. >(runner_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
  174. }
  175. config const& get_config() const noexcept {return cfg_;}
  176. private:
  177. using resolver_type = resolver<Executor>;
  178. using connector_type = connector<Executor>;
  179. using handshaker_type = detail::handshaker<Executor>;
  180. using health_checker_type = health_checker<Executor>;
  181. using timer_type = typename connector_type::timer_type;
  182. template <class, class, class> friend struct run_all_op;
  183. template <class, class, class> friend class runner_op;
  184. template <class, class, class> friend struct hello_op;
  185. template <class Connection, class Logger, class CompletionToken>
  186. auto async_run_all(Connection& conn, Logger l, CompletionToken token)
  187. {
  188. return asio::async_compose
  189. < CompletionToken
  190. , void(system::error_code)
  191. >(run_all_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
  192. }
  193. template <class Connection, class Logger, class CompletionToken>
  194. auto async_hello(Connection& conn, Logger l, CompletionToken token)
  195. {
  196. return asio::async_compose
  197. < CompletionToken
  198. , void(system::error_code)
  199. >(hello_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
  200. }
  201. void add_hello()
  202. {
  203. if (!cfg_.username.empty() && !cfg_.password.empty() && !cfg_.clientname.empty())
  204. hello_req_.push("HELLO", "3", "AUTH", cfg_.username, cfg_.password, "SETNAME", cfg_.clientname);
  205. else if (cfg_.username.empty() && cfg_.password.empty() && cfg_.clientname.empty())
  206. hello_req_.push("HELLO", "3");
  207. else if (cfg_.clientname.empty())
  208. hello_req_.push("HELLO", "3", "AUTH", cfg_.username, cfg_.password);
  209. else
  210. hello_req_.push("HELLO", "3", "SETNAME", cfg_.clientname);
  211. if (cfg_.database_index && cfg_.database_index.value() != 0)
  212. hello_req_.push("SELECT", cfg_.database_index.value());
  213. }
  214. bool has_error_in_response() const noexcept
  215. {
  216. if (!hello_resp_.has_value())
  217. return true;
  218. auto f = [](auto const& e)
  219. {
  220. switch (e.data_type) {
  221. case resp3::type::simple_error:
  222. case resp3::type::blob_error: return true;
  223. default: return false;
  224. }
  225. };
  226. return std::any_of(std::cbegin(hello_resp_.value()), std::cend(hello_resp_.value()), f);
  227. }
  228. resolver_type resv_;
  229. connector_type ctor_;
  230. handshaker_type hsher_;
  231. health_checker_type health_checker_;
  232. request hello_req_;
  233. generic_response hello_resp_;
  234. config cfg_;
  235. };
  236. } // boost::redis::detail
  237. #endif // BOOST_REDIS_RUNNER_HPP