QuantumLibrary
quantum_task_queue.h
1 /*
2 ** Copyright 2018 Bloomberg Finance L.P.
3 **
4 ** Licensed under the Apache License, Version 2.0 (the "License");
5 ** you may not use this file except in compliance with the License.
6 ** You may obtain a copy of the License at
7 **
8 ** http://www.apache.org/licenses/LICENSE-2.0
9 **
10 ** Unless required by applicable law or agreed to in writing, software
11 ** distributed under the License is distributed on an "AS IS" BASIS,
12 ** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 ** See the License for the specific language governing permissions and
14 ** limitations under the License.
15 */
16 #ifndef BLOOMBERG_QUANTUM_TASK_QUEUE_H
17 #define BLOOMBERG_QUANTUM_TASK_QUEUE_H
18 
19 #include <list>
20 #include <atomic>
21 #include <functional>
22 #include <algorithm>
23 #include <mutex>
24 #include <condition_variable>
25 #include <thread>
26 #include <pthread.h>
27 #include <iostream>
28 #include <quantum/interface/quantum_itask_continuation.h>
29 #include <quantum/interface/quantum_iterminate.h>
30 #include <quantum/interface/quantum_iqueue.h>
31 #include <quantum/quantum_spinlock.h>
32 #include <quantum/quantum_yielding_thread.h>
33 #include <quantum/quantum_queue_statistics.h>
34 #include <quantum/quantum_configuration.h>
35 
36 namespace Bloomberg {
37 namespace quantum {
38 
39 //==============================================================================================
40 // class TaskQueue
41 //==============================================================================================
45 class TaskQueue : public IQueue
46 {
47 public:
48  using TaskList = std::list<Task::Ptr, QueueListAllocator>;
49  using TaskListIter = TaskList::iterator;
50 
51  TaskQueue();
52 
53  explicit TaskQueue(const Configuration& config);
54 
55  TaskQueue(const TaskQueue& other);
56 
57  TaskQueue(TaskQueue&& other) = default;
58 
59  ~TaskQueue();
60 
61  void pinToCore(int coreId) final;
62 
63  void run() final;
64 
65  void enqueue(ITask::Ptr task) final;
66 
67  bool tryEnqueue(ITask::Ptr task) final;
68 
69  ITask::Ptr dequeue(std::atomic_bool& hint) final;
70 
71  ITask::Ptr tryDequeue(std::atomic_bool& hint) final;
72 
73  size_t size() const final;
74 
75  bool empty() const final;
76 
77  void terminate() final;
78 
79  IQueueStatistics& stats() final;
80 
81  SpinLock& getLock() final;
82 
83  void signalEmptyCondition(bool value) final;
84 
85  bool isIdle() const final;
86 
87 private:
88  TaskListIter advance();
89  void doEnqueue(ITask::Ptr task);
90  ITask::Ptr doDequeue(std::atomic_bool& hint);
91 
92  std::shared_ptr<std::thread> _thread;
93  TaskList _queue;
94  TaskListIter _queueIt;
95  TaskListIter _blockedIt;
96  mutable SpinLock _spinlock;
97  std::mutex _notEmptyMutex; //for accessing the condition variable
98  std::condition_variable _notEmptyCond;
99  std::atomic_bool _isEmpty;
100  std::atomic_bool _isInterrupted;
101  std::atomic_bool _isIdle;
102  std::atomic_flag _terminated;
103  bool _isAdvanced;
104  QueueStatistics _stats;
105 };
106 
107 }}
108 
109 #include <quantum/impl/quantum_task_queue_impl.h>
110 
111 #endif //BLOOMBERG_QUANTUM_TASK_QUEUE_H
Definition: quantum_buffer_impl.h:22
size_t size() const final
Definition: quantum_task_queue_impl.h:282
void terminate() final
Terminates the object.
Definition: quantum_task_queue_impl.h:299
std::list< Task::Ptr, QueueListAllocator > TaskList
Definition: quantum_task_queue.h:48
~TaskQueue()
Definition: quantum_task_queue_impl.h:53
Definition: quantum_stl_impl.h:23
bool tryEnqueue(ITask::Ptr task) final
Definition: quantum_task_queue_impl.h:206
void run() final
Definition: quantum_task_queue_impl.h:76
Interface to a task queue. For internal use only.
Definition: quantum_iqueue.h:33
Provides various counters related to queues and task execution.
Definition: quantum_queue_statistics.h:30
bool isIdle() const final
Definition: quantum_task_queue_impl.h:364
IQueueStatistics & stats() final
Definition: quantum_task_queue_impl.h:320
Definition: quantum_configuration.h:31
ITask::Ptr dequeue(std::atomic_bool &hint) final
Definition: quantum_task_queue_impl.h:247
Thread queue for running coroutines.
Definition: quantum_task_queue.h:45
TaskQueue()
Definition: quantum_task_queue_impl.h:26
std::shared_ptr< IQueue > Ptr
Definition: quantum_iqueue.h:36
bool empty() const final
Definition: quantum_task_queue_impl.h:293
void pinToCore(int coreId) final
Definition: quantum_task_queue_impl.h:59
SpinLock & getLock() final
Definition: quantum_task_queue_impl.h:326
void signalEmptyCondition(bool value) final
Definition: quantum_task_queue_impl.h:332
void enqueue(ITask::Ptr task) final
Definition: quantum_task_queue_impl.h:194
Interface to access and manipulate a QueueStatistics object.
Definition: quantum_iqueue_statistics.h:29
Interface to a task. For internal use only.
Definition: quantum_itask.h:32
ITask::Ptr tryDequeue(std::atomic_bool &hint) final
Definition: quantum_task_queue_impl.h:255
TaskList::iterator TaskListIter
Definition: quantum_task_queue.h:49