QuantumLibrary
quantum_task_impl.h
1 /*
2 ** Copyright 2018 Bloomberg Finance L.P.
3 **
4 ** Licensed under the Apache License, Version 2.0 (the "License");
5 ** you may not use this file except in compliance with the License.
6 ** You may obtain a copy of the License at
7 **
8 ** http://www.apache.org/licenses/LICENSE-2.0
9 **
10 ** Unless required by applicable law or agreed to in writing, software
11 ** distributed under the License is distributed on an "AS IS" BASIS,
12 ** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 ** See the License for the specific language governing permissions and
14 ** limitations under the License.
15 */
16 //NOTE: DO NOT INCLUDE DIRECTLY
17 
18 //##############################################################################################
19 //#################################### IMPLEMENTATIONS #########################################
20 //##############################################################################################
21 #include <quantum/quantum_allocator.h>
22 #include <quantum/quantum_traits.h>
23 
24 namespace Bloomberg {
25 namespace quantum {
26 
27 #ifndef __QUANTUM_TASK_ALLOC_SIZE
28  #define __QUANTUM_TASK_ALLOC_SIZE __QUANTUM_DEFAULT_POOL_ALLOC_SIZE
29 #endif
30 #ifndef __QUANTUM_USE_DEFAULT_ALLOCATOR
31  #ifdef __QUANTUM_ALLOCATE_POOL_FROM_HEAP
32  using TaskAllocator = HeapAllocator<Task>;
33  #else
35  #endif
36 #else
38 #endif
39 
40 template <class RET, class FUNC, class ... ARGS>
41 Task::Task(std::shared_ptr<Context<RET>> ctx,
42  ITask::Type type,
43  FUNC&& func,
44  ARGS&&... args) :
45  _ctx(ctx),
46  _coro(Allocator<CoroStackAllocator>::instance(AllocatorTraits::defaultCoroPoolAllocSize()),
47  Util::bindCaller(ctx, std::forward<FUNC>(func), std::forward<ARGS>(args)...)),
48  _queueId((int)IQueue::QueueId::Any),
49  _isHighPriority(false),
50  _rc((int)ITask::RetCode::Running),
51  _type(type),
52  _terminated ATOMIC_FLAG_INIT
53 {}
54 
55 template <class RET, class FUNC, class ... ARGS>
56 Task::Task(std::shared_ptr<Context<RET>> ctx,
57  int queueId,
58  bool isHighPriority,
59  ITask::Type type,
60  FUNC&& func,
61  ARGS&&... args) :
62  _ctx(ctx),
63  _coro(Allocator<CoroStackAllocator>::instance(AllocatorTraits::defaultCoroPoolAllocSize()),
64  Util::bindCaller(ctx, std::forward<FUNC>(func), std::forward<ARGS>(args)...)),
65  _queueId(queueId),
66  _isHighPriority(isHighPriority),
67  _rc((int)ITask::RetCode::Running),
68  _type(type),
69  _terminated ATOMIC_FLAG_INIT
70 {}
71 
72 inline
74 {
75  terminate();
76 }
77 
78 inline
80 {
81  if (!_terminated.test_and_set())
82  {
83  if (_ctx) _ctx->terminate();
84  }
85 }
86 
87 inline
88 int Task::run()
89 {
90  if (_coro)
91  {
92  _coro(_rc);
93  return _rc;
94  }
95  return (int)ITask::RetCode::Success;
96 }
97 
98 inline
99 void Task::setQueueId(int queueId)
100 {
101  _queueId = queueId;
102 }
103 
104 inline
106 {
107  return _queueId;
108 }
109 
110 inline
111 ITask::Type Task::getType() const { return _type; }
112 
113 inline
115 
116 inline
117 void Task::setNextTask(ITaskContinuation::Ptr nextTask) { _next = nextTask; }
118 
119 inline
120 ITaskContinuation::Ptr Task::getPrevTask() { return _prev.lock(); }
121 
122 inline
123 void Task::setPrevTask(ITaskContinuation::Ptr prevTask) { _prev = prevTask; }
124 
125 inline
127 {
128  return (_type == Type::First) ? shared_from_this() : getPrevTask()->getFirstTask();
129 }
130 
131 inline
133 {
134  if ((_type == Type::ErrorHandler) || (_type == Type::Final))
135  {
136  return shared_from_this();
137  }
138  else if (_next)
139  {
140  ITaskContinuation::Ptr task = _next->getErrorHandlerOrFinalTask();
141  if ((_next->getType() != Type::ErrorHandler) && (_next->getType() != Type::Final))
142  {
143  _next->terminate();
144  _next.reset(); //release next task
145  }
146  return task;
147  }
148  return nullptr;
149 }
150 
151 inline
152 bool Task::isBlocked() const
153 {
154  return _ctx ? _ctx->isBlocked() : false; //coroutine is waiting on some signal
155 }
156 
157 inline
158 bool Task::isSleeping(bool updateTimer)
159 {
160  return _ctx ? _ctx->isSleeping(updateTimer) : false; //coroutine is sleeping
161 }
162 
163 inline
165 {
166  return _isHighPriority;
167 }
168 
169 inline
170 void* Task::operator new(size_t)
171 {
173 }
174 
175 inline
176 void Task::operator delete(void* p)
177 {
178  Allocator<TaskAllocator>::instance(AllocatorTraits::taskAllocSize()).deallocate(static_cast<Task*>(p));
179 }
180 
181 inline
183 {
184 #ifndef __QUANTUM_USE_DEFAULT_ALLOCATOR
186 #else
187  delete p;
188 #endif
189 }
190 
191 }}
static AllocType & instance(std::enable_if_t<!A::default_constructor::value, uint16_t > size)
Definition: quantum_allocator.h:56
Definition: quantum_coroutine_pool_allocator.h:84
ITaskContinuation::Ptr getNextTask() final
Definition: quantum_task_impl.h:114
Definition: quantum_buffer_impl.h:22
Definition: quantum_allocator.h:36
int getQueueId() final
Definition: quantum_task_impl.h:105
void setNextTask(ITaskContinuation::Ptr nextTask) final
Definition: quantum_task_impl.h:117
void terminate() final
Terminates the object.
Definition: quantum_task_impl.h:79
ITaskContinuation::Ptr getFirstTask() final
Definition: quantum_task_impl.h:126
static size_type & taskAllocSize()
Get/set if the default size for task object pools.
Definition: quantum_allocator_traits.h:144
Utility to bind a user callable function unto a coroutine or an IO task.
Definition: quantum_util.h:45
void setQueueId(int queueId) final
Definition: quantum_task_impl.h:99
Definition: quantum_stl_impl.h:23
Definition: quantum_allocator.h:54
Type
Definition: quantum_itask.h:37
std::shared_ptr< ITaskContinuation > Ptr
Definition: quantum_itask_continuation.h:32
Runnable object representing a coroutine.
Definition: quantum_task.h:40
int run() final
Definition: quantum_task_impl.h:88
ITaskContinuation::Ptr getErrorHandlerOrFinalTask() final
Definition: quantum_task_impl.h:132
Provides a stack-based object pool to the underlying ContiguousPoolManager. The default buffer size i...
Definition: quantum_stack_allocator.h:34
Concrete class representing a coroutine or a thread context.
Definition: quantum_icoro_context.h:29
Interface to a task queue. For internal use only.
Definition: quantum_iqueue.h:33
bool isSleeping(bool updateTimer=false) final
Definition: quantum_task_impl.h:158
RetCode
Definition: quantum_itask.h:42
bool isHighPriority() const final
Definition: quantum_task_impl.h:164
StackAllocator< Task, __QUANTUM_TASK_ALLOC_SIZE > TaskAllocator
Definition: quantum_task_impl.h:34
Allows application-wide settings for the various allocators used by Quantum.
Definition: quantum_allocator_traits.h:46
Task(std::shared_ptr< Context< RET >> ctx, ITask::Type type, FUNC &&func, ARGS &&... args)
Definition: quantum_task_impl.h:41
Type getType() const final
Definition: quantum_task_impl.h:111
static void deleter(Task *p)
Definition: quantum_task_impl.h:182
Interface to a task. For internal use only.
Definition: quantum_itask.h:32
bool isBlocked() const final
Definition: quantum_task_impl.h:152
ITaskContinuation::Ptr getPrevTask() final
Definition: quantum_task_impl.h:120
void setPrevTask(ITaskContinuation::Ptr prevTask) final
Definition: quantum_task_impl.h:123