QuantumLibrary
quantum_context.h
1 /*
2 ** Copyright 2018 Bloomberg Finance L.P.
3 **
4 ** Licensed under the Apache License, Version 2.0 (the "License");
5 ** you may not use this file except in compliance with the License.
6 ** You may obtain a copy of the License at
7 **
8 ** http://www.apache.org/licenses/LICENSE-2.0
9 **
10 ** Unless required by applicable law or agreed to in writing, software
11 ** distributed under the License is distributed on an "AS IS" BASIS,
12 ** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 ** See the License for the specific language governing permissions and
14 ** limitations under the License.
15 */
16 #ifndef BLOOMBERG_QUANTUM_CONTEXT_H
17 #define BLOOMBERG_QUANTUM_CONTEXT_H
18 
19 #include <quantum/quantum_promise.h>
20 #include <quantum/quantum_task.h>
21 #include <quantum/quantum_io_task.h>
22 #include <quantum/quantum_dispatcher_core.h>
23 #include <quantum/quantum_traits.h>
24 #include <iterator>
25 
26 namespace Bloomberg {
27 namespace quantum {
28 
29 //==============================================================================================
30 // class Context
31 //==============================================================================================
35 template <class RET>
36 class Context : public IThreadContext<RET>,
37  public ICoroContext<RET>,
38  public ITaskAccessor,
39  public std::enable_shared_from_this<Context<RET>>
40 {
41  friend struct Util;
42  friend class Task;
43  friend class Dispatcher;
44  template <class OTHER_RET> friend class Context;
45 
46 public:
47  using Ptr = std::shared_ptr<Context<RET>>;
50 
51  //===================================
52  // D'TOR
53  //===================================
54  ~Context();
55 
56  //===================================
57  // ITERMINATE
58  //===================================
59  void terminate() final;
60 
61  //===================================
62  // ITASKACCESSOR
63  //===================================
64  void setTask(ITask::Ptr task) final;
65  ITask::Ptr getTask() const final;
66  bool isBlocked() const final;
67  bool isSleeping(bool updateTimer = false) final;
68 
69  //===================================
70  // ICONTEXTBASE
71  //===================================
72  bool valid() const final;
73  bool validAt(int num) const final;
74  int setException(std::exception_ptr ex) final;
75 
76  //===================================
77  // ITHREADCONTEXTBASE
78  //===================================
79  void waitAt(int num) const final;
80  std::future_status waitForAt(int num, std::chrono::milliseconds timeMs) const final;
81  void wait() const final;
82  std::future_status waitFor(std::chrono::milliseconds timeMs) const final;
83  void waitAll() const final;
84 
85  //===================================
86  // ITHREADCONTEXT
87  //===================================
88  template <class V = RET>
89  NonBufferRetType<V> get();
90  template <class V = RET>
91  const NonBufferRetType<V>& getRef() const;
92  template <class V, class = NonBufferType<RET,V>>
93  int set(V&& value);
94  template <class V, class = BufferType<RET,V>>
95  void push(V&& value);
96  template <class V = RET>
97  BufferRetType<V> pull(bool& isBufferClosed);
98  template <class OTHER_RET>
99  NonBufferRetType<OTHER_RET> getAt(int num);
100  template <class OTHER_RET>
101  const NonBufferRetType<OTHER_RET>& getRefAt(int num) const;
102 
103  //===================================
104  // ICOROCONTEXTBASE
105  //===================================
106  void waitAt(int num, ICoroSync::Ptr sync) const final;
107  std::future_status waitForAt(int num, ICoroSync::Ptr sync, std::chrono::milliseconds timeMs) const final;
108  void wait(ICoroSync::Ptr sync) const final;
109  std::future_status waitFor(ICoroSync::Ptr sync, std::chrono::milliseconds timeMs) const final;
110  void waitAll(ICoroSync::Ptr sync) const final;
111 
112  //===================================
113  // ICOROCONTEXT
114  //===================================
115  template <class V = RET>
116  NonBufferRetType<V> get(ICoroSync::Ptr sync);
117  template <class V = RET>
118  const NonBufferRetType<V>& getRef(ICoroSync::Ptr sync) const;
119  template <class V, class = NonBufferType<RET,V>>
120  int set(ICoroSync::Ptr sync, V&& value);
121  template <class V, class = BufferType<RET,V>>
122  void push(ICoroSync::Ptr sync, V&& value);
123  template <class V = RET>
124  BufferRetType<V> pull(ICoroSync::Ptr sync, bool& isBufferClosed);
125  template <class OTHER_RET>
126  NonBufferRetType<OTHER_RET> getAt(int num, ICoroSync::Ptr sync);
127  template <class OTHER_RET>
128  const NonBufferRetType<OTHER_RET>& getRefAt(int num, ICoroSync::Ptr sync) const;
129  template <class OTHER_RET>
130  NonBufferRetType<OTHER_RET> getPrev(ICoroSync::Ptr sync);
131  template <class OTHER_RET>
132  const NonBufferRetType<OTHER_RET>& getPrevRef(ICoroSync::Ptr sync);
133 
134  //===================================
135  // ICOROSYNC
136  //===================================
137  void setYieldHandle(Traits::Yield& yield) final;
138  Traits::Yield& getYieldHandle() final;
139  void yield() final;
140  std::atomic_int& signal() final;
141  void sleep(const std::chrono::milliseconds& timeMs) final;
142  void sleep(const std::chrono::microseconds& timeUs) final;
143 
144  //===================================
145  // MISC IMPLEMENTATIONS
146  //===================================
147  template <class V = RET, class = BufferRetType<V>>
148  int closeBuffer();
149  int getNumCoroutineThreads() const;
150  int getNumIoThreads() const;
151  const std::pair<int, int>& getCoroQueueIdRangeForAny() const;
152 
153 
154  //===================================
155  // TASK CONTINUATIONS
156  //===================================
157  template <class OTHER_RET, class FUNC, class ... ARGS>
158  typename Context<OTHER_RET>::Ptr
159  post(FUNC&& func, ARGS&&... args);
160 
161  template <class OTHER_RET, class FUNC, class ... ARGS>
162  typename Context<OTHER_RET>::Ptr
163  post(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args);
164 
165  template <class OTHER_RET, class FUNC, class ... ARGS>
166  typename Context<OTHER_RET>::Ptr
167  postFirst(FUNC&& func, ARGS&&... args);
168 
169  template <class OTHER_RET, class FUNC, class ... ARGS>
170  typename Context<OTHER_RET>::Ptr
171  postFirst(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args);
172 
173  template <class OTHER_RET, class FUNC, class ... ARGS>
174  typename Context<OTHER_RET>::Ptr
175  then(FUNC&& func, ARGS&&... args);
176 
177  template <class OTHER_RET, class FUNC, class ... ARGS>
178  typename Context<OTHER_RET>::Ptr
179  onError(FUNC&& func, ARGS&&... args);
180 
181  template <class OTHER_RET, class FUNC, class ... ARGS>
182  typename Context<OTHER_RET>::Ptr
183  finally(FUNC&& func, ARGS&&... args);
184 
185  Ptr end();
186 
187  //===================================
188  // BLOCKING IO
189  //===================================
190  template <class OTHER_RET, class FUNC, class ... ARGS>
191  CoroFuturePtr<OTHER_RET>
192  postAsyncIo(FUNC&& func, ARGS&&... args);
193 
194  template <class OTHER_RET, class FUNC, class ... ARGS>
195  CoroFuturePtr<OTHER_RET>
196  postAsyncIo(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args);
197 
198  //===================================
199  // FOR EACH
200  //===================================
201  template <class OTHER_RET, class INPUT_IT, class = Traits::IsInputIterator<INPUT_IT>>
202  typename Context<std::vector<OTHER_RET>>::Ptr
203  forEach(INPUT_IT first, INPUT_IT last, Functions::ForEachFunc<OTHER_RET, INPUT_IT> func);
204 
205  template <class OTHER_RET, class INPUT_IT>
206  typename Context<std::vector<OTHER_RET>>::Ptr
207  forEach(INPUT_IT first, size_t num, Functions::ForEachFunc<OTHER_RET, INPUT_IT> func);
208 
209  template <class OTHER_RET, class INPUT_IT, class = Traits::IsInputIterator<INPUT_IT>>
210  typename Context<std::vector<std::vector<OTHER_RET>>>::Ptr
211  forEachBatch(INPUT_IT first, INPUT_IT last, Functions::ForEachFunc<OTHER_RET, INPUT_IT> func);
212 
213  template <class OTHER_RET, class INPUT_IT>
214  typename Context<std::vector<std::vector<OTHER_RET>>>::Ptr
215  forEachBatch(INPUT_IT first, size_t num, Functions::ForEachFunc<OTHER_RET, INPUT_IT> func);
216 
217  //===================================
218  // MAP REDUCE
219  //===================================
220  template <class KEY,
221  class MAPPED_TYPE,
222  class REDUCED_TYPE,
223  class INPUT_IT,
224  class = Traits::IsInputIterator<INPUT_IT>>
225  typename Context<std::map<KEY, REDUCED_TYPE>>::Ptr
226  mapReduce(INPUT_IT first,
227  INPUT_IT last,
228  Functions::MapFunc<KEY, MAPPED_TYPE, INPUT_IT> mapper,
229  Functions::ReduceFunc<KEY, MAPPED_TYPE, REDUCED_TYPE> reducer);
230 
231  template <class KEY,
232  class MAPPED_TYPE,
233  class REDUCED_TYPE,
234  class INPUT_IT>
235  typename Context<std::map<KEY, REDUCED_TYPE>>::Ptr
236  mapReduce(INPUT_IT first,
237  size_t num,
238  Functions::MapFunc<KEY, MAPPED_TYPE, INPUT_IT> mapper,
239  Functions::ReduceFunc<KEY, MAPPED_TYPE, REDUCED_TYPE> reducer);
240 
241  template <class KEY,
242  class MAPPED_TYPE,
243  class REDUCED_TYPE,
244  class INPUT_IT,
245  class = Traits::IsInputIterator<INPUT_IT>>
246  typename Context<std::map<KEY, REDUCED_TYPE>>::Ptr
247  mapReduceBatch(INPUT_IT first,
248  INPUT_IT last,
249  Functions::MapFunc<KEY, MAPPED_TYPE, INPUT_IT> mapper,
250  Functions::ReduceFunc<KEY, MAPPED_TYPE, REDUCED_TYPE> reducer);
251 
252  template <class KEY,
253  class MAPPED_TYPE,
254  class REDUCED_TYPE,
255  class INPUT_IT>
256  typename Context<std::map<KEY, REDUCED_TYPE>>::Ptr
257  mapReduceBatch(INPUT_IT first,
258  size_t num,
259  Functions::MapFunc<KEY, MAPPED_TYPE, INPUT_IT> mapper,
260  Functions::ReduceFunc<KEY, MAPPED_TYPE, REDUCED_TYPE> reducer);
261 
262  //===================================
263  // NEW / DELETE
264  //===================================
265  static void* operator new(size_t size);
266  static void operator delete(void* p);
267  static void deleter(Context<RET>* p);
268 
269 private:
270  explicit Context(DispatcherCore& dispatcher);
271 
272  template <class OTHER_RET>
273  Context(Context<OTHER_RET>& other);
274 
275  Context(IContextBase& other);
276 
277  template <class OTHER_RET, class FUNC, class ... ARGS>
278  typename Context<OTHER_RET>::Ptr
279  thenImpl(ITask::Type type, FUNC&& func, ARGS&&... args);
280 
281  template <class OTHER_RET, class FUNC, class ... ARGS>
282  typename Context<OTHER_RET>::Ptr
283  postImpl(int queueId, bool isHighPriority, ITask::Type type, FUNC&& func, ARGS&&... args);
284 
285  template <class OTHER_RET, class FUNC, class ... ARGS>
286  CoroFuturePtr<OTHER_RET>
287  postAsyncIoImpl(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args);
288 
289  int index(int num) const;
290 
291  void validateTaskType(ITask::Type type) const; //throws
292 
293  void validateContext(ICoroSync::Ptr sync) const; //throws
294 
295  //Members
296  ITask::Ptr _task;
297  std::vector<IPromiseBase::Ptr> _promises;
298  DispatcherCore* _dispatcher;
299  std::atomic_flag _terminated;
300  std::atomic_int _signal;
301  Traits::Yield* _yield;
302  std::chrono::microseconds _sleepDuration;
303  std::chrono::high_resolution_clock::time_point _sleepTimestamp;
304 };
305 
306 template <class RET>
307 using ContextPtr = typename Context<RET>::Ptr;
308 
309 }}
310 
311 #include <quantum/interface/quantum_icontext.h>
312 #include <quantum/impl/quantum_context_impl.h>
313 
314 #endif //BLOOMBERG_QUANTUM_CONTEXT_H
BufferRetType< V > pull(bool &isBufferClosed)
Pull a single value from the future buffer.
Definition: quantum_context_impl.h:868
Context< std::map< KEY, REDUCED_TYPE > >::Ptr mapReduceBatch(INPUT_IT first, INPUT_IT last, Functions::MapFunc< KEY, MAPPED_TYPE, INPUT_IT > mapper, Functions::ReduceFunc< KEY, MAPPED_TYPE, REDUCED_TYPE > reducer)
This version of mapReduce() runs both the mapper and the reducer functions in batches for improved pe...
Definition: quantum_buffer_impl.h:22
std::enable_if_t< Traits::IsBuffer< T >::value &&!std::is_same< std::decay_t< V >, T >::value &&std::is_convertible< std::decay_t< V >, typename Traits::IsBuffer< T >::Type >::value > BufferType
Definition: quantum_traits.h:89
const std::pair< int, int > & getCoroQueueIdRangeForAny() const
Gets the range [minQueueId, maxQueueId] of coroutine queueIds covered by IQueue::QueueId::Any by the ...
Definition: quantum_context_impl.h:1027
typename ICoroFuture< T >::Ptr CoroFuturePtr
Definition: quantum_icoro_future.h:72
bool valid() const final
Determines if the future object associated with this context has a valid shared state with the corres...
Definition: quantum_context_impl.h:474
std::shared_ptr< IContextBase > Ptr
Definition: quantum_icontext_base.h:35
std::enable_if_t<!Traits::IsBuffer< T >::value &&std::is_convertible< std::decay_t< V >, T >::value > NonBufferType
Definition: quantum_traits.h:91
CoroFuturePtr< OTHER_RET > postAsyncIo(FUNC &&func, ARGS &&... args)
Posts an IO method to run asynchronously on the IO thread pool.
Definition: quantum_context_impl.h:697
const NonBufferRetType< OTHER_RET > & getRefAt(int num) const
Get a reference to the future value from the 'num-th' continuation context.
Definition: quantum_context_impl.h:896
Exposes methods to manipulate the coroutine context.
Definition: quantum_icoro_context.h:38
Class implementing the dispatching logic unto worker threads. Used for both coroutines and IO tasks.
Definition: quantum_dispatcher_core.h:45
Utility to bind a user callable function unto a coroutine or an IO task.
Definition: quantum_util.h:45
~Context()
Definition: quantum_context_impl.h:450
Context< std::vector< OTHER_RET > >::Ptr forEach(INPUT_IT first, INPUT_IT last, Functions::ForEachFunc< OTHER_RET, INPUT_IT > func)
Applies the given unary function to all the elements in the range [first,last). This function runs in...
Provides an interface to facilitate 'implicit' coroutine yielding within other primitives such as mut...
Definition: quantum_icoro_sync.h:34
Definition: quantum_stl_impl.h:23
void sleep(const std::chrono::milliseconds &timeMs) final
Sleeps the coroutine associated with this context for at least 'timeMs' milliseconds or 'timeUs' micr...
Definition: quantum_context_impl.h:618
ITask::Ptr getTask() const final
Definition: quantum_context_impl.h:587
Context< OTHER_RET >::Ptr postFirst(FUNC &&func, ARGS &&... args)
Posts a coroutine to run asynchronously.
int set(V &&value)
Set the promised value associated with this context.
Definition: quantum_context_impl.h:847
Runnable object representing a coroutine.
Definition: quantum_task.h:40
std::shared_ptr< Context< RET > > Ptr
Definition: quantum_context.h:47
void waitAt(int num) const final
Waits for the future in the 'num-th' continuation context to be ready.
Definition: quantum_context_impl.h:916
Parallel execution engine used to run coroutines or IO tasks asynchronously. This class is the main e...
Definition: quantum_dispatcher.h:34
std::enable_if_t<!Traits::IsBuffer< T >::value, typename Traits::IsBuffer< T >::Type > NonBufferRetType
Definition: quantum_traits.h:95
Context< OTHER_RET >::Ptr finally(FUNC &&func, ARGS &&... args)
Posts a coroutine to run asynchronously. This coroutine is always guaranteed to run.
bool isBlocked() const final
Definition: quantum_context_impl.h:486
typename Context< RET >::Ptr ContextPtr
Definition: quantum_context.h:307
Concrete class representing a coroutine or a thread context.
Definition: quantum_icoro_context.h:29
void waitAll() const final
Wait for all the futures in the continuation chain to be ready.
Definition: quantum_context_impl.h:940
int getNumIoThreads() const
Returns the number of underlying IO threads as specified in the dispatcher constructor.
Definition: quantum_context_impl.h:1021
This interface exposes shared functionality between IThreadContext and ICoroContext.
Definition: quantum_icontext_base.h:33
void terminate() final
Terminates the object.
Definition: quantum_context_impl.h:456
void setYieldHandle(Traits::Yield &yield) final
Sets the underlying boost::coroutine object so that it can be yielded on.
Definition: quantum_context_impl.h:593
Context< std::map< KEY, REDUCED_TYPE > >::Ptr mapReduce(INPUT_IT first, INPUT_IT last, Functions::MapFunc< KEY, MAPPED_TYPE, INPUT_IT > mapper, Functions::ReduceFunc< KEY, MAPPED_TYPE, REDUCED_TYPE > reducer)
Implementation of map-reduce functionality.
void yield() final
Explicitly yields this coroutine context.
Definition: quantum_context_impl.h:606
Context< std::vector< std::vector< OTHER_RET > > >::Ptr forEachBatch(INPUT_IT first, INPUT_IT last, Functions::ForEachFunc< OTHER_RET, INPUT_IT > func)
Applies the given unary function to all the elements in the range [first,last). This function runs se...
Context< OTHER_RET >::Ptr onError(FUNC &&func, ARGS &&... args)
Posts a coroutine to run asynchronously. This is the error handler for a continuation chain and acts ...
Ptr end()
This is the last method in a continuation chain.
Definition: quantum_context_impl.h:685
int getNumCoroutineThreads() const
Returns the number of underlying coroutine threads as specified in the dispatcher constructor....
Definition: quantum_context_impl.h:1015
const NonBufferRetType< OTHER_RET > & getPrevRef()
Get a reference to future value associated with the previous coroutine context in the continuation ch...
Definition: quantum_context_impl.h:161
void wait() const final
Waits for the future associated with this context to be ready.
Definition: quantum_context_impl.h:928
int setException(std::exception_ptr ex) final
Set an exception in the promise associated with the current IThreadContext or ICoroContext.
Definition: quantum_context_impl.h:480
static void deleter(Context< RET > *p)
Definition: quantum_context_impl.h:1148
bool validAt(int num) const final
Determines if the future object associated with the 'num'-th continuation context is still valid.
Definition: quantum_context_impl.h:468
std::enable_if_t< Traits::IsBuffer< T >::value, typename Traits::IsBuffer< T >::Type > BufferRetType
Definition: quantum_traits.h:93
NonBufferRetType< OTHER_RET > getPrev()
Get the future value associated with the previous coroutine context in the continuation chain.
Definition: quantum_context_impl.h:153
int closeBuffer()
Close a promise buffer.
Definition: quantum_context_impl.h:882
Context< OTHER_RET >::Ptr post(FUNC &&func, ARGS &&... args)
Post a coroutine to run asynchronously.
Exposes methods to manipulate and access a promise.
Definition: quantum_ipromise_base.h:32
std::future_status waitFor(std::chrono::milliseconds timeMs) const final
Waits for the future associated with this context to be ready for a maximum of 'timeMs' milliseconds.
Definition: quantum_context_impl.h:934
Exposes methods to manipulate the thread context.
Definition: quantum_ithread_context.h:37
bool isSleeping(bool updateTimer=false) final
Definition: quantum_context_impl.h:492
Interface to a task. For internal use only.
Definition: quantum_itask.h:32
std::atomic_int & signal() final
Accessor to the underlying synchronization variable.
Definition: quantum_context_impl.h:612
NonBufferRetType< V > get()
Get the future value associated with this context.
Definition: quantum_context_impl.h:903
Contains typedefs for various functions.
Definition: quantum_functions.h:31
NonBufferRetType< OTHER_RET > getAt(int num)
Get the future value from the 'num-th' continuation context.
Definition: quantum_context_impl.h:889
Configuration parameters for the Quantum library.
Traits::Yield & getYieldHandle() final
Retrieve the underlying boost::coroutine object.
Definition: quantum_context_impl.h:599
std::future_status waitForAt(int num, std::chrono::milliseconds timeMs) const final
Waits for the future in the 'num-th' continuation context to be ready for a maximum of 'timeMs' milli...
Definition: quantum_context_impl.h:922
Contains definitions for various traits used by this library. For internal use only.
Definition: quantum_traits.h:55
void push(V &&value)
Push a single value into the promise buffer.
Definition: quantum_context_impl.h:854
Context< OTHER_RET >::Ptr then(FUNC &&func, ARGS &&... args)
Posts a coroutine to run asynchronously.
void setTask(ITask::Ptr task) final
Definition: quantum_context_impl.h:581
const NonBufferRetType< V > & getRef() const
Get a reference the future value associated with this context.
Definition: quantum_context_impl.h:910