QuantumLibrary
quantum_dispatcher_core.h
1 /*
2 ** Copyright 2018 Bloomberg Finance L.P.
3 **
4 ** Licensed under the Apache License, Version 2.0 (the "License");
5 ** you may not use this file except in compliance with the License.
6 ** You may obtain a copy of the License at
7 **
8 ** http://www.apache.org/licenses/LICENSE-2.0
9 **
10 ** Unless required by applicable law or agreed to in writing, software
11 ** distributed under the License is distributed on an "AS IS" BASIS,
12 ** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 ** See the License for the specific language governing permissions and
14 ** limitations under the License.
15 */
16 #ifndef BLOOMBERG_QUANTUM_DISPATCHER_CORE_H
17 #define BLOOMBERG_QUANTUM_DISPATCHER_CORE_H
18 
19 #include <vector>
20 #include <condition_variable>
21 #include <mutex>
22 #include <atomic>
23 #include <thread>
24 #include <functional>
25 #include <algorithm>
26 #ifdef _WIN32
27 #include <winbase.h>
28 #else
29 #include <pthread.h>
30 #endif
31 #include <quantum/quantum_configuration.h>
32 #include <quantum/quantum_task_queue.h>
33 #include <quantum/quantum_io_queue.h>
34 
35 namespace Bloomberg {
36 namespace quantum {
37 
38 //==============================================================================================
39 // class DispatcherCore
40 //==============================================================================================
45 class DispatcherCore : public ITerminate
46 {
47 public:
48  friend class Dispatcher;
49 
51 
52  void terminate() final;
53 
54  size_t size(IQueue::QueueType type, int queueId) const;
55 
56  bool empty(IQueue::QueueType type, int queueId) const;
57 
58  QueueStatistics stats(IQueue::QueueType type, int queueId);
59 
60  void resetStats();
61 
62  void post(Task::Ptr task);
63 
64  void postAsyncIo(IoTask::Ptr task);
65 
66  int getNumCoroutineThreads() const;
67 
68  int getNumIoThreads() const;
69 
70  const std::pair<int, int>& getCoroQueueIdRangeForAny() const;
71 
72 private:
73  // TODO : Remove - deprecated
74  DispatcherCore(int numCoroutineThreads,
75  int numIoThreads,
76  bool pinCoroutineThreadsToCores);
77 
78  DispatcherCore(const Configuration& config);
79 
80  size_t coroSize(int queueId) const;
81 
82  size_t ioSize(int queueId) const;
83 
84  bool coroEmpty(int queueId) const;
85 
86  bool ioEmpty(int queueId) const;
87 
88  QueueStatistics coroStats(int queueId);
89 
90  QueueStatistics ioStats(int queueId);
91 
92  //Members
93  std::vector<TaskQueue> _coroQueues; //coroutine queues
94  std::vector<IoQueue> _sharedIoQueues; //shared IO task queues (hold tasks posted to 'Any' IO queue)
95  std::vector<IoQueue> _ioQueues; //dedicated IO task queues
96  bool _loadBalanceSharedIoQueues; //tasks posted to 'Any' IO queue are load balanced
97  std::atomic_flag _terminated;
98  std::pair<int, int> _coroQueueIdRangeForAny; // range of coroutine queueIds covered by 'Any'
99 };
100 
101 }}
102 
103 #include <quantum/impl/quantum_dispatcher_core_impl.h>
104 
105 #endif //BLOOMBERG_QUANTUM_DISPATCHER_CORE_H
Long running or blocking task running in the IO thread pool.
Definition: quantum_io_task.h:34
Definition: quantum_buffer_impl.h:22
~DispatcherCore()
Definition: quantum_dispatcher_core_impl.h:76
Thread queue for executing IO tasks.
Definition: quantum_io_queue.h:40
QueueStatistics stats(IQueue::QueueType type, int queueId)
Definition: quantum_dispatcher_core_impl.h:240
Class implementing the dispatching logic unto worker threads. Used for both coroutines and IO tasks.
Definition: quantum_dispatcher_core.h:45
Definition: quantum_stl_impl.h:23
void terminate() final
Terminates the object.
Definition: quantum_dispatcher_core_impl.h:82
int getNumIoThreads() const
Definition: quantum_dispatcher_core_impl.h:423
Runnable object representing a coroutine.
Definition: quantum_task.h:40
Parallel execution engine used to run coroutines or IO tasks asynchronously. This class is the main e...
Definition: quantum_dispatcher.h:34
Interface to a task queue. For internal use only.
Definition: quantum_iqueue.h:33
Provides various counters related to queues and task execution.
Definition: quantum_queue_statistics.h:30
size_t size(IQueue::QueueType type, int queueId) const
Definition: quantum_dispatcher_core_impl.h:102
bool empty(IQueue::QueueType type, int queueId) const
Definition: quantum_dispatcher_core_impl.h:121
Definition: quantum_configuration.h:31
Thread queue for running coroutines.
Definition: quantum_task_queue.h:45
const std::pair< int, int > & getCoroQueueIdRangeForAny() const
Definition: quantum_dispatcher_core_impl.h:429
void postAsyncIo(IoTask::Ptr task)
Definition: quantum_dispatcher_core_impl.h:373
void post(Task::Ptr task)
Definition: quantum_dispatcher_core_impl.h:332
void resetStats()
Definition: quantum_dispatcher_core_impl.h:315
int getNumCoroutineThreads() const
Definition: quantum_dispatcher_core_impl.h:417