QuantumLibrary
quantum_task_queue_impl.h
1 /*
2 ** Copyright 2018 Bloomberg Finance L.P.
3 **
4 ** Licensed under the Apache License, Version 2.0 (the "License");
5 ** you may not use this file except in compliance with the License.
6 ** You may obtain a copy of the License at
7 **
8 ** http://www.apache.org/licenses/LICENSE-2.0
9 **
10 ** Unless required by applicable law or agreed to in writing, software
11 ** distributed under the License is distributed on an "AS IS" BASIS,
12 ** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 ** See the License for the specific language governing permissions and
14 ** limitations under the License.
15 */
16 //NOTE: DO NOT INCLUDE DIRECTLY
17 
18 //##############################################################################################
19 //#################################### IMPLEMENTATIONS #########################################
20 //##############################################################################################
21 
22 namespace Bloomberg {
23 namespace quantum {
24 
25 inline
28 {
29 }
30 
31 inline
33  _queue(Allocator<QueueListAllocator>::instance(AllocatorTraits::queueListAllocSize())),
34  _queueIt(_queue.end()),
35  _blockedIt(_queue.end()),
36  _isEmpty(true),
37  _isInterrupted(false),
38  _isIdle(true),
39  _terminated ATOMIC_FLAG_INIT,
40  _isAdvanced(false)
41 {
42  _thread = std::make_shared<std::thread>(std::bind(&TaskQueue::run, this));
43 }
44 
45 inline
47  TaskQueue()
48 {
49 
50 }
51 
52 inline
54 {
55  terminate();
56 }
57 
58 inline
59 void TaskQueue::pinToCore(int coreId)
60 {
61 #ifdef _WIN32
62  SetThreadAffinityMask(_thread->native_handle(), 1 << coreId);
63 #else
64  int cpuSetSize = sizeof(cpu_set_t);
65  if (coreId >= 0 && (coreId <= cpuSetSize*8))
66  {
67  cpu_set_t cpuSet;
68  CPU_ZERO(&cpuSet);
69  CPU_SET(coreId, &cpuSet);
70  pthread_setaffinity_np(_thread->native_handle(), cpuSetSize, &cpuSet);
71  }
72 #endif
73 }
74 
75 inline
77 {
78  while (true)
79  {
80  try
81  {
82  if (_isEmpty)
83  {
84  _blockedIt = _queue.end(); //clear iterator
85  std::unique_lock<std::mutex> lock(_notEmptyMutex);
86  //========================= BLOCK WHEN EMPTY =========================
87  //Wait for the queue to have at least one element
88  _notEmptyCond.wait(lock, [this]()->bool { return !_isEmpty || _isInterrupted; });
89  }
90 
91  if (_isInterrupted)
92  {
93  break;
94  }
95 
96  //Iterate to the next runnable task
97  if (advance() == _queue.end())
98  {
99  continue;
100  }
101 
102  //Check if we need to pause this thread
103  if (_blockedIt == _queueIt) {
104  //All coroutines are blocked so we yield
105  YieldingThread()();
106  }
107 
108  //Process current task
109  ITaskContinuation::Ptr task = *_queueIt;
110 
111  //Check if blocked or sleeping
112  if (task->isBlocked() || task->isSleeping(true))
113  {
114  if (_blockedIt == _queue.end()) {
115  _blockedIt = _queueIt;
116  }
117  continue;
118  }
119 
120  //========================= START/RESUME COROUTINE =========================
121  int rc = task->run();
122  //=========================== END/YIELD COROUTINE ==========================
123 
124  if (rc != (int)ITask::RetCode::Running) //Coroutine ended
125  {
126  //clear the blocked position iterator if it's the same as the finished task
127  if (_blockedIt == _queueIt) {
128  _blockedIt = _queue.end();
129  }
130  ITaskContinuation::Ptr nextTask;
131  if (rc == (int)ITask::RetCode::Success)
132  {
133  //Coroutine ended normally with "return 0" statement
134  _stats.incCompletedCount();
135 
136  //check if there's another task scheduled to run after this one
137  nextTask = task->getNextTask();
138  if (nextTask && (nextTask->getType() == ITask::Type::ErrorHandler))
139  {
140  //skip error handler since we don't have any errors
141  nextTask->terminate(); //invalidate the error handler
142  nextTask = nextTask->getNextTask();
143  }
144  }
145  else
146  {
147  //Coroutine ended with explicit user error
148  _stats.incErrorCount();
149 
150 #ifdef __QUANTUM_PRINT_DEBUG
151  std::lock_guard<std::mutex> guard(Util::LogMutex());
152  if (rc == (int)ITask::RetCode::Exception)
153  {
154  std::cerr << "Coroutine exited with user exception." << std::endl;
155  }
156  else
157  {
158  std::cerr << "Coroutine exited with error : " << rc << std::endl;
159  }
160 #endif
161  //Check if we have a final task to run
162  nextTask = task->getErrorHandlerOrFinalTask();
163  }
164  //queue next task and de-queue current one
165  enqueue(nextTask);
166  dequeue(_isIdle);
167  }
168  else if (!task->isBlocked() && !task->isSleeping()) {
169  //This coroutine will run again so we reset the blocked position iterator
170  _blockedIt = _queue.end();
171  }
172  }
173  catch (std::exception& ex)
174  {
175  UNUSED(ex);
176  dequeue(_isIdle); //remove error task
177 #ifdef __QUANTUM_PRINT_DEBUG
178  std::lock_guard<std::mutex> guard(Util::LogMutex());
179  std::cerr << "Caught exception: " << ex.what() << std::endl;
180 #endif
181  }
182  catch (...)
183  {
184  dequeue(_isIdle); //remove error task
185 #ifdef __QUANTUM_PRINT_DEBUG
186  std::lock_guard<std::mutex> guard(Util::LogMutex());
187  std::cerr << "Caught unknown exception." << std::endl;
188 #endif
189  }
190  } //while(true)
191 }
192 
193 inline
195 {
196  if (!task)
197  {
198  return; //nothing to do
199  }
200  //========================= LOCKED SCOPE =========================
201  SpinLock::Guard lock(_spinlock);
202  doEnqueue(task);
203 }
204 
205 inline
207 {
208  if (!task)
209  {
210  return false; //nothing to do
211  }
212  //========================= LOCKED SCOPE =========================
213  SpinLock::Guard lock(_spinlock, SpinLock::TryToLock{});
214  if (lock.ownsLock())
215  {
216  doEnqueue(task);
217  }
218  return lock.ownsLock();
219 }
220 
221 inline
222 void TaskQueue::doEnqueue(ITask::Ptr task)
223 {
224  //NOTE: _queueIt remains unchanged following this operation
225  if (_queue.empty() || !task->isHighPriority())
226  {
227  //insert before the current position. If _queueIt == begin(), then the new
228  //task will be at the head of the queue.
229  _queue.insert(_queueIt, std::static_pointer_cast<Task>(task));
230  }
231  else
232  {
233  //insert after the current position. If next(_queueIt) == end()
234  //then the new task will be the last element in the queue
235  _queue.insert(std::next(_queueIt), std::static_pointer_cast<Task>(task));
236  }
237  if (task->isHighPriority())
238  {
239  _stats.incHighPriorityCount();
240  }
241  _stats.incPostedCount();
242  _stats.incNumElements();
243  signalEmptyCondition(false);
244 }
245 
246 inline
247 ITask::Ptr TaskQueue::dequeue(std::atomic_bool& hint)
248 {
249  //========================= LOCKED SCOPE =========================
250  SpinLock::Guard lock(_spinlock);
251  return doDequeue(hint);
252 }
253 
254 inline
255 ITask::Ptr TaskQueue::tryDequeue(std::atomic_bool& hint)
256 {
257  //========================= LOCKED SCOPE =========================
258  SpinLock::Guard lock(_spinlock, SpinLock::TryToLock{});
259  if (lock.ownsLock())
260  {
261  return doDequeue(hint);
262  }
263  return nullptr;
264 }
265 
266 inline
267 ITask::Ptr TaskQueue::doDequeue(std::atomic_bool& hint)
268 {
269  hint = (_queueIt == _queue.end());
270  if (!hint)
271  {
272  (*_queueIt)->terminate();
273  //Remove error task from the queue
274  _queueIt = _queue.erase(_queueIt);
275  _stats.decNumElements();
276  _isAdvanced = true; //_queueIt now points to the next element in the list or to _queue.end()
277  }
278  return nullptr; //not used!
279 }
280 
281 inline
282 size_t TaskQueue::size() const
283 {
284 #if (__cplusplus >= 201703L)
285  return _queue.size();
286 #else
287  //Avoid linear time implementation
288  return _stats.numElements();
289 #endif
290 }
291 
292 inline
293 bool TaskQueue::empty() const
294 {
295  return _queue.empty();
296 }
297 
298 inline
300 {
301  if (!_terminated.test_and_set())
302  {
303  {
304  std::unique_lock<std::mutex> lock(_notEmptyMutex);
305  _isInterrupted = true;
306  }
307  _notEmptyCond.notify_all();
308  _thread->join();
309 
310  //clear the queue
311  while (!_queue.empty())
312  {
313  _queue.front()->terminate();
314  _queue.pop_front();
315  }
316  }
317 }
318 
319 inline
321 {
322  return _stats;
323 }
324 
325 inline
327 {
328  return _spinlock;
329 }
330 
331 inline
333 {
334  {
335  //========================= LOCKED SCOPE =========================
336  std::lock_guard<std::mutex> lock(_notEmptyMutex);
337  _isEmpty = value;
338  }
339  if (!value)
340  {
341  _notEmptyCond.notify_all();
342  }
343 }
344 
345 inline
346 TaskQueue::TaskListIter TaskQueue::advance()
347 {
348  //========================= LOCKED SCOPE =========================
349  SpinLock::Guard lock(_spinlock);
350  //Iterate to the next element
351  if ((_queueIt == _queue.end()) || (!_isAdvanced && (++_queueIt == _queue.end())))
352  {
353  _queueIt = _queue.begin();
354  }
355  _isAdvanced = false; //reset flag
356  if (_queueIt == _queue.end())
357  {
358  signalEmptyCondition(true);
359  }
360  return _queueIt;
361 }
362 
363 inline
364 bool TaskQueue::isIdle() const
365 {
366  return _isIdle;
367 }
368 
369 }}
370 
Definition: quantum_spinlock.h:71
Definition: quantum_buffer_impl.h:22
size_t size() const final
Definition: quantum_task_queue_impl.h:282
void terminate() final
Terminates the object.
Definition: quantum_task_queue_impl.h:299
void incHighPriorityCount() final
Increment this counter.
Definition: quantum_queue_statistics_impl.h:128
~TaskQueue()
Definition: quantum_task_queue_impl.h:53
bool tryEnqueue(ITask::Ptr task) final
Definition: quantum_task_queue_impl.h:206
Definition: quantum_allocator.h:54
void run() final
Definition: quantum_task_queue_impl.h:76
std::shared_ptr< ITaskContinuation > Ptr
Definition: quantum_itask_continuation.h:32
void incPostedCount() final
Increment this counter.
Definition: quantum_queue_statistics_impl.h:116
std::shared_ptr< ITask > Ptr
Definition: quantum_itask.h:34
Provides a stack-based object pool to the underlying ContiguousPoolManager. The default buffer size i...
Definition: quantum_stack_allocator.h:34
bool isIdle() const final
Definition: quantum_task_queue_impl.h:364
IQueueStatistics & stats() final
Definition: quantum_task_queue_impl.h:320
Definition: quantum_configuration.h:31
ITask::Ptr dequeue(std::atomic_bool &hint) final
Definition: quantum_task_queue_impl.h:247
Thread queue for running coroutines.
Definition: quantum_task_queue.h:45
TaskQueue()
Definition: quantum_task_queue_impl.h:26
Allows application-wide settings for the various allocators used by Quantum.
Definition: quantum_allocator_traits.h:46
void incCompletedCount() final
Increment this counter.
Definition: quantum_queue_statistics_impl.h:92
YieldingThreadDuration< std::chrono::microseconds > YieldingThread
Definition: quantum_yielding_thread.h:57
std::try_to_lock_t TryToLock
Definition: quantum_spinlock.h:34
bool empty() const final
Definition: quantum_task_queue_impl.h:293
void pinToCore(int coreId) final
Definition: quantum_task_queue_impl.h:59
void incErrorCount() final
Increment this counter.
Definition: quantum_queue_statistics_impl.h:68
SpinLock & getLock() final
Definition: quantum_task_queue_impl.h:326
void signalEmptyCondition(bool value) final
Definition: quantum_task_queue_impl.h:332
void enqueue(ITask::Ptr task) final
Definition: quantum_task_queue_impl.h:194
Interface to access and manipulate a QueueStatistics object.
Definition: quantum_iqueue_statistics.h:29
ITask::Ptr tryDequeue(std::atomic_bool &hint) final
Definition: quantum_task_queue_impl.h:255
size_t numElements() const final
Gets the current size of the queue.
Definition: quantum_queue_statistics_impl.h:44
TaskList::iterator TaskListIter
Definition: quantum_task_queue.h:49
void incNumElements() final
Increment this counter.
Definition: quantum_queue_statistics_impl.h:50
void decNumElements() final
Decrement this counter.
Definition: quantum_queue_statistics_impl.h:56