QuantumLibrary
quantum_shared_state_impl.h
1 /*
2 ** Copyright 2018 Bloomberg Finance L.P.
3 **
4 ** Licensed under the Apache License, Version 2.0 (the "License");
5 ** you may not use this file except in compliance with the License.
6 ** You may obtain a copy of the License at
7 **
8 ** http://www.apache.org/licenses/LICENSE-2.0
9 **
10 ** Unless required by applicable law or agreed to in writing, software
11 ** distributed under the License is distributed on an "AS IS" BASIS,
12 ** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 ** See the License for the specific language governing permissions and
14 ** limitations under the License.
15 */
16 //NOTE: DO NOT INCLUDE DIRECTLY
17 
18 //##############################################################################################
19 //#################################### IMPLEMENTATIONS #########################################
20 //##############################################################################################
21 
22 namespace Bloomberg {
23 namespace quantum {
24 
25 //==============================================================================================
26 // class SharedState
27 //==============================================================================================
28 template <class T>
29 SharedState<T>::SharedState() :
31  _value(T())
32 {
33 }
34 
35 template <class T>
36 template <class V>
37 int SharedState<T>::set(V&& value)
38 {
39  {
40  //========= LOCKED SCOPE =========
41  Mutex::Guard lock(_mutex);
42  if (_state != FutureState::PromiseNotSatisfied)
43  {
44  ThrowFutureException(_state);
45  }
46  _value = std::forward<V>(value);
47  _state = FutureState::PromiseAlreadySatisfied;
48  }
49  _cond.notifyAll();
50  return 0;
51 }
52 
53 template <class T>
54 template <class V>
55 int SharedState<T>::set(ICoroSync::Ptr sync, V&& value)
56 {
57  {
58  //========= LOCKED SCOPE =========
59  Mutex::Guard lock(sync, _mutex);
60  if (_state != FutureState::PromiseNotSatisfied)
61  {
62  ThrowFutureException(_state);
63  }
64  _value = std::forward<V>(value);
65  _state = FutureState::PromiseAlreadySatisfied;
66  }
67  _cond.notifyAll();
68  return 0;
69 }
70 
71 template <class T>
73 {
74  //========= LOCKED SCOPE =========
75  Mutex::Guard lock(_mutex);
76  conditionWait();
77  _state = FutureState::FutureAlreadyRetrieved;
78  return std::move(_value);
79 }
80 
81 template <class T>
82 const T& SharedState<T>::getRef() const
83 {
84  //========= LOCKED SCOPE =========
85  Mutex::Guard lock(_mutex);
86  conditionWait();
87  return _value;
88 }
89 
90 template <class T>
92 {
93  //========= LOCKED SCOPE =========
94  Mutex::Guard lock(sync, _mutex);
95  conditionWait(sync);
96  _state = FutureState::FutureAlreadyRetrieved;
97  return std::move(_value);
98 }
99 
100 template <class T>
102 {
103  //========= LOCKED SCOPE =========
104  Mutex::Guard lock(sync, _mutex);
105  conditionWait(sync);
106  return _value;
107 }
108 
109 template <class T>
111 {
112  {//========= LOCKED SCOPE =========
113  Mutex::Guard lock(_mutex);
114  if (_state == FutureState::PromiseNotSatisfied)
115  {
116  _state = FutureState::BrokenPromise;
117  }
118  }
119  _cond.notifyAll();
120 }
121 
122 template <class T>
124 {
125  //========= LOCKED SCOPE =========
126  Mutex::Guard lock(_mutex);
127  _cond.wait(_mutex, [this]()->bool
128  {
129  return stateHasChanged();
130  });
131 }
132 
133 template <class T>
135 {
136  //========= LOCKED SCOPE =========
137  Mutex::Guard lock(sync, _mutex);
138  _cond.wait(sync, _mutex, [this]()->bool
139  {
140  return stateHasChanged();
141  });
142 }
143 
144 template <class T>
145 template<class REP, class PERIOD>
146 std::future_status SharedState<T>::waitFor(const std::chrono::duration<REP, PERIOD> &time) const
147 {
148  //========= LOCKED SCOPE =========
149  Mutex::Guard lock(_mutex);
150  _cond.waitFor(_mutex, time, [this]()->bool
151  {
152  return stateHasChanged();
153  });
154  return _state == FutureState::PromiseNotSatisfied ? std::future_status::timeout : std::future_status::ready;
155 }
156 
157 template <class T>
158 template<class REP, class PERIOD>
159 std::future_status SharedState<T>::waitFor(ICoroSync::Ptr sync,
160  const std::chrono::duration<REP, PERIOD> &time) const
161 {
162  //========= LOCKED SCOPE =========
163  Mutex::Guard lock(sync, _mutex);
164  _cond.waitFor(sync, _mutex, time, [this]()->bool
165  {
166  return stateHasChanged();
167  });
168  return _state == FutureState::PromiseNotSatisfied ? std::future_status::timeout : std::future_status::ready;
169 }
170 
171 template <class T>
172 int SharedState<T>::setException(std::exception_ptr ex)
173 {
174  {//========= LOCKED SCOPE =========
175  Mutex::Guard lock(_mutex);
176  _exception = ex;
177  }
178  _cond.notifyAll();
179  return -1;
180 }
181 
182 template <class T>
184  std::exception_ptr ex)
185 {
186  {//========= LOCKED SCOPE =========
187  Mutex::Guard lock(sync, _mutex);
188  _exception = ex;
189  }
190  _cond.notifyAll();
191  return -1;
192 }
193 
194 template <class T>
196 {
197  _cond.wait(_mutex, [this]()->bool
198  {
199  return stateHasChanged();
200  });
201  checkPromiseState();
202 }
203 
204 template <class T>
205 void SharedState<T>::conditionWait(ICoroSync::Ptr sync) const
206 {
207  _cond.wait(sync, _mutex, [this]()->bool
208  {
209  return stateHasChanged();
210  });
211  checkPromiseState();
212 }
213 
214 template <class T>
215 void SharedState<T>::checkPromiseState() const
216 {
217  if (_exception)
218  {
219  std::rethrow_exception(_exception);
220  }
221  if ((_state == FutureState::BrokenPromise) || (_state == FutureState::FutureAlreadyRetrieved))
222  {
223  ThrowFutureException(_state);
224  }
225 }
226 
227 template <class T>
228 bool SharedState<T>::stateHasChanged() const
229 {
230  return (_state != FutureState::PromiseNotSatisfied) || (_exception != nullptr);
231 }
232 
233 //==============================================================================================
234 // class SharedState<Buffer> (partial specialization)
235 //==============================================================================================
236 template <class T>
237 SharedState<Buffer<T>>::SharedState() :
239 {
240 }
241 
242 template <class T>
243 void SharedState<Buffer<T>>::breakPromise()
244 {
245  {//========= LOCKED SCOPE =========
246  Mutex::Guard lock(_mutex);
247  if ((_state == FutureState::PromiseNotSatisfied) ||
248  (_state == FutureState::BufferingData))
249  {
251  }
252  }
253  _cond.notifyAll();
254 }
255 
256 template <class T>
257 void SharedState<Buffer<T>>::wait() const
258 {
259  if (!_reader.empty())
260  {
261  return; //there is still data available
262  }
263  //========= LOCKED SCOPE =========
264  Mutex::Guard lock(_mutex);
265  _cond.wait(_mutex, [this]()->bool
266  {
267  BufferStatus status = _writer.empty() ?
269  return stateHasChanged(status);
270  });
271 }
272 
273 template <class T>
274 void SharedState<Buffer<T>>::wait(ICoroSync::Ptr sync) const
275 {
276  if (!_reader.empty())
277  {
278  return; //there is still data available
279  }
280  //========= LOCKED SCOPE =========
281  Mutex::Guard lock(sync, _mutex);
282  _cond.wait(sync, _mutex, [this]()->bool
283  {
284  BufferStatus status = _writer.empty() ?
286  return stateHasChanged(status);
287  });
288 }
289 
290 template <class T>
291 template<class REP, class PERIOD>
292 std::future_status SharedState<Buffer<T>>::waitFor(const std::chrono::duration<REP, PERIOD> &time) const
293 {
294  if (!_reader.empty())
295  {
296  return std::future_status::ready; //there is still data available
297  }
298  //========= LOCKED SCOPE =========
299  Mutex::Guard lock(_mutex);
300  _cond.waitFor(_mutex, time, [this]()->bool
301  {
302  BufferStatus status = _writer.empty() ?
304  return stateHasChanged(status);
305  });
306  return (_writer.empty() && !_writer.isClosed()) ? std::future_status::timeout : std::future_status::ready;
307 }
308 
309 template <class T>
310 template<class REP, class PERIOD>
311 std::future_status SharedState<Buffer<T>>::waitFor(ICoroSync::Ptr sync,
312  const std::chrono::duration<REP, PERIOD> &time) const
313 {
314  if (!_reader.empty())
315  {
316  return std::future_status::ready; //there is still data available
317  }
318  //========= LOCKED SCOPE =========
319  Mutex::Guard lock(sync, _mutex);
320  _cond.waitFor(sync, _mutex, time, [this]()->bool
321  {
322  BufferStatus status = _writer.empty() ?
324  return stateHasChanged(status);
325  });
326  return (_writer.empty() && !_writer.isClosed()) ? std::future_status::timeout : std::future_status::ready;
327 }
328 
329 template <class T>
330 int SharedState<Buffer<T>>::setException(std::exception_ptr ex)
331 {
332  {//========= LOCKED SCOPE =========
333  Mutex::Guard lock(_mutex);
334  _exception = ex;
335  }
336  _cond.notifyAll();
337  return -1;
338 }
339 
340 template <class T>
341 int SharedState<Buffer<T>>::setException(ICoroSync::Ptr sync,
342  std::exception_ptr ex)
343 {
344  {//========= LOCKED SCOPE =========
345  Mutex::Guard lock(sync, _mutex);
346  _exception = ex;
347  }
348  _cond.notifyAll();
349  return -1;
350 }
351 
352 template <class T>
353 template <class V>
354 void SharedState<Buffer<T>>::push(V&& value)
355 {
356  {//========= LOCKED SCOPE =========
357  Mutex::Guard lock(_mutex);
358  if ((_state != FutureState::PromiseNotSatisfied) && (_state != FutureState::BufferingData))
359  {
360  ThrowFutureException(_state);
361  }
362  BufferStatus status = _writer.push(std::forward<V>(value));
363  if (status == BufferStatus::Closed)
364  {
366  }
368  }
369  _cond.notifyAll();
370 }
371 
372 template <class T>
373 template <class V>
374 void SharedState<Buffer<T>>::push(ICoroSync::Ptr sync, V&& value)
375 {
376  {//========= LOCKED SCOPE =========
377  Mutex::Guard lock(sync, _mutex);
378  if ((_state != FutureState::PromiseNotSatisfied) && (_state != FutureState::BufferingData))
379  {
380  ThrowFutureException(_state);
381  }
382  BufferStatus status = _writer.push(std::forward<V>(value));
383  if (status == BufferStatus::Closed)
384  {
386  }
388  }
389  _cond.notifyAll();
390 }
391 
392 template <class T>
393 T SharedState<Buffer<T>>::pull(bool& isBufferClosed)
394 {
395  T out{};
396  if (!_reader.empty())
397  {
398  T out{};
399  _reader.pull(out);
400  return out;
401  }
402  {//========= LOCKED SCOPE =========
403  Mutex::Guard lock(_mutex);
404  _cond.wait(_mutex, [this]()->bool
405  {
406  BufferStatus status = _writer.empty() ?
408  bool changed = stateHasChanged(status);
409  if (changed) {
410  // Move the writer to the reader for consumption
411  _reader = std::move(_writer);
412  }
413  return changed;
414  });
415  }
416  isBufferClosed = _reader.empty() && _reader.isClosed();
417  if (isBufferClosed) {
418  //Mark the future as fully retrieved
420  return out;
421  }
422  _reader.pull(out);
423  checkPromiseState();
424  return out;
425 }
426 
427 template <class T>
428 T SharedState<Buffer<T>>::pull(ICoroSync::Ptr sync, bool& isBufferClosed)
429 {
430  T out{};
431  if (!_reader.empty())
432  {
433  T out{};
434  _reader.pull(out);
435  return out;
436  }
437  {//========= LOCKED SCOPE =========
438  Mutex::Guard lock(_mutex);
439  _cond.wait(sync, _mutex, [this]()->bool
440  {
441  BufferStatus status = _writer.empty() ?
443  bool changed = stateHasChanged(status);
444  if (changed) {
445  // Move the writer to the reader for consumption
446  _reader = std::move(_writer);
447  }
448  return changed;
449  });
450  }
451  isBufferClosed = _reader.empty() && _reader.isClosed();
452  if (isBufferClosed) {
453  //Mark the future as fully retrieved
455  return out;
456  }
457  _reader.pull(out);
458  checkPromiseState();
459  return out;
460 }
461 
462 template <class T>
463 int SharedState<Buffer<T>>::closeBuffer()
464 {
465  {//========= LOCKED SCOPE =========
466  Mutex::Guard lock(_mutex);
467  if ((_state == FutureState::PromiseNotSatisfied) || (_state == FutureState::BufferingData))
468  {
469  _state = FutureState::BufferClosed;
470  }
471  _writer.close();
472  }
473  _cond.notifyAll();
474  return 0;
475 }
476 
477 template <class T>
478 void SharedState<Buffer<T>>::checkPromiseState() const
479 {
480  if (_exception)
481  {
482  std::rethrow_exception(_exception);
483  }
485  {
486  ThrowFutureException(_state);
487  }
488 }
489 
490 template <class T>
491 bool SharedState<Buffer<T>>::stateHasChanged(BufferStatus status) const
492 {
493  return ((status == BufferStatus::DataPosted) || (status == BufferStatus::DataReceived) || (status == BufferStatus::Closed)) ||
495  (_exception != nullptr);
496 }
497 
498 }}
Buffer is closed. Push operations are not allowed. Pull operations are allowed until buffer is emtpy.
Definition: quantum_buffer_impl.h:22
Shared state used between a Promise and a Future to exchange values.
Definition: quantum_shared_state.h:38
Data has been successfully read from the buffer.
Data has been successfully written to the buffer.
Promise could not be fulfilled.
std::shared_ptr< ICoroSync > Ptr
Definition: quantum_icoro_sync.h:36
Buffered future is being streamed.
void wait() const
Definition: quantum_shared_state_impl.h:123
Buffer is empty and more data is on the way.
Future value has been consumed. In the case of a buffer, no pulling is allowed.
void ThrowFutureException(FutureState state)
Definition: quantum_future_state.h:130
FutureState
Represents the internal state of a future/promise pair. Modeled after std::future_errc.
Definition: quantum_future_state.h:31
Definition: quantum_mutex.h:73
Buffer is closed for pushing data. Data can still be pulled.
BufferStatus
Defines the result of the operation on the buffer object.
Definition: quantum_buffer.h:32