async_client.h 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820
  1. /////////////////////////////////////////////////////////////////////////////
  2. /// @file async_client.h
  3. /// Declaration of MQTT async_client class
  4. /// @date May 1, 2013
  5. /// @author Frank Pagliughi
  6. /////////////////////////////////////////////////////////////////////////////
  7. /*******************************************************************************
  8. * Copyright (c) 2013-2024 Frank Pagliughi <fpagliughi@mindspring.com>
  9. *
  10. * All rights reserved. This program and the accompanying materials
  11. * are made available under the terms of the Eclipse Public License v2.0
  12. * and Eclipse Distribution License v1.0 which accompany this distribution.
  13. *
  14. * The Eclipse Public License is available at
  15. * http://www.eclipse.org/legal/epl-v20.html
  16. * and the Eclipse Distribution License is available at
  17. * http://www.eclipse.org/org/documents/edl-v10.php.
  18. *
  19. * Contributors:
  20. * Frank Pagliughi - initial implementation and documentation
  21. * Frank Pagliughi - MQTT v5 support
  22. *******************************************************************************/
  23. #ifndef __mqtt_async_client_h
  24. #define __mqtt_async_client_h
  25. #include "MQTTAsync.h"
  26. #include "mqtt/types.h"
  27. #include "mqtt/token.h"
  28. #include "mqtt/create_options.h"
  29. #include "mqtt/string_collection.h"
  30. #include "mqtt/delivery_token.h"
  31. #include "mqtt/iclient_persistence.h"
  32. #include "mqtt/iaction_listener.h"
  33. #include "mqtt/properties.h"
  34. #include "mqtt/exception.h"
  35. #include "mqtt/message.h"
  36. #include "mqtt/callback.h"
  37. #include "mqtt/thread_queue.h"
  38. #include "mqtt/iasync_client.h"
  39. #include <vector>
  40. #include <list>
  41. #include <memory>
  42. #include <tuple>
  43. #include <functional>
  44. #include <stdexcept>
  45. namespace mqtt {
  46. // OBSOLETE: The legacy constants that lacked the "PAHO_MQTTPP_" prefix
  47. // clashed with #define's from other libraries and will be removed at the
  48. // next major version upgrade.
  49. #if defined(PAHO_MQTTPP_VERSIONS)
  50. /** The version number for the client library. */
  51. const uint32_t PAHO_MQTTPP_VERSION = 0x01040000;
  52. /** The version string for the client library */
  53. const string PAHO_MQTTPP_VERSION_STR("Paho MQTT C++ (mqttpp) v. 1.4.0");
  54. /** Copyright notice for the client library */
  55. const string PAHO_MQTTPP_COPYRIGHT("Copyright (c) 2013-2024 Frank Pagliughi");
  56. #else
  57. /** The version number for the client library. */
  58. const uint32_t VERSION = 0x01040000;
  59. /** The version string for the client library */
  60. const string VERSION_STR("Paho MQTT C++ (mqttpp) v. 1.4.0");
  61. /** Copyright notice for the client library */
  62. const string COPYRIGHT("Copyright (c) 2013-2024 Frank Pagliughi");
  63. #endif
  64. /////////////////////////////////////////////////////////////////////////////
  65. /**
  66. * Client for talking to an MQTT server using non-blocking
  67. * methods that allow an operation to run in the background.
  68. *
  69. * The location of the server is specified as a URI string with the
  70. * following schemas supported to specify the type and security used for the
  71. * connection:
  72. * @li @em "mqtt://" - A standard (insecure) connection over TCP. (Also,
  73. * "tcp://")
  74. * @li @em "mqtts://" - A secure connection using SSL/TLS sockets. (Also
  75. * "ssl://")
  76. * @li @em "ws://" - A standard (insecure) WebSocket connection.
  77. * @li @em "wss:// - A secure websocket connection using SSL/TLS.
  78. *
  79. * The secure connection types assume that the library was built with
  80. * OpenSSL support, otherwise requesting a secure connection will result in
  81. * an error.
  82. *
  83. * The communication methods of this class - `connect()`, `publish()`,
  84. * `subscribe()`, etc. - are all asynchronous. They create the request for
  85. * the server, but return immediately, before a response is received back
  86. * from the server.
  87. *
  88. * These methods return a `Token` to the caller which is akin to a C++
  89. * std::future. The caller can keep the Token, then use it later to block
  90. * until the asynchronous operation is complete and retrieve the result of
  91. * the operation, including any response from the server.
  92. *
  93. * Alternately, the application can choose to set callbacks to be fired when
  94. * each operation completes. This can be used to create an event-driven
  95. * architecture, but is more complex in that it forces the user to avoid any
  96. * blocking operations and manually handle thread synchronization (since
  97. * the callbacks run in a separate thread managed by the library).
  98. */
  99. class async_client : public virtual iasync_client
  100. {
  101. public:
  102. /** Smart/shared pointer for an object of this class */
  103. using ptr_t = std::shared_ptr<async_client>;
  104. /** Type for a thread-safe queue to consume messages synchronously */
  105. using consumer_queue_type = std::unique_ptr<thread_queue<const_message_ptr>>;
  106. /** Handler type for registering an individual message callback */
  107. using message_handler = std::function<void(const_message_ptr)>;
  108. /** Handler type for when a connection is made or lost */
  109. using connection_handler = std::function<void(const string& cause)>;
  110. /** Handler type for when a disconnect packet is received */
  111. using disconnected_handler = std::function<void(const properties&, ReasonCode)>;
  112. /** Handler for updating connection data before an auto-reconnect. */
  113. using update_connection_handler = std::function<bool(connect_data&)>;
  114. private:
  115. /** Lock guard type for this class */
  116. using guard = std::unique_lock<std::mutex>;
  117. /** Unique lock type for this class */
  118. using unique_lock = std::unique_lock<std::mutex>;
  119. /** Object monitor mutex */
  120. mutable std::mutex lock_;
  121. /** The underlying C-lib client. */
  122. MQTTAsync cli_;
  123. /** The server URI string. */
  124. string serverURI_;
  125. /** The client ID string that we provided to the server. */
  126. string clientId_;
  127. /** The MQTT protocol version we're connected at */
  128. int mqttVersion_;
  129. /** A user persistence wrapper (if any) */
  130. std::unique_ptr<MQTTClient_persistence> persist_;
  131. /** Callback supplied by the user (if any) */
  132. callback* userCallback_;
  133. /** Connection handler */
  134. connection_handler connHandler_;
  135. /** Connection lost handler */
  136. connection_handler connLostHandler_;
  137. /** Disconnected handler */
  138. disconnected_handler disconnectedHandler_;
  139. /** Update connect data/options */
  140. update_connection_handler updateConnectionHandler_;
  141. /** Message handler */
  142. message_handler msgHandler_;
  143. /** Cached options from the last connect */
  144. connect_options connOpts_;
  145. /** Copy of connect token (for re-connects) */
  146. token_ptr connTok_;
  147. /** A list of tokens that are in play */
  148. std::list<token_ptr> pendingTokens_;
  149. /** A list of delivery tokens that are in play */
  150. std::list<delivery_token_ptr> pendingDeliveryTokens_;
  151. /** A queue of messages for consumer API */
  152. consumer_queue_type que_;
  153. /** Callbacks from the C library */
  154. static void on_connected(void* context, char* cause);
  155. static void on_connection_lost(void *context, char *cause);
  156. static void on_disconnected(void* context, MQTTProperties* cprops,
  157. MQTTReasonCodes reasonCode);
  158. static int on_message_arrived(void* context, char* topicName, int topicLen,
  159. MQTTAsync_message* msg);
  160. static void on_delivery_complete(void* context, MQTTAsync_token tok);
  161. static int on_update_connection(void* context, MQTTAsync_connectData* cdata);
  162. /** Manage internal list of active tokens */
  163. friend class token;
  164. virtual void add_token(token_ptr tok);
  165. virtual void add_token(delivery_token_ptr tok);
  166. virtual void remove_token(token* tok) override;
  167. virtual void remove_token(token_ptr tok) { remove_token(tok.get()); }
  168. void remove_token(delivery_token_ptr tok) { remove_token(tok.get()); }
  169. /** Non-copyable */
  170. async_client() =delete;
  171. async_client(const async_client&) =delete;
  172. async_client& operator=(const async_client&) =delete;
  173. /** Checks a function return code and throws on error. */
  174. static void check_ret(int rc) {
  175. if (rc != MQTTASYNC_SUCCESS)
  176. throw exception(rc);
  177. }
  178. public:
  179. /**
  180. * Create an async_client that can be used to communicate with an MQTT
  181. * server.
  182. * This uses file-based persistence in the specified directory.
  183. * @param serverURI the address of the server to connect to, specified
  184. * as a URI.
  185. * @param clientId a client identifier that is unique on the server
  186. * being connected to
  187. * @param persistDir The directory to use for persistence data
  188. * @throw exception if an argument is invalid
  189. */
  190. async_client(const string& serverURI, const string& clientId,
  191. const string& persistDir);
  192. /**
  193. * Create an async_client that can be used to communicate with an MQTT
  194. * server.
  195. * This allows the caller to specify a user-defined persistence object,
  196. * or use no persistence.
  197. * @param serverURI the address of the server to connect to, specified
  198. * as a URI.
  199. * @param clientId a client identifier that is unique on the server
  200. * being connected to
  201. * @param persistence The user persistence structure. If this is null,
  202. * then no persistence is used.
  203. * @throw exception if an argument is invalid
  204. */
  205. async_client(const string& serverURI, const string& clientId,
  206. iclient_persistence* persistence=nullptr);
  207. /**
  208. * Create an async_client that can be used to communicate with an MQTT
  209. * server, which allows for off-line message buffering.
  210. * This uses file-based persistence in the specified directory.
  211. * @param serverURI the address of the server to connect to, specified
  212. * as a URI.
  213. * @param clientId a client identifier that is unique on the server
  214. * being connected to
  215. * @param maxBufferedMessages the maximum number of messages allowed to
  216. * be buffered while not connected
  217. * @param persistDir The directory to use for persistence data
  218. * @throw exception if an argument is invalid
  219. */
  220. async_client(const string& serverURI, const string& clientId,
  221. int maxBufferedMessages, const string& persistDir);
  222. /**
  223. * Create an async_client that can be used to communicate with an MQTT
  224. * server, which allows for off-line message buffering.
  225. * This allows the caller to specify a user-defined persistence object,
  226. * or use no persistence.
  227. * @param serverURI the address of the server to connect to, specified
  228. * as a URI.
  229. * @param clientId a client identifier that is unique on the server
  230. * being connected to
  231. * @param maxBufferedMessages the maximum number of messages allowed to
  232. * be buffered while not connected
  233. * @param persistence The user persistence structure. If this is null,
  234. * then no persistence is used.
  235. * @throw exception if an argument is invalid
  236. */
  237. async_client(const string& serverURI, const string& clientId,
  238. int maxBufferedMessages,
  239. iclient_persistence* persistence=nullptr);
  240. /**
  241. * Create an async_client that can be used to communicate with an MQTT
  242. * server, which allows for off-line message buffering.
  243. * This uses file-based persistence in the specified directory.
  244. * @param serverURI the address of the server to connect to, specified
  245. * as a URI.
  246. * @param clientId a client identifier that is unique on the server
  247. * being connected to
  248. * @param opts The create options
  249. * @param persistDir The directory to use for persistence data
  250. * @throw exception if an argument is invalid
  251. */
  252. async_client(const string& serverURI, const string& clientId,
  253. const create_options& opts, const string& persistDir);
  254. /**
  255. * Create an async_client that can be used to communicate with an MQTT
  256. * server, which allows for off-line message buffering.
  257. * This allows the caller to specify a user-defined persistence object,
  258. * or use no persistence.
  259. * @param serverURI the address of the server to connect to, specified
  260. * as a URI.
  261. * @param clientId a client identifier that is unique on the server
  262. * being connected to
  263. * @param opts The create options
  264. * @param persistence The user persistence structure. If this is null,
  265. * then no persistence is used.
  266. * @throw exception if an argument is invalid
  267. */
  268. async_client(const string& serverURI, const string& clientId,
  269. const create_options& opts,
  270. iclient_persistence* persistence=nullptr);
  271. /**
  272. * Destructor
  273. */
  274. ~async_client() override;
  275. /**
  276. * Sets a callback listener to use for events that happen
  277. * asynchronously.
  278. * @param cb callback receiver which will be invoked for certain
  279. * asynchronous events
  280. */
  281. void set_callback(callback& cb) override;
  282. /**
  283. * Stops callbacks.
  284. * This is not normally called by the application. It should be used
  285. * cautiously as it may cause the application to lose messages.
  286. */
  287. void disable_callbacks() override;
  288. /**
  289. * Callback for when a connection is made.
  290. * @param cb Callback functor for when the connection is made.
  291. */
  292. void set_connected_handler(connection_handler cb) /*override*/;
  293. /**
  294. * Callback for when a connection is lost.
  295. * @param cb Callback functor for when the connection is lost.
  296. */
  297. void set_connection_lost_handler(connection_handler cb) /*override*/;
  298. /**
  299. * Callback for when a disconnect packet is received from the server.
  300. * @param cb Callback for when the disconnect packet is received.
  301. */
  302. void set_disconnected_handler(disconnected_handler cb) /*override*/;
  303. /**
  304. * Sets the callback for when a message arrives from the broker.
  305. * Note that the application can only have one message handler which can
  306. * be installed individually using this method, or installled as a
  307. * listener object.
  308. * @param cb The callback functor to register with the library.
  309. */
  310. void set_message_callback(message_handler cb) /*override*/;
  311. /**
  312. * Sets a callback to allow the application to update the connection
  313. * data on automatic reconnects.
  314. * @param cb The callback functor to register with the library.
  315. */
  316. void set_update_connection_handler(update_connection_handler cb);
  317. /**
  318. * Connects to an MQTT server using the default options.
  319. * @return token used to track and wait for the connect to complete. The
  320. * token will be passed to any callback that has been set.
  321. * @throw exception for non security related problems
  322. * @throw security_exception for security related problems
  323. */
  324. token_ptr connect() override;
  325. /**
  326. * Connects to an MQTT server using the provided connect options.
  327. * @param options a set of connection parameters that override the
  328. * defaults.
  329. * @return token used to track and wait for the connect to complete. The
  330. * token will be passed to any callback that has been set.
  331. * @throw exception for non security related problems
  332. * @throw security_exception for security related problems
  333. */
  334. token_ptr connect(connect_options options) override;
  335. /**
  336. * Connects to an MQTT server using the specified options.
  337. * @param options a set of connection parameters that override the
  338. * defaults.
  339. * @param userContext optional object used to pass context to the
  340. * callback. Use @em nullptr if not required.
  341. * @param cb callback listener that will be notified when the connect
  342. * completes.
  343. * @return token used to track and wait for the connect to complete. The
  344. * token will be passed to any callback that has been set.
  345. * @throw exception for non security related problems
  346. * @throw security_exception for security related problems
  347. */
  348. token_ptr connect(connect_options options, void* userContext,
  349. iaction_listener& cb) override;
  350. /**
  351. *
  352. * @param userContext optional object used to pass context to the
  353. * callback. Use @em nullptr if not required.
  354. * @param cb callback listener that will be notified when the connect
  355. * completes.
  356. * @return token used to track and wait for the connect to complete. The
  357. * token will be passed to any callback that has been set.
  358. * @throw exception for non security related problems
  359. * @throw security_exception for security related problems
  360. */
  361. token_ptr connect(void* userContext, iaction_listener& cb) override {
  362. return connect(connect_options{}, userContext, cb);
  363. }
  364. /**
  365. * Reconnects the client using options from the previous connect.
  366. * The client must have previously called connect() for this to work.
  367. * @return token used to track the progress of the reconnect.
  368. */
  369. token_ptr reconnect() override;
  370. /**
  371. * Disconnects from the server.
  372. * @return token used to track and wait for the disconnect to complete.
  373. * The token will be passed to any callback that has been set.
  374. * @throw exception for problems encountered while disconnecting
  375. */
  376. token_ptr disconnect() override { return disconnect(disconnect_options()); }
  377. /**
  378. * Disconnects from the server.
  379. * @param opts Options for disconnecting.
  380. * @return token used to track and wait for the disconnect to complete.
  381. * The token will be passed to any callback that has been set.
  382. * @throw exception for problems encountered while disconnecting
  383. */
  384. token_ptr disconnect(disconnect_options opts) override;
  385. /**
  386. * Disconnects from the server.
  387. * @param timeout the amount of time in milliseconds to allow for
  388. * existing work to finish before disconnecting. A value
  389. * of zero or less means the client will not quiesce.
  390. * @return Token used to track and wait for disconnect to complete. The
  391. * token will be passed to the callback methods if a callback is
  392. * set.
  393. * @throw exception for problems encountered while disconnecting
  394. */
  395. token_ptr disconnect(int timeout) override {
  396. return disconnect(disconnect_options(timeout));
  397. }
  398. /**
  399. * Disconnects from the server.
  400. * @param timeout the amount of time in milliseconds to allow for
  401. * existing work to finish before disconnecting. A value
  402. * of zero or less means the client will not quiesce.
  403. * @return Token used to track and wait for disconnect to complete. The
  404. * token will be passed to the callback methods if a callback is
  405. * set.
  406. * @throw exception for problems encountered while disconnecting
  407. */
  408. template <class Rep, class Period>
  409. token_ptr disconnect(const std::chrono::duration<Rep, Period>& timeout) {
  410. // TODO: check range
  411. return disconnect((int) to_milliseconds_count(timeout));
  412. }
  413. /**
  414. * Disconnects from the server.
  415. * @param timeout the amount of time in milliseconds to allow for
  416. * existing work to finish before disconnecting. A value
  417. * of zero or less means the client will not quiesce.
  418. * @param userContext optional object used to pass context to the
  419. * callback. Use @em nullptr if not required.
  420. * @param cb callback listener that will be notified when the disconnect
  421. * completes.
  422. * @return token_ptr Token used to track and wait for disconnect to
  423. * complete. The token will be passed to the callback methods if
  424. * a callback is set.
  425. * @throw exception for problems encountered while disconnecting
  426. */
  427. token_ptr disconnect(int timeout, void* userContext,
  428. iaction_listener& cb) override;
  429. /**
  430. * Disconnects from the server.
  431. * @param timeout the amount of time in milliseconds to allow for
  432. * existing work to finish before disconnecting. A value
  433. * of zero or less means the client will not quiesce.
  434. * @param userContext optional object used to pass context to the
  435. * callback. Use @em nullptr if not required.
  436. * @param cb callback listener that will be notified when the disconnect
  437. * completes.
  438. * @return token_ptr Token used to track and wait for disconnect to
  439. * complete. The token will be passed to the callback methods if
  440. * a callback is set.
  441. * @throw exception for problems encountered while disconnecting
  442. */
  443. template <class Rep, class Period>
  444. token_ptr disconnect(const std::chrono::duration<Rep, Period>& timeout,
  445. void* userContext, iaction_listener& cb) {
  446. // TODO: check range
  447. return disconnect((int) to_milliseconds_count(timeout), userContext, cb);
  448. }
  449. /**
  450. * Disconnects from the server.
  451. * @param userContext optional object used to pass context to the
  452. * callback. Use @em nullptr if not required.
  453. * @param cb callback listener that will be notified when the disconnect
  454. * completes.
  455. * @return token_ptr Token used to track and wait for disconnect to
  456. * complete. The token will be passed to the callback methods if
  457. * a callback is set.
  458. * @throw exception for problems encountered while disconnecting
  459. */
  460. token_ptr disconnect(void* userContext, iaction_listener& cb) override {
  461. return disconnect(0L, userContext, cb);
  462. }
  463. /**
  464. * Returns the delivery token for the specified message ID.
  465. * @return delivery_token
  466. */
  467. delivery_token_ptr get_pending_delivery_token(int msgID) const override;
  468. /**
  469. * Returns the delivery tokens for any outstanding publish operations.
  470. * @return delivery_token[]
  471. */
  472. std::vector<delivery_token_ptr> get_pending_delivery_tokens() const override;
  473. /**
  474. * Returns the client ID used by this client.
  475. * @return The client ID used by this client.
  476. */
  477. string get_client_id() const override { return clientId_; }
  478. /**
  479. * Returns the address of the server used by this client.
  480. * @return The server's address, as a URI String.
  481. */
  482. string get_server_uri() const override { return serverURI_; }
  483. /**
  484. * Gets the MQTT version used by the client.
  485. * @return The MQTT version used by the client
  486. * @li MQTTVERSION_DEFAULT (0) = default: start with 3.1.1, and if
  487. * that fails, fall back to 3.1
  488. * @li MQTTVERSION_3_1 (3) = only try version 3.1
  489. * @li MQTTVERSION_3_1_1 (4) = only try version 3.1.1
  490. * @li MQTTVERSION_5 (5) = only try version 5
  491. */
  492. int mqtt_version() const noexcept { return mqttVersion_; }
  493. /**
  494. * Gets a copy of the connect options that were last used in a request
  495. * to connect to the broker.
  496. * @returns The last connect options that were used.
  497. */
  498. connect_options get_connect_options() const {
  499. guard g(lock_);
  500. return connOpts_;
  501. }
  502. /**
  503. * Determines if this client is currently connected to the server.
  504. * @return true if connected, false otherwise.
  505. */
  506. bool is_connected() const override { return to_bool(MQTTAsync_isConnected(cli_)); }
  507. /**
  508. * Publishes a message to a topic on the server
  509. * @param topic The topic to deliver the message to
  510. * @param payload the bytes to use as the message payload
  511. * @param n the number of bytes in the payload
  512. * @param qos the Quality of Service to deliver the message at. Valid
  513. * values are 0, 1 or 2.
  514. * @param retained whether or not this message should be retained by the
  515. * server.
  516. * @return token used to track and wait for the publish to complete. The
  517. * token will be passed to callback methods if set.
  518. */
  519. delivery_token_ptr publish(string_ref topic, const void* payload, size_t n,
  520. int qos, bool retained) override;
  521. /**
  522. * Publishes a message to a topic on the server
  523. * @param topic The topic to deliver the message to
  524. * @param payload the bytes to use as the message payload
  525. * @param n the number of bytes in the payload
  526. * @return token used to track and wait for the publish to complete. The
  527. * token will be passed to callback methods if set.
  528. */
  529. delivery_token_ptr publish(string_ref topic, const void* payload, size_t n) override {
  530. return publish(std::move(topic), payload, n,
  531. message::DFLT_QOS, message::DFLT_RETAINED);
  532. }
  533. /**
  534. * Publishes a message to a topic on the server
  535. * @param topic The topic to deliver the message to
  536. * @param payload the bytes to use as the message payload
  537. * @param qos the Quality of Service to deliver the message at. Valid
  538. * values are 0, 1 or 2.
  539. * @param retained whether or not this message should be retained by the
  540. * server.
  541. * @return token used to track and wait for the publish to complete. The
  542. * token will be passed to callback methods if set.
  543. */
  544. delivery_token_ptr publish(string_ref topic, binary_ref payload,
  545. int qos, bool retained) override;
  546. /**
  547. * Publishes a message to a topic on the server
  548. * @param topic The topic to deliver the message to
  549. * @param payload the bytes to use as the message payload
  550. * @return token used to track and wait for the publish to complete. The
  551. * token will be passed to callback methods if set.
  552. */
  553. delivery_token_ptr publish(string_ref topic, binary_ref payload) override {
  554. return publish(std::move(topic), std::move(payload),
  555. message::DFLT_QOS, message::DFLT_RETAINED);
  556. }
  557. /**
  558. * Publishes a message to a topic on the server
  559. * @param topic The topic to deliver the message to
  560. * @param payload the bytes to use as the message payload
  561. * @param n the number of bytes in the payload
  562. * @param qos the Quality of Service to deliver the message at. Valid
  563. * values are 0, 1 or 2.
  564. * @param retained whether or not this message should be retained by the
  565. * server.
  566. * @param userContext optional object used to pass context to the
  567. * callback. Use @em nullptr if not required.
  568. * @param cb
  569. * @return token used to track and wait for the publish to complete. The
  570. * token will be passed to callback methods if set.
  571. */
  572. delivery_token_ptr publish(string_ref topic,
  573. const void* payload, size_t n,
  574. int qos, bool retained,
  575. void* userContext, iaction_listener& cb) override;
  576. /**
  577. * Publishes a message to a topic on the server Takes an Message
  578. * message and delivers it to the server at the requested quality of
  579. * service.
  580. * @param msg the message to deliver to the server
  581. * @return token used to track and wait for the publish to complete. The
  582. * token will be passed to callback methods if set.
  583. */
  584. delivery_token_ptr publish(const_message_ptr msg) override;
  585. /**
  586. * Publishes a message to a topic on the server.
  587. * @param msg the message to deliver to the server
  588. * @param userContext optional object used to pass context to the
  589. * callback. Use @em nullptr if not required.
  590. * @param cb callback optional listener that will be notified when message
  591. * delivery has completed to the requested quality of
  592. * service
  593. * @return token used to track and wait for the publish to complete. The
  594. * token will be passed to callback methods if set.
  595. */
  596. delivery_token_ptr publish(const_message_ptr msg,
  597. void* userContext, iaction_listener& cb) override;
  598. /**
  599. * Subscribe to a topic, which may include wildcards.
  600. * @param topicFilter the topic to subscribe to, which can include
  601. * wildcards.
  602. * @param qos The quality of service for the subscription
  603. * @param opts The MQTT v5 subscribe options for the topic
  604. * @param props The MQTT v5 properties.
  605. * @return token used to track and wait for the subscribe to complete.
  606. * The token will be passed to callback methods if set.
  607. */
  608. token_ptr subscribe(const string& topicFilter, int qos,
  609. const subscribe_options& opts=subscribe_options(),
  610. const properties& props=properties()) override;
  611. /**
  612. * Subscribe to a topic, which may include wildcards.
  613. * @param topicFilter the topic to subscribe to, which can include
  614. * wildcards.
  615. * @param qos the maximum quality of service at which to subscribe.
  616. * Messages published at a lower quality of service will be
  617. * received at the published QoS. Messages published at a
  618. * higher quality of service will be received using the QoS
  619. * specified on the subscribe.
  620. * @param userContext optional object used to pass context to the
  621. * callback. Use @em nullptr if not required.
  622. * @param cb listener that will be notified when subscribe has completed
  623. * @param opts The MQTT v5 subscribe options for the topic
  624. * @param props The MQTT v5 properties.
  625. * @return token used to track and wait for the subscribe to complete.
  626. * The token will be passed to callback methods if set.
  627. */
  628. token_ptr subscribe(const string& topicFilter, int qos,
  629. void* userContext, iaction_listener& cb,
  630. const subscribe_options& opts=subscribe_options(),
  631. const properties& props=properties()) override;
  632. /**
  633. * Subscribe to multiple topics, each of which may include wildcards.
  634. * @param topicFilters
  635. * @param qos the maximum quality of service at which to subscribe.
  636. * Messages published at a lower quality of service will be
  637. * received at the published QoS. Messages published at a
  638. * higher quality of service will be received using the QoS
  639. * specified on the subscribe.
  640. * @param opts The MQTT v5 subscribe options (one for each topic)
  641. * @param props The MQTT v5 properties.
  642. * @return token used to track and wait for the subscribe to complete.
  643. * The token will be passed to callback methods if set.
  644. */
  645. token_ptr subscribe(const_string_collection_ptr topicFilters,
  646. const qos_collection& qos,
  647. const std::vector<subscribe_options>& opts=std::vector<subscribe_options>(),
  648. const properties& props=properties()) override;
  649. /**
  650. * Subscribes to multiple topics, each of which may include wildcards.
  651. * @param topicFilters
  652. * @param qos the maximum quality of service at which to subscribe.
  653. * Messages published at a lower quality of service will be
  654. * received at the published QoS. Messages published at a
  655. * higher quality of service will be received using the QoS
  656. * specified on the subscribe.
  657. * @param userContext optional object used to pass context to the
  658. * callback. Use @em nullptr if not required.
  659. * @param cb listener that will be notified when subscribe has completed
  660. * @param opts The MQTT v5 subscribe options (one for each topic)
  661. * @param props The MQTT v5 properties.
  662. * @return token used to track and wait for the subscribe to complete.
  663. * The token will be passed to callback methods if set.
  664. */
  665. token_ptr subscribe(const_string_collection_ptr topicFilters,
  666. const qos_collection& qos,
  667. void* userContext, iaction_listener& cb,
  668. const std::vector<subscribe_options>& opts=std::vector<subscribe_options>(),
  669. const properties& props=properties()) override;
  670. /**
  671. * Requests the server unsubscribe the client from a topic.
  672. * @param topicFilter the topic to unsubscribe from. It must match a
  673. * topicFilter specified on an earlier subscribe.
  674. * @param props The MQTT v5 properties.
  675. * @return token used to track and wait for the unsubscribe to complete.
  676. * The token will be passed to callback methods if set.
  677. */
  678. token_ptr unsubscribe(const string& topicFilter,
  679. const properties& props=properties()) override;
  680. /**
  681. * Requests the server unsubscribe the client from one or more topics.
  682. * @param topicFilters one or more topics to unsubscribe from. Each
  683. * topicFilter must match one specified on an
  684. * earlier subscribe.
  685. * @param props The MQTT v5 properties.
  686. * @return token used to track and wait for the unsubscribe to complete.
  687. * The token will be passed to callback methods if set.
  688. */
  689. token_ptr unsubscribe(const_string_collection_ptr topicFilters,
  690. const properties& props=properties()) override;
  691. /**
  692. * Requests the server unsubscribe the client from one or more topics.
  693. * @param topicFilters
  694. * @param userContext optional object used to pass context to the
  695. * callback. Use @em nullptr if not required.
  696. * @param cb listener that will be notified when unsubscribe has
  697. * completed
  698. * @param props The MQTT v5 properties.
  699. * @return token used to track and wait for the unsubscribe to complete.
  700. * The token will be passed to callback methods if set.
  701. */
  702. token_ptr unsubscribe(const_string_collection_ptr topicFilters,
  703. void* userContext, iaction_listener& cb,
  704. const properties& props=properties()) override;
  705. /**
  706. * Requests the server unsubscribe the client from a topics.
  707. * @param topicFilter the topic to unsubscribe from. It must match a
  708. * topicFilter specified on an earlier subscribe.
  709. * @param userContext optional object used to pass context to the
  710. * callback. Use @em nullptr if not required.
  711. * @param cb listener that will be notified when unsubscribe has
  712. * completed
  713. * @param props The MQTT v5 properties.
  714. * @return token used to track and wait for the unsubscribe to complete.
  715. * The token will be passed to callback methods if set.
  716. */
  717. token_ptr unsubscribe(const string& topicFilter,
  718. void* userContext, iaction_listener& cb,
  719. const properties& props=properties()) override;
  720. /**
  721. * Start consuming messages.
  722. * This initializes the client to receive messages through a queue that
  723. * can be read synchronously.
  724. */
  725. void start_consuming() override;
  726. /**
  727. * Stop consuming messages.
  728. * This shuts down the internal callback and discards any unread
  729. * messages.
  730. */
  731. void stop_consuming() override;
  732. /**
  733. * Read the next message from the queue.
  734. * This blocks until a new message arrives.
  735. * @return The message and topic.
  736. */
  737. const_message_ptr consume_message() override { return que_->get(); }
  738. /**
  739. * Try to read the next message from the queue without blocking.
  740. * @param msg Pointer to the value to receive the message
  741. * @return @em true is a message was read, @em false if no message was
  742. * available.
  743. */
  744. bool try_consume_message(const_message_ptr* msg) override {
  745. return que_->try_get(msg);
  746. }
  747. /**
  748. * Waits a limited time for a message to arrive.
  749. * @param msg Pointer to the value to receive the message
  750. * @param relTime The maximum amount of time to wait for a message.
  751. * @return @em true if a message was read, @em false if a timeout
  752. * occurred.
  753. */
  754. template <typename Rep, class Period>
  755. bool try_consume_message_for(const_message_ptr* msg,
  756. const std::chrono::duration<Rep, Period>& relTime) {
  757. return que_->try_get_for(msg, relTime);
  758. }
  759. /**
  760. * Waits a limited time for a message to arrive.
  761. * @param relTime The maximum amount of time to wait for a message.
  762. * @return A shared pointer to the message that was received. It will be
  763. * empty on timeout.
  764. */
  765. template <typename Rep, class Period>
  766. const_message_ptr try_consume_message_for(const std::chrono::duration<Rep, Period>& relTime) {
  767. const_message_ptr msg;
  768. que_->try_get_for(&msg, relTime);
  769. return msg;
  770. }
  771. /**
  772. * Waits until a specific time for a message to appear.
  773. * @param msg Pointer to the value to receive the message
  774. * @param absTime The time point to wait until, before timing out.
  775. * @return @em true if a message was read, @em false if a timeout
  776. * occurred.
  777. */
  778. template <class Clock, class Duration>
  779. bool try_consume_message_until(const_message_ptr* msg,
  780. const std::chrono::time_point<Clock,Duration>& absTime) {
  781. return que_->try_get_until(msg, absTime);
  782. }
  783. /**
  784. * Waits until a specific time for a message to appear.
  785. * @param absTime The time point to wait until, before timing out.
  786. * @return The message, if read, an empty pointer if not.
  787. */
  788. template <class Clock, class Duration>
  789. const_message_ptr try_consume_message_until(const std::chrono::time_point<Clock,Duration>& absTime) {
  790. const_message_ptr msg;
  791. que_->try_get_until(&msg, absTime);
  792. return msg;
  793. }
  794. };
  795. /** Smart/shared pointer to an asynchronous MQTT client object */
  796. using async_client_ptr = async_client::ptr_t;
  797. /////////////////////////////////////////////////////////////////////////////
  798. // end namespace mqtt
  799. }
  800. #endif // __mqtt_async_client_h