condition.hpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. //////////////////////////////////////////////////////////////////////////////
  2. //
  3. // (C) Copyright Ion Gaztanaga 2005-2012. Distributed under the Boost
  4. // Software License, Version 1.0. (See accompanying file
  5. // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // See http://www.boost.org/libs/interprocess for documentation.
  8. //
  9. //////////////////////////////////////////////////////////////////////////////
  10. #ifndef BOOST_INTERPROCESS_DETAIL_SPIN_CONDITION_HPP
  11. #define BOOST_INTERPROCESS_DETAIL_SPIN_CONDITION_HPP
  12. #ifndef BOOST_CONFIG_HPP
  13. # include <boost/config.hpp>
  14. #endif
  15. #
  16. #if defined(BOOST_HAS_PRAGMA_ONCE)
  17. # pragma once
  18. #endif
  19. #include <boost/interprocess/detail/config_begin.hpp>
  20. #include <boost/interprocess/detail/workaround.hpp>
  21. #include <boost/interprocess/sync/cv_status.hpp>
  22. #include <boost/interprocess/sync/spin/mutex.hpp>
  23. #include <boost/interprocess/detail/atomic.hpp>
  24. #include <boost/interprocess/sync/scoped_lock.hpp>
  25. #include <boost/interprocess/exceptions.hpp>
  26. #include <boost/interprocess/detail/os_thread_functions.hpp>
  27. #include <boost/interprocess/detail/timed_utils.hpp>
  28. #include <boost/interprocess/sync/spin/wait.hpp>
  29. #include <boost/move/utility_core.hpp>
  30. #include <boost/cstdint.hpp>
  31. namespace boost {
  32. namespace interprocess {
  33. namespace ipcdetail {
  34. class spin_condition
  35. {
  36. spin_condition(const spin_condition &);
  37. spin_condition &operator=(const spin_condition &);
  38. public:
  39. spin_condition()
  40. {
  41. //Note that this class is initialized to zero.
  42. //So zeroed memory can be interpreted as an initialized
  43. //condition variable
  44. m_command = SLEEP;
  45. m_num_waiters = 0;
  46. }
  47. ~spin_condition()
  48. {
  49. //Notify all waiting threads
  50. //to allow POSIX semantics on condition destruction
  51. this->notify_all();
  52. }
  53. void notify_one()
  54. { this->notify(NOTIFY_ONE); }
  55. void notify_all()
  56. { this->notify(NOTIFY_ALL); }
  57. template <typename L>
  58. void wait(L& lock)
  59. {
  60. if (!lock)
  61. throw lock_exception();
  62. this->do_timed_wait_impl<false>(0, *lock.mutex());
  63. }
  64. template <typename L, typename Pr>
  65. void wait(L& lock, Pr pred)
  66. {
  67. if (!lock)
  68. throw lock_exception();
  69. while (!pred())
  70. this->do_timed_wait_impl<false>(0, *lock.mutex());
  71. }
  72. template <typename L, typename TimePoint>
  73. bool timed_wait(L& lock, const TimePoint &abs_time)
  74. {
  75. if (!lock)
  76. throw lock_exception();
  77. //Handle infinity absolute time here to avoid complications in do_timed_wait
  78. if(is_pos_infinity(abs_time)){
  79. this->wait(lock);
  80. return true;
  81. }
  82. return this->do_timed_wait_impl<true>(abs_time, *lock.mutex());
  83. }
  84. template <typename L, typename TimePoint, typename Pr>
  85. bool timed_wait(L& lock, const TimePoint &abs_time, Pr pred)
  86. {
  87. if (!lock)
  88. throw lock_exception();
  89. //Handle infinity absolute time here to avoid complications in do_timed_wait
  90. if(is_pos_infinity(abs_time)){
  91. this->wait(lock, pred);
  92. return true;
  93. }
  94. while (!pred()){
  95. if (!this->do_timed_wait_impl<true>(abs_time, *lock.mutex()))
  96. return pred();
  97. }
  98. return true;
  99. }
  100. template <typename L, class TimePoint>
  101. cv_status wait_until(L& lock, const TimePoint &abs_time)
  102. { return this->timed_wait(lock, abs_time) ? cv_status::no_timeout : cv_status::timeout; }
  103. template <typename L, class TimePoint, typename Pr>
  104. bool wait_until(L& lock, const TimePoint &abs_time, Pr pred)
  105. { return this->timed_wait(lock, abs_time, pred); }
  106. template <typename L, class Duration>
  107. cv_status wait_for(L& lock, const Duration &dur)
  108. { return this->wait_until(lock, duration_to_ustime(dur)); }
  109. template <typename L, class Duration, typename Pr>
  110. bool wait_for(L& lock, const Duration &dur, Pr pred)
  111. { return this->wait_until(lock, duration_to_ustime(dur), pred); }
  112. private:
  113. template<bool TimeoutEnabled, class InterprocessMutex, class TimePoint>
  114. bool do_timed_wait_impl(const TimePoint &abs_time, InterprocessMutex &mut)
  115. {
  116. typedef boost::interprocess::scoped_lock<spin_mutex> InternalLock;
  117. //The enter mutex guarantees that while executing a notification,
  118. //no other thread can execute the do_timed_wait method.
  119. {
  120. //---------------------------------------------------------------
  121. InternalLock lock;
  122. get_lock(bool_<TimeoutEnabled>(), m_enter_mut, lock, abs_time);
  123. if(!lock)
  124. return false;
  125. //---------------------------------------------------------------
  126. //We increment the waiting thread count protected so that it will be
  127. //always constant when another thread enters the notification logic.
  128. //The increment marks this thread as "waiting on spin_condition"
  129. atomic_inc32(const_cast<boost::uint32_t*>(&m_num_waiters));
  130. //We unlock the external mutex atomically with the increment
  131. mut.unlock();
  132. }
  133. //By default, we suppose that no timeout has happened
  134. bool timed_out = false, unlock_enter_mut= false;
  135. //Loop until a notification indicates that the thread should
  136. //exit or timeout occurs
  137. while(1){
  138. //The thread sleeps/spins until a spin_condition commands a notification
  139. //Notification occurred, we will lock the checking mutex so that
  140. spin_wait swait;
  141. while(atomic_read32(&m_command) == SLEEP){
  142. swait.yield();
  143. //Check for timeout
  144. if(TimeoutEnabled){
  145. TimePoint now = get_now<TimePoint>(bool_<TimeoutEnabled>());
  146. if(now >= abs_time){
  147. //If we can lock the mutex it means that no notification
  148. //is being executed in this spin_condition variable
  149. timed_out = m_enter_mut.try_lock();
  150. //If locking fails, indicates that another thread is executing
  151. //notification, so we play the notification game
  152. if(!timed_out){
  153. //There is an ongoing notification, we will try again later
  154. continue;
  155. }
  156. //No notification in execution, since enter mutex is locked.
  157. //We will execute time-out logic, so we will decrement count,
  158. //release the enter mutex and return false.
  159. break;
  160. }
  161. }
  162. }
  163. //If a timeout occurred, the mutex will not execute checking logic
  164. if(TimeoutEnabled && timed_out){
  165. //Decrement wait count
  166. atomic_dec32(const_cast<boost::uint32_t*>(&m_num_waiters));
  167. unlock_enter_mut = true;
  168. break;
  169. }
  170. else{
  171. boost::uint32_t result = atomic_cas32
  172. (const_cast<boost::uint32_t*>(&m_command), SLEEP, NOTIFY_ONE);
  173. if(result == SLEEP){
  174. //Other thread has been notified and since it was a NOTIFY one
  175. //command, this thread must sleep again
  176. continue;
  177. }
  178. else if(result == NOTIFY_ONE){
  179. //If it was a NOTIFY_ONE command, only this thread should
  180. //exit. This thread has atomically marked command as sleep before
  181. //so no other thread will exit.
  182. //Decrement wait count.
  183. unlock_enter_mut = true;
  184. atomic_dec32(const_cast<boost::uint32_t*>(&m_num_waiters));
  185. break;
  186. }
  187. else{
  188. //If it is a NOTIFY_ALL command, all threads should return
  189. //from do_timed_wait function. Decrement wait count.
  190. unlock_enter_mut = 1 == atomic_dec32(const_cast<boost::uint32_t*>(&m_num_waiters));
  191. //Check if this is the last thread of notify_all waiters
  192. //Only the last thread will release the mutex
  193. if(unlock_enter_mut){
  194. atomic_cas32(const_cast<boost::uint32_t*>(&m_command), SLEEP, NOTIFY_ALL);
  195. }
  196. break;
  197. }
  198. }
  199. }
  200. //Unlock the enter mutex if it is a single notification, if this is
  201. //the last notified thread in a notify_all or a timeout has occurred
  202. if(unlock_enter_mut){
  203. m_enter_mut.unlock();
  204. }
  205. //Lock external again before returning from the method
  206. mut.lock();
  207. return !timed_out;
  208. }
  209. template <class TimePoint>
  210. static TimePoint get_now(bool_<true>)
  211. { return microsec_clock<TimePoint>::universal_time(); }
  212. template <class TimePoint>
  213. static TimePoint get_now(bool_<false>)
  214. { return TimePoint(); }
  215. template <class Mutex, class Lock, class TimePoint>
  216. static void get_lock(bool_<true>, Mutex &m, Lock &lck, const TimePoint &abs_time)
  217. {
  218. Lock dummy(m, abs_time);
  219. lck = boost::move(dummy);
  220. }
  221. template <class Mutex, class Lock, class TimePoint>
  222. static void get_lock(bool_<false>, Mutex &m, Lock &lck, const TimePoint &)
  223. {
  224. Lock dummy(m);
  225. lck = boost::move(dummy);
  226. }
  227. void notify(boost::uint32_t command)
  228. {
  229. //This mutex guarantees that no other thread can enter to the
  230. //do_timed_wait method logic, so that thread count will be
  231. //constant until the function writes a NOTIFY_ALL command.
  232. //It also guarantees that no other notification can be signaled
  233. //on this spin_condition before this one ends
  234. m_enter_mut.lock();
  235. //Return if there are no waiters
  236. if(!atomic_read32(&m_num_waiters)) {
  237. m_enter_mut.unlock();
  238. return;
  239. }
  240. //Notify that all threads should execute wait logic
  241. spin_wait swait;
  242. while(SLEEP != atomic_cas32(const_cast<boost::uint32_t*>(&m_command), command, SLEEP)){
  243. swait.yield();
  244. }
  245. //The enter mutex will rest locked until the last waiting thread unlocks it
  246. }
  247. enum { SLEEP = 0, NOTIFY_ONE, NOTIFY_ALL };
  248. spin_mutex m_enter_mut;
  249. volatile boost::uint32_t m_command;
  250. volatile boost::uint32_t m_num_waiters;
  251. };
  252. } //namespace ipcdetail
  253. } //namespace interprocess
  254. } //namespace boost
  255. #include <boost/interprocess/detail/config_end.hpp>
  256. #endif //BOOST_INTERPROCESS_DETAIL_SPIN_CONDITION_HPP