QuantumLibrary
quantum_dispatcher_core_impl.h
1 /*
2 ** Copyright 2018 Bloomberg Finance L.P.
3 **
4 ** Licensed under the Apache License, Version 2.0 (the "License");
5 ** you may not use this file except in compliance with the License.
6 ** You may obtain a copy of the License at
7 **
8 ** http://www.apache.org/licenses/LICENSE-2.0
9 **
10 ** Unless required by applicable law or agreed to in writing, software
11 ** distributed under the License is distributed on an "AS IS" BASIS,
12 ** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 ** See the License for the specific language governing permissions and
14 ** limitations under the License.
15 */
16 //NOTE: DO NOT INCLUDE DIRECTLY
17 
18 //##############################################################################################
19 //#################################### IMPLEMENTATIONS #########################################
20 //##############################################################################################
21 
22 namespace Bloomberg {
23 namespace quantum {
24 
25 inline
26 DispatcherCore::DispatcherCore(int numCoroutineThreads,
27  int numIoThreads,
28  bool pinCoroutineThreadsToCores) :
29  _coroQueues((numCoroutineThreads == -1) ? std::thread::hardware_concurrency() :
30  (numCoroutineThreads == 0) ? 1 : numCoroutineThreads),
31  _sharedIoQueues((numIoThreads <= 0) ? 1 : numIoThreads),
32  _ioQueues((numIoThreads <= 0) ? 1 : numIoThreads, IoQueue(Configuration(), &_sharedIoQueues)),
33  _loadBalanceSharedIoQueues(false),
34  _terminated ATOMIC_FLAG_INIT,
35  _coroQueueIdRangeForAny(0, (int)_coroQueues.size()-1)
36 {
37  if (pinCoroutineThreadsToCores)
38  {
39  unsigned int cores = std::thread::hardware_concurrency();
40  for (size_t i = 0; i < _coroQueues.size(); ++i)
41  {
42  _coroQueues[i].pinToCore(i%cores);
43  }
44  }
45 }
46 
47 inline
48 DispatcherCore::DispatcherCore(const Configuration& config) :
49  _coroQueues((config.getNumCoroutineThreads() == -1) ? std::thread::hardware_concurrency() :
50  (config.getNumCoroutineThreads() == 0) ? 1 : config.getNumCoroutineThreads(), TaskQueue(config)),
51  _sharedIoQueues((config.getNumIoThreads() <= 0) ? 1 : config.getNumIoThreads(), IoQueue(config, nullptr)),
52  _ioQueues((config.getNumIoThreads() <= 0) ? 1 : config.getNumIoThreads(), IoQueue(config, &_sharedIoQueues)),
53  _loadBalanceSharedIoQueues(false),
54  _terminated ATOMIC_FLAG_INIT,
55  _coroQueueIdRangeForAny(0, (int)_coroQueues.size()-1)
56 {
57  if (config.getPinCoroutineThreadsToCores())
58  {
59  unsigned int cores = std::thread::hardware_concurrency();
60  for (size_t i = 0; i < _coroQueues.size(); ++i)
61  {
62  _coroQueues[i].pinToCore(i%cores);
63  }
64  }
65  const auto& coroQueueIdRangeForAny = config.getCoroQueueIdRangeForAny();
66  // set the range to the default if the configured one is invalid or empty
67  if (coroQueueIdRangeForAny.first <= coroQueueIdRangeForAny.second &&
68  coroQueueIdRangeForAny.first >= 0 &&
69  coroQueueIdRangeForAny.second < (int)_coroQueues.size())
70  {
71  _coroQueueIdRangeForAny = coroQueueIdRangeForAny;
72  }
73 }
74 
75 inline
77 {
78  terminate();
79 }
80 
81 inline
83 {
84  if (!_terminated.test_and_set())
85  {
86  for (auto&& queue : _coroQueues)
87  {
88  queue.terminate();
89  }
90  for (auto&& queue : _ioQueues)
91  {
92  queue.terminate();
93  }
94  for (auto&& queue : _sharedIoQueues)
95  {
96  queue.terminate();
97  }
98  }
99 }
100 
101 inline
103  int queueId) const
104 {
105  if (type == IQueue::QueueType::All)
106  {
107  if (queueId != (int)IQueue::QueueId::All)
108  {
109  throw std::runtime_error("Cannot specify queue id");
110  }
111  return coroSize((int)IQueue::QueueId::All) + ioSize((int)IQueue::QueueId::All);
112  }
113  else if (type == IQueue::QueueType::Coro)
114  {
115  return coroSize(queueId);
116  }
117  return ioSize(queueId);
118 }
119 
120 inline
122  int queueId) const
123 {
124  if (type == IQueue::QueueType::All)
125  {
126  if (queueId != (int)IQueue::QueueId::All)
127  {
128  throw std::runtime_error("Cannot specify queue id");
129  }
130  return coroEmpty((int)IQueue::QueueId::All) && ioEmpty((int)IQueue::QueueId::All);
131  }
132  else if (type == IQueue::QueueType::Coro)
133  {
134  return coroEmpty(queueId);
135  }
136  return ioEmpty(queueId);
137 }
138 
139 inline
140 size_t DispatcherCore::coroSize(int queueId) const
141 {
142  if (queueId == (int)IQueue::QueueId::All)
143  {
144  size_t size = 0;
145  for (auto&& queue : _coroQueues)
146  {
147  size += queue.size();
148  }
149  return size;
150  }
151  else if ((queueId >= (int)_coroQueues.size()) || (queueId < 0))
152  {
153  throw std::runtime_error("Invalid coroutine queue id");
154  }
155  return _coroQueues.at(queueId).size();
156 }
157 
158 inline
159 bool DispatcherCore::coroEmpty(int queueId) const
160 {
161  if (queueId == (int)IQueue::QueueId::All)
162  {
163  for (auto&& queue : _coroQueues)
164  {
165  if (!queue.empty()) return false;
166  }
167  return true;
168  }
169  else if ((queueId >= (int)_coroQueues.size()) || (queueId < 0))
170  {
171  throw std::runtime_error("Invalid coroutine queue id");
172  }
173  return _coroQueues.at(queueId).empty();
174 }
175 
176 inline
177 size_t DispatcherCore::ioSize(int queueId) const
178 {
179  if (queueId == (int)IQueue::QueueId::All)
180  {
181  size_t size = 0;
182  for (auto&& queue : _ioQueues)
183  {
184  size += queue.size();
185  }
186  for (auto&& queue : _sharedIoQueues)
187  {
188  size += queue.size();
189  }
190  return size;
191  }
192  else if (queueId == (int)IQueue::QueueId::Any)
193  {
194  size_t size = 0;
195  for (auto&& queue : _sharedIoQueues)
196  {
197  size += queue.size();
198  }
199  return size;
200  }
201  return _ioQueues.at(queueId).size();
202 }
203 
204 inline
205 bool DispatcherCore::ioEmpty(int queueId) const
206 {
207  if (queueId == (int)IQueue::QueueId::All)
208  {
209  for (auto&& queue : _sharedIoQueues)
210  {
211  if (!queue.empty())
212  {
213  return false;
214  }
215  }
216  for (auto&& queue : _ioQueues)
217  {
218  if (!queue.empty())
219  {
220  return false;
221  }
222  }
223  return true;
224  }
225  else if (queueId == (int)IQueue::QueueId::Any)
226  {
227  for (auto&& queue : _sharedIoQueues)
228  {
229  if (!queue.empty())
230  {
231  return false;
232  }
233  }
234  return true;
235  }
236  return _ioQueues.at(queueId).empty();
237 }
238 
239 inline
241 {
242  if (type == IQueue::QueueType::All)
243  {
244  if (queueId != (int)IQueue::QueueId::All)
245  {
246  throw std::runtime_error("Cannot specify queue id");
247  }
248  return coroStats((int)IQueue::QueueId::All) + ioStats((int)IQueue::QueueId::All);
249  }
250  else if (type == IQueue::QueueType::Coro)
251  {
252  return coroStats(queueId);
253  }
254  return ioStats(queueId);
255 }
256 
257 inline
258 QueueStatistics DispatcherCore::coroStats(int queueId)
259 {
260  if (queueId == (int)IQueue::QueueId::All)
261  {
263  for (auto&& queue : _coroQueues)
264  {
265  stats += queue.stats();
266  }
267  return stats;
268  }
269  else
270  {
271  if ((queueId >= (int)_coroQueues.size()) || (queueId < 0))
272  {
273  throw std::runtime_error("Invalid coroutine queue id");
274  }
275  return static_cast<const QueueStatistics&>(_coroQueues.at(queueId).stats());
276  }
277 }
278 
279 inline
280 QueueStatistics DispatcherCore::ioStats(int queueId)
281 {
282  if (queueId == (int)IQueue::QueueId::All)
283  {
284  QueueStatistics stats;
285  for (auto&& queue : _ioQueues)
286  {
287  stats += queue.stats();
288  }
289  for (auto&& queue : _sharedIoQueues)
290  {
291  stats += queue.stats();
292  }
293  return stats;
294  }
295  else if (queueId == (int)IQueue::QueueId::Any)
296  {
297  QueueStatistics stats;
298  for (auto&& queue : _sharedIoQueues)
299  {
300  stats += queue.stats();
301  }
302  return stats;
303  }
304  else
305  {
306  if ((queueId >= (int)_ioQueues.size()) || (queueId < 0))
307  {
308  throw std::runtime_error("Invalid IO queue id");
309  }
310  return static_cast<const QueueStatistics&>(_ioQueues.at(queueId).stats());
311  }
312 }
313 
314 inline
316 {
317  for (auto&& queue : _coroQueues)
318  {
319  queue.stats().reset();
320  }
321  for (auto&& queue : _sharedIoQueues)
322  {
323  queue.stats().reset();
324  }
325  for (auto&& queue : _ioQueues)
326  {
327  queue.stats().reset();
328  }
329 }
330 
331 inline
333 {
334  if (!task)
335  {
336  return;
337  }
338 
339  if (task->getQueueId() == (int)IQueue::QueueId::Any)
340  {
341  size_t index = 0;
342 
343  //Insert into the shortest queue or the first empty queue found
344  size_t numTasks = std::numeric_limits<size_t>::max();
345  for (size_t i = (size_t)_coroQueueIdRangeForAny.first; i <= (size_t)_coroQueueIdRangeForAny.second; ++i)
346  {
347  size_t queueSize = _coroQueues[i].size();
348  if (queueSize < numTasks)
349  {
350  numTasks = queueSize;
351  index = i;
352  }
353  if (numTasks == 0)
354  {
355  break; //reached an empty queue
356  }
357  }
358 
359  task->setQueueId(index); //overwrite the queueId with the selected one
360  }
361  else
362  {
363  if (task->getQueueId() >= (int)_coroQueues.size())
364  {
365  throw std::runtime_error("Queue id out of bounds");
366  }
367  }
368 
369  _coroQueues.at(task->getQueueId()).enqueue(task);
370 }
371 
372 inline
374 {
375  if (!task)
376  {
377  return;
378  }
379 
380  if (task->getQueueId() == (int)IQueue::QueueId::Any)
381  {
382  if (_loadBalanceSharedIoQueues)
383  {
384  static size_t index = 0;
385  //loop until we can find an queue that won't block
386  while (1) {
387  if (_sharedIoQueues.at(++index % _sharedIoQueues.size()).tryEnqueue(task)) {
388  break;
389  }
390  }
391  }
392  else
393  {
394  //insert the task into the shared queue
395  _sharedIoQueues[0].enqueue(task);
396 
397  //Signal all threads there is work to do
398  for (auto&& queue : _ioQueues)
399  {
400  queue.signalEmptyCondition(false);
401  }
402  }
403  }
404  else
405  {
406  if (task->getQueueId() >= (int)_ioQueues.size())
407  {
408  throw std::runtime_error("Queue id out of bounds");
409  }
410 
411  //Run on specific queue
412  _ioQueues.at(task->getQueueId()).enqueue(task);
413  }
414 }
415 
416 inline
418 {
419  return _coroQueues.size();
420 }
421 
422 inline
424 {
425  return _ioQueues.size();
426 }
427 
428 inline
429 const std::pair<int, int>& DispatcherCore::getCoroQueueIdRangeForAny() const
430 {
431  return _coroQueueIdRangeForAny;
432 }
433 
434 }}
QueueType
Definition: quantum_iqueue.h:37
Definition: quantum_buffer_impl.h:22
~DispatcherCore()
Definition: quantum_dispatcher_core_impl.h:76
QueueStatistics stats(IQueue::QueueType type, int queueId)
Definition: quantum_dispatcher_core_impl.h:240
Definition: quantum_stl_impl.h:23
void terminate() final
Terminates the object.
Definition: quantum_dispatcher_core_impl.h:82
int getNumIoThreads() const
Definition: quantum_dispatcher_core_impl.h:423
Provides various counters related to queues and task execution.
Definition: quantum_queue_statistics.h:30
size_t size(IQueue::QueueType type, int queueId) const
Definition: quantum_dispatcher_core_impl.h:102
bool empty(IQueue::QueueType type, int queueId) const
Definition: quantum_dispatcher_core_impl.h:121
std::shared_ptr< IoTask > Ptr
Definition: quantum_io_task.h:37
std::shared_ptr< Task > Ptr
Definition: quantum_task.h:44
const std::pair< int, int > & getCoroQueueIdRangeForAny() const
Definition: quantum_dispatcher_core_impl.h:429
void postAsyncIo(IoTask::Ptr task)
Definition: quantum_dispatcher_core_impl.h:373
void post(Task::Ptr task)
Definition: quantum_dispatcher_core_impl.h:332
void resetStats()
Definition: quantum_dispatcher_core_impl.h:315
int getNumCoroutineThreads() const
Definition: quantum_dispatcher_core_impl.h:417