22 #include <quantum/util/quantum_util.h> 30 bool pinCoroutineThreadsToCores) :
31 _dispatcher(numCoroutineThreads, numIoThreads, pinCoroutineThreadsToCores),
33 _terminated ATOMIC_FLAG_INIT
40 _terminated ATOMIC_FLAG_INIT
50 template <
class RET,
class FUNC,
class ... ARGS>
58 template <
class RET,
class FUNC,
class ... ARGS>
65 return postImpl<RET>(queueId, isHighPriority,
ITask::Type::Standalone, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
68 template <
class RET,
class FUNC,
class ... ARGS>
76 template <
class RET,
class FUNC,
class ... ARGS>
83 return postImpl<RET>(queueId, isHighPriority,
ITask::Type::First, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
86 template <
class RET,
class FUNC,
class ... ARGS>
91 return postAsyncIoImpl<RET>((
int)
IQueue::QueueId::Any,
false, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
94 template <
class RET,
class FUNC,
class ... ARGS>
101 return postAsyncIoImpl<RET>(queueId, isHighPriority, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
104 template <
class RET,
class INPUT_IT,
class>
110 return forEach<RET>(first, std::distance(first, last), std::move(func));
113 template <
class RET,
class INPUT_IT>
119 return post<std::vector<RET>>(Util::forEachCoro<RET, INPUT_IT>,
125 template <
class RET,
class INPUT_IT,
class>
131 return forEachBatch<RET>(first, std::distance(first, last), std::move(func));
134 template <
class RET,
class INPUT_IT>
140 return post<std::vector<std::vector<RET>>>(Util::forEachBatchCoro<RET, INPUT_IT>,
158 return mapReduce(first, std::distance(first, last), std::move(mapper), std::move(reducer));
171 using ReducerOutput = std::map<KEY, REDUCED_TYPE>;
172 return post<ReducerOutput>(Util::mapReduceCoro<KEY, MAPPED_TYPE, REDUCED_TYPE, INPUT_IT>,
190 return mapReduceBatch(first, std::distance(first, last), std::move(mapper), std::move(reducer));
203 using ReducerOutput = std::map<KEY, REDUCED_TYPE>;
204 return post<ReducerOutput>(Util::mapReduceBatchCoro<KEY, MAPPED_TYPE, REDUCED_TYPE, INPUT_IT>,
214 if (!_terminated.test_and_set())
224 return _dispatcher.
size(type, queueId);
231 return _dispatcher.
empty(type, queueId);
239 auto start = std::chrono::high_resolution_clock::now();
248 if (timeout != std::chrono::milliseconds::zero())
250 auto present = std::chrono::high_resolution_clock::now();
251 if (std::chrono::duration_cast<std::chrono::milliseconds>(present-start) > timeout)
259 #ifdef __QUANTUM_PRINT_DEBUG 260 std::lock_guard<std::mutex> guard(Util::LogMutex());
261 std::cout <<
"All queues have drained." << std::endl;
288 return _dispatcher.
stats(type, queueId);
297 template <
class RET,
class FUNC,
class ... ARGS>
299 Dispatcher::postImpl(
int queueId,
307 throw std::runtime_error(
"Posting is disabled");
311 throw std::runtime_error(
"Invalid coroutine queue id");
313 auto ctx = ContextPtr<RET>(
new Context<RET>(_dispatcher),
319 std::forward<FUNC>(func),
320 std::forward<ARGS>(args)...),
325 _dispatcher.
post(task);
327 return std::static_pointer_cast<IThreadContext<RET>>(ctx);
330 template <
class RET,
class FUNC,
class ... ARGS>
332 Dispatcher::postAsyncIoImpl(
int queueId,
339 throw std::runtime_error(
"Posting is disabled");
343 throw std::runtime_error(
"Invalid IO queue id");
349 std::forward<FUNC>(func),
350 std::forward<ARGS>(args)...),
353 return promise->getIThreadFuture();
void drain(std::chrono::milliseconds timeout=std::chrono::milliseconds::zero())
Drains all queues on this dispatcher object.
Definition: quantum_dispatcher_impl.h:235
QueueType
Definition: quantum_iqueue.h:37
ThreadContextPtr< std::vector< std::vector< RET > > > forEachBatch(INPUT_IT first, INPUT_IT last, Functions::ForEachFunc< RET, INPUT_IT > func)
The batched version of forEach(). This function applies the given unary function to all the elements ...
Definition: quantum_dispatcher_impl.h:127
Definition: quantum_buffer_impl.h:22
QueueStatistics stats(IQueue::QueueType type=IQueue::QueueType::All, int queueId=(int) IQueue::QueueId::All)
Returns a statistics object for the specified type and queue id.
Definition: quantum_dispatcher_impl.h:285
ThreadContextPtr< RET > post(FUNC &&func, ARGS &&... args)
Post a coroutine to run asynchronously.
Definition: quantum_dispatcher_impl.h:52
QueueStatistics stats(IQueue::QueueType type, int queueId)
Definition: quantum_dispatcher_core_impl.h:240
void terminate() final
Signal all threads to immediately terminate and exit. All other pending coroutines and IO tasks will ...
Definition: quantum_dispatcher_impl.h:212
ThreadContextPtr< std::map< KEY, REDUCED_TYPE > > mapReduce(INPUT_IT first, INPUT_IT last, Functions::MapFunc< KEY, MAPPED_TYPE, INPUT_IT > mapper, Functions::ReduceFunc< KEY, MAPPED_TYPE, REDUCED_TYPE > reducer)
Implementation of map-reduce functionality.
Definition: quantum_dispatcher_impl.h:153
void terminate() final
Terminates the object.
Definition: quantum_dispatcher_core_impl.h:82
ThreadFuturePtr< RET > postAsyncIo(FUNC &&func, ARGS &&... args)
Post a blocking IO (or long running) task to run asynchronously on the IO thread pool.
Definition: quantum_dispatcher_impl.h:88
Type
Definition: quantum_itask.h:37
static void deleter(Promise< T > *p)
Definition: quantum_promise_impl.h:207
int getNumIoThreads() const
Definition: quantum_dispatcher_core_impl.h:423
std::function< RET(const typename std::iterator_traits< INPUT_IT >::value_type &)> ForEachFunc
Definition: quantum_functions.h:34
ThreadContextPtr< std::map< KEY, REDUCED_TYPE > > mapReduceBatch(INPUT_IT first, INPUT_IT last, Functions::MapFunc< KEY, MAPPED_TYPE, INPUT_IT > mapper, Functions::ReduceFunc< KEY, MAPPED_TYPE, REDUCED_TYPE > reducer)
This version of mapReduce() runs both the mapper and the reducer functions in batches for improved pe...
Definition: quantum_dispatcher_impl.h:185
static void deleter(IoTask *p)
Definition: quantum_io_task_impl.h:137
ThreadContextPtr< RET > postFirst(FUNC &&func, ARGS &&... args)
Post the first coroutine in a continuation chain to run asynchronously.
Definition: quantum_dispatcher_impl.h:70
std::function< std::vector< std::pair< KEY, MAPPED_TYPE > >(const typename std::iterator_traits< INPUT_IT >::value_type &)> MapFunc
Definition: quantum_functions.h:37
Provides various counters related to queues and task execution.
Definition: quantum_queue_statistics.h:30
size_t size(IQueue::QueueType type, int queueId) const
Definition: quantum_dispatcher_core_impl.h:102
bool empty(IQueue::QueueType type, int queueId) const
Definition: quantum_dispatcher_core_impl.h:121
Definition: quantum_configuration.h:31
bool empty(IQueue::QueueType type=IQueue::QueueType::All, int queueId=(int) IQueue::QueueId::All) const
Check if the specified type and queue id is empty (i.e. there are no running tasks)
Definition: quantum_dispatcher_impl.h:228
std::shared_ptr< IoTask > Ptr
Definition: quantum_io_task.h:37
int getNumCoroutineThreads() const
Returns the number of underlying coroutine threads as specified in the constructor....
Definition: quantum_dispatcher_impl.h:267
void resetStats()
Resets all coroutine and IO queue counters.
Definition: quantum_dispatcher_impl.h:292
std::shared_ptr< Task > Ptr
Definition: quantum_task.h:44
std::function< std::pair< KEY, REDUCED_TYPE >(std::pair< KEY, std::vector< MAPPED_TYPE > > &&)> ReduceFunc
Definition: quantum_functions.h:40
const std::pair< int, int > & getCoroQueueIdRangeForAny() const
Definition: quantum_dispatcher_core_impl.h:429
ThreadContextPtr< std::vector< RET > > forEach(INPUT_IT first, INPUT_IT last, Functions::ForEachFunc< RET, INPUT_IT > func)
Applies the given unary function to all the elements in the range [first,last). This function runs in...
Definition: quantum_dispatcher_impl.h:106
static void deleter(Context< RET > *p)
Definition: quantum_context_impl.h:1148
void postAsyncIo(IoTask::Ptr task)
Definition: quantum_dispatcher_core_impl.h:373
typename IThreadFuture< T >::Ptr ThreadFuturePtr
Definition: quantum_ithread_future.h:69
int getNumIoThreads() const
Returns the number of underlying IO threads as specified in the constructor.
Definition: quantum_dispatcher_impl.h:273
const std::pair< int, int > & getCoroQueueIdRangeForAny() const
Gets the range [minQueueId, maxQueueId] of coroutine queueIds covered by IQueue::QueueId::Any when us...
Definition: quantum_dispatcher_impl.h:279
void post(Task::Ptr task)
Definition: quantum_dispatcher_core_impl.h:332
static void deleter(Task *p)
Definition: quantum_task_impl.h:182
DEPRECATED Dispatcher(int numCoroutineThreads=-1, int numIoThreads=5, bool pinCoroutineThreadsToCores=false)
Constructor.
Definition: quantum_dispatcher_impl.h:28
typename IThreadContext< RET >::Ptr ThreadContextPtr
Definition: quantum_ithread_context.h:242
This class provides the same functionality as a coroutine yield when called from a thread context.
Definition: quantum_yielding_thread.h:33
void resetStats()
Definition: quantum_dispatcher_core_impl.h:315
int getNumCoroutineThreads() const
Definition: quantum_dispatcher_core_impl.h:417
size_t size(IQueue::QueueType type=IQueue::QueueType::All, int queueId=(int) IQueue::QueueId::All) const
Returns the total number of queued tasks for the specified type and queue id.
Definition: quantum_dispatcher_impl.h:221