client.h 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. /////////////////////////////////////////////////////////////////////////////
  2. /// @file client.h
  3. /// Declaration of MQTT client class
  4. /// @date May 1, 2013
  5. /// @author Frank Pagliughi
  6. /////////////////////////////////////////////////////////////////////////////
  7. /*******************************************************************************
  8. * Copyright (c) 2013-2023 Frank Pagliughi <fpagliughi@mindspring.com>
  9. *
  10. * All rights reserved. This program and the accompanying materials
  11. * are made available under the terms of the Eclipse Public License v2.0
  12. * and Eclipse Distribution License v1.0 which accompany this distribution.
  13. *
  14. * The Eclipse Public License is available at
  15. * http://www.eclipse.org/legal/epl-v20.html
  16. * and the Eclipse Distribution License is available at
  17. * http://www.eclipse.org/org/documents/edl-v10.php.
  18. *
  19. * Contributors:
  20. * Frank Pagliughi - initial implementation and documentation
  21. *******************************************************************************/
  22. #ifndef __mqtt_client_h
  23. #define __mqtt_client_h
  24. #include "mqtt/async_client.h"
  25. #include <future>
  26. namespace mqtt {
  27. /////////////////////////////////////////////////////////////////////////////
  28. /**
  29. * Lightweight client for talking to an MQTT server using methods that block
  30. * until an operation completes.
  31. */
  32. class client : private callback
  33. {
  34. /** An arbitrary, but relatively long timeout */
  35. PAHO_MQTTPP_EXPORT static const std::chrono::seconds DFLT_TIMEOUT;
  36. /** The default quality of service */
  37. PAHO_MQTTPP_EXPORT static const int DFLT_QOS; // =1;
  38. /** The actual client */
  39. async_client cli_;
  40. /** The longest time to wait for an operation to complete. */
  41. std::chrono::milliseconds timeout_;
  42. /** Callback supplied by the user (if any) */
  43. callback* userCallback_;
  44. /**
  45. * Creates a shared pointer to an existing non-heap object.
  46. * The shared pointer is given a no-op deleter, so it will not try to
  47. * destroy the object when it goes out of scope. It is up to the caller
  48. * to ensure that the object remains in memory for as long as there may
  49. * be pointers to it.
  50. * @param val A value which may live anywhere in memory (stack,
  51. * file-scope, etc).
  52. * @return A shared pointer to the object.
  53. */
  54. template <typename T>
  55. std::shared_ptr<T> ptr(const T& val) {
  56. return std::shared_ptr<T>(const_cast<T*>(&val), [](T*){});
  57. }
  58. // User callbacks
  59. // Most are launched in a separate thread, for convenience, except
  60. // message_arrived, for performance.
  61. void connected(const string& cause) override {
  62. std::async(std::launch::async, &callback::connected, userCallback_, cause).wait();
  63. }
  64. void connection_lost(const string& cause) override {
  65. std::async(std::launch::async,
  66. &callback::connection_lost, userCallback_, cause).wait();
  67. }
  68. void message_arrived(const_message_ptr msg) override {
  69. userCallback_->message_arrived(msg);
  70. }
  71. void delivery_complete(delivery_token_ptr tok) override {
  72. std::async(std::launch::async, &callback::delivery_complete, userCallback_, tok).wait();
  73. }
  74. /** Non-copyable */
  75. client() =delete;
  76. client(const async_client&) =delete;
  77. client& operator=(const async_client&) =delete;
  78. public:
  79. /** Smart pointer type for this object */
  80. using ptr_t = std::shared_ptr<client>;
  81. /** Type for a collection of QOS values */
  82. using qos_collection = async_client::qos_collection;
  83. /** Handler for updating connection data before an auto-reconnect. */
  84. using update_connection_handler = async_client::update_connection_handler;
  85. /**
  86. * Create a client that can be used to communicate with an MQTT server.
  87. * This allows the caller to specify a user-defined persistence object,
  88. * or use no persistence.
  89. * @param serverURI the address of the server to connect to, specified
  90. * as a URI.
  91. * @param clientId a client identifier that is unique on the server
  92. * being connected to
  93. * @param persistence The user persistence structure. If this is null,
  94. * then no persistence is used.
  95. */
  96. client(const string& serverURI, const string& clientId,
  97. iclient_persistence* persistence=nullptr);
  98. /**
  99. * Create an async_client that can be used to communicate with an MQTT
  100. * server.
  101. * This uses file-based persistence in the specified directory.
  102. * @param serverURI the address of the server to connect to, specified
  103. * as a URI.
  104. * @param clientId a client identifier that is unique on the server
  105. * being connected to
  106. * @param persistDir The directory to use for persistence data
  107. */
  108. client(const string& serverURI, const string& clientId,
  109. const string& persistDir);
  110. /**
  111. * Create a client that can be used to communicate with an MQTT server,
  112. * which allows for off-line message buffering.
  113. * This allows the caller to specify a user-defined persistence object,
  114. * or use no persistence.
  115. * @param serverURI the address of the server to connect to, specified
  116. * as a URI.
  117. * @param clientId a client identifier that is unique on the server
  118. * being connected to
  119. * @param maxBufferedMessages the maximum number of messages allowed to
  120. * be buffered while not connected
  121. * @param persistence The user persistence structure. If this is null,
  122. * then no persistence is used.
  123. */
  124. client(const string& serverURI, const string& clientId,
  125. int maxBufferedMessages,
  126. iclient_persistence* persistence=nullptr);
  127. /**
  128. * Create a client that can be used to communicate with an MQTT server,
  129. * which allows for off-line message buffering.
  130. * This uses file-based persistence in the specified directory.
  131. * @param serverURI the address of the server to connect to, specified
  132. * as a URI.
  133. * @param clientId a client identifier that is unique on the server
  134. * being connected to
  135. * @param maxBufferedMessages the maximum number of messages allowed to
  136. * be buffered while not connected
  137. * @param persistDir The directory to use for persistence data
  138. */
  139. client(const string& serverURI, const string& clientId,
  140. int maxBufferedMessages, const string& persistDir);
  141. /**
  142. * Create an async_client that can be used to communicate with an MQTT
  143. * server, which allows for off-line message buffering.
  144. * This allows the caller to specify a user-defined persistence object,
  145. * or use no persistence.
  146. * @param serverURI the address of the server to connect to, specified
  147. * as a URI.
  148. * @param clientId a client identifier that is unique on the server
  149. * being connected to
  150. * @param opts The create options
  151. * @param persistence The user persistence structure. If this is null,
  152. * then no persistence is used.
  153. */
  154. client(const string& serverURI, const string& clientId,
  155. const create_options& opts,
  156. iclient_persistence* persistence=nullptr);
  157. /**
  158. * Virtual destructor
  159. */
  160. virtual ~client() {}
  161. /**
  162. * Connects to an MQTT server using the default options.
  163. */
  164. virtual connect_response connect();
  165. /**
  166. * Connects to an MQTT server using the specified options.
  167. * @param opts
  168. */
  169. virtual connect_response connect(connect_options opts);
  170. /**
  171. * Reconnects the client using options from the previous connect.
  172. * The client must have previously called connect() for this to work.
  173. */
  174. virtual connect_response reconnect();
  175. /**
  176. * Disconnects from the server.
  177. */
  178. virtual void disconnect();
  179. /**
  180. * Disconnects from the server.
  181. * @param timeoutMS the amount of time in milliseconds to allow for
  182. * existing work to finish before disconnecting. A value
  183. * of zero or less means the client will not quiesce.
  184. */
  185. virtual void disconnect(int timeoutMS);
  186. /**
  187. * Disconnects from the server.
  188. * @param to the amount of time in milliseconds to allow for
  189. * existing work to finish before disconnecting. A value
  190. * of zero or less means the client will not quiesce.
  191. */
  192. template <class Rep, class Period>
  193. void disconnect(const std::chrono::duration<Rep, Period>& to) {
  194. disconnect((int) to_milliseconds_count(to));
  195. }
  196. /**
  197. * Gets the client ID used by this client.
  198. * @return The client ID used by this client.
  199. */
  200. virtual string get_client_id() const { return cli_.get_client_id(); }
  201. /**
  202. * Gets the address of the server used by this client.
  203. * @return The address of the server used by this client, as a URI.
  204. */
  205. virtual string get_server_uri() const { return cli_.get_server_uri(); }
  206. /**
  207. * Return the maximum time to wait for an action to complete.
  208. * @return int
  209. */
  210. virtual std::chrono::milliseconds get_timeout() const { return timeout_; }
  211. /**
  212. * Gets a copy of the connect options that were last used in a request
  213. * to connect to the broker.
  214. * @returns The last connect options that were used.
  215. */
  216. connect_options get_connect_options() const {
  217. return cli_.get_connect_options();
  218. }
  219. /**
  220. * Get a topic object which can be used to publish messages on this
  221. * client.
  222. * @param top The topic name
  223. * @param qos The Quality of Service for the topic
  224. * @param retained Whether the published messages set the retain flag.
  225. * @return A topic attached to this client.
  226. */
  227. virtual topic get_topic(const string& top, int qos=message::DFLT_QOS,
  228. bool retained=message::DFLT_RETAINED) {
  229. return topic(cli_, top, qos, retained);
  230. }
  231. /**
  232. * Determines if this client is currently connected to the server.
  233. * @return @em true if the client is currently connected, @em false if
  234. * not.
  235. */
  236. virtual bool is_connected() const { return cli_.is_connected(); }
  237. /**
  238. * Sets a callback to allow the application to update the connection
  239. * data on automatic reconnects.
  240. * @param cb The callback functor to register with the library.
  241. */
  242. void set_update_connection_handler(update_connection_handler cb) {
  243. cli_.set_update_connection_handler(cb);
  244. }
  245. /**
  246. * Publishes a message to a topic on the server and return once it is
  247. * delivered.
  248. * @param top The topic to publish
  249. * @param payload The data to publish
  250. * @param n The size in bytes of the data
  251. * @param qos The QoS for message delivery
  252. * @param retained Whether the broker should retain the message
  253. */
  254. virtual void publish(string_ref top, const void* payload, size_t n,
  255. int qos, bool retained) {
  256. if (!cli_.publish(std::move(top), payload, n, qos, retained)->wait_for(timeout_))
  257. throw timeout_error();
  258. }
  259. /**
  260. * Publishes a message to a topic on the server and return once it is
  261. * delivered.
  262. * @param top The topic to publish
  263. * @param payload The data to publish
  264. * @param n The size in bytes of the data
  265. */
  266. virtual void publish(string_ref top, const void* payload, size_t n) {
  267. if (!cli_.publish(std::move(top), payload, n)->wait_for(timeout_))
  268. throw timeout_error();
  269. }
  270. /**
  271. * Publishes a message to a topic on the server.
  272. * @param msg The message
  273. */
  274. virtual void publish(const_message_ptr msg) {
  275. if (!cli_.publish(msg)->wait_for(timeout_))
  276. throw timeout_error();
  277. }
  278. /**
  279. * Publishes a message to a topic on the server.
  280. * This version will not timeout since that could leave the library with
  281. * a reference to memory that could disappear while the library is still
  282. * using it.
  283. * @param msg The message
  284. */
  285. virtual void publish(const message& msg) {
  286. cli_.publish(ptr(msg))->wait();
  287. }
  288. /**
  289. * Sets the callback listener to use for events that happen
  290. * asynchronously.
  291. * @param cb The callback functions
  292. */
  293. virtual void set_callback(callback& cb);
  294. /**
  295. * Set the maximum time to wait for an action to complete.
  296. * @param timeoutMS The timeout in milliseconds
  297. */
  298. virtual void set_timeout(int timeoutMS) {
  299. timeout_ = std::chrono::milliseconds(timeoutMS);
  300. }
  301. /**
  302. * Set the maximum time to wait for an action to complete.
  303. * @param to The timeout as a std::chrono duration.
  304. */
  305. template <class Rep, class Period>
  306. void set_timeout(const std::chrono::duration<Rep, Period>& to) {
  307. timeout_ = to_milliseconds(to);
  308. }
  309. /**
  310. * Subscribe to a topic, which may include wildcards using a QoS of 1.
  311. * @param topicFilter
  312. * @param props The MQTT v5 properties.
  313. * @param opts The MQTT v5 subscribe options for the topic
  314. * @return The "subscribe" response from the server.
  315. */
  316. virtual subscribe_response subscribe(const string& topicFilter,
  317. const subscribe_options& opts=subscribe_options(),
  318. const properties& props=properties());
  319. /**
  320. * Subscribe to a topic, which may include wildcards.
  321. * @param topicFilter A single topic to subscribe
  322. * @param qos The QoS of the subscription
  323. * @param opts The MQTT v5 subscribe options for the topic
  324. * @param props The MQTT v5 properties.
  325. * @return The "subscribe" response from the server.
  326. */
  327. virtual subscribe_response subscribe(const string& topicFilter, int qos,
  328. const subscribe_options& opts=subscribe_options(),
  329. const properties& props=properties());
  330. /**
  331. * Subscribes to a one or more topics, which may include wildcards using
  332. * a QoS of 1.
  333. * @param topicFilters A set of topics to subscribe
  334. * @param opts The MQTT v5 subscribe options (one for each topic)
  335. * @param props The MQTT v5 properties.
  336. * @return The "subscribe" response from the server.
  337. */
  338. virtual subscribe_response subscribe(const string_collection& topicFilters,
  339. const std::vector<subscribe_options>& opts=std::vector<subscribe_options>(),
  340. const properties& props=properties());
  341. /**
  342. * Subscribes to multiple topics, each of which may include wildcards.
  343. * @param topicFilters A collection of topics to subscribe
  344. * @param qos A collection of QoS for each topic
  345. * @param opts The MQTT v5 subscribe options (one for each topic)
  346. * @param props The MQTT v5 properties.
  347. * @return The "subscribe" response from the server.
  348. */
  349. virtual subscribe_response subscribe(const string_collection& topicFilters,
  350. const qos_collection& qos,
  351. const std::vector<subscribe_options>& opts=std::vector<subscribe_options>(),
  352. const properties& props=properties());
  353. /**
  354. * Requests the server unsubscribe the client from a topic.
  355. * @param topicFilter A single topic to unsubscribe.
  356. * @param props The MQTT v5 properties.
  357. * @return The "unsubscribe" response from the server.
  358. */
  359. virtual unsubscribe_response unsubscribe(const string& topicFilter,
  360. const properties& props=properties());
  361. /**
  362. * Requests the server unsubscribe the client from one or more topics.
  363. * @param topicFilters A collection of topics to unsubscribe.
  364. * @param props The MQTT v5 properties.
  365. * @return The "unsubscribe" response from the server.
  366. */
  367. virtual unsubscribe_response unsubscribe(const string_collection& topicFilters,
  368. const properties& props=properties());
  369. /**
  370. * Start consuming messages.
  371. * This initializes the client to receive messages through a queue that
  372. * can be read synchronously.
  373. */
  374. virtual void start_consuming() { cli_.start_consuming(); }
  375. /**
  376. * Stop consuming messages.
  377. * This shuts down the internal callback and discards any unread
  378. * messages.
  379. */
  380. virtual void stop_consuming() { cli_.stop_consuming(); }
  381. /**
  382. * Read the next message from the queue.
  383. * This blocks until a new message arrives.
  384. * @return The message and topic.
  385. */
  386. virtual const_message_ptr consume_message() { return cli_.consume_message(); }
  387. /**
  388. * Try to read the next message from the queue without blocking.
  389. * @param msg Pointer to the value to receive the message
  390. * @return @em true is a message was read, @em false if no message was
  391. * available.
  392. */
  393. virtual bool try_consume_message(const_message_ptr* msg) {
  394. return cli_.try_consume_message(msg);
  395. }
  396. /**
  397. * Waits a limited time for a message to arrive.
  398. * @param msg Pointer to the value to receive the message
  399. * @param relTime The maximum amount of time to wait for a message.
  400. * @return @em true if a message was read, @em false if a timeout
  401. * occurred.
  402. */
  403. template <typename Rep, class Period>
  404. bool try_consume_message_for(const_message_ptr* msg,
  405. const std::chrono::duration<Rep, Period>& relTime) {
  406. return cli_.try_consume_message_for(msg, relTime);
  407. }
  408. /**
  409. * Waits until a specific time for a message to occur.
  410. * @param msg Pointer to the value to receive the message
  411. * @param absTime The time point to wait until, before timing out.
  412. * @return @em true if a message was read, @em false if a timeout
  413. * occurred.
  414. */
  415. template <class Clock, class Duration>
  416. bool try_consume_message_until(const_message_ptr* msg,
  417. const std::chrono::time_point<Clock,Duration>& absTime) {
  418. return cli_.try_consume_message_until(msg, absTime);
  419. }
  420. };
  421. /** Smart/shared pointer to an MQTT synchronous client object */
  422. using client_ptr = client::ptr_t;
  423. /////////////////////////////////////////////////////////////////////////////
  424. // end namespace mqtt
  425. }
  426. #endif // __mqtt_client_h