123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329 |
- #ifndef __mqtt_thread_queue_h
- #define __mqtt_thread_queue_h
- #include <thread>
- #include <mutex>
- #include <condition_variable>
- #include <limits>
- #include <deque>
- #include <queue>
- #include <algorithm>
- namespace mqtt {
- template <typename T, class Container=std::deque<T>>
- class thread_queue
- {
- public:
-
- using container_type = Container;
-
- using value_type = T;
-
- using size_type = typename Container::size_type;
-
- static constexpr size_type MAX_CAPACITY = std::numeric_limits<size_type>::max();
- private:
-
- mutable std::mutex lock_;
-
- std::condition_variable notEmptyCond_;
-
- std::condition_variable notFullCond_;
-
- size_type cap_;
-
- std::queue<T,Container> que_;
-
- using guard = std::lock_guard<std::mutex>;
-
- using unique_guard = std::unique_lock<std::mutex>;
- public:
-
- thread_queue() : cap_(MAX_CAPACITY) {}
-
- explicit thread_queue(size_t cap) : cap_(std::max<size_type>(cap, 1)) {}
-
- bool empty() const {
- guard g(lock_);
- return que_.empty();
- }
-
- size_type capacity() const {
- guard g(lock_);
- return cap_;
- }
-
- void capacity(size_type cap) {
- guard g(lock_);
- cap_ = cap;
- }
-
- size_type size() const {
- guard g(lock_);
- return que_.size();
- }
-
- void put(value_type val) {
- unique_guard g(lock_);
- notFullCond_.wait(g, [this]{return que_.size() < cap_;});
- que_.emplace(std::move(val));
- g.unlock();
- notEmptyCond_.notify_one();
- }
-
- bool try_put(value_type val) {
- unique_guard g(lock_);
- if (que_.size() >= cap_)
- return false;
- que_.emplace(std::move(val));
- g.unlock();
- notEmptyCond_.notify_one();
- return true;
- }
-
- template <typename Rep, class Period>
- bool try_put_for(value_type val, const std::chrono::duration<Rep, Period>& relTime) {
- unique_guard g(lock_);
- if (!notFullCond_.wait_for(g, relTime, [this]{return que_.size() < cap_;}))
- return false;
- que_.emplace(std::move(val));
- g.unlock();
- notEmptyCond_.notify_one();
- return true;
- }
-
- template <class Clock, class Duration>
- bool try_put_until(value_type val, const std::chrono::time_point<Clock,Duration>& absTime) {
- unique_guard g(lock_);
- if (!notFullCond_.wait_until(g, absTime, [this]{return que_.size() < cap_;}))
- return false;
- que_.emplace(std::move(val));
- g.unlock();
- notEmptyCond_.notify_one();
- return true;
- }
-
- void get(value_type* val) {
- if (!val)
- return;
- unique_guard g(lock_);
- notEmptyCond_.wait(g, [this]{return !que_.empty();});
- *val = std::move(que_.front());
- que_.pop();
- g.unlock();
- notFullCond_.notify_one();
- }
-
- value_type get() {
- unique_guard g(lock_);
- notEmptyCond_.wait(g, [this]{return !que_.empty();});
- value_type val = std::move(que_.front());
- que_.pop();
- g.unlock();
- notFullCond_.notify_one();
- return val;
- }
-
- bool try_get(value_type* val) {
- if (!val)
- return false;
- unique_guard g(lock_);
- if (que_.empty())
- return false;
- *val = std::move(que_.front());
- que_.pop();
- g.unlock();
- notFullCond_.notify_one();
- return true;
- }
-
- template <typename Rep, class Period>
- bool try_get_for(value_type* val, const std::chrono::duration<Rep, Period>& relTime) {
- if (!val)
- return false;
- unique_guard g(lock_);
- if (!notEmptyCond_.wait_for(g, relTime, [this]{return !que_.empty();}))
- return false;
- *val = std::move(que_.front());
- que_.pop();
- g.unlock();
- notFullCond_.notify_one();
- return true;
- }
-
- template <class Clock, class Duration>
- bool try_get_until(value_type* val, const std::chrono::time_point<Clock,Duration>& absTime) {
- if (!val)
- return false;
- unique_guard g(lock_);
- if (!notEmptyCond_.wait_until(g, absTime, [this]{return !que_.empty();}))
- return false;
- *val = std::move(que_.front());
- que_.pop();
- g.unlock();
- notFullCond_.notify_one();
- return true;
- }
- };
- }
- #endif
|