QuantumLibrary
quantum_dispatcher.h
1 /*
2 ** Copyright 2018 Bloomberg Finance L.P.
3 **
4 ** Licensed under the Apache License, Version 2.0 (the "License");
5 ** you may not use this file except in compliance with the License.
6 ** You may obtain a copy of the License at
7 **
8 ** http://www.apache.org/licenses/LICENSE-2.0
9 **
10 ** Unless required by applicable law or agreed to in writing, software
11 ** distributed under the License is distributed on an "AS IS" BASIS,
12 ** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 ** See the License for the specific language governing permissions and
14 ** limitations under the License.
15 */
16 #ifndef BLOOMBERG_QUANTUM_DISPATCHER_H
17 #define BLOOMBERG_QUANTUM_DISPATCHER_H
18 
19 #include <quantum/quantum_context.h>
20 #include <quantum/quantum_configuration.h>
21 #include <quantum/quantum_macros.h>
22 #include <iterator>
23 #include <chrono>
24 
25 namespace Bloomberg {
26 namespace quantum {
27 
28 //==============================================================================================
29 // class Dispatcher
30 //==============================================================================================
34 class Dispatcher : public ITerminate
35 {
36 public:
38 
48  DEPRECATED Dispatcher(int numCoroutineThreads = -1,
49  int numIoThreads = 5,
50  bool pinCoroutineThreadsToCores = false);
51 
54  explicit Dispatcher(const Configuration& config);
55 
59  ~Dispatcher();
60 
74  template <class RET = int, class FUNC, class ... ARGS>
76  post(FUNC&& func, ARGS&&... args);
77 
94  template <class RET = int, class FUNC, class ... ARGS>
96  post(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args);
97 
109  template <class RET = int, class FUNC, class ... ARGS>
111  postFirst(FUNC&& func, ARGS&&... args);
112 
129  template <class RET = int, class FUNC, class ... ARGS>
131  postFirst(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args);
132 
143  template <class RET = int, class FUNC, class ... ARGS>
145  postAsyncIo(FUNC&& func, ARGS&&... args);
146 
161  template <class RET = int, class FUNC, class ... ARGS>
163  postAsyncIo(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args);
164 
176  template <class RET = int, class INPUT_IT, class = Traits::IsInputIterator<INPUT_IT>>
178  forEach(INPUT_IT first, INPUT_IT last, Functions::ForEachFunc<RET, INPUT_IT> func);
179 
182  template <class RET = int, class INPUT_IT>
184  forEach(INPUT_IT first, size_t num, Functions::ForEachFunc<RET, INPUT_IT> func);
185 
193  template <class RET = int, class INPUT_IT, class = Traits::IsInputIterator<INPUT_IT>>
195  forEachBatch(INPUT_IT first, INPUT_IT last, Functions::ForEachFunc<RET, INPUT_IT> func);
196 
199  template <class RET = int, class INPUT_IT>
201  forEachBatch(INPUT_IT first, size_t num, Functions::ForEachFunc<RET, INPUT_IT> func);
202 
218  template <class KEY,
219  class MAPPED_TYPE,
220  class REDUCED_TYPE,
221  class INPUT_IT,
224  mapReduce(INPUT_IT first,
225  INPUT_IT last,
228 
231  template <class KEY,
232  class MAPPED_TYPE,
233  class REDUCED_TYPE,
234  class INPUT_IT>
236  mapReduce(INPUT_IT first,
237  size_t num,
240 
245  template <class KEY,
246  class MAPPED_TYPE,
247  class REDUCED_TYPE,
248  class INPUT_IT,
251  mapReduceBatch(INPUT_IT first,
252  INPUT_IT last,
255 
258  template <class KEY,
259  class MAPPED_TYPE,
260  class REDUCED_TYPE,
261  class INPUT_IT>
263  mapReduceBatch(INPUT_IT first,
264  size_t num,
267 
271  void terminate() final;
272 
280  size_t size(IQueue::QueueType type = IQueue::QueueType::All,
281  int queueId = (int)IQueue::QueueId::All) const;
282 
290  bool empty(IQueue::QueueType type = IQueue::QueueType::All,
291  int queueId = (int)IQueue::QueueId::All) const;
292 
297  void drain(std::chrono::milliseconds timeout = std::chrono::milliseconds::zero());
298 
304  int getNumCoroutineThreads() const;
305 
310  int getNumIoThreads() const;
311 
315  const std::pair<int, int>& getCoroQueueIdRangeForAny() const;
316 
324  QueueStatistics stats(IQueue::QueueType type = IQueue::QueueType::All,
325  int queueId = (int)IQueue::QueueId::All);
326 
328  void resetStats();
329 
330 private:
331  template <class RET, class FUNC, class ... ARGS>
332  ThreadContextPtr<RET>
333  postImpl(int queueId, bool isHighPriority, ITask::Type type, FUNC&& func, ARGS&&... args);
334 
335  template <class RET, class FUNC, class ... ARGS>
336  ThreadFuturePtr<RET>
337  postAsyncIoImpl(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args);
338 
339  //Members
340  DispatcherCore _dispatcher;
341  bool _drain;
342  std::atomic_flag _terminated;
343 };
344 
345 using TaskDispatcher = Dispatcher; //alias
346 
347 }}
348 
349 #include <quantum/impl/quantum_dispatcher_impl.h>
350 
351 #endif //BLOOMBERG_QUANTUM_DISPATCHER_H
void drain(std::chrono::milliseconds timeout=std::chrono::milliseconds::zero())
Drains all queues on this dispatcher object.
Definition: quantum_dispatcher_impl.h:235
ThreadContextPtr< std::vector< std::vector< RET > > > forEachBatch(INPUT_IT first, INPUT_IT last, Functions::ForEachFunc< RET, INPUT_IT > func)
The batched version of forEach(). This function applies the given unary function to all the elements ...
Definition: quantum_dispatcher_impl.h:127
Definition: quantum_buffer_impl.h:22
QueueStatistics stats(IQueue::QueueType type=IQueue::QueueType::All, int queueId=(int) IQueue::QueueId::All)
Returns a statistics object for the specified type and queue id.
Definition: quantum_dispatcher_impl.h:285
ThreadContextPtr< RET > post(FUNC &&func, ARGS &&... args)
Post a coroutine to run asynchronously.
Definition: quantum_dispatcher_impl.h:52
Class implementing the dispatching logic unto worker threads. Used for both coroutines and IO tasks.
Definition: quantum_dispatcher_core.h:45
void terminate() final
Signal all threads to immediately terminate and exit. All other pending coroutines and IO tasks will ...
Definition: quantum_dispatcher_impl.h:212
Definition: quantum_stl_impl.h:23
ThreadContextPtr< std::map< KEY, REDUCED_TYPE > > mapReduce(INPUT_IT first, INPUT_IT last, Functions::MapFunc< KEY, MAPPED_TYPE, INPUT_IT > mapper, Functions::ReduceFunc< KEY, MAPPED_TYPE, REDUCED_TYPE > reducer)
Implementation of map-reduce functionality.
Definition: quantum_dispatcher_impl.h:153
ThreadFuturePtr< RET > postAsyncIo(FUNC &&func, ARGS &&... args)
Post a blocking IO (or long running) task to run asynchronously on the IO thread pool.
Definition: quantum_dispatcher_impl.h:88
std::function< RET(const typename std::iterator_traits< INPUT_IT >::value_type &)> ForEachFunc
Definition: quantum_functions.h:34
ThreadContextPtr< std::map< KEY, REDUCED_TYPE > > mapReduceBatch(INPUT_IT first, INPUT_IT last, Functions::MapFunc< KEY, MAPPED_TYPE, INPUT_IT > mapper, Functions::ReduceFunc< KEY, MAPPED_TYPE, REDUCED_TYPE > reducer)
This version of mapReduce() runs both the mapper and the reducer functions in batches for improved pe...
Definition: quantum_dispatcher_impl.h:185
ThreadContextPtr< RET > postFirst(FUNC &&func, ARGS &&... args)
Post the first coroutine in a continuation chain to run asynchronously.
Definition: quantum_dispatcher_impl.h:70
std::function< std::vector< std::pair< KEY, MAPPED_TYPE > >(const typename std::iterator_traits< INPUT_IT >::value_type &)> MapFunc
Definition: quantum_functions.h:37
Parallel execution engine used to run coroutines or IO tasks asynchronously. This class is the main e...
Definition: quantum_dispatcher.h:34
Interface to a task queue. For internal use only.
Definition: quantum_iqueue.h:33
Provides various counters related to queues and task execution.
Definition: quantum_queue_statistics.h:30
Definition: quantum_configuration.h:31
bool empty(IQueue::QueueType type=IQueue::QueueType::All, int queueId=(int) IQueue::QueueId::All) const
Check if the specified type and queue id is empty (i.e. there are no running tasks)
Definition: quantum_dispatcher_impl.h:228
int getNumCoroutineThreads() const
Returns the number of underlying coroutine threads as specified in the constructor....
Definition: quantum_dispatcher_impl.h:267
void resetStats()
Resets all coroutine and IO queue counters.
Definition: quantum_dispatcher_impl.h:292
Definition: quantum_icontext_base.h:26
std::function< std::pair< KEY, REDUCED_TYPE >(std::pair< KEY, std::vector< MAPPED_TYPE > > &&)> ReduceFunc
Definition: quantum_functions.h:40
ThreadContextPtr< std::vector< RET > > forEach(INPUT_IT first, INPUT_IT last, Functions::ForEachFunc< RET, INPUT_IT > func)
Applies the given unary function to all the elements in the range [first,last). This function runs in...
Definition: quantum_dispatcher_impl.h:106
typename IThreadFuture< T >::Ptr ThreadFuturePtr
Definition: quantum_ithread_future.h:69
int getNumIoThreads() const
Returns the number of underlying IO threads as specified in the constructor.
Definition: quantum_dispatcher_impl.h:273
const std::pair< int, int > & getCoroQueueIdRangeForAny() const
Gets the range [minQueueId, maxQueueId] of coroutine queueIds covered by IQueue::QueueId::Any when us...
Definition: quantum_dispatcher_impl.h:279
std::enable_if_t< std::is_convertible< typename std::iterator_traits< IT >::iterator_category, std::input_iterator_tag >::value > IsInputIterator
Definition: quantum_traits.h:62
Interface to a task. For internal use only.
Definition: quantum_itask.h:32
DEPRECATED Dispatcher(int numCoroutineThreads=-1, int numIoThreads=5, bool pinCoroutineThreadsToCores=false)
Constructor.
Definition: quantum_dispatcher_impl.h:28
typename IThreadContext< RET >::Ptr ThreadContextPtr
Definition: quantum_ithread_context.h:242
Configuration parameters for the Quantum library.
size_t size(IQueue::QueueType type=IQueue::QueueType::All, int queueId=(int) IQueue::QueueId::All) const
Returns the total number of queued tasks for the specified type and queue id.
Definition: quantum_dispatcher_impl.h:221