QuantumLibrary
quantum_icoro_context.h
1 /*
2 ** Copyright 2018 Bloomberg Finance L.P.
3 **
4 ** Licensed under the Apache License, Version 2.0 (the "License");
5 ** you may not use this file except in compliance with the License.
6 ** You may obtain a copy of the License at
7 **
8 ** http://www.apache.org/licenses/LICENSE-2.0
9 **
10 ** Unless required by applicable law or agreed to in writing, software
11 ** distributed under the License is distributed on an "AS IS" BASIS,
12 ** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 ** See the License for the specific language governing permissions and
14 ** limitations under the License.
15 */
16 #ifndef BLOOMBERG_QUANTUM_ICORO_CONTEXT_H
17 #define BLOOMBERG_QUANTUM_ICORO_CONTEXT_H
18 
19 #include <quantum/quantum_functions.h>
20 #include <quantum/interface/quantum_icoro_context_base.h>
21 #include <quantum/interface/quantum_icoro_future.h>
22 #include <map>
23 #include <vector>
24 
25 namespace Bloomberg {
26 namespace quantum {
27 
28 template <class RET>
29 class Context;
30 
31 //==============================================================================================
32 // interface ICoroContext
33 //==============================================================================================
37 template <class RET>
39 {
41  using Ptr = std::shared_ptr<ICoroContext<RET>>;
42  using Impl = Context<RET>;
43 
49  template <class V = RET>
51 
57  template <class V = RET>
58  const NonBufferRetType<V>& getRef(ICoroSync::Ptr sync) const;
59 
66  template <class OTHER_RET>
68 
74  template <class OTHER_RET>
76 
87  template <class OTHER_RET>
89 
100  template <class OTHER_RET>
101  const NonBufferRetType<OTHER_RET>& getRefAt(int num, ICoroSync::Ptr sync) const;
102 
108  template <class V, class = NonBufferType<RET,V>>
109  int set(V&& value);
110 
117  template <class V, class = BufferType<RET,V>>
118  void push(V&& value);
119 
127  template <class V = RET>
128  BufferRetType<V> pull(ICoroSync::Ptr sync, bool& isBufferClosed);
129 
135  template <class V = RET, class = BufferRetType<V>>
136  int closeBuffer();
137 
143  int getNumCoroutineThreads() const;
144 
149  int getNumIoThreads() const;
150 
154  const std::pair<int, int>& getCoroQueueIdRangeForAny() const;
155 
156  //-----------------------------------------------------------------------------------------
157  // TASK CONTINUATIONS
158  //-----------------------------------------------------------------------------------------
169  //-----------------------------------------------------------------------------------------
170 
184  template <class OTHER_RET = int, class FUNC, class ... ARGS>
186  post(FUNC&& func, ARGS&&... args);
187 
207  template <class OTHER_RET = int, class FUNC, class ... ARGS>
209  post(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args);
210 
223  template <class OTHER_RET = int, class FUNC, class ... ARGS>
225  postFirst(FUNC&& func, ARGS&&... args);
226 
246  template <class OTHER_RET = int, class FUNC, class ... ARGS>
248  postFirst(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args);
249 
264  template <class OTHER_RET = int, class FUNC, class ... ARGS>
266  then(FUNC&& func, ARGS&&... args);
267 
285  template <class OTHER_RET = int, class FUNC, class ... ARGS>
287  onError(FUNC&& func, ARGS&&... args);
288 
302  template <class OTHER_RET = int, class FUNC, class ... ARGS>
304  finally(FUNC&& func, ARGS&&... args);
305 
311  Ptr end();
312 
323  template <class OTHER_RET = int, class FUNC, class ... ARGS>
325  postAsyncIo(FUNC&& func, ARGS&&... args);
326 
343  template <class OTHER_RET = int, class FUNC, class ... ARGS>
345  postAsyncIo(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args);
346 
358  template <class OTHER_RET = int, class INPUT_IT, class = Traits::IsInputIterator<INPUT_IT>>
360  forEach(INPUT_IT first, INPUT_IT last, Functions::ForEachFunc<OTHER_RET, INPUT_IT> func);
361 
373  template <class OTHER_RET = int, class INPUT_IT>
375  forEach(INPUT_IT first, size_t num, Functions::ForEachFunc<OTHER_RET, INPUT_IT> func);
376 
389  template <class OTHER_RET = int, class INPUT_IT, class = Traits::IsInputIterator<INPUT_IT>>
391  forEachBatch(INPUT_IT first, INPUT_IT last, Functions::ForEachFunc<OTHER_RET, INPUT_IT> func);
392 
405  template <class OTHER_RET = int, class INPUT_IT>
407  forEachBatch(INPUT_IT first, size_t num, Functions::ForEachFunc<OTHER_RET, INPUT_IT> func);
408 
424  template <class KEY,
425  class MAPPED_TYPE,
426  class REDUCED_TYPE,
427  class INPUT_IT,
430  mapReduce(INPUT_IT first,
431  INPUT_IT last,
434 
437  template <class KEY,
438  class MAPPED_TYPE,
439  class REDUCED_TYPE,
440  class INPUT_IT>
442  mapReduce(INPUT_IT first,
443  size_t num,
446 
451  template <class KEY,
452  class MAPPED_TYPE,
453  class REDUCED_TYPE,
454  class INPUT_IT,
457  mapReduceBatch(INPUT_IT first,
458  INPUT_IT last,
461 
464  template <class KEY,
465  class MAPPED_TYPE,
466  class REDUCED_TYPE,
467  class INPUT_IT>
469  mapReduceBatch(INPUT_IT first,
470  size_t num,
473 };
474 
475 template <class RET>
477 
478 template <class RET>
480 
481 }}
482 
483 #endif //BLOOMBERG_QUANTUM_ICORO_CONTEXT_H
int getNumIoThreads() const
Returns the number of underlying IO threads as specified in the dispatcher constructor.
Definition: quantum_context_impl.h:218
Definition: quantum_buffer_impl.h:22
typename ICoroFuture< T >::Ptr CoroFuturePtr
Definition: quantum_icoro_future.h:72
std::shared_ptr< IContextBase > Ptr
Definition: quantum_icontext_base.h:35
Ptr end()
This is the last method in a continuation chain.
Definition: quantum_context_impl.h:287
const NonBufferRetType< V > & getRef(ICoroSync::Ptr sync) const
Get a reference the future value associated with this context.
Definition: quantum_context_impl.h:146
ICoroContext< OTHER_RET >::Ptr postFirst(FUNC &&func, ARGS &&... args)
Posts a coroutine to run asynchronously.
Exposes methods to manipulate the coroutine context.
Definition: quantum_icoro_context.h:38
typename ICoroContext< RET >::Ptr CoroContextPtr
Definition: quantum_icoro_context.h:479
ICoroContext< std::map< KEY, REDUCED_TYPE > >::Ptr mapReduceBatch(INPUT_IT first, INPUT_IT last, Functions::MapFunc< KEY, MAPPED_TYPE, INPUT_IT > mapper, Functions::ReduceFunc< KEY, MAPPED_TYPE, REDUCED_TYPE > reducer)
This version of mapReduce() runs both the mapper and the reducer functions in batches for improved pe...
int getNumCoroutineThreads() const
Returns the number of underlying coroutine threads as specified in the dispatcher constructor....
Definition: quantum_context_impl.h:212
std::shared_ptr< ICoroSync > Ptr
Definition: quantum_icoro_sync.h:36
std::function< RET(const typename std::iterator_traits< INPUT_IT >::value_type &)> ForEachFunc
Definition: quantum_functions.h:34
const std::pair< int, int > & getCoroQueueIdRangeForAny() const
Gets the range [minQueueId, maxQueueId] of coroutine queueIds covered by IQueue::QueueId::Any by the ...
Definition: quantum_context_impl.h:224
CoroFuturePtr< OTHER_RET > postAsyncIo(FUNC &&func, ARGS &&... args)
Posts an IO method to run asynchronously on the IO thread pool.
Definition: quantum_context_impl.h:295
std::function< std::vector< std::pair< KEY, MAPPED_TYPE > >(const typename std::iterator_traits< INPUT_IT >::value_type &)> MapFunc
Definition: quantum_functions.h:37
std::enable_if_t<!Traits::IsBuffer< T >::value, typename Traits::IsBuffer< T >::Type > NonBufferRetType
Definition: quantum_traits.h:95
NonBufferRetType< OTHER_RET > getAt(int num, ICoroSync::Ptr sync)
Get the future value from the 'num-th' continuation context.
Definition: quantum_context_impl.h:169
Concrete class representing a coroutine or a thread context.
Definition: quantum_icoro_context.h:29
ICoroContext< OTHER_RET >::Ptr onError(FUNC &&func, ARGS &&... args)
Posts a coroutine to run asynchronously. This is the error handler for a continuation chain and acts ...
int closeBuffer()
Close a promise buffer.
Definition: quantum_context_impl.h:206
ICoroContext< std::map< KEY, REDUCED_TYPE > >::Ptr mapReduce(INPUT_IT first, INPUT_IT last, Functions::MapFunc< KEY, MAPPED_TYPE, INPUT_IT > mapper, Functions::ReduceFunc< KEY, MAPPED_TYPE, REDUCED_TYPE > reducer)
Implementation of map-reduce functionality.
ICoroContext< std::vector< OTHER_RET > >::Ptr forEach(INPUT_IT first, INPUT_IT last, Functions::ForEachFunc< OTHER_RET, INPUT_IT > func)
Applies the given unary function to all the elements in the range [first,last). This function runs in...
NonBufferRetType< V > get(ICoroSync::Ptr sync)
Get the future value associated with this context.
Definition: quantum_context_impl.h:139
const NonBufferRetType< OTHER_RET > & getRefAt(int num, ICoroSync::Ptr sync) const
Get a reference to the future value from the 'num-th' continuation context.
Definition: quantum_context_impl.h:176
const NonBufferRetType< OTHER_RET > & getPrevRef()
Get a reference to future value associated with the previous coroutine context in the continuation ch...
Definition: quantum_context_impl.h:161
std::function< std::pair< KEY, REDUCED_TYPE >(std::pair< KEY, std::vector< MAPPED_TYPE > > &&)> ReduceFunc
Definition: quantum_functions.h:40
int set(V &&value)
Set the promised value associated with this context.
Definition: quantum_context_impl.h:183
std::shared_ptr< ICoroContext< RET > > Ptr
Definition: quantum_icoro_context.h:41
void push(V &&value)
Push a single value into the promise buffer.
Definition: quantum_context_impl.h:191
std::enable_if_t< Traits::IsBuffer< T >::value, typename Traits::IsBuffer< T >::Type > BufferRetType
Definition: quantum_traits.h:93
NonBufferRetType< OTHER_RET > getPrev()
Get the future value associated with the previous coroutine context in the continuation chain.
Definition: quantum_context_impl.h:153
std::enable_if_t< std::is_convertible< typename std::iterator_traits< IT >::iterator_category, std::input_iterator_tag >::value > IsInputIterator
Definition: quantum_traits.h:62
BufferRetType< V > pull(ICoroSync::Ptr sync, bool &isBufferClosed)
Pull a single value from the future buffer.
Definition: quantum_context_impl.h:199
ICoroContext< OTHER_RET >::Ptr post(FUNC &&func, ARGS &&... args)
Post a coroutine to run asynchronously.
ICoroContext< std::vector< std::vector< OTHER_RET > > >::Ptr forEachBatch(INPUT_IT first, INPUT_IT last, Functions::ForEachFunc< OTHER_RET, INPUT_IT > func)
Applies the given unary function to all the elements in the range [first,last). This function runs se...
Definition: quantum_icontext_base.h:25
ICoroContext< OTHER_RET >::Ptr then(FUNC &&func, ARGS &&... args)
Posts a coroutine to run asynchronously.
Exposes methods to manipulate the coroutine context, especially future wait methods.
Definition: quantum_icoro_context_base.h:32