34 _queueIt(_queue.end()),
35 _blockedIt(_queue.end()),
37 _isInterrupted(false),
39 _terminated ATOMIC_FLAG_INIT,
42 _thread = std::make_shared<std::thread>(std::bind(&
TaskQueue::run,
this));
62 SetThreadAffinityMask(_thread->native_handle(), 1 << coreId);
64 int cpuSetSize =
sizeof(cpu_set_t);
65 if (coreId >= 0 && (coreId <= cpuSetSize*8))
69 CPU_SET(coreId, &cpuSet);
70 pthread_setaffinity_np(_thread->native_handle(), cpuSetSize, &cpuSet);
84 _blockedIt = _queue.end();
85 std::unique_lock<std::mutex> lock(_notEmptyMutex);
88 _notEmptyCond.wait(lock, [
this]()->
bool {
return !_isEmpty || _isInterrupted; });
97 if (advance() == _queue.end())
103 if (_blockedIt == _queueIt) {
112 if (task->isBlocked() || task->isSleeping(
true))
114 if (_blockedIt == _queue.end()) {
115 _blockedIt = _queueIt;
121 int rc = task->run();
127 if (_blockedIt == _queueIt) {
128 _blockedIt = _queue.end();
137 nextTask = task->getNextTask();
141 nextTask->terminate();
142 nextTask = nextTask->getNextTask();
150 #ifdef __QUANTUM_PRINT_DEBUG 151 std::lock_guard<std::mutex> guard(Util::LogMutex());
154 std::cerr <<
"Coroutine exited with user exception." << std::endl;
158 std::cerr <<
"Coroutine exited with error : " << rc << std::endl;
162 nextTask = task->getErrorHandlerOrFinalTask();
168 else if (!task->isBlocked() && !task->isSleeping()) {
170 _blockedIt = _queue.end();
173 catch (std::exception& ex)
177 #ifdef __QUANTUM_PRINT_DEBUG 178 std::lock_guard<std::mutex> guard(Util::LogMutex());
179 std::cerr <<
"Caught exception: " << ex.what() << std::endl;
185 #ifdef __QUANTUM_PRINT_DEBUG 186 std::lock_guard<std::mutex> guard(Util::LogMutex());
187 std::cerr <<
"Caught unknown exception." << std::endl;
218 return lock.ownsLock();
225 if (_queue.empty() || !task->isHighPriority())
229 _queue.insert(_queueIt, std::static_pointer_cast<Task>(task));
235 _queue.insert(std::next(_queueIt), std::static_pointer_cast<Task>(task));
237 if (task->isHighPriority())
251 return doDequeue(hint);
261 return doDequeue(hint);
267 ITask::Ptr TaskQueue::doDequeue(std::atomic_bool& hint)
269 hint = (_queueIt == _queue.end());
272 (*_queueIt)->terminate();
274 _queueIt = _queue.erase(_queueIt);
284 #if (__cplusplus >= 201703L) 285 return _queue.size();
295 return _queue.empty();
301 if (!_terminated.test_and_set())
304 std::unique_lock<std::mutex> lock(_notEmptyMutex);
305 _isInterrupted =
true;
307 _notEmptyCond.notify_all();
311 while (!_queue.empty())
313 _queue.front()->terminate();
336 std::lock_guard<std::mutex> lock(_notEmptyMutex);
341 _notEmptyCond.notify_all();
351 if ((_queueIt == _queue.end()) || (!_isAdvanced && (++_queueIt == _queue.end())))
353 _queueIt = _queue.begin();
356 if (_queueIt == _queue.end())
Definition: quantum_spinlock.h:71
Definition: quantum_buffer_impl.h:22
size_t size() const final
Definition: quantum_task_queue_impl.h:282
void terminate() final
Terminates the object.
Definition: quantum_task_queue_impl.h:299
void incHighPriorityCount() final
Increment this counter.
Definition: quantum_queue_statistics_impl.h:128
~TaskQueue()
Definition: quantum_task_queue_impl.h:53
bool tryEnqueue(ITask::Ptr task) final
Definition: quantum_task_queue_impl.h:206
Definition: quantum_allocator.h:54
void run() final
Definition: quantum_task_queue_impl.h:76
std::shared_ptr< ITaskContinuation > Ptr
Definition: quantum_itask_continuation.h:32
void incPostedCount() final
Increment this counter.
Definition: quantum_queue_statistics_impl.h:116
std::shared_ptr< ITask > Ptr
Definition: quantum_itask.h:34
Provides a stack-based object pool to the underlying ContiguousPoolManager. The default buffer size i...
Definition: quantum_stack_allocator.h:34
bool isIdle() const final
Definition: quantum_task_queue_impl.h:364
IQueueStatistics & stats() final
Definition: quantum_task_queue_impl.h:320
Definition: quantum_configuration.h:31
ITask::Ptr dequeue(std::atomic_bool &hint) final
Definition: quantum_task_queue_impl.h:247
Thread queue for running coroutines.
Definition: quantum_task_queue.h:45
TaskQueue()
Definition: quantum_task_queue_impl.h:26
Allows application-wide settings for the various allocators used by Quantum.
Definition: quantum_allocator_traits.h:46
void incCompletedCount() final
Increment this counter.
Definition: quantum_queue_statistics_impl.h:92
YieldingThreadDuration< std::chrono::microseconds > YieldingThread
Definition: quantum_yielding_thread.h:57
std::try_to_lock_t TryToLock
Definition: quantum_spinlock.h:34
bool empty() const final
Definition: quantum_task_queue_impl.h:293
void pinToCore(int coreId) final
Definition: quantum_task_queue_impl.h:59
void incErrorCount() final
Increment this counter.
Definition: quantum_queue_statistics_impl.h:68
SpinLock & getLock() final
Definition: quantum_task_queue_impl.h:326
void signalEmptyCondition(bool value) final
Definition: quantum_task_queue_impl.h:332
void enqueue(ITask::Ptr task) final
Definition: quantum_task_queue_impl.h:194
Interface to access and manipulate a QueueStatistics object.
Definition: quantum_iqueue_statistics.h:29
ITask::Ptr tryDequeue(std::atomic_bool &hint) final
Definition: quantum_task_queue_impl.h:255
size_t numElements() const final
Gets the current size of the queue.
Definition: quantum_queue_statistics_impl.h:44
TaskList::iterator TaskListIter
Definition: quantum_task_queue.h:49
void incNumElements() final
Increment this counter.
Definition: quantum_queue_statistics_impl.h:50
void decNumElements() final
Decrement this counter.
Definition: quantum_queue_statistics_impl.h:56