QuantumLibrary
quantum_io_queue.h
1 /*
2 ** Copyright 2018 Bloomberg Finance L.P.
3 **
4 ** Licensed under the Apache License, Version 2.0 (the "License");
5 ** you may not use this file except in compliance with the License.
6 ** You may obtain a copy of the License at
7 **
8 ** http://www.apache.org/licenses/LICENSE-2.0
9 **
10 ** Unless required by applicable law or agreed to in writing, software
11 ** distributed under the License is distributed on an "AS IS" BASIS,
12 ** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 ** See the License for the specific language governing permissions and
14 ** limitations under the License.
15 */
16 #ifndef BLOOMBERG_QUANTUM_IO_QUEUE_H
17 #define BLOOMBERG_QUANTUM_IO_QUEUE_H
18 
19 #include <list>
20 #include <thread>
21 #include <condition_variable>
22 #include <iostream>
23 #include <atomic>
24 #include <quantum/interface/quantum_itask.h>
25 #include <quantum/interface/quantum_iterminate.h>
26 #include <quantum/interface/quantum_iqueue.h>
27 #include <quantum/quantum_io_task.h>
28 #include <quantum/quantum_queue_statistics.h>
29 #include <quantum/quantum_configuration.h>
30 
31 namespace Bloomberg {
32 namespace quantum {
33 
34 //==============================================================================================
35 // class IoQueue
36 //==============================================================================================
40 class IoQueue : public IQueue
41 {
42 public:
43  using TaskList = std::list<IoTask::Ptr, QueueListAllocator>;
44  using TaskListIter = TaskList::iterator;
45 
46  IoQueue();
47 
48  IoQueue(const Configuration& config,
49  std::vector<IoQueue>* sharedIoQueues);
50 
51  IoQueue(const IoQueue& other);
52 
53  IoQueue(IoQueue&& other) = default;
54 
55  ~IoQueue();
56 
57  void terminate() final;
58 
59  void pinToCore(int coreId) final;
60 
61  void run() final;
62 
63  void enqueue(ITask::Ptr task) final;
64 
65  bool tryEnqueue(ITask::Ptr task) final;
66 
67  ITask::Ptr dequeue(std::atomic_bool& hint) final;
68 
69  ITask::Ptr tryDequeue(std::atomic_bool& hint) final;
70 
71  size_t size() const final;
72 
73  bool empty() const final;
74 
75  IQueueStatistics& stats() final;
76 
77  SpinLock& getLock() final;
78 
79  void signalEmptyCondition(bool value) final;
80 
81  bool isIdle() const final;
82 
83 private:
84  ITask::Ptr grabWorkItem();
85  ITask::Ptr grabWorkItemFromAll();
86  void doEnqueue(ITask::Ptr task);
87  ITask::Ptr doDequeue(std::atomic_bool& hint);
88  ITask::Ptr tryDequeueFromShared();
89  std::chrono::milliseconds getBackoffInterval();
90 
91  //async IO queue
92  std::vector<IoQueue>* _sharedIoQueues;
93  bool _loadBalanceSharedIoQueues;
94  std::chrono::milliseconds _loadBalancePollIntervalMs;
95  Configuration::BackoffPolicy _loadBalancePollIntervalBackoffPolicy;
96  size_t _loadBalancePollIntervalNumBackoffs;
97  size_t _loadBalanceBackoffNum;
98  std::shared_ptr<std::thread> _thread;
99  TaskList _queue;
100  mutable SpinLock _spinlock;
101  std::mutex _notEmptyMutex; //for accessing the condition variable
102  std::condition_variable _notEmptyCond;
103  std::atomic_bool _isEmpty;
104  std::atomic_bool _isInterrupted;
105  std::atomic_bool _isIdle;
106  std::atomic_flag _terminated;
107  QueueStatistics _stats;
108 };
109 
110 }}
111 
112 #include <quantum/impl/quantum_io_queue_impl.h>
113 
114 #endif //BLOOMBERG_QUANTUM_IO_QUEUE_H
Definition: quantum_buffer_impl.h:22
std::list< IoTask::Ptr, QueueListAllocator > TaskList
Definition: quantum_io_queue.h:43
bool tryEnqueue(ITask::Ptr task) final
Definition: quantum_io_queue_impl.h:200
ITask::Ptr dequeue(std::atomic_bool &hint) final
Definition: quantum_io_queue_impl.h:236
Thread queue for executing IO tasks.
Definition: quantum_io_queue.h:40
Definition: quantum_stl_impl.h:23
void run() final
Definition: quantum_io_queue_impl.h:86
bool isIdle() const final
Definition: quantum_io_queue_impl.h:450
Interface to a task queue. For internal use only.
Definition: quantum_iqueue.h:33
Provides various counters related to queues and task execution.
Definition: quantum_queue_statistics.h:30
Definition: quantum_configuration.h:31
TaskList::iterator TaskListIter
Definition: quantum_io_queue.h:44
void signalEmptyCondition(bool value) final
Definition: quantum_io_queue_impl.h:370
SpinLock & getLock() final
Definition: quantum_io_queue_impl.h:364
void enqueue(ITask::Ptr task) final
Definition: quantum_io_queue_impl.h:188
std::shared_ptr< IQueue > Ptr
Definition: quantum_iqueue.h:36
Interface to access and manipulate a QueueStatistics object.
Definition: quantum_iqueue_statistics.h:29
void pinToCore(int coreId) final
Definition: quantum_io_queue_impl.h:80
Interface to a task. For internal use only.
Definition: quantum_itask.h:32
IoQueue()
Definition: quantum_io_queue_impl.h:27
IQueueStatistics & stats() final
Definition: quantum_io_queue_impl.h:358
void terminate() final
Terminates the object.
Definition: quantum_io_queue_impl.h:341
size_t size() const final
Definition: quantum_io_queue_impl.h:315
ITask::Ptr tryDequeue(std::atomic_bool &hint) final
Definition: quantum_io_queue_impl.h:248
bool empty() const final
Definition: quantum_io_queue_impl.h:332