QuantumLibrary
|
Parallel execution engine used to run coroutines or IO tasks asynchronously. This class is the main entry point into the library. More...
#include <quantum_dispatcher.h>
Public Types | |
using | ContextTag = ThreadContextTag |
Public Member Functions | |
DEPRECATED | Dispatcher (int numCoroutineThreads=-1, int numIoThreads=5, bool pinCoroutineThreadsToCores=false) |
Constructor. More... | |
Dispatcher (const Configuration &config) | |
Constructor. @oaram[in] config The configuration for the Quantum dispatcher. More... | |
~Dispatcher () | |
Destructor. More... | |
template<class RET = int, class FUNC , class ... ARGS> | |
ThreadContextPtr< RET > | post (FUNC &&func, ARGS &&... args) |
Post a coroutine to run asynchronously. More... | |
template<class RET = int, class FUNC , class ... ARGS> | |
ThreadContextPtr< RET > | post (int queueId, bool isHighPriority, FUNC &&func, ARGS &&... args) |
Post a coroutine to run asynchronously on a specific queue (thread). More... | |
template<class RET = int, class FUNC , class ... ARGS> | |
ThreadContextPtr< RET > | postFirst (FUNC &&func, ARGS &&... args) |
Post the first coroutine in a continuation chain to run asynchronously. More... | |
template<class RET = int, class FUNC , class ... ARGS> | |
ThreadContextPtr< RET > | postFirst (int queueId, bool isHighPriority, FUNC &&func, ARGS &&... args) |
Post the first coroutine in a continuation chain to run asynchronously on a specific queue (thread). More... | |
template<class RET = int, class FUNC , class ... ARGS> | |
ThreadFuturePtr< RET > | postAsyncIo (FUNC &&func, ARGS &&... args) |
Post a blocking IO (or long running) task to run asynchronously on the IO thread pool. More... | |
template<class RET = int, class FUNC , class ... ARGS> | |
ThreadFuturePtr< RET > | postAsyncIo (int queueId, bool isHighPriority, FUNC &&func, ARGS &&... args) |
Post a blocking IO (or long running) task to run asynchronously on a specific thread in the IO thread pool. More... | |
template<class RET = int, class INPUT_IT , class = Traits::IsInputIterator<INPUT_IT>> | |
ThreadContextPtr< std::vector< RET > > | forEach (INPUT_IT first, INPUT_IT last, Functions::ForEachFunc< RET, INPUT_IT > func) |
Applies the given unary function to all the elements in the range [first,last). This function runs in parallel. More... | |
template<class RET = int, class INPUT_IT > | |
ThreadContextPtr< std::vector< RET > > | forEach (INPUT_IT first, size_t num, Functions::ForEachFunc< RET, INPUT_IT > func) |
Same as forEach() but takes a length as second argument in case INPUT_IT is not a random access iterator. More... | |
template<class RET = int, class INPUT_IT , class = Traits::IsInputIterator<INPUT_IT>> | |
ThreadContextPtr< std::vector< std::vector< RET > > > | forEachBatch (INPUT_IT first, INPUT_IT last, Functions::ForEachFunc< RET, INPUT_IT > func) |
The batched version of forEach(). This function applies the given unary function to all the elements in the range [first,last). This function runs serially with respect to other functions in the same batch. More... | |
template<class RET = int, class INPUT_IT > | |
ThreadContextPtr< std::vector< std::vector< RET > > > | forEachBatch (INPUT_IT first, size_t num, Functions::ForEachFunc< RET, INPUT_IT > func) |
Same as forEachBatch() but takes a length as second argument in case INPUT_IT is not a random access iterator. More... | |
template<class KEY , class MAPPED_TYPE , class REDUCED_TYPE , class INPUT_IT , class = Traits::IsInputIterator<INPUT_IT>> | |
ThreadContextPtr< std::map< KEY, REDUCED_TYPE > > | mapReduce (INPUT_IT first, INPUT_IT last, Functions::MapFunc< KEY, MAPPED_TYPE, INPUT_IT > mapper, Functions::ReduceFunc< KEY, MAPPED_TYPE, REDUCED_TYPE > reducer) |
Implementation of map-reduce functionality. More... | |
template<class KEY , class MAPPED_TYPE , class REDUCED_TYPE , class INPUT_IT > | |
ThreadContextPtr< std::map< KEY, REDUCED_TYPE > > | mapReduce (INPUT_IT first, size_t num, Functions::MapFunc< KEY, MAPPED_TYPE, INPUT_IT > mapper, Functions::ReduceFunc< KEY, MAPPED_TYPE, REDUCED_TYPE > reducer) |
Same as mapReduce() but takes a length as second argument in case INPUT_IT is not a random access iterator. More... | |
template<class KEY , class MAPPED_TYPE , class REDUCED_TYPE , class INPUT_IT , class = Traits::IsInputIterator<INPUT_IT>> | |
ThreadContextPtr< std::map< KEY, REDUCED_TYPE > > | mapReduceBatch (INPUT_IT first, INPUT_IT last, Functions::MapFunc< KEY, MAPPED_TYPE, INPUT_IT > mapper, Functions::ReduceFunc< KEY, MAPPED_TYPE, REDUCED_TYPE > reducer) |
This version of mapReduce() runs both the mapper and the reducer functions in batches for improved performance. This should be used in the case where the functions are more CPU intensive with little or no IO. More... | |
template<class KEY , class MAPPED_TYPE , class REDUCED_TYPE , class INPUT_IT > | |
ThreadContextPtr< std::map< KEY, REDUCED_TYPE > > | mapReduceBatch (INPUT_IT first, size_t num, Functions::MapFunc< KEY, MAPPED_TYPE, INPUT_IT > mapper, Functions::ReduceFunc< KEY, MAPPED_TYPE, REDUCED_TYPE > reducer) |
Same as mapReduceBatch() but takes a length as second argument in case INPUT_IT is not a random access iterator. More... | |
void | terminate () final |
Signal all threads to immediately terminate and exit. All other pending coroutines and IO tasks will not complete. Call this function for a fast shutdown of the dispatcher. More... | |
size_t | size (IQueue::QueueType type=IQueue::QueueType::All, int queueId=(int) IQueue::QueueId::All) const |
Returns the total number of queued tasks for the specified type and queue id. More... | |
bool | empty (IQueue::QueueType type=IQueue::QueueType::All, int queueId=(int) IQueue::QueueId::All) const |
Check if the specified type and queue id is empty (i.e. there are no running tasks) More... | |
void | drain (std::chrono::milliseconds timeout=std::chrono::milliseconds::zero()) |
Drains all queues on this dispatcher object. More... | |
int | getNumCoroutineThreads () const |
Returns the number of underlying coroutine threads as specified in the constructor. If -1 was passed than this number essentially indicates the number of cores. More... | |
int | getNumIoThreads () const |
Returns the number of underlying IO threads as specified in the constructor. More... | |
const std::pair< int, int > & | getCoroQueueIdRangeForAny () const |
Gets the range [minQueueId, maxQueueId] of coroutine queueIds covered by IQueue::QueueId::Any when using Dispatcher::post. More... | |
QueueStatistics | stats (IQueue::QueueType type=IQueue::QueueType::All, int queueId=(int) IQueue::QueueId::All) |
Returns a statistics object for the specified type and queue id. More... | |
void | resetStats () |
Resets all coroutine and IO queue counters. More... | |
![]() | |
virtual | ~ITerminate ()=default |
Virtual destructor. This function is explicitly left empty. More... | |
Parallel execution engine used to run coroutines or IO tasks asynchronously. This class is the main entry point into the library.
|
inline |
Constructor.
This will build two thread pools, one used for running parallel coroutines and another used for running blocking IO tasks.
[in] | numCoroutineThreads | Number of parallel threads running coroutines. -1 indicates one per core. |
[in] | numIoThreads | Number of parallel threads running blocking IO calls. |
[in] | pinCoroutineThreadsToCores | If set to true, it will pin all coroutine threads unto physical cores. provided numCoroutineThreads <= cores. |
|
explicit |
Constructor. @oaram[in] config The configuration for the Quantum dispatcher.
Bloomberg::quantum::Dispatcher::~Dispatcher | ( | ) |
Destructor.
Destroys the task dispatcher object. This will wait until all coroutines complete, signal all worker threads (coroutine and IO) to exit and join them.
|
inline |
Drains all queues on this dispatcher object.
[in] | timeout | Maximum time for this function to wait. Set to 0 to wait indefinitely until all queues drain. |
|
inline |
Check if the specified type and queue id is empty (i.e. there are no running tasks)
[in] | type | The type of queue. |
[in] | queueId | The queue number to query. Valid range is [0, numCoroutineThreads) for IQueue::QueueType::Coro, [0, numIoThreads) for IQueue::QueueType::IO and IQueue::QueueId::All for either. |
ThreadContextPtr< std::vector< RET > > Bloomberg::quantum::Dispatcher::forEach | ( | INPUT_IT | first, |
INPUT_IT | last, | ||
Functions::ForEachFunc< RET, INPUT_IT > | func | ||
) |
Applies the given unary function to all the elements in the range [first,last). This function runs in parallel.
RET | The return value of the unary function. |
UNARY_FUNC | A unary function of type 'RET(*INPUT_IT)'. |
InputIt | The type of iterator. @oaram[in] first The first element in the range. @oaram[in] last The last element in the range (exclusive). @oaram[in] func The unary function. |
ThreadContextPtr< std::vector< RET > > Bloomberg::quantum::Dispatcher::forEach | ( | INPUT_IT | first, |
size_t | num, | ||
Functions::ForEachFunc< RET, INPUT_IT > | func | ||
) |
Same as forEach() but takes a length as second argument in case INPUT_IT is not a random access iterator.
ThreadContextPtr< std::vector< std::vector< RET > > > Bloomberg::quantum::Dispatcher::forEachBatch | ( | INPUT_IT | first, |
INPUT_IT | last, | ||
Functions::ForEachFunc< RET, INPUT_IT > | func | ||
) |
The batched version of forEach(). This function applies the given unary function to all the elements in the range [first,last). This function runs serially with respect to other functions in the same batch.
ThreadContextPtr< std::vector< std::vector< RET > > > Bloomberg::quantum::Dispatcher::forEachBatch | ( | INPUT_IT | first, |
size_t | num, | ||
Functions::ForEachFunc< RET, INPUT_IT > | func | ||
) |
Same as forEachBatch() but takes a length as second argument in case INPUT_IT is not a random access iterator.
|
inline |
Gets the range [minQueueId, maxQueueId] of coroutine queueIds covered by IQueue::QueueId::Any when using Dispatcher::post.
|
inline |
Returns the number of underlying coroutine threads as specified in the constructor. If -1 was passed than this number essentially indicates the number of cores.
|
inline |
Returns the number of underlying IO threads as specified in the constructor.
ThreadContextPtr< std::map< KEY, REDUCED_TYPE > > Bloomberg::quantum::Dispatcher::mapReduce | ( | INPUT_IT | first, |
INPUT_IT | last, | ||
Functions::MapFunc< KEY, MAPPED_TYPE, INPUT_IT > | mapper, | ||
Functions::ReduceFunc< KEY, MAPPED_TYPE, REDUCED_TYPE > | reducer | ||
) |
Implementation of map-reduce functionality.
KEY | The KEY type used for mapping and reducing. |
MAPPED_TYPE | The output type after a map operation. |
REDUCED_TYPE | The output type after a reduce operation. |
MAPPER_FUNC | The mapper function having the signature 'std::vector<std::pair<KEY,MAPPED_TYPE>>(*INPUT_IT)' |
REDUCER_FUNC | The reducer function having the signature 'std::pair<KEY,REDUCED_TYPE>(std::pair<KEY, std::vector<MAPPED_TYPE>>&&)' |
INPUT_IT | The iterator type. @oaram[in] first The start iterator to a list of items to be processed in the range [first,last). @oaram[in] last The end iterator to a list of items (not inclusive). @oaram[in] mapper The mapper function. @oaram[in] reducer The reducer function. |
ThreadContextPtr< std::map< KEY, REDUCED_TYPE > > Bloomberg::quantum::Dispatcher::mapReduce | ( | INPUT_IT | first, |
size_t | num, | ||
Functions::MapFunc< KEY, MAPPED_TYPE, INPUT_IT > | mapper, | ||
Functions::ReduceFunc< KEY, MAPPED_TYPE, REDUCED_TYPE > | reducer | ||
) |
Same as mapReduce() but takes a length as second argument in case INPUT_IT is not a random access iterator.
ThreadContextPtr< std::map< KEY, REDUCED_TYPE > > Bloomberg::quantum::Dispatcher::mapReduceBatch | ( | INPUT_IT | first, |
INPUT_IT | last, | ||
Functions::MapFunc< KEY, MAPPED_TYPE, INPUT_IT > | mapper, | ||
Functions::ReduceFunc< KEY, MAPPED_TYPE, REDUCED_TYPE > | reducer | ||
) |
This version of mapReduce() runs both the mapper and the reducer functions in batches for improved performance. This should be used in the case where the functions are more CPU intensive with little or no IO.
ThreadContextPtr< std::map< KEY, REDUCED_TYPE > > Bloomberg::quantum::Dispatcher::mapReduceBatch | ( | INPUT_IT | first, |
size_t | num, | ||
Functions::MapFunc< KEY, MAPPED_TYPE, INPUT_IT > | mapper, | ||
Functions::ReduceFunc< KEY, MAPPED_TYPE, REDUCED_TYPE > | reducer | ||
) |
Same as mapReduceBatch() but takes a length as second argument in case INPUT_IT is not a random access iterator.
ThreadContextPtr< RET > Bloomberg::quantum::Dispatcher::post | ( | FUNC && | func, |
ARGS &&... | args | ||
) |
Post a coroutine to run asynchronously.
This method will post the coroutine on any thread available. Typically it will pick one which has the smallest number of concurrent coroutines executing at the time of the post.
RET | Type of future returned by this coroutine. |
FUNC | Callable object type which will be wrapped in a coroutine. Can be a standalone function, a method, an std::function, a functor generated via std::bind or a lambda. The signature of the callable object must strictly be 'int f(CoroContext<RET>::Ptr, ...)'. |
ARGS | Argument types passed to FUNC. |
[in] | func | Callable object. |
[in] | args | Variable list of arguments passed to the callable object. |
ThreadContextPtr< RET > Bloomberg::quantum::Dispatcher::post | ( | int | queueId, |
bool | isHighPriority, | ||
FUNC && | func, | ||
ARGS &&... | args | ||
) |
Post a coroutine to run asynchronously on a specific queue (thread).
RET | Type of future returned by this coroutine. |
FUNC | Callable object type which will be wrapped in a coroutine. Can be a standalone function, a method, an std::function, a functor generated via std::bind or a lambda. The signature of the callable object must strictly be 'int f(CoroContext<RET>::Ptr, ...)'. |
ARGS | Argument types passed to FUNC. |
[in] | queueId | Id of the queue where this coroutine should run. Note that the user can specify IQueue::QueueId::Any as a value, which is equivalent to running the simpler version of post() above. Valid range is [0, numCoroutineThreads) or IQueue::QueueId::Any. |
[in] | isHighPriority | If set to true, the coroutine will be scheduled to run immediately after the currently executing coroutine on 'queueId' has completed or has yielded. |
[in] | func | Callable object. |
[in] | args | Variable list of arguments passed to the callable object. |
ThreadFuturePtr< RET > Bloomberg::quantum::Dispatcher::postAsyncIo | ( | FUNC && | func, |
ARGS &&... | args | ||
) |
Post a blocking IO (or long running) task to run asynchronously on the IO thread pool.
RET | Type of future returned by this task. |
FUNC | Callable object type. Can be a standalone function, a method, an std::function, a functor generated via std::bind or a lambda. The signature of the callable object must strictly be 'int f(ThreadPromise<RET>::Ptr, ...)'. |
ARGS | Argument types passed to FUNC. |
[in] | func | Callable object. |
[in] | args | Variable list of arguments passed to the callable object. |
ThreadFuturePtr< RET > Bloomberg::quantum::Dispatcher::postAsyncIo | ( | int | queueId, |
bool | isHighPriority, | ||
FUNC && | func, | ||
ARGS &&... | args | ||
) |
Post a blocking IO (or long running) task to run asynchronously on a specific thread in the IO thread pool.
RET | Type of future returned by this task. |
FUNC | Callable object type. Can be a standalone function, a method, an std::function, a functor generated via std::bind or a lambda. The signature of the callable object must strictly be 'int f(ThreadPromise<RET>::Ptr, ...)'. |
ARGS | Argument types passed to FUNC. |
[in] | queueId | Id of the queue where this task should run. Note that the user can specify IQueue::QueueId::Any as a value, which is equivalent to running the simpler version of postAsyncIo() above. Valid range is [0, numCoroutineThreads) or IQueue::QueueId::Any. |
[in] | isHighPriority | If set to true, the task will be scheduled to run immediately. |
[in] | func | Callable object. |
[in] | args | Variable list of arguments passed to the callable object. |
ThreadContextPtr< RET > Bloomberg::quantum::Dispatcher::postFirst | ( | FUNC && | func, |
ARGS &&... | args | ||
) |
Post the first coroutine in a continuation chain to run asynchronously.
RET | Type of future returned by this coroutine. |
FUNC | Callable object type which will be wrapped in a coroutine. Can be a standalone function, a method, an std::function, a functor generated via std::bind or a lambda. The signature of the callable object must strictly be 'int f(CoroContext<RET>::Ptr, ...)'. |
ARGS | Argument types passed to FUNC. |
[in] | func | Callable object. |
[in] | args | Variable list of arguments passed to the callable object. |
ThreadContextPtr< RET > Bloomberg::quantum::Dispatcher::postFirst | ( | int | queueId, |
bool | isHighPriority, | ||
FUNC && | func, | ||
ARGS &&... | args | ||
) |
Post the first coroutine in a continuation chain to run asynchronously on a specific queue (thread).
RET | Type of future returned by this coroutine. |
FUNC | Callable object type which will be wrapped in a coroutine. Can be a standalone function, a method, an std::function, a functor generated via std::bind or a lambda. The signature of the callable object must strictly be 'int f(CoroContext<RET>::Ptr, ...)'. |
ARGS | Argument types passed to FUNC. |
[in] | queueId | Id of the queue where this coroutine should run. Note that the user can specify IQueue::QueueId::Any as a value, which is equivalent to running the simpler version of post() above. Valid range is [0, numCoroutineThreads) or IQueue::QueueId::Any. |
[in] | isHighPriority | If set to true, the coroutine will be scheduled to run immediately after the currently executing coroutine on 'queueId' has completed or has yielded. |
[in] | func | Callable object. |
[in] | args | Variable list of arguments passed to the callable object. |
|
inline |
Resets all coroutine and IO queue counters.
|
inline |
Returns the total number of queued tasks for the specified type and queue id.
[in] | type | The type of queue. |
[in] | queueId | The queue number to query. Valid range is [0, numCoroutineThreads) for IQueue::QueueType::Coro, [0, numIoThreads) for IQueue::QueueType::IO and IQueue::QueueId::All for either. |
|
inline |
Returns a statistics object for the specified type and queue id.
[in] | type | The type of queue. |
[in] | queueId | The queue number to query. Valid range is [0, numCoroutineThreads) for IQueue::QueueType::Coro, [0, numIoThreads) for IQueue::QueueType::IO and IQueue::QueueId::All for either. |
|
inlinefinalvirtual |
Signal all threads to immediately terminate and exit. All other pending coroutines and IO tasks will not complete. Call this function for a fast shutdown of the dispatcher.
Implements Bloomberg::quantum::ITerminate.