QuantumLibrary
quantum_condition_variable_impl.h
1 /*
2 ** Copyright 2018 Bloomberg Finance L.P.
3 **
4 ** Licensed under the Apache License, Version 2.0 (the "License");
5 ** you may not use this file except in compliance with the License.
6 ** You may obtain a copy of the License at
7 **
8 ** http://www.apache.org/licenses/LICENSE-2.0
9 **
10 ** Unless required by applicable law or agreed to in writing, software
11 ** distributed under the License is distributed on an "AS IS" BASIS,
12 ** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 ** See the License for the specific language governing permissions and
14 ** limitations under the License.
15 */
16 //NOTE: DO NOT INCLUDE DIRECTLY
17 
18 //##############################################################################################
19 //#################################### IMPLEMENTATIONS #########################################
20 //##############################################################################################
21 
22 namespace Bloomberg {
23 namespace quantum {
24 
25 thread_local static std::atomic_int s_threadSignal{-1}; //thread specific (non-coroutine)
26 
27 inline
29  _destroyed(false)
30 {}
31 
32 inline
34 {
35  Mutex::Guard lock(_thisLock);
36  _destroyed = true;
37 }
38 
39 inline
41 {
42  //LOCKED OR UNLOCKED SCOPE
43  Mutex::Guard lock(_thisLock);
44  if (_waiters.empty())
45  {
46  return;
47  }
48  (*_waiters.front()) = 1;
49  _waiters.pop_front();
50 }
51 
52 inline
54 {
55  //LOCKED OR UNLOCKED SCOPE
56  Mutex::Guard lock(_thisLock);
57  for (auto&& waiter : _waiters)
58  {
59  (*waiter) = 1;
60  }
61  _waiters.clear();
62 }
63 
64 inline
66 {
67  waitImpl(YieldingThread(), mutex, s_threadSignal);
68 }
69 
70 inline
72 {
73  waitImpl(sync->getYieldHandle(), mutex, sync->signal());
74 }
75 
76 template <class PREDICATE>
78  PREDICATE predicate)
79 {
80  waitImpl(YieldingThread(), mutex, predicate, s_threadSignal);
81 }
82 
83 template <class PREDICATE>
85  Mutex& mutex,
86  PREDICATE predicate)
87 {
88  waitImpl(sync->getYieldHandle(), mutex, predicate, sync->signal());
89 }
90 
91 template <class REP, class PERIOD>
93  const std::chrono::duration<REP, PERIOD>& time)
94 {
95  return waitForImpl(YieldingThread(), mutex, time, s_threadSignal);
96 }
97 
98 template <class REP, class PERIOD>
100  Mutex& mutex,
101  const std::chrono::duration<REP, PERIOD>& time)
102 {
103  return waitForImpl(sync->getYieldHandle(), mutex, time, sync->signal());
104 }
105 
106 template <class REP, class PERIOD, class PREDICATE>
108  const std::chrono::duration<REP, PERIOD>& time,
109  PREDICATE predicate)
110 {
111  return waitForImpl(YieldingThread(), mutex, time, predicate, s_threadSignal);
112 }
113 
114 template <class REP, class PERIOD, class PREDICATE>
116  Mutex& mutex,
117  const std::chrono::duration<REP, PERIOD>& time,
118  PREDICATE predicate)
119 {
120  return waitForImpl(sync->getYieldHandle(), mutex, time, predicate, sync->signal());
121 }
122 
123 template <class YIELDING>
124 void ConditionVariable::waitImpl(YIELDING&& yield,
125  Mutex& mutex,
126  std::atomic_int& signal)
127 {
128  {//========= LOCKED SCOPE =========
129  Mutex::Guard lock(_thisLock);
130  if (_destroyed)
131  {
132  return; //don't release the mutex
133  }
134  signal = 0; //clear signal flag
135  _waiters.push_back(&signal);
136  }
137  //========= UNLOCKED SCOPE =========
138  Mutex::ReverseGuard unlock(mutex);
139  while ((signal == 0) && !_destroyed)
140  {
141  yield();
142  }
143  signal = -1; //reset
144 }
145 
146 template <class YIELDING, class PREDICATE>
147 void ConditionVariable::waitImpl(YIELDING&& yield,
148  Mutex& mutex,
149  PREDICATE predicate,
150  std::atomic_int& signal)
151 {
152  while (!predicate() && !_destroyed)
153  {
154  waitImpl(std::forward<YIELDING>(yield), mutex, signal);
155  }
156 }
157 
158 template <class YIELDING, class REP, class PERIOD>
159 bool ConditionVariable::waitForImpl(YIELDING&& yield,
160  Mutex& mutex,
161  std::chrono::duration<REP, PERIOD>& time,
162  std::atomic_int& signal)
163 {
164  {//========= LOCKED SCOPE =========
165  Mutex::Guard lock(_thisLock);
166  if (_destroyed)
167  {
168  return true; //don't release the mutex
169  }
170  if (time == std::chrono::duration<REP, PERIOD>::zero())
171  {
172  return false; //timeout
173  }
174  signal = 0; //clear signal flag
175  _waiters.push_back(&signal);
176  }
177  //========= UNLOCKED SCOPE =========
178  Mutex::ReverseGuard unlock(mutex);
179  auto start = std::chrono::high_resolution_clock::now();
180  auto elapsed = std::chrono::duration<REP, PERIOD>::zero();
181  bool timeout = false;
182 
183  //wait until signalled or times out
184  while ((signal == 0) && !_destroyed)
185  {
186  yield();
187  elapsed = std::chrono::duration_cast<std::chrono::duration<REP, PERIOD>>(std::chrono::high_resolution_clock::now() - start);
188  if (elapsed >= time)
189  {
190  timeout = true;
191  break; //expired time
192  }
193  }
194 
195  signal = -1; //reset signal flag
196 
197  //adjust duration or set to zero if nothing remains
198  time = timeout ? std::chrono::duration<REP, PERIOD>::zero() : time - elapsed;
199  return !timeout;
200 }
201 
202 template <class YIELDING, class REP, class PERIOD, class PREDICATE>
203 bool ConditionVariable::waitForImpl(YIELDING&& yield,
204  Mutex& mutex,
205  const std::chrono::duration<REP, PERIOD>& time,
206  PREDICATE predicate,
207  std::atomic_int& signal)
208 {
209  if (time > std::chrono::duration<REP, PERIOD>(0)) {
210  auto duration = time;
211  while (!predicate() && !_destroyed)
212  {
213  if (!waitForImpl(std::forward<YIELDING>(yield), mutex, duration, signal))
214  {
215  //timeout
216  return predicate();
217  }
218  }
219  }
220  return true; //duration has not yet expired
221 }
222 
223 }}
void notifyOne()
Notify one waiting thread or coroutine.
Definition: quantum_condition_variable_impl.h:40
Definition: quantum_buffer_impl.h:22
void notifyAll()
Notify all waiting threads and coroutines.
Definition: quantum_condition_variable_impl.h:53
std::shared_ptr< ICoroSync > Ptr
Definition: quantum_icoro_sync.h:36
~ConditionVariable()
Destructor.
Definition: quantum_condition_variable_impl.h:33
YieldingThreadDuration< std::chrono::microseconds > YieldingThread
Definition: quantum_yielding_thread.h:57
Opposite form of RAII-style mechanism for mutex ownership. Releases a mutex on construction and acqui...
ConditionVariable()
Default constructor.
Definition: quantum_condition_variable_impl.h:28
RAII-style mechanism for mutex ownership. Acquires a mutex on construction and releases it inside the...
Definition: quantum_mutex.h:73
bool waitFor(Mutex &mutex, const std::chrono::duration< REP, PERIOD > &time)
Block the current thread until the condition is signalled via notifyOne() or notifyAll() or until 'ti...
Definition: quantum_condition_variable_impl.h:92
void wait(Mutex &mutex)
Block the current thread until the condition is signalled via notifyOne() or notifyAll().
Definition: quantum_condition_variable_impl.h:65