21 #include <quantum/quantum_allocator.h> 33 return static_cast<Impl*>(
this)->get();
40 return static_cast<const Impl*>(
this)->getRef();
44 template <
class OTHER_RET>
47 return static_cast<Impl*>(
this)->template getAt<OTHER_RET>(num);
51 template <
class OTHER_RET>
54 return static_cast<const Impl*>(
this)->template getRefAt<OTHER_RET>(num);
58 template <
class V,
class>
61 return static_cast<Impl*>(
this)->set(std::forward<V>(value));
65 template <
class V,
class>
68 static_cast<Impl*>(
this)->push(std::forward<V>(value));
75 return static_cast<Impl*>(
this)->pull(isBufferClosed);
79 template <
class V,
class>
82 return static_cast<Impl*>(
this)->closeBuffer();
88 return static_cast<const Impl*>(
this)->getNumCoroutineThreads();
94 return static_cast<const Impl*>(
this)->getNumIoThreads();
100 return static_cast<const Impl*>(
this)->getCoroQueueIdRangeForAny();
104 template <
class OTHER_RET,
class FUNC,
class ... ARGS>
108 return static_cast<Impl*>(
this)->template then<OTHER_RET>(std::forward<FUNC>(func), std::forward<ARGS>(args)...);
112 template <
class OTHER_RET,
class FUNC,
class ... ARGS>
116 return static_cast<Impl*>(
this)->template onError<OTHER_RET>(std::forward<FUNC>(func), std::forward<ARGS>(args)...);
120 template <
class OTHER_RET,
class FUNC,
class ... ARGS>
124 return static_cast<Impl*>(
this)->template finally<OTHER_RET>(std::forward<FUNC>(func), std::forward<ARGS>(args)...);
131 return static_cast<Impl*>(
this)->end();
141 return static_cast<Impl*>(
this)->get(sync);
148 return static_cast<const Impl*>(
this)->getRef(sync);
152 template <
class OTHER_RET>
155 std::shared_ptr<Impl> ctx = static_cast<Impl*>(
this)->shared_from_this();
156 return ctx->template getPrev<OTHER_RET>(ctx);
160 template <
class OTHER_RET>
163 std::shared_ptr<Impl> ctx = static_cast<Impl*>(
this)->shared_from_this();
164 return ctx->template getPrevRef<OTHER_RET>(ctx);
168 template <
class OTHER_RET>
171 return static_cast<Impl*>(
this)->template getAt<OTHER_RET>(num, sync);
175 template <
class OTHER_RET>
178 return static_cast<const Impl*>(
this)->template getRefAt<OTHER_RET>(num, sync);
182 template <
class V,
class>
185 std::shared_ptr<Impl> ctx = static_cast<Impl*>(
this)->shared_from_this();
186 return ctx->set(ctx, std::forward<V>(value));
190 template <
class V,
class>
193 std::shared_ptr<Impl> ctx = static_cast<Impl*>(
this)->shared_from_this();
194 ctx->push(ctx, std::forward<V>(value));
201 return static_cast<Impl*>(
this)->pull(sync, isBufferClosed);
205 template <
class V,
class>
208 return static_cast<Impl*>(
this)->closeBuffer();
214 return static_cast<const Impl*>(
this)->getNumCoroutineThreads();
220 return static_cast<const Impl*>(
this)->getNumIoThreads();
226 return static_cast<const Impl*>(
this)->getCoroQueueIdRangeForAny();
230 template <
class OTHER_RET,
class FUNC,
class ... ARGS>
234 return static_cast<Impl*>(
this)->template post<OTHER_RET>(std::forward<FUNC>(func), std::forward<ARGS>(args)...);
238 template <
class OTHER_RET,
class FUNC,
class ... ARGS>
242 return static_cast<Impl*>(
this)->template post<OTHER_RET>(queueId, isHighPriority, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
246 template <
class OTHER_RET,
class FUNC,
class ... ARGS>
250 return static_cast<Impl*>(
this)->template postFirst<OTHER_RET>(std::forward<FUNC>(func), std::forward<ARGS>(args)...);
254 template <
class OTHER_RET,
class FUNC,
class ... ARGS>
258 return static_cast<Impl*>(
this)->template postFirst<OTHER_RET>(queueId, isHighPriority, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
262 template <
class OTHER_RET,
class FUNC,
class ... ARGS>
266 return static_cast<Impl*>(
this)->template then<OTHER_RET>(std::forward<FUNC>(func), std::forward<ARGS>(args)...);
270 template <
class OTHER_RET,
class FUNC,
class ... ARGS>
274 return static_cast<Impl*>(
this)->template onError<OTHER_RET>(std::forward<FUNC>(func), std::forward<ARGS>(args)...);
278 template <
class OTHER_RET,
class FUNC,
class ... ARGS>
282 return static_cast<Impl*>(
this)->template finally<OTHER_RET>(std::forward<FUNC>(func), std::forward<ARGS>(args)...);
289 return static_cast<Impl*>(
this)->end();
293 template <
class OTHER_RET,
class FUNC,
class ... ARGS>
297 return static_cast<Impl*>(
this)->template postAsyncIo<OTHER_RET>(std::forward<FUNC>(func), std::forward<ARGS>(args)...);
301 template <
class OTHER_RET,
class FUNC,
class ... ARGS>
305 return static_cast<Impl*>(
this)->template postAsyncIo<OTHER_RET>(queueId, isHighPriority, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
309 template <
class OTHER_RET,
class INPUT_IT,
class>
315 return static_cast<Impl*>(
this)->template forEach<OTHER_RET>(first, last, std::move(func));
319 template <
class OTHER_RET,
class INPUT_IT>
325 return static_cast<Impl*>(
this)->template forEach<OTHER_RET>(first, num, std::move(func));
329 template <
class OTHER_RET,
class INPUT_IT,
class>
335 return static_cast<Impl*>(
this)->template forEachBatch<OTHER_RET>(first, last, std::move(func));
339 template <
class OTHER_RET,
class INPUT_IT>
345 return static_cast<Impl*>(
this)->template forEachBatch<OTHER_RET>(first, num, std::move(func));
360 return static_cast<Impl*>(
this)->template mapReduce<KEY, MAPPED_TYPE, REDUCED_TYPE>
361 (first, last, std::move(mapper), std::move(reducer));
375 return static_cast<Impl*>(
this)->template mapReduce<KEY, MAPPED_TYPE, REDUCED_TYPE>
376 (first, num, std::move(mapper), std::move(reducer));
391 return static_cast<Impl*>(
this)->template mapReduceBatch<KEY, MAPPED_TYPE, REDUCED_TYPE>
392 (first, last, std::move(mapper), std::move(reducer));
406 return static_cast<Impl*>(
this)->template mapReduceBatch<KEY, MAPPED_TYPE, REDUCED_TYPE>
407 (first, num, std::move(mapper), std::move(reducer));
413 #ifndef __QUANTUM_CONTEXT_ALLOC_SIZE 414 #define __QUANTUM_CONTEXT_ALLOC_SIZE __QUANTUM_DEFAULT_POOL_ALLOC_SIZE 416 #ifndef __QUANTUM_USE_DEFAULT_ALLOCATOR 417 #ifdef __QUANTUM_ALLOCATE_POOL_FROM_HEAP 429 _dispatcher(&dispatcher),
430 _terminated ATOMIC_FLAG_INIT,
437 template <
class OTHER_RET>
438 Context<RET>::Context(Context<OTHER_RET>& other) :
439 _promises(other._promises),
440 _dispatcher(other._dispatcher),
441 _terminated ATOMIC_FLAG_INIT,
458 if (!_terminated.test_and_set())
470 return _promises[index(num)]->
valid();
494 if (_sleepDuration.count() > 0) {
498 auto now = std::chrono::high_resolution_clock::now();
499 auto elapsed = std::chrono::duration_cast<std::chrono::microseconds>(now-_sleepTimestamp);
500 if (elapsed >= _sleepDuration) {
502 _sleepDuration = std::chrono::microseconds(0);
503 _sleepTimestamp = std::chrono::high_resolution_clock::time_point{};
507 _sleepDuration -= elapsed;
508 _sleepTimestamp = now;
518 if ((num < -1) || (num >= (
int)_promises.size()))
522 return (num == -1) ? _promises.size() - 1 : num;
526 void Context<RET>::validateTaskType(ITask::Type type)
const 530 throw std::runtime_error(
"Invalid task pointer");
536 case ITask::Type::Continuation:
537 case ITask::Type::ErrorHandler:
538 if ((_task->getType() != ITask::Type::First) &&
539 (_task->getType() != ITask::Type::Continuation))
544 case ITask::Type::Final:
545 if ((_task->getType() != ITask::Type::First) &&
546 (_task->getType() != ITask::Type::Continuation) &&
547 (_task->getType() != ITask::Type::ErrorHandler))
552 case ITask::Type::Termination:
553 if ((_task->getType() != ITask::Type::First) &&
554 (_task->getType() != ITask::Type::Continuation) &&
555 (_task->getType() != ITask::Type::ErrorHandler) &&
556 (_task->getType() != ITask::Type::Final))
567 throw std::runtime_error(
"Restricted continuation method");
572 void Context<RET>::validateContext(ICoroSync::Ptr sync)
const 574 if (static_cast<const ICoroSync*>(
this) == sync.get())
576 throw std::runtime_error(
"Must use different synchronization object");
601 if (!_yield)
throw std::runtime_error(
"Yield handle is null");
620 sleep(std::chrono::duration_cast<std::chrono::microseconds>(timeMs));
626 _sleepDuration = timeUs;
627 _sleepTimestamp = std::chrono::high_resolution_clock::now();
634 template <
class OTHER_RET,
class FUNC,
class ... ARGS>
642 _task->isHighPriority(),
644 std::forward<FUNC>(func),
645 std::forward<ARGS>(args)...),
650 std::static_pointer_cast<ITaskContinuation>(_task)->setNextTask(task);
651 task->setPrevTask(std::static_pointer_cast<ITaskContinuation>(_task));
656 template <
class OTHER_RET,
class FUNC,
class ... ARGS>
661 validateTaskType(ITask::Type::Continuation);
662 return thenImpl<OTHER_RET, FUNC, ARGS...>(ITask::Type::Continuation, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
666 template <
class OTHER_RET,
class FUNC,
class ... ARGS>
670 validateTaskType(ITask::Type::ErrorHandler);
671 return thenImpl<OTHER_RET, FUNC, ARGS...>(ITask::Type::ErrorHandler, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
675 template <
class OTHER_RET,
class FUNC,
class ... ARGS>
679 validateTaskType(ITask::Type::Final);
680 return thenImpl<OTHER_RET, FUNC, ARGS...>(ITask::Type::Final, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
687 validateTaskType(ITask::Type::Termination);
689 auto task = std::static_pointer_cast<Task>(std::static_pointer_cast<ITaskContinuation>(getTask())->getFirstTask());
690 _dispatcher->post(task);
691 return dynamic_cast<Context<RET>*
>(
this)->shared_from_this();
695 template <
class OTHER_RET,
class FUNC,
class ... ARGS>
699 return postAsyncIoImpl<OTHER_RET>((
int)IQueue::QueueId::Any,
false, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
703 template <
class OTHER_RET,
class FUNC,
class ... ARGS>
707 return postAsyncIoImpl<OTHER_RET>(queueId, isHighPriority, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
711 template <
class OTHER_RET,
class FUNC,
class ... ARGS>
715 if (queueId < (
int)IQueue::QueueId::Any)
717 throw std::runtime_error(
"Invalid coroutine queue id");
719 auto promise = PromisePtr<OTHER_RET>(
new Promise<OTHER_RET>(), Promise<OTHER_RET>::deleter);
720 auto task = IoTask::Ptr(
new IoTask(promise,
723 std::forward<FUNC>(func),
724 std::forward<ARGS>(args)...),
726 _dispatcher->postAsyncIo(task);
727 return promise->getICoroFuture();
731 template <
class OTHER_RET,
class INPUT_IT,
class>
732 ContextPtr<std::vector<OTHER_RET>>
737 return forEach<OTHER_RET>(first, std::distance(first, last), std::move(func));
741 template <
class OTHER_RET,
class INPUT_IT>
747 return post<std::vector<OTHER_RET>>(Util::forEachCoro<OTHER_RET, INPUT_IT>,
754 template <
class OTHER_RET,
class INPUT_IT,
class>
760 return forEachBatch<OTHER_RET>(first, std::distance(first, last), std::move(func));
764 template <
class OTHER_RET,
class INPUT_IT>
770 return post<std::vector<std::vector<OTHER_RET>>>(Util::forEachBatchCoro<OTHER_RET, INPUT_IT>,
774 getNumCoroutineThreads());
789 return mapReduce(first, std::distance(first, last), std::move(mapper), std::move(reducer));
803 using ReducerOutput = std::map<KEY, REDUCED_TYPE>;
804 return post<ReducerOutput>(Util::mapReduceCoro<KEY, MAPPED_TYPE, REDUCED_TYPE, INPUT_IT>,
823 return mapReduceBatch(first, std::distance(first, last), std::move(mapper), std::move(reducer));
837 using ReducerOutput = std::map<KEY, REDUCED_TYPE>;
838 return post<ReducerOutput>(Util::mapReduceBatchCoro<KEY, MAPPED_TYPE, REDUCED_TYPE, INPUT_IT>,
846 template <
class V,
class>
849 return std::static_pointer_cast<
Promise<RET>>(_promises.back())->set(std::forward<V>(value));
853 template <
class V,
class>
856 std::static_pointer_cast<
Promise<RET>>(_promises.back())->push(std::forward<V>(value));
860 template <
class V,
class>
863 std::static_pointer_cast<
Promise<RET>>(_promises.back())->push(sync, std::forward<V>(value));
870 return std::static_pointer_cast<
Promise<RET>>(_promises.back())->getIThreadFuture()->pull(isBufferClosed);
877 return std::static_pointer_cast<
Promise<RET>>(_promises.back())->getICoroFuture()->pull(sync, isBufferClosed);
881 template <
class V,
class>
884 return std::static_pointer_cast<
Promise<RET>>(_promises.back())->closeBuffer();
888 template <
class OTHER_RET>
891 return std::static_pointer_cast<
Promise<OTHER_RET>>(_promises[index(num)])->getIThreadFuture()->get();
895 template <
class OTHER_RET>
898 return std::static_pointer_cast<
Promise<OTHER_RET>>(_promises[index(num)])->getIThreadFuture()->getRef();
905 return getAt<RET>(-1);
912 return getRefAt<RET>(-1);
918 _promises[index(num)]->getIThreadFutureBase()->
wait();
924 return _promises[index(num)]->getIThreadFutureBase()->
waitFor(timeMs);
936 return waitForAt(-1, timeMs);
942 for (
auto&& promise : _promises)
946 promise->getIThreadFutureBase()->
wait();
954 template <
class V,
class>
957 return std::static_pointer_cast<
Promise<RET>>(_promises.back())->set(sync, std::forward<V>(value));
961 template <
class OTHER_RET>
965 validateContext(sync);
966 return std::static_pointer_cast<
Promise<OTHER_RET>>(_promises[index(num)])->getICoroFuture()->get(sync);
970 template <
class OTHER_RET>
974 validateContext(sync);
975 return std::static_pointer_cast<
Promise<OTHER_RET>>(_promises[index(num)])->getICoroFuture()->getRef(sync);
982 return getAt<RET>(-1, sync);
989 return getRefAt<RET>(-1, sync);
993 template <
class OTHER_RET>
996 if (_promises.size() < 2)
1003 template <
class RET>
1004 template <
class OTHER_RET>
1007 if (_promises.size() < 2)
1014 template <
class RET>
1020 template <
class RET>
1026 template <
class RET>
1032 template <
class RET>
1036 validateContext(sync);
1037 _promises[index(num)]->getICoroFutureBase()->wait(sync);
1040 template <
class RET>
1043 std::chrono::milliseconds timeMs)
const 1045 validateContext(sync);
1046 return _promises[index(num)]->getICoroFutureBase()->waitFor(sync, timeMs);
1049 template <
class RET>
1055 template <
class RET>
1057 std::chrono::milliseconds timeMs)
const 1059 return waitForAt(-1, sync, timeMs);
1062 template <
class RET>
1065 for (
auto&& promise : _promises)
1069 promise->getICoroFutureBase()->
wait(sync);
1077 template <
class RET>
1078 template <
class OTHER_RET,
class FUNC,
class ... ARGS>
1082 return postImpl<OTHER_RET>((
int)IQueue::QueueId::Any,
false, ITask::Type::Standalone, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
1085 template <
class RET>
1086 template <
class OTHER_RET,
class FUNC,
class ... ARGS>
1090 return postImpl<OTHER_RET>(queueId, isHighPriority, ITask::Type::Standalone, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
1093 template <
class RET>
1094 template <
class OTHER_RET,
class FUNC,
class ... ARGS>
1098 return postImpl<OTHER_RET>((
int)IQueue::QueueId::Any,
false, ITask::Type::First, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
1101 template <
class RET>
1102 template <
class OTHER_RET,
class FUNC,
class ... ARGS>
1106 return postImpl<OTHER_RET>(queueId, isHighPriority, ITask::Type::First, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
1109 template <
class RET>
1110 template <
class OTHER_RET,
class FUNC,
class ... ARGS>
1114 if (queueId < (
int)IQueue::QueueId::Same)
1116 throw std::runtime_error(
"Invalid coroutine queue id");
1121 (queueId == (
int)IQueue::QueueId::Same) ? _task->getQueueId() : queueId,
1124 std::forward<FUNC>(func),
1125 std::forward<ARGS>(args)...),
1128 if (type == ITask::Type::Standalone)
1130 _dispatcher->post(task);
1135 template <
class RET>
1141 template <
class RET>
1147 template <
class RET>
1150 #ifndef __QUANTUM_USE_DEFAULT_ALLOCATOR ICoroContext< OTHER_RET >::Ptr finally(FUNC &&func, ARGS &&... args)
Posts a coroutine to run asynchronously. This coroutine is always guaranteed to run.
int getNumIoThreads() const
Returns the number of underlying IO threads as specified in the dispatcher constructor.
Definition: quantum_context_impl.h:218
Definition: quantum_buffer_impl.h:22
const std::pair< int, int > & getCoroQueueIdRangeForAny() const
Gets the range [minQueueId, maxQueueId] of coroutine queueIds covered by IQueue::QueueId::Any by the ...
Definition: quantum_context_impl.h:1027
typename ICoroFuture< T >::Ptr CoroFuturePtr
Definition: quantum_icoro_future.h:72
int getNumIoThreads() const
Returns the number of underlying IO threads as specified in the dispatcher constructor.
Definition: quantum_context_impl.h:92
bool valid() const final
Determines if the future object associated with this context has a valid shared state with the corres...
Definition: quantum_context_impl.h:474
Definition: quantum_allocator.h:36
typename Promise< T >::Ptr PromisePtr
Definition: quantum_promise.h:89
Ptr end()
This is the last method in a continuation chain.
Definition: quantum_context_impl.h:287
const NonBufferRetType< V > & getRef(ICoroSync::Ptr sync) const
Get a reference the future value associated with this context.
Definition: quantum_context_impl.h:146
ICoroContext< OTHER_RET >::Ptr postFirst(FUNC &&func, ARGS &&... args)
Posts a coroutine to run asynchronously.
typename ICoroContext< RET >::Ptr CoroContextPtr
Definition: quantum_icoro_context.h:479
Class implementing the dispatching logic unto worker threads. Used for both coroutines and IO tasks.
Definition: quantum_dispatcher_core.h:45
ICoroContext< std::map< KEY, REDUCED_TYPE > >::Ptr mapReduceBatch(INPUT_IT first, INPUT_IT last, Functions::MapFunc< KEY, MAPPED_TYPE, INPUT_IT > mapper, Functions::ReduceFunc< KEY, MAPPED_TYPE, REDUCED_TYPE > reducer)
This version of mapReduce() runs both the mapper and the reducer functions in batches for improved pe...
Definition: quantum_allocator.h:54
const NonBufferRetType< OTHER_RET > & getRefAt(int num) const
Get a reference to the future value from the 'num-th' continuation context.
Definition: quantum_context_impl.h:52
const NonBufferRetType< V > & getRef() const
Get a reference the future value associated with this context.
Definition: quantum_context_impl.h:38
std::shared_ptr< IThreadContext< RET > > Ptr
Definition: quantum_ithread_context.h:40
int getNumCoroutineThreads() const
Returns the number of underlying coroutine threads as specified in the dispatcher constructor....
Definition: quantum_context_impl.h:212
std::shared_ptr< ICoroSync > Ptr
Definition: quantum_icoro_sync.h:36
Type
Definition: quantum_itask.h:37
static void deleter(Promise< T > *p)
Definition: quantum_promise_impl.h:207
Class representing a promised value.
Definition: quantum_icoro_promise.h:77
std::function< RET(const typename std::iterator_traits< INPUT_IT >::value_type &)> ForEachFunc
Definition: quantum_functions.h:34
const std::pair< int, int > & getCoroQueueIdRangeForAny() const
Gets the range [minQueueId, maxQueueId] of coroutine queueIds covered by IQueue::QueueId::Any by the ...
Definition: quantum_context_impl.h:224
CoroFuturePtr< OTHER_RET > postAsyncIo(FUNC &&func, ARGS &&... args)
Posts an IO method to run asynchronously on the IO thread pool.
Definition: quantum_context_impl.h:295
std::function< std::vector< std::pair< KEY, MAPPED_TYPE > >(const typename std::iterator_traits< INPUT_IT >::value_type &)> MapFunc
Definition: quantum_functions.h:37
Runnable object representing a coroutine.
Definition: quantum_task.h:40
int closeBuffer()
Close a promise buffer.
Definition: quantum_context_impl.h:80
std::shared_ptr< Context< RET > > Ptr
Definition: quantum_context.h:47
std::enable_if_t<!Traits::IsBuffer< T >::value, typename Traits::IsBuffer< T >::Type > NonBufferRetType
Definition: quantum_traits.h:95
NonBufferRetType< OTHER_RET > getAt(int num, ICoroSync::Ptr sync)
Get the future value from the 'num-th' continuation context.
Definition: quantum_context_impl.h:169
const std::pair< int, int > & getCoroQueueIdRangeForAny() const
Gets the range [minQueueId, maxQueueId] of coroutine queueIds covered by IQueue::QueueId::Any by the ...
Definition: quantum_context_impl.h:98
std::shared_ptr< ITask > Ptr
Definition: quantum_itask.h:34
typename Context< RET >::Ptr ContextPtr
Definition: quantum_context.h:307
IThreadContext< OTHER_RET >::Ptr finally(FUNC &&func, ARGS &&... args)
Posts a function to run asynchronously. This function is always guaranteed to run.
Provides a stack-based object pool to the underlying ContiguousPoolManager. The default buffer size i...
Definition: quantum_stack_allocator.h:34
Concrete class representing a coroutine or a thread context.
Definition: quantum_icoro_context.h:29
NonBufferRetType< V > get()
Get the future value associated with this context.
Definition: quantum_context_impl.h:31
int getNumIoThreads() const
Returns the number of underlying IO threads as specified in the dispatcher constructor.
Definition: quantum_context_impl.h:1021
IThreadContext< OTHER_RET >::Ptr then(FUNC &&func, ARGS &&... args)
Posts a function to run asynchronously.
void terminate() final
Terminates the object.
Definition: quantum_context_impl.h:456
ICoroContext< OTHER_RET >::Ptr onError(FUNC &&func, ARGS &&... args)
Posts a coroutine to run asynchronously. This is the error handler for a continuation chain and acts ...
int closeBuffer()
Close a promise buffer.
Definition: quantum_context_impl.h:206
ICoroContext< std::map< KEY, REDUCED_TYPE > >::Ptr mapReduce(INPUT_IT first, INPUT_IT last, Functions::MapFunc< KEY, MAPPED_TYPE, INPUT_IT > mapper, Functions::ReduceFunc< KEY, MAPPED_TYPE, REDUCED_TYPE > reducer)
Implementation of map-reduce functionality.
std::shared_ptr< Task > Ptr
Definition: quantum_task.h:44
void push(V &&value)
Push a single value into the promise buffer.
Definition: quantum_context_impl.h:66
ICoroContext< std::vector< OTHER_RET > >::Ptr forEach(INPUT_IT first, INPUT_IT last, Functions::ForEachFunc< OTHER_RET, INPUT_IT > func)
Applies the given unary function to all the elements in the range [first,last). This function runs in...
NonBufferRetType< V > get(ICoroSync::Ptr sync)
Get the future value associated with this context.
Definition: quantum_context_impl.h:139
const NonBufferRetType< OTHER_RET > & getRefAt(int num, ICoroSync::Ptr sync) const
Get a reference to the future value from the 'num-th' continuation context.
Definition: quantum_context_impl.h:176
int getNumCoroutineThreads() const
Returns the number of underlying coroutine threads as specified in the dispatcher constructor....
Definition: quantum_context_impl.h:1015
typename BoostCoro::pull_type Yield
Definition: quantum_traits.h:58
void ThrowFutureException(FutureState state)
Definition: quantum_future_state.h:130
NonBufferRetType< OTHER_RET > getAt(int num)
Get the future value from the 'num-th' continuation context.
Definition: quantum_context_impl.h:45
const NonBufferRetType< OTHER_RET > & getPrevRef()
Get a reference to future value associated with the previous coroutine context in the continuation ch...
Definition: quantum_context_impl.h:161
std::function< std::pair< KEY, REDUCED_TYPE >(std::pair< KEY, std::vector< MAPPED_TYPE > > &&)> ReduceFunc
Definition: quantum_functions.h:40
void wait() const final
Waits for the future associated with this context to be ready.
Definition: quantum_context_impl.h:928
int set(V &&value)
Set the promised value associated with this context.
Definition: quantum_context_impl.h:183
int set(V &&value)
Set the promised value associated with this context.
Definition: quantum_context_impl.h:59
std::shared_ptr< ICoroContext< RET > > Ptr
Definition: quantum_icoro_context.h:41
int setException(std::exception_ptr ex) final
Set an exception in the promise associated with the current IThreadContext or ICoroContext.
Definition: quantum_context_impl.h:480
Provides a heap-based object pool to the underlying ContiguousPoolManager. The default buffer size is...
Definition: quantum_heap_allocator.h:33
IThreadContext< OTHER_RET >::Ptr onError(FUNC &&func, ARGS &&... args)
Posts a function to run asynchronously. This is the error handler for a continuation chain and acts a...
void push(V &&value)
Push a single value into the promise buffer.
Definition: quantum_context_impl.h:191
std::enable_if_t< Traits::IsBuffer< T >::value, typename Traits::IsBuffer< T >::Type > BufferRetType
Definition: quantum_traits.h:93
NonBufferRetType< OTHER_RET > getPrev()
Get the future value associated with the previous coroutine context in the continuation chain.
Definition: quantum_context_impl.h:153
BufferRetType< V > pull(ICoroSync::Ptr sync, bool &isBufferClosed)
Pull a single value from the future buffer.
Definition: quantum_context_impl.h:199
std::future_status waitFor(std::chrono::milliseconds timeMs) const final
Waits for the future associated with this context to be ready for a maximum of 'timeMs' milliseconds.
Definition: quantum_context_impl.h:934
ICoroContext< OTHER_RET >::Ptr post(FUNC &&func, ARGS &&... args)
Post a coroutine to run asynchronously.
ICoroContext< std::vector< std::vector< OTHER_RET > > >::Ptr forEachBatch(INPUT_IT first, INPUT_IT last, Functions::ForEachFunc< OTHER_RET, INPUT_IT > func)
Applies the given unary function to all the elements in the range [first,last). This function runs se...
Ptr end()
This is the last method in a continuation chain.
Definition: quantum_context_impl.h:129
int getNumCoroutineThreads() const
Returns the number of underlying coroutine threads as specified in the dispatcher constructor....
Definition: quantum_context_impl.h:86
typename IThreadContext< RET >::Ptr ThreadContextPtr
Definition: quantum_ithread_context.h:242
ICoroContext< OTHER_RET >::Ptr then(FUNC &&func, ARGS &&... args)
Posts a coroutine to run asynchronously.
friend class Context
Definition: quantum_context.h:44
BufferRetType< V > pull(bool &isBufferClosed)
Pull a single value from the future buffer.
Definition: quantum_context_impl.h:73
CoroFuturePtr< T > getICoroFuture() const
Get the associated coroutine future.
Definition: quantum_promise_impl.h:164