QuantumLibrary
quantum_sequencer_impl.h
1 /*
2 ** Copyright 2018 Bloomberg Finance L.P.
3 **
4 ** Licensed under the Apache License, Version 2.0 (the "License");
5 ** you may not use this file except in compliance with the License.
6 ** You may obtain a copy of the License at
7 **
8 ** http://www.apache.org/licenses/LICENSE-2.0
9 **
10 ** Unless required by applicable law or agreed to in writing, software
11 ** distributed under the License is distributed on an "AS IS" BASIS,
12 ** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 ** See the License for the specific language governing permissions and
14 ** limitations under the License.
15 */
16 //NOTE: DO NOT INCLUDE DIRECTLY
17 
18 //##############################################################################################
19 //#################################### IMPLEMENTATIONS #########################################
20 //##############################################################################################
21 
22 #include <stdexcept>
23 
24 namespace Bloomberg {
25 namespace quantum {
26 
27 template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
30  _dispatcher(dispatcher),
31  _controllerQueueId(configuration.getControlQueueId()),
32  _universalContext(),
33  _contexts(configuration.getBucketCount(),
34  configuration.getHash(),
35  configuration.getKeyEqual(),
36  configuration.getAllocator()),
37  _exceptionCallback(configuration.getExceptionCallback()),
38  _taskStats(std::make_shared<SequenceKeyStatisticsWriter>())
39 {
40  if (_controllerQueueId <= (int)IQueue::QueueId::Any || _controllerQueueId >= _dispatcher.getNumCoroutineThreads())
41  {
42  throw std::out_of_range("Allowed range is 0 <= controllerQueueId < _dispatcher.getNumCoroutineThreads()");
43  }
44 }
45 
46 template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
47 template <class FUNC, class ... ARGS>
48 void
50  const SequenceKey& sequenceKey,
51  FUNC&& func,
52  ARGS&&... args)
53 {
54  _dispatcher.post<int>(_controllerQueueId,
55  false,
56  singleSequenceKeyTaskScheduler<FUNC, ARGS...>,
57  nullptr,
59  false,
60  *this,
61  SequenceKey(sequenceKey),
62  std::forward<FUNC>(func),
63  std::forward<ARGS>(args)...);
64 }
65 
66 template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
67 template <class FUNC, class ... ARGS>
68 void
70  void* opaque,
71  int queueId,
72  bool isHighPriority,
73  const SequenceKey& sequenceKey,
74  FUNC&& func,
75  ARGS&&... args)
76 {
77  if (queueId < (int)IQueue::QueueId::Any)
78  {
79  throw std::runtime_error("Invalid IO queue id");
80  }
81 
82  _dispatcher.post<int>(_controllerQueueId,
83  false,
84  singleSequenceKeyTaskScheduler<FUNC, ARGS...>,
85  std::move(opaque),
86  std::move(queueId),
87  std::move(isHighPriority),
88  *this,
89  SequenceKey(sequenceKey),
90  std::forward<FUNC>(func),
91  std::forward<ARGS>(args)...);
92 }
93 
94 template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
95 template <class FUNC, class ... ARGS>
96 void
98  const std::vector<SequenceKey>& sequenceKeys,
99  FUNC&& func,
100  ARGS&&... args)
101 {
102  _dispatcher.post<int>(_controllerQueueId,
103  false,
104  multiSequenceKeyTaskScheduler<FUNC, ARGS...>,
105  nullptr,
107  false,
108  *this,
109  std::vector<SequenceKey>(sequenceKeys),
110  std::forward<FUNC>(func),
111  std::forward<ARGS>(args)...);
112 }
113 
114 template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
115 template <class FUNC, class ... ARGS>
116 void
118  void* opaque,
119  int queueId,
120  bool isHighPriority,
121  const std::vector<SequenceKey>& sequenceKeys,
122  FUNC&& func,
123  ARGS&&... args)
124 {
125  if (queueId < (int)IQueue::QueueId::Any)
126  {
127  throw std::runtime_error("Invalid IO queue id");
128  }
129  _dispatcher.post<int>(_controllerQueueId,
130  false,
131  multiSequenceKeyTaskScheduler<FUNC, ARGS...>,
132  std::move(opaque),
133  std::move(queueId),
134  std::move(isHighPriority),
135  *this,
136  std::vector<SequenceKey>(sequenceKeys),
137  std::forward<FUNC>(func),
138  std::forward<ARGS>(args)...);
139 }
140 
141 template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
142 template <class FUNC, class ... ARGS>
143 void
145 {
146  _dispatcher.post<int>(_controllerQueueId,
147  false,
148  universalTaskScheduler<FUNC, ARGS...>,
149  nullptr,
151  false,
152  *this,
153  std::forward<FUNC>(func),
154  std::forward<ARGS>(args)...);
155 }
156 
157 template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
158 template <class FUNC, class ... ARGS>
159 void
161  void* opaque,
162  int queueId,
163  bool isHighPriority,
164  FUNC&& func,
165  ARGS&&... args)
166 {
167  if (queueId < (int)IQueue::QueueId::Any)
168  {
169  throw std::runtime_error("Invalid IO queue id");
170  }
171  _dispatcher.post<int>(_controllerQueueId,
172  false,
173  universalTaskScheduler<FUNC, ARGS...>,
174  std::move(opaque),
175  std::move(queueId),
176  std::move(isHighPriority),
177  *this,
178  std::forward<FUNC>(func),
179  std::forward<ARGS>(args)...);
180 }
181 
182 template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
183 size_t
185 {
186  auto trimFunc = [this](CoroContextPtr<size_t> ctx)->int
187  {
188  for (auto it = _contexts.begin(); it != _contexts.end();)
189  {
190  auto trimIt = it++;
191  if (canTrimContext(ctx, trimIt->second._context))
192  {
193  _contexts.erase(trimIt);
194  }
195  }
196  return ctx->set(_contexts.size());
197  };
198  return _dispatcher.post<size_t>(_controllerQueueId, true, std::move(trimFunc))->get();
199 }
200 
201 template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
204 {
205  auto statsFunc = [this, sequenceKey](CoroContextPtr<SequenceKeyStatistics> ctx)->int
206  {
207  typename ContextMap::iterator ctxIt = _contexts.find(sequenceKey);
208  if (ctxIt == _contexts.end())
209  {
210  return ctx->set(SequenceKeyStatistics());
211  }
212  return ctx->set(SequenceKeyStatistics(*ctxIt->second._stats));
213  };
214  return _dispatcher.post<SequenceKeyStatistics>(_controllerQueueId, true, std::move(statsFunc))->get();
215 }
216 
217 template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
220 {
221  return *_universalContext._stats;
222 }
223 
224 template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
227 {
228  return *_taskStats;
229 }
230 
231 template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
232 size_t
234 {
235  auto statsFunc = [this](CoroContextPtr<size_t> ctx)->int
236  {
237  return ctx->set(_contexts.size());
238  };
239  return _dispatcher.post<size_t>(_controllerQueueId, true, std::move(statsFunc))->get();
240 }
241 
242 template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
243 template <class FUNC, class ... ARGS>
244 int
247  void* opaque,
248  Sequencer& sequencer,
249  SequenceKeyData&& dependent,
250  SequenceKeyData&& universalDependent,
251  FUNC&& func,
252  ARGS&&... args)
253 {
254  // wait until all the dependents are done
255  if (dependent._context)
256  {
257  dependent._context->wait(ctx);
258  }
259  if (universalDependent._context)
260  {
261  universalDependent._context->wait(ctx);
262  }
263  // update task stats
264  dependent._stats->decrementPendingTaskCount();
265  sequencer._taskStats->decrementPendingTaskCount();
266  callPosted(ctx, opaque, sequencer, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
267  return 0;
268 }
269 
270 template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
271 template <class FUNC, class ... ARGS>
272 int
273 Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::waitForDependents(
274  CoroContextPtr<int> ctx,
275  void* opaque,
276  Sequencer& sequencer,
277  std::vector<SequenceKeyData>&& dependents,
278  SequenceKeyData&& universalDependent,
279  FUNC&& func,
280  ARGS&&... args)
281 {
282  // wait until all the dependents are done
283  for (const auto& dependent : dependents)
284  {
285  if (dependent._context)
286  {
287  dependent._context->wait(ctx);
288  }
289  }
290  //wait until the universal dependent is done
291  if (universalDependent._context)
292  {
293  universalDependent._context->wait(ctx);
294  }
295  //update stats
296  for (const auto& dependent : dependents)
297  {
298  dependent._stats->decrementPendingTaskCount();
299  }
300  // update task stats
301  sequencer._taskStats->decrementPendingTaskCount();
302  callPosted(ctx, opaque, sequencer, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
303  return 0;
304 }
305 
306 template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
307 template <class FUNC, class ... ARGS>
308 int
309 Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::waitForUniversalDependent(
310  CoroContextPtr<int> ctx,
311  void* opaque,
312  Sequencer& sequencer,
313  std::vector<SequenceKeyData>&& dependents,
314  SequenceKeyData&& universalDependent,
315  FUNC&& func,
316  ARGS&&... args)
317 {
318  // wait until all the dependents are done
319  for (const auto& dependent : dependents)
320  {
321  if (dependent._context)
322  {
323  dependent._context->wait(ctx);
324  }
325  }
326  //wait until the universal dependent is done
327  if (universalDependent._context)
328  {
329  universalDependent._context->wait(ctx);
330  }
331  universalDependent._stats->decrementPendingTaskCount();
332  // update task stats
333  sequencer._taskStats->decrementPendingTaskCount();
334  callPosted(ctx, opaque, sequencer, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
335  return 0;
336 }
337 
338 template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
339 template <class FUNC, class ... ARGS>
340 int
341 Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::singleSequenceKeyTaskScheduler(
342  CoroContextPtr<int> ctx,
343  void* opaque,
344  int queueId,
345  bool isHighPriority,
346  Sequencer& sequencer,
347  SequenceKey&& sequenceKey,
348  FUNC&& func,
349  ARGS&&... args)
350 {
351  // find the dependent
352  typename ContextMap::iterator contextIt = sequencer._contexts.find(sequenceKey);
353  if (contextIt == sequencer._contexts.end())
354  {
355  contextIt = sequencer._contexts.emplace(sequenceKey, SequenceKeyData()).first;
356  }
357  // update stats
358  contextIt->second._stats->incrementPostedTaskCount();
359  contextIt->second._stats->incrementPendingTaskCount();
360  // update task stats
361  sequencer._taskStats->incrementPostedTaskCount();
362  sequencer._taskStats->incrementPendingTaskCount();
363 
364  // save the context as the last for this sequenceKey
365  contextIt->second._context = ctx->post<int>(
366  std::move(queueId),
367  std::move(isHighPriority),
368  waitForTwoDependents<FUNC, ARGS...>,
369  std::move(opaque),
370  sequencer,
371  SequenceKeyData(contextIt->second),
372  SequenceKeyData(sequencer._universalContext),
373  std::forward<FUNC>(func),
374  std::forward<ARGS>(args)...);
375  return 0;
376 }
377 
378 template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
379 template <class FUNC, class ... ARGS>
380 int
381 Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::multiSequenceKeyTaskScheduler(
382  CoroContextPtr<int> ctx,
383  void* opaque,
384  int queueId,
385  bool isHighPriority,
386  Sequencer& sequencer,
387  std::vector<SequenceKey>&& sequenceKeys,
388  FUNC&& func,
389  ARGS&&... args)
390 {
391  // construct the dependent collection
392  std::vector<SequenceKeyData> dependents;
393  dependents.reserve(sequenceKeys.size());
394  dependents.push_back(sequencer._universalContext);
395  for (const SequenceKey& sequenceKey : sequenceKeys)
396  {
397  auto taskIt = sequencer._contexts.find(sequenceKey);
398  if (taskIt != sequencer._contexts.end())
399  {
400  // add the dependent and increment stats
401  taskIt->second._stats->incrementPostedTaskCount();
402  taskIt->second._stats->incrementPendingTaskCount();
403  dependents.emplace_back(taskIt->second);
404  }
405  }
406  // update task stats
407  sequencer._taskStats->incrementPostedTaskCount();
408  sequencer._taskStats->incrementPendingTaskCount();
409 
410  ICoroContextBasePtr newCtx = ctx->post<int>(
411  std::move(queueId),
412  std::move(isHighPriority),
413  waitForDependents<FUNC, ARGS...>,
414  std::move(opaque),
415  sequencer,
416  std::move(dependents),
417  SequenceKeyData(sequencer._universalContext),
418  std::forward<FUNC>(func),
419  std::forward<ARGS>(args)...);
420 
421  // save the context as the last for each sequenceKey
422  for (const SequenceKey& sequenceKey : sequenceKeys)
423  {
424  sequencer._contexts[sequenceKey]._context = newCtx;
425  }
426  return 0;
427 }
428 
429 template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
430 template <class FUNC, class ... ARGS>
431 int
432 Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::universalTaskScheduler(
433  CoroContextPtr<int> ctx,
434  void* opaque,
435  int queueId,
436  bool isHighPriority,
437  Sequencer& sequencer,
438  FUNC&& func,
439  ARGS&&... args)
440 {
441  // construct the dependent collection
442  std::vector<SequenceKeyData> dependents;
443  dependents.reserve(sequencer._contexts.size());
444  for (auto ctxIt = sequencer._contexts.begin(); ctxIt != sequencer._contexts.end(); ++ctxIt)
445  {
446  // check if the context still has a pending task
447  if (isPendingContext(ctx, ctxIt->second._context))
448  {
449  // we will need to wait on this context to finish its current running task
450  dependents.emplace_back(ctxIt->second);
451  }
452  }
453  // update the universal stats only
454  sequencer._universalContext._stats->incrementPostedTaskCount();
455  sequencer._universalContext._stats->incrementPendingTaskCount();
456  // update task stats
457  sequencer._taskStats->incrementPostedTaskCount();
458  sequencer._taskStats->incrementPendingTaskCount();
459 
460  // post the task and save the context as the last for the universal sequenceKey
461  sequencer._universalContext._context = ctx->post<int>(
462  std::move(queueId),
463  std::move(isHighPriority),
464  waitForUniversalDependent<FUNC, ARGS...>,
465  std::move(opaque),
466  sequencer,
467  std::move(dependents),
468  SequenceKeyData(sequencer._universalContext),
469  std::forward<FUNC>(func),
470  std::forward<ARGS>(args)...);
471  return 0;
472 }
473 
474 template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
475 template <class FUNC, class ... ARGS>
476 void
477 Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::callPosted(
478  CoroContextPtr<int> ctx,
479  void* opaque,
480  const Sequencer& sequencer,
481  FUNC&& func,
482  ARGS&&... args)
483 {
484  // make sure the final action is eventually called
485  try
486  {
487  std::forward<FUNC>(func)(ctx, std::forward<ARGS>(args)...);
488  }
489  catch(std::exception& ex)
490  {
491  if (sequencer._exceptionCallback)
492  {
493  sequencer._exceptionCallback(std::current_exception(), opaque);
494  }
495  }
496  catch(...)
497  {
498  if (sequencer._exceptionCallback)
499  {
500  sequencer._exceptionCallback(std::current_exception(), opaque);
501  }
502  }
503 }
504 
505 template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
506 bool
507 Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::canTrimContext(const ICoroContextBasePtr& ctx,
508  const ICoroContextBasePtr& ctxToValidate)
509 {
510  return !ctxToValidate || !ctxToValidate->valid() ||
511  ctxToValidate->waitFor(ctx, std::chrono::milliseconds(0)) == std::future_status::ready;
512 }
513 
514 template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
515 bool
516 Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::isPendingContext(const ICoroContextBasePtr& ctx,
517  const ICoroContextBasePtr& ctxToValidate)
518 {
519  return ctxToValidate && ctxToValidate->valid() &&
520  ctxToValidate->waitFor(ctx, std::chrono::milliseconds(0)) == std::future_status::timeout;
521 }
522 
523 
524 }}
Sequencer(Dispatcher &dispatcher, const Configuration &configuration=Configuration())
Constructor.
Definition: quantum_sequencer_impl.h:28
SequenceKeyStatistics getTaskStatistics()
Gets the sequencer statistics for all jobs.
Definition: quantum_sequencer_impl.h:226
Definition: quantum_buffer_impl.h:22
SequenceKeyStatistics getStatistics()
Gets the sequencer statistics for the 'universal key', a.k.a. posted via postAll() method.
Definition: quantum_sequencer_impl.h:219
Definition: quantum_sequence_key_statistics.h:72
size_t trimSequenceKeys()
Trims the sequence keys not used by the sequencer anymore.
Definition: quantum_sequencer_impl.h:184
void post(const SequenceKey &sequenceKey, FUNC &&func, ARGS &&... args)
Post a coroutine to run asynchronously.
Definition: quantum_sequencer_impl.h:49
typename ICoroContext< RET >::Ptr CoroContextPtr
Definition: quantum_icoro_context.h:479
Definition: quantum_stl_impl.h:23
Implementation of a configuration class for Sequencer.
Definition: quantum_sequencer_configuration.h:41
ICoroContextBase::Ptr ICoroContextBasePtr
Definition: quantum_icoro_context_base.h:79
Parallel execution engine used to run coroutines or IO tasks asynchronously. This class is the main e...
Definition: quantum_dispatcher.h:34
Implementation of a statistics collection for a SequenceKey in Sequencer.
Definition: quantum_sequence_key_statistics.h:32
Definition: quantum_sequencer_configuration_impl.h:28
int getNumCoroutineThreads() const
Returns the number of underlying coroutine threads as specified in the constructor....
Definition: quantum_dispatcher_impl.h:267
size_t getSequenceKeyCount()
Gets the number of tracked sequence keys.
Definition: quantum_sequencer_impl.h:233
void postAll(FUNC &&func, ARGS &&... args)
Post a coroutine to run asynchronously.
Definition: quantum_sequencer_impl.h:144
Implementation of a key-based task sequencing with quantum.
Definition: quantum_sequencer.h:43