thread_queue.h 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. /////////////////////////////////////////////////////////////////////////////
  2. /// @file thread_queue.h
  3. /// Implementation of the template class 'thread_queue', a thread-safe,
  4. /// blocking queue for passing data between threads, safe for use with smart
  5. /// pointers.
  6. /// @date 09-Jan-2017
  7. /////////////////////////////////////////////////////////////////////////////
  8. /*******************************************************************************
  9. * Copyright (c) 2017-2022 Frank Pagliughi <fpagliughi@mindspring.com>
  10. *
  11. * All rights reserved. This program and the accompanying materials
  12. * are made available under the terms of the Eclipse Public License v2.0
  13. * and Eclipse Distribution License v1.0 which accompany this distribution.
  14. *
  15. * The Eclipse Public License is available at
  16. * http://www.eclipse.org/legal/epl-v20.html
  17. * and the Eclipse Distribution License is available at
  18. * http://www.eclipse.org/org/documents/edl-v10.php.
  19. *
  20. * Contributors:
  21. * Frank Pagliughi - initial implementation and documentation
  22. *******************************************************************************/
  23. #ifndef __mqtt_thread_queue_h
  24. #define __mqtt_thread_queue_h
  25. #include <thread>
  26. #include <mutex>
  27. #include <condition_variable>
  28. #include <limits>
  29. #include <deque>
  30. #include <queue>
  31. #include <algorithm>
  32. namespace mqtt {
  33. /////////////////////////////////////////////////////////////////////////////
  34. /**
  35. * A thread-safe queue for inter-thread communication.
  36. *
  37. * This is a locking queue with blocking operations. The get() operations
  38. * can always block on an empty queue, but have variations for non-blocking
  39. * (try_get) and bounded-time blocking (try_get_for, try_get_until).
  40. * @par
  41. * The default queue has a capacity that is unbounded in the practical
  42. * sense, limited by available memory. In this mode the object will not
  43. * block when placing values into the queue. A capacity can bet set with the
  44. * constructor or, at any time later by calling the @ref capacity(size_type)
  45. * method. Using this latter method, the capacity can be set to an amount
  46. * smaller than the current size of the queue. In that case all put's to the
  47. * queue will block until the number of items are removed from the queue to
  48. * bring the size below the new capacity.
  49. * @par
  50. * Note that the queue uses move semantics to place items into the queue and
  51. * remove items from the queue. This means that the type, T, of the data
  52. * held by the queue only needs to follow move semantics; not copy
  53. * semantics. In addition, this means that copies of the value will @em not
  54. * be left in the queue. This is especially useful when creating queues of
  55. * shared pointers, as the "dead" part of the queue will not hold onto a
  56. * reference count after the item has been removed from the queue.
  57. *
  58. * @param T The type of the items to be held in the queue.
  59. * @param Container The type of the underlying container to use. It must
  60. * support back(), front(), push_back(), pop_front().
  61. */
  62. template <typename T, class Container=std::deque<T>>
  63. class thread_queue
  64. {
  65. public:
  66. /** The underlying container type to use for the queue. */
  67. using container_type = Container;
  68. /** The type of items to be held in the queue. */
  69. using value_type = T;
  70. /** The type used to specify number of items in the container. */
  71. using size_type = typename Container::size_type;
  72. /** The maximum capacity of the queue. */
  73. static constexpr size_type MAX_CAPACITY = std::numeric_limits<size_type>::max();
  74. private:
  75. /** Object lock */
  76. mutable std::mutex lock_;
  77. /** Condition get signaled when item added to empty queue */
  78. std::condition_variable notEmptyCond_;
  79. /** Condition gets signaled then item removed from full queue */
  80. std::condition_variable notFullCond_;
  81. /** The capacity of the queue */
  82. size_type cap_;
  83. /** The actual STL container to hold data */
  84. std::queue<T,Container> que_;
  85. /** Simple, scope-based lock guard */
  86. using guard = std::lock_guard<std::mutex>;
  87. /** General purpose guard */
  88. using unique_guard = std::unique_lock<std::mutex>;
  89. public:
  90. /**
  91. * Constructs a queue with the maximum capacity.
  92. */
  93. thread_queue() : cap_(MAX_CAPACITY) {}
  94. /**
  95. * Constructs a queue with the specified capacity.
  96. * @param cap The maximum number of items that can be placed in the
  97. * queue. The minimum capacity is 1.
  98. */
  99. explicit thread_queue(size_t cap) : cap_(std::max<size_type>(cap, 1)) {}
  100. /**
  101. * Determine if the queue is empty.
  102. * @return @em true if there are no elements in the queue, @em false if
  103. * there are any items in the queue.
  104. */
  105. bool empty() const {
  106. guard g(lock_);
  107. return que_.empty();
  108. }
  109. /**
  110. * Gets the capacity of the queue.
  111. * @return The maximum number of elements before the queue is full.
  112. */
  113. size_type capacity() const {
  114. guard g(lock_);
  115. return cap_;
  116. }
  117. /**
  118. * Sets the capacity of the queue.
  119. * Note that the capacity can be set to a value smaller than the current
  120. * size of the queue. In that event, all calls to put() will block until
  121. * a sufficient number
  122. */
  123. void capacity(size_type cap) {
  124. guard g(lock_);
  125. cap_ = cap;
  126. }
  127. /**
  128. * Gets the number of items in the queue.
  129. * @return The number of items in the queue.
  130. */
  131. size_type size() const {
  132. guard g(lock_);
  133. return que_.size();
  134. }
  135. /**
  136. * Put an item into the queue.
  137. * If the queue is full, this will block the caller until items are
  138. * removed bringing the size less than the capacity.
  139. * @param val The value to add to the queue.
  140. */
  141. void put(value_type val) {
  142. unique_guard g(lock_);
  143. notFullCond_.wait(g, [this]{return que_.size() < cap_;});
  144. que_.emplace(std::move(val));
  145. g.unlock();
  146. notEmptyCond_.notify_one();
  147. }
  148. /**
  149. * Non-blocking attempt to place an item into the queue.
  150. * @param val The value to add to the queue.
  151. * @return @em true if the item was added to the queue, @em false if the
  152. * item was not added because the queue is currently full.
  153. */
  154. bool try_put(value_type val) {
  155. unique_guard g(lock_);
  156. if (que_.size() >= cap_)
  157. return false;
  158. que_.emplace(std::move(val));
  159. g.unlock();
  160. notEmptyCond_.notify_one();
  161. return true;
  162. }
  163. /**
  164. * Attempt to place an item in the queue with a bounded wait.
  165. * This will attempt to place the value in the queue, but if it is full,
  166. * it will wait up to the specified time duration before timing out.
  167. * @param val The value to add to the queue.
  168. * @param relTime The amount of time to wait until timing out.
  169. * @return @em true if the value was added to the queue, @em false if a
  170. * timeout occurred.
  171. */
  172. template <typename Rep, class Period>
  173. bool try_put_for(value_type val, const std::chrono::duration<Rep, Period>& relTime) {
  174. unique_guard g(lock_);
  175. if (!notFullCond_.wait_for(g, relTime, [this]{return que_.size() < cap_;}))
  176. return false;
  177. que_.emplace(std::move(val));
  178. g.unlock();
  179. notEmptyCond_.notify_one();
  180. return true;
  181. }
  182. /**
  183. * Attempt to place an item in the queue with a bounded wait to an
  184. * absolute time point.
  185. * This will attempt to place the value in the queue, but if it is full,
  186. * it will wait up until the specified time before timing out.
  187. * @param val The value to add to the queue.
  188. * @param absTime The absolute time to wait to before timing out.
  189. * @return @em true if the value was added to the queue, @em false if a
  190. * timeout occurred.
  191. */
  192. template <class Clock, class Duration>
  193. bool try_put_until(value_type val, const std::chrono::time_point<Clock,Duration>& absTime) {
  194. unique_guard g(lock_);
  195. if (!notFullCond_.wait_until(g, absTime, [this]{return que_.size() < cap_;}))
  196. return false;
  197. que_.emplace(std::move(val));
  198. g.unlock();
  199. notEmptyCond_.notify_one();
  200. return true;
  201. }
  202. /**
  203. * Retrieve a value from the queue.
  204. * If the queue is empty, this will block indefinitely until a value is
  205. * added to the queue by another thread,
  206. * @param val Pointer to a variable to receive the value.
  207. */
  208. void get(value_type* val) {
  209. if (!val)
  210. return;
  211. unique_guard g(lock_);
  212. notEmptyCond_.wait(g, [this]{return !que_.empty();});
  213. *val = std::move(que_.front());
  214. que_.pop();
  215. g.unlock();
  216. notFullCond_.notify_one();
  217. }
  218. /**
  219. * Retrieve a value from the queue.
  220. * If the queue is empty, this will block indefinitely until a value is
  221. * added to the queue by another thread,
  222. * @return The value removed from the queue
  223. */
  224. value_type get() {
  225. unique_guard g(lock_);
  226. notEmptyCond_.wait(g, [this]{return !que_.empty();});
  227. value_type val = std::move(que_.front());
  228. que_.pop();
  229. g.unlock();
  230. notFullCond_.notify_one();
  231. return val;
  232. }
  233. /**
  234. * Attempts to remove a value from the queue without blocking.
  235. * If the queue is currently empty, this will return immediately with a
  236. * failure, otherwise it will get the next value and return it.
  237. * @param val Pointer to a variable to receive the value.
  238. * @return @em true if a value was removed from the queue, @em false if
  239. * the queue is empty.
  240. */
  241. bool try_get(value_type* val) {
  242. if (!val)
  243. return false;
  244. unique_guard g(lock_);
  245. if (que_.empty())
  246. return false;
  247. *val = std::move(que_.front());
  248. que_.pop();
  249. g.unlock();
  250. notFullCond_.notify_one();
  251. return true;
  252. }
  253. /**
  254. * Attempt to remove an item from the queue for a bounded amount of time.
  255. * This will retrieve the next item from the queue. If the queue is
  256. * empty, it will wait the specified amount of time for an item to arrive
  257. * before timing out.
  258. * @param val Pointer to a variable to receive the value.
  259. * @param relTime The amount of time to wait until timing out.
  260. * @return @em true if the value was removed the queue, @em false if a
  261. * timeout occurred.
  262. */
  263. template <typename Rep, class Period>
  264. bool try_get_for(value_type* val, const std::chrono::duration<Rep, Period>& relTime) {
  265. if (!val)
  266. return false;
  267. unique_guard g(lock_);
  268. if (!notEmptyCond_.wait_for(g, relTime, [this]{return !que_.empty();}))
  269. return false;
  270. *val = std::move(que_.front());
  271. que_.pop();
  272. g.unlock();
  273. notFullCond_.notify_one();
  274. return true;
  275. }
  276. /**
  277. * Attempt to remove an item from the queue for a bounded amount of time.
  278. * This will retrieve the next item from the queue. If the queue is
  279. * empty, it will wait until the specified time for an item to arrive
  280. * before timing out.
  281. * @param val Pointer to a variable to receive the value.
  282. * @param absTime The absolute time to wait to before timing out.
  283. * @return @em true if the value was removed from the queue, @em false
  284. * if a timeout occurred.
  285. */
  286. template <class Clock, class Duration>
  287. bool try_get_until(value_type* val, const std::chrono::time_point<Clock,Duration>& absTime) {
  288. if (!val)
  289. return false;
  290. unique_guard g(lock_);
  291. if (!notEmptyCond_.wait_until(g, absTime, [this]{return !que_.empty();}))
  292. return false;
  293. *val = std::move(que_.front());
  294. que_.pop();
  295. g.unlock();
  296. notFullCond_.notify_one();
  297. return true;
  298. }
  299. };
  300. /////////////////////////////////////////////////////////////////////////////
  301. // end namespace mqtt
  302. }
  303. #endif // __mqtt_thread_queue_h