27 template <
class SequenceKey,
class Hash,
class KeyEqual,
class Allocator>
30 _dispatcher(dispatcher),
31 _controllerQueueId(configuration.getControlQueueId()),
33 _contexts(configuration.getBucketCount(),
34 configuration.getHash(),
35 configuration.getKeyEqual(),
36 configuration.getAllocator()),
37 _exceptionCallback(configuration.getExceptionCallback()),
42 throw std::out_of_range(
"Allowed range is 0 <= controllerQueueId < _dispatcher.getNumCoroutineThreads()");
46 template <
class SequenceKey,
class Hash,
class KeyEqual,
class Allocator>
47 template <
class FUNC,
class ... ARGS>
50 const SequenceKey& sequenceKey,
54 _dispatcher.post<
int>(_controllerQueueId,
56 singleSequenceKeyTaskScheduler<FUNC, ARGS...>,
61 SequenceKey(sequenceKey),
62 std::forward<FUNC>(func),
63 std::forward<ARGS>(args)...);
66 template <
class SequenceKey,
class Hash,
class KeyEqual,
class Allocator>
67 template <
class FUNC,
class ... ARGS>
73 const SequenceKey& sequenceKey,
79 throw std::runtime_error(
"Invalid IO queue id");
82 _dispatcher.post<
int>(_controllerQueueId,
84 singleSequenceKeyTaskScheduler<FUNC, ARGS...>,
87 std::move(isHighPriority),
89 SequenceKey(sequenceKey),
90 std::forward<FUNC>(func),
91 std::forward<ARGS>(args)...);
94 template <
class SequenceKey,
class Hash,
class KeyEqual,
class Allocator>
95 template <
class FUNC,
class ... ARGS>
98 const std::vector<SequenceKey>& sequenceKeys,
102 _dispatcher.post<
int>(_controllerQueueId,
104 multiSequenceKeyTaskScheduler<FUNC, ARGS...>,
109 std::vector<SequenceKey>(sequenceKeys),
110 std::forward<FUNC>(func),
111 std::forward<ARGS>(args)...);
114 template <
class SequenceKey,
class Hash,
class KeyEqual,
class Allocator>
115 template <
class FUNC,
class ... ARGS>
121 const std::vector<SequenceKey>& sequenceKeys,
127 throw std::runtime_error(
"Invalid IO queue id");
129 _dispatcher.post<
int>(_controllerQueueId,
131 multiSequenceKeyTaskScheduler<FUNC, ARGS...>,
134 std::move(isHighPriority),
136 std::vector<SequenceKey>(sequenceKeys),
137 std::forward<FUNC>(func),
138 std::forward<ARGS>(args)...);
141 template <
class SequenceKey,
class Hash,
class KeyEqual,
class Allocator>
142 template <
class FUNC,
class ... ARGS>
146 _dispatcher.post<
int>(_controllerQueueId,
148 universalTaskScheduler<FUNC, ARGS...>,
153 std::forward<FUNC>(func),
154 std::forward<ARGS>(args)...);
157 template <
class SequenceKey,
class Hash,
class KeyEqual,
class Allocator>
158 template <
class FUNC,
class ... ARGS>
169 throw std::runtime_error(
"Invalid IO queue id");
171 _dispatcher.post<
int>(_controllerQueueId,
173 universalTaskScheduler<FUNC, ARGS...>,
176 std::move(isHighPriority),
178 std::forward<FUNC>(func),
179 std::forward<ARGS>(args)...);
182 template <
class SequenceKey,
class Hash,
class KeyEqual,
class Allocator>
188 for (
auto it = _contexts.begin(); it != _contexts.end();)
191 if (canTrimContext(ctx, trimIt->second._context))
193 _contexts.erase(trimIt);
196 return ctx->set(_contexts.size());
198 return _dispatcher.post<
size_t>(_controllerQueueId,
true, std::move(trimFunc))->get();
201 template <
class SequenceKey,
class Hash,
class KeyEqual,
class Allocator>
207 typename ContextMap::iterator ctxIt = _contexts.find(sequenceKey);
208 if (ctxIt == _contexts.end())
214 return _dispatcher.post<
SequenceKeyStatistics>(_controllerQueueId,
true, std::move(statsFunc))->get();
217 template <
class SequenceKey,
class Hash,
class KeyEqual,
class Allocator>
221 return *_universalContext._stats;
224 template <
class SequenceKey,
class Hash,
class KeyEqual,
class Allocator>
231 template <
class SequenceKey,
class Hash,
class KeyEqual,
class Allocator>
237 return ctx->set(_contexts.size());
239 return _dispatcher.post<
size_t>(_controllerQueueId,
true, std::move(statsFunc))->get();
242 template <
class SequenceKey,
class Hash,
class KeyEqual,
class Allocator>
243 template <
class FUNC,
class ... ARGS>
255 if (dependent._context)
257 dependent._context->wait(ctx);
259 if (universalDependent._context)
261 universalDependent._context->wait(ctx);
264 dependent._stats->decrementPendingTaskCount();
265 sequencer._taskStats->decrementPendingTaskCount();
266 callPosted(ctx, opaque, sequencer, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
270 template <
class SequenceKey,
class Hash,
class KeyEqual,
class Allocator>
271 template <
class FUNC,
class ... ARGS>
273 Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::waitForDependents(
274 CoroContextPtr<int> ctx,
276 Sequencer& sequencer,
277 std::vector<SequenceKeyData>&& dependents,
278 SequenceKeyData&& universalDependent,
283 for (
const auto& dependent : dependents)
285 if (dependent._context)
287 dependent._context->wait(ctx);
291 if (universalDependent._context)
293 universalDependent._context->wait(ctx);
296 for (
const auto& dependent : dependents)
298 dependent._stats->decrementPendingTaskCount();
301 sequencer._taskStats->decrementPendingTaskCount();
302 callPosted(ctx, opaque, sequencer, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
306 template <
class SequenceKey,
class Hash,
class KeyEqual,
class Allocator>
307 template <
class FUNC,
class ... ARGS>
309 Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::waitForUniversalDependent(
310 CoroContextPtr<int> ctx,
312 Sequencer& sequencer,
313 std::vector<SequenceKeyData>&& dependents,
314 SequenceKeyData&& universalDependent,
319 for (
const auto& dependent : dependents)
321 if (dependent._context)
323 dependent._context->wait(ctx);
327 if (universalDependent._context)
329 universalDependent._context->wait(ctx);
331 universalDependent._stats->decrementPendingTaskCount();
333 sequencer._taskStats->decrementPendingTaskCount();
334 callPosted(ctx, opaque, sequencer, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
338 template <
class SequenceKey,
class Hash,
class KeyEqual,
class Allocator>
339 template <
class FUNC,
class ... ARGS>
341 Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::singleSequenceKeyTaskScheduler(
342 CoroContextPtr<int> ctx,
346 Sequencer& sequencer,
347 SequenceKey&& sequenceKey,
352 typename ContextMap::iterator contextIt = sequencer._contexts.find(sequenceKey);
353 if (contextIt == sequencer._contexts.end())
355 contextIt = sequencer._contexts.emplace(sequenceKey, SequenceKeyData()).first;
358 contextIt->second._stats->incrementPostedTaskCount();
359 contextIt->second._stats->incrementPendingTaskCount();
361 sequencer._taskStats->incrementPostedTaskCount();
362 sequencer._taskStats->incrementPendingTaskCount();
365 contextIt->second._context = ctx->post<
int>(
367 std::move(isHighPriority),
368 waitForTwoDependents<FUNC, ARGS...>,
371 SequenceKeyData(contextIt->second),
372 SequenceKeyData(sequencer._universalContext),
373 std::forward<FUNC>(func),
374 std::forward<ARGS>(args)...);
378 template <
class SequenceKey,
class Hash,
class KeyEqual,
class Allocator>
379 template <
class FUNC,
class ... ARGS>
381 Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::multiSequenceKeyTaskScheduler(
382 CoroContextPtr<int> ctx,
386 Sequencer& sequencer,
387 std::vector<SequenceKey>&& sequenceKeys,
392 std::vector<SequenceKeyData> dependents;
393 dependents.reserve(sequenceKeys.size());
394 dependents.push_back(sequencer._universalContext);
395 for (
const SequenceKey& sequenceKey : sequenceKeys)
397 auto taskIt = sequencer._contexts.find(sequenceKey);
398 if (taskIt != sequencer._contexts.end())
401 taskIt->second._stats->incrementPostedTaskCount();
402 taskIt->second._stats->incrementPendingTaskCount();
403 dependents.emplace_back(taskIt->second);
407 sequencer._taskStats->incrementPostedTaskCount();
408 sequencer._taskStats->incrementPendingTaskCount();
412 std::move(isHighPriority),
413 waitForDependents<FUNC, ARGS...>,
416 std::move(dependents),
417 SequenceKeyData(sequencer._universalContext),
418 std::forward<FUNC>(func),
419 std::forward<ARGS>(args)...);
422 for (
const SequenceKey& sequenceKey : sequenceKeys)
424 sequencer._contexts[sequenceKey]._context = newCtx;
429 template <
class SequenceKey,
class Hash,
class KeyEqual,
class Allocator>
430 template <
class FUNC,
class ... ARGS>
432 Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::universalTaskScheduler(
433 CoroContextPtr<int> ctx,
437 Sequencer& sequencer,
442 std::vector<SequenceKeyData> dependents;
443 dependents.reserve(sequencer._contexts.size());
444 for (
auto ctxIt = sequencer._contexts.begin(); ctxIt != sequencer._contexts.end(); ++ctxIt)
447 if (isPendingContext(ctx, ctxIt->second._context))
450 dependents.emplace_back(ctxIt->second);
454 sequencer._universalContext._stats->incrementPostedTaskCount();
455 sequencer._universalContext._stats->incrementPendingTaskCount();
457 sequencer._taskStats->incrementPostedTaskCount();
458 sequencer._taskStats->incrementPendingTaskCount();
461 sequencer._universalContext._context = ctx->post<
int>(
463 std::move(isHighPriority),
464 waitForUniversalDependent<FUNC, ARGS...>,
467 std::move(dependents),
468 SequenceKeyData(sequencer._universalContext),
469 std::forward<FUNC>(func),
470 std::forward<ARGS>(args)...);
474 template <
class SequenceKey,
class Hash,
class KeyEqual,
class Allocator>
475 template <
class FUNC,
class ... ARGS>
477 Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::callPosted(
478 CoroContextPtr<int> ctx,
480 const Sequencer& sequencer,
487 std::forward<FUNC>(func)(ctx, std::forward<ARGS>(args)...);
489 catch(std::exception& ex)
491 if (sequencer._exceptionCallback)
493 sequencer._exceptionCallback(std::current_exception(), opaque);
498 if (sequencer._exceptionCallback)
500 sequencer._exceptionCallback(std::current_exception(), opaque);
505 template <
class SequenceKey,
class Hash,
class KeyEqual,
class Allocator>
507 Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::canTrimContext(
const ICoroContextBasePtr& ctx,
510 return !ctxToValidate || !ctxToValidate->valid() ||
511 ctxToValidate->waitFor(ctx, std::chrono::milliseconds(0)) == std::future_status::ready;
514 template <
class SequenceKey,
class Hash,
class KeyEqual,
class Allocator>
516 Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::isPendingContext(
const ICoroContextBasePtr& ctx,
519 return ctxToValidate && ctxToValidate->valid() &&
520 ctxToValidate->waitFor(ctx, std::chrono::milliseconds(0)) == std::future_status::timeout;
Sequencer(Dispatcher &dispatcher, const Configuration &configuration=Configuration())
Constructor.
Definition: quantum_sequencer_impl.h:28
SequenceKeyStatistics getTaskStatistics()
Gets the sequencer statistics for all jobs.
Definition: quantum_sequencer_impl.h:226
Definition: quantum_buffer_impl.h:22
SequenceKeyStatistics getStatistics()
Gets the sequencer statistics for the 'universal key', a.k.a. posted via postAll() method.
Definition: quantum_sequencer_impl.h:219
Definition: quantum_sequence_key_statistics.h:72
size_t trimSequenceKeys()
Trims the sequence keys not used by the sequencer anymore.
Definition: quantum_sequencer_impl.h:184
void post(const SequenceKey &sequenceKey, FUNC &&func, ARGS &&... args)
Post a coroutine to run asynchronously.
Definition: quantum_sequencer_impl.h:49
typename ICoroContext< RET >::Ptr CoroContextPtr
Definition: quantum_icoro_context.h:479
Definition: quantum_stl_impl.h:23
Implementation of a configuration class for Sequencer.
Definition: quantum_sequencer_configuration.h:41
ICoroContextBase::Ptr ICoroContextBasePtr
Definition: quantum_icoro_context_base.h:79
Parallel execution engine used to run coroutines or IO tasks asynchronously. This class is the main e...
Definition: quantum_dispatcher.h:34
Implementation of a statistics collection for a SequenceKey in Sequencer.
Definition: quantum_sequence_key_statistics.h:32
Definition: quantum_sequencer_configuration_impl.h:28
int getNumCoroutineThreads() const
Returns the number of underlying coroutine threads as specified in the constructor....
Definition: quantum_dispatcher_impl.h:267
size_t getSequenceKeyCount()
Gets the number of tracked sequence keys.
Definition: quantum_sequencer_impl.h:233
void postAll(FUNC &&func, ARGS &&... args)
Post a coroutine to run asynchronously.
Definition: quantum_sequencer_impl.h:144
Implementation of a key-based task sequencing with quantum.
Definition: quantum_sequencer.h:43