QuantumLibrary
quantum_dispatcher_impl.h
1 /*
2 ** Copyright 2018 Bloomberg Finance L.P.
3 **
4 ** Licensed under the Apache License, Version 2.0 (the "License");
5 ** you may not use this file except in compliance with the License.
6 ** You may obtain a copy of the License at
7 **
8 ** http://www.apache.org/licenses/LICENSE-2.0
9 **
10 ** Unless required by applicable law or agreed to in writing, software
11 ** distributed under the License is distributed on an "AS IS" BASIS,
12 ** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 ** See the License for the specific language governing permissions and
14 ** limitations under the License.
15 */
16 //NOTE: DO NOT INCLUDE DIRECTLY
17 
18 //##############################################################################################
19 //#################################### IMPLEMENTATIONS #########################################
20 //##############################################################################################
21 
22 #include <quantum/util/quantum_util.h>
23 
24 namespace Bloomberg {
25 namespace quantum {
26 
27 inline
28 Dispatcher::Dispatcher(int numCoroutineThreads,
29  int numIoThreads,
30  bool pinCoroutineThreadsToCores) :
31  _dispatcher(numCoroutineThreads, numIoThreads, pinCoroutineThreadsToCores),
32  _drain(false),
33  _terminated ATOMIC_FLAG_INIT
34 {}
35 
36 inline
38  _dispatcher(config),
39  _drain(false),
40  _terminated ATOMIC_FLAG_INIT
41 {}
42 
43 inline
45 {
46  drain();
47  terminate();
48 }
49 
50 template <class RET, class FUNC, class ... ARGS>
51 ThreadContextPtr<RET>
52 Dispatcher::post(FUNC&& func,
53  ARGS&&... args)
54 {
55  return postImpl<RET>((int)IQueue::QueueId::Any, false, ITask::Type::Standalone, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
56 }
57 
58 template <class RET, class FUNC, class ... ARGS>
60 Dispatcher::post(int queueId,
61  bool isHighPriority,
62  FUNC&& func,
63  ARGS&&... args)
64 {
65  return postImpl<RET>(queueId, isHighPriority, ITask::Type::Standalone, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
66 }
67 
68 template <class RET, class FUNC, class ... ARGS>
71  ARGS&&... args)
72 {
73  return postImpl<RET>((int)IQueue::QueueId::Any, false, ITask::Type::First, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
74 }
75 
76 template <class RET, class FUNC, class ... ARGS>
79  bool isHighPriority,
80  FUNC&& func,
81  ARGS&&... args)
82 {
83  return postImpl<RET>(queueId, isHighPriority, ITask::Type::First, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
84 }
85 
86 template <class RET, class FUNC, class ... ARGS>
89  ARGS&&... args)
90 {
91  return postAsyncIoImpl<RET>((int)IQueue::QueueId::Any, false, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
92 }
93 
94 template <class RET, class FUNC, class ... ARGS>
97  bool isHighPriority,
98  FUNC&& func,
99  ARGS&&... args)
100 {
101  return postAsyncIoImpl<RET>(queueId, isHighPriority, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
102 }
103 
104 template <class RET, class INPUT_IT, class>
106 Dispatcher::forEach(INPUT_IT first,
107  INPUT_IT last,
109 {
110  return forEach<RET>(first, std::distance(first, last), std::move(func));
111 }
112 
113 template <class RET, class INPUT_IT>
115 Dispatcher::forEach(INPUT_IT first,
116  size_t num,
118 {
119  return post<std::vector<RET>>(Util::forEachCoro<RET, INPUT_IT>,
120  INPUT_IT{first},
121  size_t{num},
122  Functions::ForEachFunc<RET, INPUT_IT>{std::move(func)});
123 }
124 
125 template <class RET, class INPUT_IT, class>
128  INPUT_IT last,
130 {
131  return forEachBatch<RET>(first, std::distance(first, last), std::move(func));
132 }
133 
134 template <class RET, class INPUT_IT>
137  size_t num,
139 {
140  return post<std::vector<std::vector<RET>>>(Util::forEachBatchCoro<RET, INPUT_IT>,
141  INPUT_IT{first},
142  size_t{num},
143  Functions::ForEachFunc<RET, INPUT_IT>{std::move(func)},
145 }
146 
147 template <class KEY,
148  class MAPPED_TYPE,
149  class REDUCED_TYPE,
150  class INPUT_IT,
151  class>
153 Dispatcher::mapReduce(INPUT_IT first,
154  INPUT_IT last,
157 {
158  return mapReduce(first, std::distance(first, last), std::move(mapper), std::move(reducer));
159 }
160 
161 template <class KEY,
162  class MAPPED_TYPE,
163  class REDUCED_TYPE,
164  class INPUT_IT>
166 Dispatcher::mapReduce(INPUT_IT first,
167  size_t num,
170 {
171  using ReducerOutput = std::map<KEY, REDUCED_TYPE>;
172  return post<ReducerOutput>(Util::mapReduceCoro<KEY, MAPPED_TYPE, REDUCED_TYPE, INPUT_IT>,
173  INPUT_IT{first},
174  size_t{num},
177 }
178 
179 template <class KEY,
180  class MAPPED_TYPE,
181  class REDUCED_TYPE,
182  class INPUT_IT,
183  class>
186  INPUT_IT last,
189 {
190  return mapReduceBatch(first, std::distance(first, last), std::move(mapper), std::move(reducer));
191 }
192 
193 template <class KEY,
194  class MAPPED_TYPE,
195  class REDUCED_TYPE,
196  class INPUT_IT>
199  size_t num,
202 {
203  using ReducerOutput = std::map<KEY, REDUCED_TYPE>;
204  return post<ReducerOutput>(Util::mapReduceBatchCoro<KEY, MAPPED_TYPE, REDUCED_TYPE, INPUT_IT>,
205  INPUT_IT{first},
206  size_t{num},
209 }
210 
211 inline
213 {
214  if (!_terminated.test_and_set())
215  {
216  _dispatcher.terminate();
217  }
218 }
219 
220 inline
222  int queueId) const
223 {
224  return _dispatcher.size(type, queueId);
225 }
226 
227 inline
229  int queueId) const
230 {
231  return _dispatcher.empty(type, queueId);
232 }
233 
234 inline
235 void Dispatcher::drain(std::chrono::milliseconds timeout)
236 {
237  _drain = true;
238 
239  auto start = std::chrono::high_resolution_clock::now();
240 
241  //wait until all queues have completed their work
242  YieldingThread yield;
243  while (!empty())
244  {
245  yield();
246 
247  //check remaining time
248  if (timeout != std::chrono::milliseconds::zero())
249  {
250  auto present = std::chrono::high_resolution_clock::now();
251  if (std::chrono::duration_cast<std::chrono::milliseconds>(present-start) > timeout)
252  {
253  //timeout reached
254  break;
255  }
256  }
257  }
258 
259 #ifdef __QUANTUM_PRINT_DEBUG
260  std::lock_guard<std::mutex> guard(Util::LogMutex());
261  std::cout << "All queues have drained." << std::endl;
262 #endif
263  _drain = false;
264 }
265 
266 inline
268 {
269  return _dispatcher.getNumCoroutineThreads();
270 }
271 
272 inline
274 {
275  return _dispatcher.getNumIoThreads();
276 }
277 
278 inline
279 const std::pair<int, int>& Dispatcher::getCoroQueueIdRangeForAny() const
280 {
281  return _dispatcher.getCoroQueueIdRangeForAny();
282 }
283 
284 inline
286  int queueId)
287 {
288  return _dispatcher.stats(type, queueId);
289 }
290 
291 inline
293 {
294  _dispatcher.resetStats();
295 }
296 
297 template <class RET, class FUNC, class ... ARGS>
299 Dispatcher::postImpl(int queueId,
300  bool isHighPriority,
301  ITask::Type type,
302  FUNC&& func,
303  ARGS&&... args)
304 {
305  if (_drain)
306  {
307  throw std::runtime_error("Posting is disabled");
308  }
309  if (queueId < (int)IQueue::QueueId::Any)
310  {
311  throw std::runtime_error("Invalid coroutine queue id");
312  }
313  auto ctx = ContextPtr<RET>(new Context<RET>(_dispatcher),
315  auto task = Task::Ptr(new Task(ctx,
316  queueId,
317  isHighPriority,
318  type,
319  std::forward<FUNC>(func),
320  std::forward<ARGS>(args)...),
321  Task::deleter);
322  ctx->setTask(task);
323  if (type == ITask::Type::Standalone)
324  {
325  _dispatcher.post(task);
326  }
327  return std::static_pointer_cast<IThreadContext<RET>>(ctx);
328 }
329 
330 template <class RET, class FUNC, class ... ARGS>
331 ThreadFuturePtr<RET>
332 Dispatcher::postAsyncIoImpl(int queueId,
333  bool isHighPriority,
334  FUNC&& func,
335  ARGS&&... args)
336 {
337  if (_drain)
338  {
339  throw std::runtime_error("Posting is disabled");
340  }
341  if (queueId < (int)IQueue::QueueId::Any)
342  {
343  throw std::runtime_error("Invalid IO queue id");
344  }
345  auto promise = PromisePtr<RET>(new Promise<RET>(), Promise<RET>::deleter);
346  auto task = IoTask::Ptr(new IoTask(promise,
347  queueId,
348  isHighPriority,
349  std::forward<FUNC>(func),
350  std::forward<ARGS>(args)...),
352  _dispatcher.postAsyncIo(task);
353  return promise->getIThreadFuture();
354 }
355 
356 }}
void drain(std::chrono::milliseconds timeout=std::chrono::milliseconds::zero())
Drains all queues on this dispatcher object.
Definition: quantum_dispatcher_impl.h:235
QueueType
Definition: quantum_iqueue.h:37
ThreadContextPtr< std::vector< std::vector< RET > > > forEachBatch(INPUT_IT first, INPUT_IT last, Functions::ForEachFunc< RET, INPUT_IT > func)
The batched version of forEach(). This function applies the given unary function to all the elements ...
Definition: quantum_dispatcher_impl.h:127
Definition: quantum_buffer_impl.h:22
QueueStatistics stats(IQueue::QueueType type=IQueue::QueueType::All, int queueId=(int) IQueue::QueueId::All)
Returns a statistics object for the specified type and queue id.
Definition: quantum_dispatcher_impl.h:285
ThreadContextPtr< RET > post(FUNC &&func, ARGS &&... args)
Post a coroutine to run asynchronously.
Definition: quantum_dispatcher_impl.h:52
QueueStatistics stats(IQueue::QueueType type, int queueId)
Definition: quantum_dispatcher_core_impl.h:240
void terminate() final
Signal all threads to immediately terminate and exit. All other pending coroutines and IO tasks will ...
Definition: quantum_dispatcher_impl.h:212
ThreadContextPtr< std::map< KEY, REDUCED_TYPE > > mapReduce(INPUT_IT first, INPUT_IT last, Functions::MapFunc< KEY, MAPPED_TYPE, INPUT_IT > mapper, Functions::ReduceFunc< KEY, MAPPED_TYPE, REDUCED_TYPE > reducer)
Implementation of map-reduce functionality.
Definition: quantum_dispatcher_impl.h:153
void terminate() final
Terminates the object.
Definition: quantum_dispatcher_core_impl.h:82
ThreadFuturePtr< RET > postAsyncIo(FUNC &&func, ARGS &&... args)
Post a blocking IO (or long running) task to run asynchronously on the IO thread pool.
Definition: quantum_dispatcher_impl.h:88
Type
Definition: quantum_itask.h:37
static void deleter(Promise< T > *p)
Definition: quantum_promise_impl.h:207
int getNumIoThreads() const
Definition: quantum_dispatcher_core_impl.h:423
std::function< RET(const typename std::iterator_traits< INPUT_IT >::value_type &)> ForEachFunc
Definition: quantum_functions.h:34
ThreadContextPtr< std::map< KEY, REDUCED_TYPE > > mapReduceBatch(INPUT_IT first, INPUT_IT last, Functions::MapFunc< KEY, MAPPED_TYPE, INPUT_IT > mapper, Functions::ReduceFunc< KEY, MAPPED_TYPE, REDUCED_TYPE > reducer)
This version of mapReduce() runs both the mapper and the reducer functions in batches for improved pe...
Definition: quantum_dispatcher_impl.h:185
static void deleter(IoTask *p)
Definition: quantum_io_task_impl.h:137
ThreadContextPtr< RET > postFirst(FUNC &&func, ARGS &&... args)
Post the first coroutine in a continuation chain to run asynchronously.
Definition: quantum_dispatcher_impl.h:70
std::function< std::vector< std::pair< KEY, MAPPED_TYPE > >(const typename std::iterator_traits< INPUT_IT >::value_type &)> MapFunc
Definition: quantum_functions.h:37
Provides various counters related to queues and task execution.
Definition: quantum_queue_statistics.h:30
size_t size(IQueue::QueueType type, int queueId) const
Definition: quantum_dispatcher_core_impl.h:102
bool empty(IQueue::QueueType type, int queueId) const
Definition: quantum_dispatcher_core_impl.h:121
Definition: quantum_configuration.h:31
bool empty(IQueue::QueueType type=IQueue::QueueType::All, int queueId=(int) IQueue::QueueId::All) const
Check if the specified type and queue id is empty (i.e. there are no running tasks)
Definition: quantum_dispatcher_impl.h:228
std::shared_ptr< IoTask > Ptr
Definition: quantum_io_task.h:37
int getNumCoroutineThreads() const
Returns the number of underlying coroutine threads as specified in the constructor....
Definition: quantum_dispatcher_impl.h:267
void resetStats()
Resets all coroutine and IO queue counters.
Definition: quantum_dispatcher_impl.h:292
std::shared_ptr< Task > Ptr
Definition: quantum_task.h:44
std::function< std::pair< KEY, REDUCED_TYPE >(std::pair< KEY, std::vector< MAPPED_TYPE > > &&)> ReduceFunc
Definition: quantum_functions.h:40
const std::pair< int, int > & getCoroQueueIdRangeForAny() const
Definition: quantum_dispatcher_core_impl.h:429
ThreadContextPtr< std::vector< RET > > forEach(INPUT_IT first, INPUT_IT last, Functions::ForEachFunc< RET, INPUT_IT > func)
Applies the given unary function to all the elements in the range [first,last). This function runs in...
Definition: quantum_dispatcher_impl.h:106
static void deleter(Context< RET > *p)
Definition: quantum_context_impl.h:1148
void postAsyncIo(IoTask::Ptr task)
Definition: quantum_dispatcher_core_impl.h:373
typename IThreadFuture< T >::Ptr ThreadFuturePtr
Definition: quantum_ithread_future.h:69
int getNumIoThreads() const
Returns the number of underlying IO threads as specified in the constructor.
Definition: quantum_dispatcher_impl.h:273
const std::pair< int, int > & getCoroQueueIdRangeForAny() const
Gets the range [minQueueId, maxQueueId] of coroutine queueIds covered by IQueue::QueueId::Any when us...
Definition: quantum_dispatcher_impl.h:279
void post(Task::Ptr task)
Definition: quantum_dispatcher_core_impl.h:332
static void deleter(Task *p)
Definition: quantum_task_impl.h:182
DEPRECATED Dispatcher(int numCoroutineThreads=-1, int numIoThreads=5, bool pinCoroutineThreadsToCores=false)
Constructor.
Definition: quantum_dispatcher_impl.h:28
typename IThreadContext< RET >::Ptr ThreadContextPtr
Definition: quantum_ithread_context.h:242
This class provides the same functionality as a coroutine yield when called from a thread context.
Definition: quantum_yielding_thread.h:33
void resetStats()
Definition: quantum_dispatcher_core_impl.h:315
int getNumCoroutineThreads() const
Definition: quantum_dispatcher_core_impl.h:417
size_t size(IQueue::QueueType type=IQueue::QueueType::All, int queueId=(int) IQueue::QueueId::All) const
Returns the total number of queued tasks for the specified type and queue id.
Definition: quantum_dispatcher_impl.h:221