QuantumLibrary
quantum_io_queue_impl.h
1 /*
2 ** Copyright 2018 Bloomberg Finance L.P.
3 **
4 ** Licensed under the Apache License, Version 2.0 (the "License");
5 ** you may not use this file except in compliance with the License.
6 ** You may obtain a copy of the License at
7 **
8 ** http://www.apache.org/licenses/LICENSE-2.0
9 **
10 ** Unless required by applicable law or agreed to in writing, software
11 ** distributed under the License is distributed on an "AS IS" BASIS,
12 ** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 ** See the License for the specific language governing permissions and
14 ** limitations under the License.
15 */
16 //NOTE: DO NOT INCLUDE DIRECTLY
17 
18 //##############################################################################################
19 //#################################### IMPLEMENTATIONS #########################################
20 //##############################################################################################
21 #include <cmath>
22 
23 namespace Bloomberg {
24 namespace quantum {
25 
26 inline
28  IoQueue(Configuration(), nullptr)
29 {
30 }
31 
32 inline
34  std::vector<IoQueue>* sharedIoQueues) :
35  _sharedIoQueues(sharedIoQueues),
36  _loadBalanceSharedIoQueues(config.getLoadBalanceSharedIoQueues()),
37  _loadBalancePollIntervalMs(config.getLoadBalancePollIntervalMs()),
38  _loadBalancePollIntervalBackoffPolicy(config.getLoadBalancePollIntervalBackoffPolicy()),
39  _loadBalancePollIntervalNumBackoffs(config.getLoadBalancePollIntervalNumBackoffs()),
40  _loadBalanceBackoffNum(0),
41  _queue(Allocator<QueueListAllocator>::instance(AllocatorTraits::queueListAllocSize())),
42  _isEmpty(true),
43  _isInterrupted(false),
44  _isIdle(true),
45  _terminated ATOMIC_FLAG_INIT
46 {
47  if (_sharedIoQueues) {
48  //The shared queue doesn't have its own thread
49  _thread = std::make_shared<std::thread>(std::bind(&IoQueue::run, this));
50  }
51 }
52 
53 inline
54 IoQueue::IoQueue(const IoQueue& other) :
55  _sharedIoQueues(other._sharedIoQueues),
56  _loadBalanceSharedIoQueues(other._loadBalanceSharedIoQueues),
57  _loadBalancePollIntervalMs(other._loadBalancePollIntervalMs),
58  _loadBalancePollIntervalBackoffPolicy(other._loadBalancePollIntervalBackoffPolicy),
59  _loadBalancePollIntervalNumBackoffs(other._loadBalancePollIntervalNumBackoffs),
60  _loadBalanceBackoffNum(0),
61  _queue(Allocator<QueueListAllocator>::instance(AllocatorTraits::queueListAllocSize())),
62  _isEmpty(true),
63  _isInterrupted(false),
64  _isIdle(true),
65  _terminated ATOMIC_FLAG_INIT
66 {
67  if (_sharedIoQueues) {
68  //The shared queue doesn't have its own thread
69  _thread = std::make_shared<std::thread>(std::bind(&IoQueue::run, this));
70  }
71 }
72 
73 inline
75 {
76  terminate();
77 }
78 
79 inline
81 {
82  //Not used
83 }
84 
85 inline
87 {
88  while (true)
89  {
90  try
91  {
92  ITask::Ptr task;
93  if (_loadBalanceSharedIoQueues)
94  {
95  do
96  {
97  task = grabWorkItemFromAll();
98  if (task)
99  {
100  _loadBalanceBackoffNum = 0; //reset
101  break;
102  }
103  YieldingThread()(getBackoffInterval());
104  } while (!_isInterrupted);
105  }
106  else if (_isEmpty)
107  {
108  std::unique_lock<std::mutex> lock(_notEmptyMutex);
109  //========================= BLOCK WHEN EMPTY =========================
110  //Wait for the queue to have at least one element
111  _notEmptyCond.wait(lock, [this]() -> bool { return !_isEmpty || _isInterrupted; });
112  }
113 
114  if (_isInterrupted)
115  {
116  break;
117  }
118 
119  if (!_loadBalanceSharedIoQueues)
120  {
121  //Iterate to the next runnable task
122  task = grabWorkItem();
123  if (!task)
124  {
125  continue;
126  }
127  }
128 
129  //========================= START TASK =========================
130  int rc = task->run();
131  //========================== END TASK ==========================
132 
133  if (rc == (int)ITask::RetCode::Success)
134  {
135  if (task->getQueueId() == (int)IQueue::QueueId::Any)
136  {
138  }
139  else
140  {
141  _stats.incCompletedCount();
142  }
143  }
144  else
145  {
146  //IO task ended with error
147  if (task->getQueueId() == (int)IQueue::QueueId::Any)
148  {
149  _stats.incSharedQueueErrorCount();
150  }
151  else
152  {
153  _stats.incErrorCount();
154  }
155 
156 #ifdef __QUANTUM_PRINT_DEBUG
157  std::lock_guard<std::mutex> guard(Util::LogMutex());
158  if (rc == (int)ITask::RetCode::Exception)
159  {
160  std::cerr << "IO task exited with user exception." << std::endl;
161  }
162  else
163  {
164  std::cerr << "IO task exited with error : " << rc << std::endl;
165  }
166 #endif
167  }
168  }
169  catch (std::exception& ex)
170  {
171  UNUSED(ex);
172 #ifdef __QUANTUM_PRINT_DEBUG
173  std::lock_guard<std::mutex> guard(Util::LogMutex());
174  std::cerr << "Caught exception: " << ex.what() << std::endl;
175 #endif
176  }
177  catch (...)
178  {
179 #ifdef __QUANTUM_PRINT_DEBUG
180  std::lock_guard<std::mutex> guard(Util::LogMutex());
181  std::cerr << "Caught unknown exception." << std::endl;
182 #endif
183  }
184  } //while
185 }
186 
187 inline
189 {
190  if (!task)
191  {
192  return; //nothing to do
193  }
194  //========================= LOCKED SCOPE =========================
195  SpinLock::Guard lock(_spinlock);
196  doEnqueue(task);
197 }
198 
199 inline
201 {
202  if (!task)
203  {
204  return false; //nothing to do
205  }
206  //========================= LOCKED SCOPE =========================
207  SpinLock::Guard lock(_spinlock, SpinLock::TryToLock{});
208  if (lock.ownsLock())
209  {
210  doEnqueue(task);
211  }
212  return lock.ownsLock();
213 }
214 
215 inline
216 void IoQueue::doEnqueue(ITask::Ptr task)
217 {
218  if (task->isHighPriority())
219  {
220  _stats.incHighPriorityCount();
221  _queue.emplace_front(std::static_pointer_cast<IoTask>(task));
222  }
223  else
224  {
225  _queue.emplace_back(std::static_pointer_cast<IoTask>(task));
226  }
227  _stats.incPostedCount();
228  _stats.incNumElements();
229  if (!_loadBalanceSharedIoQueues)
230  {
231  signalEmptyCondition(false);
232  }
233 }
234 
235 inline
236 ITask::Ptr IoQueue::dequeue(std::atomic_bool& hint)
237 {
238  if (_loadBalanceSharedIoQueues)
239  {
240  //========================= LOCKED SCOPE =========================
241  SpinLock::Guard lock(_spinlock);
242  return doDequeue(hint);
243  }
244  return doDequeue(hint);
245 }
246 
247 inline
248 ITask::Ptr IoQueue::tryDequeue(std::atomic_bool& hint)
249 {
250  //========================= LOCKED SCOPE =========================
251  SpinLock::Guard lock(_spinlock, SpinLock::TryToLock{});
252  if (lock.ownsLock())
253  {
254  return doDequeue(hint);
255  }
256  return nullptr;
257 }
258 
259 inline
260 ITask::Ptr IoQueue::doDequeue(std::atomic_bool& hint)
261 {
262  hint = _queue.empty();
263  if (!hint)
264  {
265  ITask::Ptr task = _queue.front();
266  _queue.pop_front();
267  _stats.decNumElements();
268  return task;
269  }
270  return nullptr;
271 }
272 
273 inline
274 ITask::Ptr IoQueue::tryDequeueFromShared()
275 {
276  static size_t index = 0;
277  ITask::Ptr task;
278  size_t size = 0;
279 
280  for (size_t i = 0; i < (*_sharedIoQueues).size(); ++i)
281  {
282  IoQueue& queue = (*_sharedIoQueues)[++index % (*_sharedIoQueues).size()];
283  size += queue.size();
284  task = queue.tryDequeue(_isIdle);
285  if (task)
286  {
287  return task;
288  }
289  }
290 
291  if (size) {
292  //try again
293  return tryDequeueFromShared();
294  }
295  return nullptr;
296 }
297 
298 inline
299 std::chrono::milliseconds IoQueue::getBackoffInterval()
300 {
301  if (_loadBalanceBackoffNum < _loadBalancePollIntervalNumBackoffs) {
302  ++_loadBalanceBackoffNum;
303  }
304  if (_loadBalancePollIntervalBackoffPolicy == Configuration::BackoffPolicy::Linear)
305  {
306  return _loadBalancePollIntervalMs + (_loadBalancePollIntervalMs * _loadBalanceBackoffNum);
307  }
308  else
309  {
310  return _loadBalancePollIntervalMs * static_cast<size_t>(std::exp2(_loadBalanceBackoffNum));
311  }
312 }
313 
314 inline
315 size_t IoQueue::size() const
316 {
317 #if (__cplusplus >= 201703L)
318  if (_sharedIoQueues) {
319  return _isIdle ? _queue.size() : _queue.size() + 1;
320  }
321  return _queue.size();
322 #else
323  //Avoid linear time implementation
324  if (_sharedIoQueues) {
325  return _isIdle ? _stats.numElements() : _stats.numElements() + 1;
326  }
327  return _stats.numElements();
328 #endif
329 }
330 
331 inline
332 bool IoQueue::empty() const
333 {
334  if (_sharedIoQueues) {
335  return _queue.empty() && _isIdle;
336  }
337  return _queue.empty();
338 }
339 
340 inline
342 {
343  if (!_terminated.test_and_set() && _sharedIoQueues)
344  {
345  {
346  std::unique_lock<std::mutex> lock(_notEmptyMutex);
347  _isInterrupted = true;
348  }
349  if (!_loadBalanceSharedIoQueues) {
350  _notEmptyCond.notify_all();
351  }
352  _thread->join();
353  _queue.clear();
354  }
355 }
356 
357 inline
359 {
360  return _stats;
361 }
362 
363 inline
365 {
366  return _spinlock;
367 }
368 
369 inline
371 {
372  {
373  //========================= LOCKED SCOPE =========================
374  std::lock_guard<std::mutex> lock(_notEmptyMutex);
375  _isEmpty = value;
376  }
377  if (!value)
378  {
379  _notEmptyCond.notify_all();
380  }
381 }
382 
383 inline
384 ITask::Ptr IoQueue::grabWorkItem()
385 {
386  static bool grabFromShared = false;
387  ITask::Ptr task = nullptr;
388  grabFromShared = !grabFromShared;
389 
390  if (grabFromShared) {
391  //========================= LOCKED SCOPE (SHARED QUEUE) =========================
392  SpinLock::Guard lock((*_sharedIoQueues)[0].getLock());
393  task = (*_sharedIoQueues)[0].dequeue(_isIdle);
394  if (!task)
395  {
396  //========================= LOCKED SCOPE =========================
397  SpinLock::Guard lock(_spinlock);
398  task = dequeue(_isIdle);
399  if (!task)
400  {
401  signalEmptyCondition(true);
402  }
403  }
404  }
405  else {
406  //========================= LOCKED SCOPE =========================
407  SpinLock::Guard lock(_spinlock);
408  task = dequeue(_isIdle);
409  if (!task)
410  {
411  //========================= LOCKED SCOPE (SHARED QUEUE) =========================
412  SpinLock::Guard lock((*_sharedIoQueues)[0].getLock());
413  task = (*_sharedIoQueues)[0].dequeue(_isIdle);
414  if (!task)
415  {
416  signalEmptyCondition(true);
417  }
418  }
419  }
420  return task;
421 }
422 
423 inline
424 ITask::Ptr IoQueue::grabWorkItemFromAll()
425 {
426  static bool grabFromShared = false;
427  ITask::Ptr task = nullptr;
428  grabFromShared = !grabFromShared;
429 
430  if (grabFromShared)
431  {
432  task = tryDequeueFromShared();
433  if (!task)
434  {
435  task = dequeue(_isIdle);
436  }
437  }
438  else
439  {
440  task = dequeue(_isIdle);
441  if (!task)
442  {
443  task = tryDequeueFromShared();
444  }
445  }
446  return task;
447 }
448 
449 inline
450 bool IoQueue::isIdle() const
451 {
452  return _isIdle;
453 }
454 
455 }}
Definition: quantum_spinlock.h:71
Definition: quantum_buffer_impl.h:22
bool tryEnqueue(ITask::Ptr task) final
Definition: quantum_io_queue_impl.h:200
ITask::Ptr dequeue(std::atomic_bool &hint) final
Definition: quantum_io_queue_impl.h:236
Thread queue for executing IO tasks.
Definition: quantum_io_queue.h:40
void incHighPriorityCount() final
Increment this counter.
Definition: quantum_queue_statistics_impl.h:128
void run() final
Definition: quantum_io_queue_impl.h:86
Definition: quantum_allocator.h:54
bool isIdle() const final
Definition: quantum_io_queue_impl.h:450
void incSharedQueueErrorCount() final
Increment this counter.
Definition: quantum_queue_statistics_impl.h:80
void incPostedCount() final
Increment this counter.
Definition: quantum_queue_statistics_impl.h:116
std::shared_ptr< ITask > Ptr
Definition: quantum_itask.h:34
Provides a stack-based object pool to the underlying ContiguousPoolManager. The default buffer size i...
Definition: quantum_stack_allocator.h:34
Definition: quantum_configuration.h:31
void signalEmptyCondition(bool value) final
Definition: quantum_io_queue_impl.h:370
void incSharedQueueCompletedCount() final
Increment this counter.
Definition: quantum_queue_statistics_impl.h:104
SpinLock & getLock() final
Definition: quantum_io_queue_impl.h:364
Allows application-wide settings for the various allocators used by Quantum.
Definition: quantum_allocator_traits.h:46
void enqueue(ITask::Ptr task) final
Definition: quantum_io_queue_impl.h:188
void incCompletedCount() final
Increment this counter.
Definition: quantum_queue_statistics_impl.h:92
YieldingThreadDuration< std::chrono::microseconds > YieldingThread
Definition: quantum_yielding_thread.h:57
std::try_to_lock_t TryToLock
Definition: quantum_spinlock.h:34
void incErrorCount() final
Increment this counter.
Definition: quantum_queue_statistics_impl.h:68
Interface to access and manipulate a QueueStatistics object.
Definition: quantum_iqueue_statistics.h:29
void pinToCore(int coreId) final
Definition: quantum_io_queue_impl.h:80
IoQueue()
Definition: quantum_io_queue_impl.h:27
size_t numElements() const final
Gets the current size of the queue.
Definition: quantum_queue_statistics_impl.h:44
IQueueStatistics & stats() final
Definition: quantum_io_queue_impl.h:358
void incNumElements() final
Increment this counter.
Definition: quantum_queue_statistics_impl.h:50
void terminate() final
Terminates the object.
Definition: quantum_io_queue_impl.h:341
void decNumElements() final
Decrement this counter.
Definition: quantum_queue_statistics_impl.h:56
size_t size() const final
Definition: quantum_io_queue_impl.h:315
ITask::Ptr tryDequeue(std::atomic_bool &hint) final
Definition: quantum_io_queue_impl.h:248
RAII-style mechanism for SpinLock ownership. Acquires a SpinLock on construction and releases it insi...
bool empty() const final
Definition: quantum_io_queue_impl.h:332