34 std::vector<IoQueue>* sharedIoQueues) :
35 _sharedIoQueues(sharedIoQueues),
36 _loadBalanceSharedIoQueues(config.getLoadBalanceSharedIoQueues()),
37 _loadBalancePollIntervalMs(config.getLoadBalancePollIntervalMs()),
38 _loadBalancePollIntervalBackoffPolicy(config.getLoadBalancePollIntervalBackoffPolicy()),
39 _loadBalancePollIntervalNumBackoffs(config.getLoadBalancePollIntervalNumBackoffs()),
40 _loadBalanceBackoffNum(0),
43 _isInterrupted(false),
45 _terminated ATOMIC_FLAG_INIT
47 if (_sharedIoQueues) {
49 _thread = std::make_shared<std::thread>(std::bind(&
IoQueue::run,
this));
55 _sharedIoQueues(other._sharedIoQueues),
56 _loadBalanceSharedIoQueues(other._loadBalanceSharedIoQueues),
57 _loadBalancePollIntervalMs(other._loadBalancePollIntervalMs),
58 _loadBalancePollIntervalBackoffPolicy(other._loadBalancePollIntervalBackoffPolicy),
59 _loadBalancePollIntervalNumBackoffs(other._loadBalancePollIntervalNumBackoffs),
60 _loadBalanceBackoffNum(0),
63 _isInterrupted(false),
65 _terminated ATOMIC_FLAG_INIT
67 if (_sharedIoQueues) {
69 _thread = std::make_shared<std::thread>(std::bind(&
IoQueue::run,
this));
93 if (_loadBalanceSharedIoQueues)
97 task = grabWorkItemFromAll();
100 _loadBalanceBackoffNum = 0;
104 }
while (!_isInterrupted);
108 std::unique_lock<std::mutex> lock(_notEmptyMutex);
111 _notEmptyCond.wait(lock, [
this]() ->
bool {
return !_isEmpty || _isInterrupted; });
119 if (!_loadBalanceSharedIoQueues)
122 task = grabWorkItem();
130 int rc = task->run();
156 #ifdef __QUANTUM_PRINT_DEBUG 157 std::lock_guard<std::mutex> guard(Util::LogMutex());
160 std::cerr <<
"IO task exited with user exception." << std::endl;
164 std::cerr <<
"IO task exited with error : " << rc << std::endl;
169 catch (std::exception& ex)
172 #ifdef __QUANTUM_PRINT_DEBUG 173 std::lock_guard<std::mutex> guard(Util::LogMutex());
174 std::cerr <<
"Caught exception: " << ex.what() << std::endl;
179 #ifdef __QUANTUM_PRINT_DEBUG 180 std::lock_guard<std::mutex> guard(Util::LogMutex());
181 std::cerr <<
"Caught unknown exception." << std::endl;
212 return lock.ownsLock();
218 if (task->isHighPriority())
221 _queue.emplace_front(std::static_pointer_cast<IoTask>(task));
225 _queue.emplace_back(std::static_pointer_cast<IoTask>(task));
229 if (!_loadBalanceSharedIoQueues)
238 if (_loadBalanceSharedIoQueues)
242 return doDequeue(hint);
244 return doDequeue(hint);
254 return doDequeue(hint);
260 ITask::Ptr IoQueue::doDequeue(std::atomic_bool& hint)
262 hint = _queue.empty();
276 static size_t index = 0;
280 for (
size_t i = 0; i < (*_sharedIoQueues).size(); ++i)
282 IoQueue& queue = (*_sharedIoQueues)[++index % (*_sharedIoQueues).size()];
283 size += queue.size();
284 task = queue.tryDequeue(_isIdle);
293 return tryDequeueFromShared();
299 std::chrono::milliseconds IoQueue::getBackoffInterval()
301 if (_loadBalanceBackoffNum < _loadBalancePollIntervalNumBackoffs) {
302 ++_loadBalanceBackoffNum;
306 return _loadBalancePollIntervalMs + (_loadBalancePollIntervalMs * _loadBalanceBackoffNum);
310 return _loadBalancePollIntervalMs * static_cast<size_t>(std::exp2(_loadBalanceBackoffNum));
317 #if (__cplusplus >= 201703L) 318 if (_sharedIoQueues) {
319 return _isIdle ? _queue.size() : _queue.size() + 1;
321 return _queue.size();
324 if (_sharedIoQueues) {
334 if (_sharedIoQueues) {
335 return _queue.empty() && _isIdle;
337 return _queue.empty();
343 if (!_terminated.test_and_set() && _sharedIoQueues)
346 std::unique_lock<std::mutex> lock(_notEmptyMutex);
347 _isInterrupted =
true;
349 if (!_loadBalanceSharedIoQueues) {
350 _notEmptyCond.notify_all();
374 std::lock_guard<std::mutex> lock(_notEmptyMutex);
379 _notEmptyCond.notify_all();
386 static bool grabFromShared =
false;
388 grabFromShared = !grabFromShared;
390 if (grabFromShared) {
393 task = (*_sharedIoQueues)[0].dequeue(_isIdle);
413 task = (*_sharedIoQueues)[0].dequeue(_isIdle);
426 static bool grabFromShared =
false;
428 grabFromShared = !grabFromShared;
432 task = tryDequeueFromShared();
443 task = tryDequeueFromShared();
Definition: quantum_spinlock.h:71
Definition: quantum_buffer_impl.h:22
bool tryEnqueue(ITask::Ptr task) final
Definition: quantum_io_queue_impl.h:200
ITask::Ptr dequeue(std::atomic_bool &hint) final
Definition: quantum_io_queue_impl.h:236
Thread queue for executing IO tasks.
Definition: quantum_io_queue.h:40
void incHighPriorityCount() final
Increment this counter.
Definition: quantum_queue_statistics_impl.h:128
void run() final
Definition: quantum_io_queue_impl.h:86
Definition: quantum_allocator.h:54
bool isIdle() const final
Definition: quantum_io_queue_impl.h:450
void incSharedQueueErrorCount() final
Increment this counter.
Definition: quantum_queue_statistics_impl.h:80
void incPostedCount() final
Increment this counter.
Definition: quantum_queue_statistics_impl.h:116
std::shared_ptr< ITask > Ptr
Definition: quantum_itask.h:34
Provides a stack-based object pool to the underlying ContiguousPoolManager. The default buffer size i...
Definition: quantum_stack_allocator.h:34
Definition: quantum_configuration.h:31
void signalEmptyCondition(bool value) final
Definition: quantum_io_queue_impl.h:370
void incSharedQueueCompletedCount() final
Increment this counter.
Definition: quantum_queue_statistics_impl.h:104
SpinLock & getLock() final
Definition: quantum_io_queue_impl.h:364
Allows application-wide settings for the various allocators used by Quantum.
Definition: quantum_allocator_traits.h:46
void enqueue(ITask::Ptr task) final
Definition: quantum_io_queue_impl.h:188
void incCompletedCount() final
Increment this counter.
Definition: quantum_queue_statistics_impl.h:92
YieldingThreadDuration< std::chrono::microseconds > YieldingThread
Definition: quantum_yielding_thread.h:57
std::try_to_lock_t TryToLock
Definition: quantum_spinlock.h:34
void incErrorCount() final
Increment this counter.
Definition: quantum_queue_statistics_impl.h:68
Interface to access and manipulate a QueueStatistics object.
Definition: quantum_iqueue_statistics.h:29
void pinToCore(int coreId) final
Definition: quantum_io_queue_impl.h:80
IoQueue()
Definition: quantum_io_queue_impl.h:27
size_t numElements() const final
Gets the current size of the queue.
Definition: quantum_queue_statistics_impl.h:44
IQueueStatistics & stats() final
Definition: quantum_io_queue_impl.h:358
void incNumElements() final
Increment this counter.
Definition: quantum_queue_statistics_impl.h:50
void terminate() final
Terminates the object.
Definition: quantum_io_queue_impl.h:341
void decNumElements() final
Decrement this counter.
Definition: quantum_queue_statistics_impl.h:56
size_t size() const final
Definition: quantum_io_queue_impl.h:315
ITask::Ptr tryDequeue(std::atomic_bool &hint) final
Definition: quantum_io_queue_impl.h:248
RAII-style mechanism for SpinLock ownership. Acquires a SpinLock on construction and releases it insi...
bool empty() const final
Definition: quantum_io_queue_impl.h:332