26 DispatcherCore::DispatcherCore(
int numCoroutineThreads,
28 bool pinCoroutineThreadsToCores) :
29 _coroQueues((numCoroutineThreads == -1) ?
std::thread::hardware_concurrency() :
30 (numCoroutineThreads == 0) ? 1 : numCoroutineThreads),
31 _sharedIoQueues((numIoThreads <= 0) ? 1 : numIoThreads),
32 _ioQueues((numIoThreads <= 0) ? 1 : numIoThreads, IoQueue(Configuration(), &_sharedIoQueues)),
33 _loadBalanceSharedIoQueues(false),
34 _terminated ATOMIC_FLAG_INIT,
35 _coroQueueIdRangeForAny(0, (int)_coroQueues.size()-1)
37 if (pinCoroutineThreadsToCores)
39 unsigned int cores = std::thread::hardware_concurrency();
40 for (
size_t i = 0; i < _coroQueues.size(); ++i)
42 _coroQueues[i].pinToCore(i%cores);
48 DispatcherCore::DispatcherCore(
const Configuration& config) :
49 _coroQueues((config.getNumCoroutineThreads() == -1) ?
std::thread::hardware_concurrency() :
50 (config.getNumCoroutineThreads() == 0) ? 1 : config.getNumCoroutineThreads(), TaskQueue(config)),
51 _sharedIoQueues((config.getNumIoThreads() <= 0) ? 1 : config.getNumIoThreads(), IoQueue(config, nullptr)),
52 _ioQueues((config.getNumIoThreads() <= 0) ? 1 : config.getNumIoThreads(), IoQueue(config, &_sharedIoQueues)),
53 _loadBalanceSharedIoQueues(false),
54 _terminated ATOMIC_FLAG_INIT,
55 _coroQueueIdRangeForAny(0, (int)_coroQueues.size()-1)
57 if (config.getPinCoroutineThreadsToCores())
59 unsigned int cores = std::thread::hardware_concurrency();
60 for (
size_t i = 0; i < _coroQueues.size(); ++i)
62 _coroQueues[i].pinToCore(i%cores);
65 const auto& coroQueueIdRangeForAny = config.getCoroQueueIdRangeForAny();
67 if (coroQueueIdRangeForAny.first <= coroQueueIdRangeForAny.second &&
68 coroQueueIdRangeForAny.first >= 0 &&
69 coroQueueIdRangeForAny.second < (
int)_coroQueues.size())
71 _coroQueueIdRangeForAny = coroQueueIdRangeForAny;
84 if (!_terminated.test_and_set())
86 for (
auto&& queue : _coroQueues)
90 for (
auto&& queue : _ioQueues)
94 for (
auto&& queue : _sharedIoQueues)
109 throw std::runtime_error(
"Cannot specify queue id");
115 return coroSize(queueId);
117 return ioSize(queueId);
128 throw std::runtime_error(
"Cannot specify queue id");
134 return coroEmpty(queueId);
136 return ioEmpty(queueId);
140 size_t DispatcherCore::coroSize(
int queueId)
const 145 for (
auto&& queue : _coroQueues)
147 size += queue.size();
151 else if ((queueId >= (
int)_coroQueues.size()) || (queueId < 0))
153 throw std::runtime_error(
"Invalid coroutine queue id");
155 return _coroQueues.at(queueId).size();
159 bool DispatcherCore::coroEmpty(
int queueId)
const 163 for (
auto&& queue : _coroQueues)
165 if (!queue.empty())
return false;
169 else if ((queueId >= (
int)_coroQueues.size()) || (queueId < 0))
171 throw std::runtime_error(
"Invalid coroutine queue id");
173 return _coroQueues.at(queueId).empty();
177 size_t DispatcherCore::ioSize(
int queueId)
const 182 for (
auto&& queue : _ioQueues)
184 size += queue.size();
186 for (
auto&& queue : _sharedIoQueues)
188 size += queue.size();
195 for (
auto&& queue : _sharedIoQueues)
197 size += queue.size();
201 return _ioQueues.at(queueId).size();
205 bool DispatcherCore::ioEmpty(
int queueId)
const 209 for (
auto&& queue : _sharedIoQueues)
216 for (
auto&& queue : _ioQueues)
227 for (
auto&& queue : _sharedIoQueues)
236 return _ioQueues.at(queueId).empty();
246 throw std::runtime_error(
"Cannot specify queue id");
252 return coroStats(queueId);
254 return ioStats(queueId);
263 for (
auto&& queue : _coroQueues)
265 stats += queue.stats();
271 if ((queueId >= (
int)_coroQueues.size()) || (queueId < 0))
273 throw std::runtime_error(
"Invalid coroutine queue id");
275 return static_cast<const QueueStatistics&>(_coroQueues.at(queueId).stats());
280 QueueStatistics DispatcherCore::ioStats(
int queueId)
284 QueueStatistics
stats;
285 for (
auto&& queue : _ioQueues)
287 stats += queue.stats();
289 for (
auto&& queue : _sharedIoQueues)
291 stats += queue.stats();
297 QueueStatistics
stats;
298 for (
auto&& queue : _sharedIoQueues)
300 stats += queue.stats();
306 if ((queueId >= (
int)_ioQueues.size()) || (queueId < 0))
308 throw std::runtime_error(
"Invalid IO queue id");
310 return static_cast<const QueueStatistics&>(_ioQueues.at(queueId).stats());
317 for (
auto&& queue : _coroQueues)
319 queue.stats().reset();
321 for (
auto&& queue : _sharedIoQueues)
323 queue.stats().reset();
325 for (
auto&& queue : _ioQueues)
327 queue.stats().reset();
344 size_t numTasks = std::numeric_limits<size_t>::max();
345 for (
size_t i = (
size_t)_coroQueueIdRangeForAny.first; i <= (
size_t)_coroQueueIdRangeForAny.second; ++i)
347 size_t queueSize = _coroQueues[i].size();
348 if (queueSize < numTasks)
350 numTasks = queueSize;
359 task->setQueueId(index);
363 if (task->getQueueId() >= (int)_coroQueues.size())
365 throw std::runtime_error(
"Queue id out of bounds");
369 _coroQueues.at(task->getQueueId()).enqueue(task);
382 if (_loadBalanceSharedIoQueues)
384 static size_t index = 0;
387 if (_sharedIoQueues.at(++index % _sharedIoQueues.size()).tryEnqueue(task)) {
395 _sharedIoQueues[0].enqueue(task);
398 for (
auto&& queue : _ioQueues)
400 queue.signalEmptyCondition(
false);
406 if (task->getQueueId() >= (int)_ioQueues.size())
408 throw std::runtime_error(
"Queue id out of bounds");
412 _ioQueues.at(task->getQueueId()).enqueue(task);
419 return _coroQueues.size();
425 return _ioQueues.size();
431 return _coroQueueIdRangeForAny;
QueueType
Definition: quantum_iqueue.h:37
Definition: quantum_buffer_impl.h:22
~DispatcherCore()
Definition: quantum_dispatcher_core_impl.h:76
QueueStatistics stats(IQueue::QueueType type, int queueId)
Definition: quantum_dispatcher_core_impl.h:240
Definition: quantum_stl_impl.h:23
void terminate() final
Terminates the object.
Definition: quantum_dispatcher_core_impl.h:82
int getNumIoThreads() const
Definition: quantum_dispatcher_core_impl.h:423
Provides various counters related to queues and task execution.
Definition: quantum_queue_statistics.h:30
size_t size(IQueue::QueueType type, int queueId) const
Definition: quantum_dispatcher_core_impl.h:102
bool empty(IQueue::QueueType type, int queueId) const
Definition: quantum_dispatcher_core_impl.h:121
std::shared_ptr< IoTask > Ptr
Definition: quantum_io_task.h:37
std::shared_ptr< Task > Ptr
Definition: quantum_task.h:44
const std::pair< int, int > & getCoroQueueIdRangeForAny() const
Definition: quantum_dispatcher_core_impl.h:429
void postAsyncIo(IoTask::Ptr task)
Definition: quantum_dispatcher_core_impl.h:373
void post(Task::Ptr task)
Definition: quantum_dispatcher_core_impl.h:332
void resetStats()
Definition: quantum_dispatcher_core_impl.h:315
int getNumCoroutineThreads() const
Definition: quantum_dispatcher_core_impl.h:417