QuantumLibrary
quantum_context_impl.h
1 /*
2 ** Copyright 2018 Bloomberg Finance L.P.
3 **
4 ** Licensed under the Apache License, Version 2.0 (the "License");
5 ** you may not use this file except in compliance with the License.
6 ** You may obtain a copy of the License at
7 **
8 ** http://www.apache.org/licenses/LICENSE-2.0
9 **
10 ** Unless required by applicable law or agreed to in writing, software
11 ** distributed under the License is distributed on an "AS IS" BASIS,
12 ** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 ** See the License for the specific language governing permissions and
14 ** limitations under the License.
15 */
16 //NOTE: DO NOT INCLUDE DIRECTLY
17 
18 //##############################################################################################
19 //#################################### IMPLEMENTATIONS #########################################
20 //##############################################################################################
21 #include <quantum/quantum_allocator.h>
22 
23 namespace Bloomberg {
24 namespace quantum {
25 
26 //==============================================================================================
27 // class IThreadContext (fwd to implementation)
28 //==============================================================================================
29 template <class RET>
30 template <class V>
32 {
33  return static_cast<Impl*>(this)->get();
34 }
35 
36 template <class RET>
37 template <class V>
39 {
40  return static_cast<const Impl*>(this)->getRef();
41 }
42 
43 template <class RET>
44 template <class OTHER_RET>
46 {
47  return static_cast<Impl*>(this)->template getAt<OTHER_RET>(num);
48 }
49 
50 template <class RET>
51 template <class OTHER_RET>
53 {
54  return static_cast<const Impl*>(this)->template getRefAt<OTHER_RET>(num);
55 }
56 
57 template <class RET>
58 template <class V, class>
60 {
61  return static_cast<Impl*>(this)->set(std::forward<V>(value));
62 }
63 
64 template <class RET>
65 template <class V, class>
67 {
68  static_cast<Impl*>(this)->push(std::forward<V>(value));
69 }
70 
71 template <class RET>
72 template <class V>
74 {
75  return static_cast<Impl*>(this)->pull(isBufferClosed);
76 }
77 
78 template <class RET>
79 template <class V, class>
81 {
82  return static_cast<Impl*>(this)->closeBuffer();
83 }
84 
85 template <class RET>
87 {
88  return static_cast<const Impl*>(this)->getNumCoroutineThreads();
89 }
90 
91 template <class RET>
93 {
94  return static_cast<const Impl*>(this)->getNumIoThreads();
95 }
96 
97 template <class RET>
98 const std::pair<int, int>& IThreadContext<RET>::getCoroQueueIdRangeForAny() const
99 {
100  return static_cast<const Impl*>(this)->getCoroQueueIdRangeForAny();
101 }
102 
103 template <class RET>
104 template <class OTHER_RET, class FUNC, class ... ARGS>
106 IThreadContext<RET>::then(FUNC&& func, ARGS&&... args)
107 {
108  return static_cast<Impl*>(this)->template then<OTHER_RET>(std::forward<FUNC>(func), std::forward<ARGS>(args)...);
109 }
110 
111 template <class RET>
112 template <class OTHER_RET, class FUNC, class ... ARGS>
114 IThreadContext<RET>::onError(FUNC&& func, ARGS&&... args)
115 {
116  return static_cast<Impl*>(this)->template onError<OTHER_RET>(std::forward<FUNC>(func), std::forward<ARGS>(args)...);
117 }
118 
119 template <class RET>
120 template <class OTHER_RET, class FUNC, class ... ARGS>
122 IThreadContext<RET>::finally(FUNC&& func, ARGS&&... args)
123 {
124  return static_cast<Impl*>(this)->template finally<OTHER_RET>(std::forward<FUNC>(func), std::forward<ARGS>(args)...);
125 }
126 
127 template <class RET>
130 {
131  return static_cast<Impl*>(this)->end();
132 }
133 
134 //==============================================================================================
135 // class ICoroContext (fwd to implementation)
136 //==============================================================================================
137 template <class RET>
138 template <class V>
140 {
141  return static_cast<Impl*>(this)->get(sync);
142 }
143 
144 template <class RET>
145 template <class V>
147 {
148  return static_cast<const Impl*>(this)->getRef(sync);
149 }
150 
151 template <class RET>
152 template <class OTHER_RET>
154 {
155  std::shared_ptr<Impl> ctx = static_cast<Impl*>(this)->shared_from_this();
156  return ctx->template getPrev<OTHER_RET>(ctx);
157 }
158 
159 template <class RET>
160 template <class OTHER_RET>
162 {
163  std::shared_ptr<Impl> ctx = static_cast<Impl*>(this)->shared_from_this();
164  return ctx->template getPrevRef<OTHER_RET>(ctx);
165 }
166 
167 template <class RET>
168 template <class OTHER_RET>
170 {
171  return static_cast<Impl*>(this)->template getAt<OTHER_RET>(num, sync);
172 }
173 
174 template <class RET>
175 template <class OTHER_RET>
177 {
178  return static_cast<const Impl*>(this)->template getRefAt<OTHER_RET>(num, sync);
179 }
180 
181 template <class RET>
182 template <class V, class>
184 {
185  std::shared_ptr<Impl> ctx = static_cast<Impl*>(this)->shared_from_this();
186  return ctx->set(ctx, std::forward<V>(value));
187 }
188 
189 template <class RET>
190 template <class V, class>
191 void ICoroContext<RET>::push(V&& value)
192 {
193  std::shared_ptr<Impl> ctx = static_cast<Impl*>(this)->shared_from_this();
194  ctx->push(ctx, std::forward<V>(value));
195 }
196 
197 template <class RET>
198 template <class V>
200 {
201  return static_cast<Impl*>(this)->pull(sync, isBufferClosed);
202 }
203 
204 template <class RET>
205 template <class V, class>
207 {
208  return static_cast<Impl*>(this)->closeBuffer();
209 }
210 
211 template <class RET>
213 {
214  return static_cast<const Impl*>(this)->getNumCoroutineThreads();
215 }
216 
217 template <class RET>
219 {
220  return static_cast<const Impl*>(this)->getNumIoThreads();
221 }
222 
223 template <class RET>
224 const std::pair<int, int>& ICoroContext<RET>::getCoroQueueIdRangeForAny() const
225 {
226  return static_cast<const Impl*>(this)->getCoroQueueIdRangeForAny();
227 }
228 
229 template <class RET>
230 template <class OTHER_RET, class FUNC, class ... ARGS>
232 ICoroContext<RET>::post(FUNC&& func, ARGS&&... args)
233 {
234  return static_cast<Impl*>(this)->template post<OTHER_RET>(std::forward<FUNC>(func), std::forward<ARGS>(args)...);
235 }
236 
237 template <class RET>
238 template <class OTHER_RET, class FUNC, class ... ARGS>
240 ICoroContext<RET>::post(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args)
241 {
242  return static_cast<Impl*>(this)->template post<OTHER_RET>(queueId, isHighPriority, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
243 }
244 
245 template <class RET>
246 template <class OTHER_RET, class FUNC, class ... ARGS>
248 ICoroContext<RET>::postFirst(FUNC&& func, ARGS&&... args)
249 {
250  return static_cast<Impl*>(this)->template postFirst<OTHER_RET>(std::forward<FUNC>(func), std::forward<ARGS>(args)...);
251 }
252 
253 template <class RET>
254 template <class OTHER_RET, class FUNC, class ... ARGS>
256 ICoroContext<RET>::postFirst(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args)
257 {
258  return static_cast<Impl*>(this)->template postFirst<OTHER_RET>(queueId, isHighPriority, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
259 }
260 
261 template <class RET>
262 template <class OTHER_RET, class FUNC, class ... ARGS>
264 ICoroContext<RET>::then(FUNC&& func, ARGS&&... args)
265 {
266  return static_cast<Impl*>(this)->template then<OTHER_RET>(std::forward<FUNC>(func), std::forward<ARGS>(args)...);
267 }
268 
269 template <class RET>
270 template <class OTHER_RET, class FUNC, class ... ARGS>
272 ICoroContext<RET>::onError(FUNC&& func, ARGS&&... args)
273 {
274  return static_cast<Impl*>(this)->template onError<OTHER_RET>(std::forward<FUNC>(func), std::forward<ARGS>(args)...);
275 }
276 
277 template <class RET>
278 template <class OTHER_RET, class FUNC, class ... ARGS>
280 ICoroContext<RET>::finally(FUNC&& func, ARGS&&... args)
281 {
282  return static_cast<Impl*>(this)->template finally<OTHER_RET>(std::forward<FUNC>(func), std::forward<ARGS>(args)...);
283 }
284 
285 template <class RET>
286 typename ICoroContext<RET>::Ptr
288 {
289  return static_cast<Impl*>(this)->end();
290 }
291 
292 template <class RET>
293 template <class OTHER_RET, class FUNC, class ... ARGS>
295 ICoroContext<RET>::postAsyncIo(FUNC&& func, ARGS&&... args)
296 {
297  return static_cast<Impl*>(this)->template postAsyncIo<OTHER_RET>(std::forward<FUNC>(func), std::forward<ARGS>(args)...);
298 }
299 
300 template <class RET>
301 template <class OTHER_RET, class FUNC, class ... ARGS>
303 ICoroContext<RET>::postAsyncIo(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args)
304 {
305  return static_cast<Impl*>(this)->template postAsyncIo<OTHER_RET>(queueId, isHighPriority, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
306 }
307 
308 template <class RET>
309 template <class OTHER_RET, class INPUT_IT, class>
312  INPUT_IT last,
314 {
315  return static_cast<Impl*>(this)->template forEach<OTHER_RET>(first, last, std::move(func));
316 }
317 
318 template <class RET>
319 template <class OTHER_RET, class INPUT_IT>
322  size_t num,
324 {
325  return static_cast<Impl*>(this)->template forEach<OTHER_RET>(first, num, std::move(func));
326 }
327 
328 template <class RET>
329 template <class OTHER_RET, class INPUT_IT, class>
332  INPUT_IT last,
334 {
335  return static_cast<Impl*>(this)->template forEachBatch<OTHER_RET>(first, last, std::move(func));
336 }
337 
338 template <class RET>
339 template <class OTHER_RET, class INPUT_IT>
342  size_t num,
344 {
345  return static_cast<Impl*>(this)->template forEachBatch<OTHER_RET>(first, num, std::move(func));
346 }
347 
348 template <class RET>
349 template <class KEY,
350  class MAPPED_TYPE,
351  class REDUCED_TYPE,
352  class INPUT_IT,
353  class>
356  INPUT_IT last,
359 {
360  return static_cast<Impl*>(this)->template mapReduce<KEY, MAPPED_TYPE, REDUCED_TYPE>
361  (first, last, std::move(mapper), std::move(reducer));
362 }
363 
364 template <class RET>
365 template <class KEY,
366  class MAPPED_TYPE,
367  class REDUCED_TYPE,
368  class INPUT_IT>
371  size_t num,
374 {
375  return static_cast<Impl*>(this)->template mapReduce<KEY, MAPPED_TYPE, REDUCED_TYPE>
376  (first, num, std::move(mapper), std::move(reducer));
377 }
378 
379 template <class RET>
380 template <class KEY,
381  class MAPPED_TYPE,
382  class REDUCED_TYPE,
383  class INPUT_IT,
384  class>
387  INPUT_IT last,
390 {
391  return static_cast<Impl*>(this)->template mapReduceBatch<KEY, MAPPED_TYPE, REDUCED_TYPE>
392  (first, last, std::move(mapper), std::move(reducer));
393 }
394 
395 template <class RET>
396 template <class KEY,
397  class MAPPED_TYPE,
398  class REDUCED_TYPE,
399  class INPUT_IT>
402  size_t num,
405 {
406  return static_cast<Impl*>(this)->template mapReduceBatch<KEY, MAPPED_TYPE, REDUCED_TYPE>
407  (first, num, std::move(mapper), std::move(reducer));
408 }
409 
410 //==============================================================================================
411 // class Context
412 //==============================================================================================
413 #ifndef __QUANTUM_CONTEXT_ALLOC_SIZE
414  #define __QUANTUM_CONTEXT_ALLOC_SIZE __QUANTUM_DEFAULT_POOL_ALLOC_SIZE
415 #endif
416 #ifndef __QUANTUM_USE_DEFAULT_ALLOCATOR
417  #ifdef __QUANTUM_ALLOCATE_POOL_FROM_HEAP
419  #else
420  using ContextAllocator = StackAllocator<Context<int>, __QUANTUM_CONTEXT_ALLOC_SIZE>;
421  #endif
422 #else
424 #endif
425 
426 template <class RET>
428  _promises(1, PromisePtr<RET>(new Promise<RET>(), Promise<RET>::deleter)),
429  _dispatcher(&dispatcher),
430  _terminated ATOMIC_FLAG_INIT,
431  _signal(-1),
432  _yield(nullptr),
433  _sleepDuration(0)
434 {}
435 
436 template <class RET>
437 template <class OTHER_RET>
438 Context<RET>::Context(Context<OTHER_RET>& other) :
439  _promises(other._promises),
440  _dispatcher(other._dispatcher),
441  _terminated ATOMIC_FLAG_INIT,
442  _signal(-1),
443  _yield(nullptr),
444  _sleepDuration(0)
445 {
446  _promises.emplace_back(PromisePtr<RET>(new Promise<RET>(), Promise<RET>::deleter)); //append a new promise
447 }
448 
449 template <class RET>
451 {
452  terminate();
453 }
454 
455 template <class RET>
457 {
458  if (!_terminated.test_and_set())
459  {
460  _promises.back()->terminate();
461 
462  //unlink task ptr
463  _task.reset();
464  }
465 }
466 
467 template <class RET>
468 bool Context<RET>::validAt(int num) const
469 {
470  return _promises[index(num)]->valid();
471 }
472 
473 template <class RET>
475 {
476  return validAt(-1);
477 }
478 
479 template <class RET>
480 int Context<RET>::setException(std::exception_ptr ex)
481 {
482  return _promises.back()->setException(ex);
483 }
484 
485 template <class RET>
487 {
488  return _signal == 0;
489 }
490 
491 template <class RET>
492 bool Context<RET>::isSleeping(bool updateTimer)
493 {
494  if (_sleepDuration.count() > 0) {
495  if (!updateTimer) {
496  return true;
497  }
498  auto now = std::chrono::high_resolution_clock::now();
499  auto elapsed = std::chrono::duration_cast<std::chrono::microseconds>(now-_sleepTimestamp);
500  if (elapsed >= _sleepDuration) {
501  //expired so we reset all values
502  _sleepDuration = std::chrono::microseconds(0);
503  _sleepTimestamp = std::chrono::high_resolution_clock::time_point{};
504  }
505  else {
506  //reduce duration and save new timestamp
507  _sleepDuration -= elapsed;
508  _sleepTimestamp = now;
509  return true;
510  }
511  }
512  return false;
513 }
514 
515 template <class RET>
516 int Context<RET>::index(int num) const
517 {
518  if ((num < -1) || (num >= (int)_promises.size()))
519  {
520  ThrowFutureException(FutureState::NoState);
521  }
522  return (num == -1) ? _promises.size() - 1 : num;
523 }
524 
525 template <class RET>
526 void Context<RET>::validateTaskType(ITask::Type type) const
527 {
528  if (!_task)
529  {
530  throw std::runtime_error("Invalid task pointer");
531  }
532 
533  bool isValid = true;
534  switch (type)
535  {
536  case ITask::Type::Continuation:
537  case ITask::Type::ErrorHandler:
538  if ((_task->getType() != ITask::Type::First) &&
539  (_task->getType() != ITask::Type::Continuation))
540  {
541  isValid = false;
542  }
543  break;
544  case ITask::Type::Final:
545  if ((_task->getType() != ITask::Type::First) &&
546  (_task->getType() != ITask::Type::Continuation) &&
547  (_task->getType() != ITask::Type::ErrorHandler))
548  {
549  isValid = false;
550  }
551  break;
552  case ITask::Type::Termination:
553  if ((_task->getType() != ITask::Type::First) &&
554  (_task->getType() != ITask::Type::Continuation) &&
555  (_task->getType() != ITask::Type::ErrorHandler) &&
556  (_task->getType() != ITask::Type::Final))
557  {
558  isValid = false;
559  }
560  break;
561  default:
562  break;
563  }
564 
565  if (!isValid)
566  {
567  throw std::runtime_error("Restricted continuation method");
568  }
569 }
570 
571 template <class RET>
572 void Context<RET>::validateContext(ICoroSync::Ptr sync) const
573 {
574  if (static_cast<const ICoroSync*>(this) == sync.get())
575  {
576  throw std::runtime_error("Must use different synchronization object");
577  }
578 }
579 
580 template <class RET>
582 {
583  _task = task;
584 }
585 
586 template <class RET>
588 {
589  return _task;
590 }
591 
592 template <class RET>
594 {
595  _yield = &yield;
596 }
597 
598 template <class RET>
600 {
601  if (!_yield) throw std::runtime_error("Yield handle is null");
602  return *_yield;
603 }
604 
605 template <class RET>
607 {
608  getYieldHandle()();
609 }
610 
611 template <class RET>
612 std::atomic_int& Context<RET>::signal()
613 {
614  return _signal;
615 }
616 
617 template <class RET>
618 void Context<RET>::sleep(const std::chrono::milliseconds& timeMs)
619 {
620  sleep(std::chrono::duration_cast<std::chrono::microseconds>(timeMs));
621 }
622 
623 template <class RET>
624 void Context<RET>::sleep(const std::chrono::microseconds& timeUs)
625 {
626  _sleepDuration = timeUs;
627  _sleepTimestamp = std::chrono::high_resolution_clock::now();
628  if (isSleeping()) {
629  yield();
630  }
631 }
632 
633 template <class RET>
634 template <class OTHER_RET, class FUNC, class ... ARGS>
636 Context<RET>::thenImpl(ITask::Type type, FUNC&& func, ARGS&&... args)
637 {
638  auto ctx = ContextPtr<OTHER_RET>(new Context<OTHER_RET>(*this),
640  auto task = Task::Ptr(new Task(ctx,
641  _task->getQueueId(), //keep current queueId
642  _task->isHighPriority(), //keep current priority
643  type,
644  std::forward<FUNC>(func),
645  std::forward<ARGS>(args)...),
646  Task::deleter);
647  ctx->setTask(task);
648 
649  //Chain tasks
650  std::static_pointer_cast<ITaskContinuation>(_task)->setNextTask(task);
651  task->setPrevTask(std::static_pointer_cast<ITaskContinuation>(_task));
652  return ctx;
653 }
654 
655 template <class RET>
656 template <class OTHER_RET, class FUNC, class ... ARGS>
658 Context<RET>::then(FUNC&& func, ARGS&&... args)
659 {
660  //Previous task must either be First or Continuation types
661  validateTaskType(ITask::Type::Continuation);
662  return thenImpl<OTHER_RET, FUNC, ARGS...>(ITask::Type::Continuation, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
663 }
664 
665 template <class RET>
666 template <class OTHER_RET, class FUNC, class ... ARGS>
668 Context<RET>::onError(FUNC&& func, ARGS&&... args)
669 {
670  validateTaskType(ITask::Type::ErrorHandler);
671  return thenImpl<OTHER_RET, FUNC, ARGS...>(ITask::Type::ErrorHandler, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
672 }
673 
674 template <class RET>
675 template <class OTHER_RET, class FUNC, class ... ARGS>
677 Context<RET>::finally(FUNC&& func, ARGS&&... args)
678 {
679  validateTaskType(ITask::Type::Final);
680  return thenImpl<OTHER_RET, FUNC, ARGS...>(ITask::Type::Final, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
681 }
682 
683 template <class RET>
684 typename Context<RET>::Ptr
686 {
687  validateTaskType(ITask::Type::Termination);
688  //Async post to next available queue since the previous task in the chain already terminated
689  auto task = std::static_pointer_cast<Task>(std::static_pointer_cast<ITaskContinuation>(getTask())->getFirstTask());
690  _dispatcher->post(task);
691  return dynamic_cast<Context<RET>*>(this)->shared_from_this();
692 }
693 
694 template <class RET>
695 template <class OTHER_RET, class FUNC, class ... ARGS>
697 Context<RET>::postAsyncIo(FUNC&& func, ARGS&&... args)
698 {
699  return postAsyncIoImpl<OTHER_RET>((int)IQueue::QueueId::Any, false, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
700 }
701 
702 template <class RET>
703 template <class OTHER_RET, class FUNC, class ... ARGS>
705 Context<RET>::postAsyncIo(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args)
706 {
707  return postAsyncIoImpl<OTHER_RET>(queueId, isHighPriority, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
708 }
709 
710 template <class RET>
711 template <class OTHER_RET, class FUNC, class ... ARGS>
713 Context<RET>::postAsyncIoImpl(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args)
714 {
715  if (queueId < (int)IQueue::QueueId::Any)
716  {
717  throw std::runtime_error("Invalid coroutine queue id");
718  }
719  auto promise = PromisePtr<OTHER_RET>(new Promise<OTHER_RET>(), Promise<OTHER_RET>::deleter);
720  auto task = IoTask::Ptr(new IoTask(promise,
721  queueId,
722  isHighPriority,
723  std::forward<FUNC>(func),
724  std::forward<ARGS>(args)...),
725  IoTask::deleter);
726  _dispatcher->postAsyncIo(task);
727  return promise->getICoroFuture();
728 }
729 
730 template <class RET>
731 template <class OTHER_RET, class INPUT_IT, class>
732 ContextPtr<std::vector<OTHER_RET>>
733 Context<RET>::forEach(INPUT_IT first,
734  INPUT_IT last,
736 {
737  return forEach<OTHER_RET>(first, std::distance(first, last), std::move(func));
738 }
739 
740 template <class RET>
741 template <class OTHER_RET, class INPUT_IT>
743 Context<RET>::forEach(INPUT_IT first,
744  size_t num,
746 {
747  return post<std::vector<OTHER_RET>>(Util::forEachCoro<OTHER_RET, INPUT_IT>,
748  INPUT_IT{first},
749  size_t{num},
751 }
752 
753 template <class RET>
754 template <class OTHER_RET, class INPUT_IT, class>
757  INPUT_IT last,
759 {
760  return forEachBatch<OTHER_RET>(first, std::distance(first, last), std::move(func));
761 }
762 
763 template <class RET>
764 template <class OTHER_RET, class INPUT_IT>
767  size_t num,
769 {
770  return post<std::vector<std::vector<OTHER_RET>>>(Util::forEachBatchCoro<OTHER_RET, INPUT_IT>,
771  INPUT_IT{first},
772  size_t{num},
774  getNumCoroutineThreads());
775 }
776 
777 template <class RET>
778 template <class KEY,
779  class MAPPED_TYPE,
780  class REDUCED_TYPE,
781  class INPUT_IT,
782  class>
784 Context<RET>::mapReduce(INPUT_IT first,
785  INPUT_IT last,
788 {
789  return mapReduce(first, std::distance(first, last), std::move(mapper), std::move(reducer));
790 }
791 
792 template <class RET>
793 template <class KEY,
794  class MAPPED_TYPE,
795  class REDUCED_TYPE,
796  class INPUT_IT>
798 Context<RET>::mapReduce(INPUT_IT first,
799  size_t num,
802 {
803  using ReducerOutput = std::map<KEY, REDUCED_TYPE>;
804  return post<ReducerOutput>(Util::mapReduceCoro<KEY, MAPPED_TYPE, REDUCED_TYPE, INPUT_IT>,
805  INPUT_IT{first},
806  size_t{num},
809 }
810 
811 template <class RET>
812 template <class KEY,
813  class MAPPED_TYPE,
814  class REDUCED_TYPE,
815  class INPUT_IT,
816  class>
819  INPUT_IT last,
822 {
823  return mapReduceBatch(first, std::distance(first, last), std::move(mapper), std::move(reducer));
824 }
825 
826 template <class RET>
827 template <class KEY,
828  class MAPPED_TYPE,
829  class REDUCED_TYPE,
830  class INPUT_IT>
833  size_t num,
836 {
837  using ReducerOutput = std::map<KEY, REDUCED_TYPE>;
838  return post<ReducerOutput>(Util::mapReduceBatchCoro<KEY, MAPPED_TYPE, REDUCED_TYPE, INPUT_IT>,
839  INPUT_IT{first},
840  size_t{num},
843 }
844 
845 template <class RET>
846 template <class V, class>
847 int Context<RET>::set(V&& value)
848 {
849  return std::static_pointer_cast<Promise<RET>>(_promises.back())->set(std::forward<V>(value));
850 }
851 
852 template <class RET>
853 template <class V, class>
854 void Context<RET>::push(V&& value)
855 {
856  std::static_pointer_cast<Promise<RET>>(_promises.back())->push(std::forward<V>(value));
857 }
858 
859 template <class RET>
860 template <class V, class>
861 void Context<RET>::push(ICoroSync::Ptr sync, V&& value)
862 {
863  std::static_pointer_cast<Promise<RET>>(_promises.back())->push(sync, std::forward<V>(value));
864 }
865 
866 template <class RET>
867 template <class V>
869 {
870  return std::static_pointer_cast<Promise<RET>>(_promises.back())->getIThreadFuture()->pull(isBufferClosed);
871 }
872 
873 template <class RET>
874 template <class V>
876 {
877  return std::static_pointer_cast<Promise<RET>>(_promises.back())->getICoroFuture()->pull(sync, isBufferClosed);
878 }
879 
880 template <class RET>
881 template <class V, class>
883 {
884  return std::static_pointer_cast<Promise<RET>>(_promises.back())->closeBuffer();
885 }
886 
887 template <class RET>
888 template <class OTHER_RET>
890 {
891  return std::static_pointer_cast<Promise<OTHER_RET>>(_promises[index(num)])->getIThreadFuture()->get();
892 }
893 
894 template <class RET>
895 template <class OTHER_RET>
897 {
898  return std::static_pointer_cast<Promise<OTHER_RET>>(_promises[index(num)])->getIThreadFuture()->getRef();
899 }
900 
901 template <class RET>
902 template <class V>
904 {
905  return getAt<RET>(-1);
906 }
907 
908 template <class RET>
909 template <class V>
911 {
912  return getRefAt<RET>(-1);
913 }
914 
915 template <class RET>
916 void Context<RET>::waitAt(int num) const
917 {
918  _promises[index(num)]->getIThreadFutureBase()->wait();
919 }
920 
921 template <class RET>
922 std::future_status Context<RET>::waitForAt(int num, std::chrono::milliseconds timeMs) const
923 {
924  return _promises[index(num)]->getIThreadFutureBase()->waitFor(timeMs);
925 }
926 
927 template <class RET>
928 void Context<RET>::wait() const
929 {
930  waitAt(-1);
931 }
932 
933 template <class RET>
934 std::future_status Context<RET>::waitFor(std::chrono::milliseconds timeMs) const
935 {
936  return waitForAt(-1, timeMs);
937 }
938 
939 template <class RET>
941 {
942  for (auto&& promise : _promises)
943  {
944  try
945  {
946  promise->getIThreadFutureBase()->wait();
947  }
948  catch(...) //catch all broken promises or any other exception
949  {}
950  }
951 }
952 
953 template <class RET>
954 template <class V, class>
955 int Context<RET>::set(ICoroSync::Ptr sync, V&& value)
956 {
957  return std::static_pointer_cast<Promise<RET>>(_promises.back())->set(sync, std::forward<V>(value));
958 }
959 
960 template <class RET>
961 template <class OTHER_RET>
963  ICoroSync::Ptr sync)
964 {
965  validateContext(sync);
966  return std::static_pointer_cast<Promise<OTHER_RET>>(_promises[index(num)])->getICoroFuture()->get(sync);
967 }
968 
969 template <class RET>
970 template <class OTHER_RET>
972  ICoroSync::Ptr sync) const
973 {
974  validateContext(sync);
975  return std::static_pointer_cast<Promise<OTHER_RET>>(_promises[index(num)])->getICoroFuture()->getRef(sync);
976 }
977 
978 template <class RET>
979 template <class V>
981 {
982  return getAt<RET>(-1, sync);
983 }
984 
985 template <class RET>
986 template <class V>
988 {
989  return getRefAt<RET>(-1, sync);
990 }
991 
992 template <class RET>
993 template <class OTHER_RET>
995 {
996  if (_promises.size() < 2)
997  {
998  ThrowFutureException(FutureState::NoState);
999  }
1000  return std::static_pointer_cast<Promise<OTHER_RET>>(_promises[index(_promises.size()-2)])->getICoroFuture()->get(sync);
1001 }
1002 
1003 template <class RET>
1004 template <class OTHER_RET>
1006 {
1007  if (_promises.size() < 2)
1008  {
1009  ThrowFutureException(FutureState::NoState);
1010  }
1011  return std::static_pointer_cast<Promise<OTHER_RET>>(_promises[index(_promises.size()-2)])->getICoroFuture()->getRef(sync);
1012 }
1013 
1014 template <class RET>
1016 {
1017  return _dispatcher->getNumCoroutineThreads();
1018 }
1019 
1020 template <class RET>
1022 {
1023  return _dispatcher->getNumIoThreads();
1024 }
1025 
1026 template <class RET>
1027 const std::pair<int, int>& Context<RET>::getCoroQueueIdRangeForAny() const
1028 {
1029  return _dispatcher->getCoroQueueIdRangeForAny();
1030 }
1031 
1032 template <class RET>
1034  ICoroSync::Ptr sync) const
1035 {
1036  validateContext(sync);
1037  _promises[index(num)]->getICoroFutureBase()->wait(sync);
1038 }
1039 
1040 template <class RET>
1041 std::future_status Context<RET>::waitForAt(int num,
1042  ICoroSync::Ptr sync,
1043  std::chrono::milliseconds timeMs) const
1044 {
1045  validateContext(sync);
1046  return _promises[index(num)]->getICoroFutureBase()->waitFor(sync, timeMs);
1047 }
1048 
1049 template <class RET>
1051 {
1052  waitAt(-1, sync);
1053 }
1054 
1055 template <class RET>
1056 std::future_status Context<RET>::waitFor(ICoroSync::Ptr sync,
1057  std::chrono::milliseconds timeMs) const
1058 {
1059  return waitForAt(-1, sync, timeMs);
1060 }
1061 
1062 template <class RET>
1064 {
1065  for (auto&& promise : _promises)
1066  {
1067  try
1068  {
1069  promise->getICoroFutureBase()->wait(sync);
1070  }
1071  catch(...) //catch all broken promises or any other exception
1072  {
1073  }
1074  }
1075 }
1076 
1077 template <class RET>
1078 template <class OTHER_RET, class FUNC, class ... ARGS>
1080 Context<RET>::post(FUNC&& func, ARGS&&... args)
1081 {
1082  return postImpl<OTHER_RET>((int)IQueue::QueueId::Any, false, ITask::Type::Standalone, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
1083 }
1084 
1085 template <class RET>
1086 template <class OTHER_RET, class FUNC, class ... ARGS>
1088 Context<RET>::post(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args)
1089 {
1090  return postImpl<OTHER_RET>(queueId, isHighPriority, ITask::Type::Standalone, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
1091 }
1092 
1093 template <class RET>
1094 template <class OTHER_RET, class FUNC, class ... ARGS>
1096 Context<RET>::postFirst(FUNC&& func, ARGS&&... args)
1097 {
1098  return postImpl<OTHER_RET>((int)IQueue::QueueId::Any, false, ITask::Type::First, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
1099 }
1100 
1101 template <class RET>
1102 template <class OTHER_RET, class FUNC, class ... ARGS>
1104 Context<RET>::postFirst(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args)
1105 {
1106  return postImpl<OTHER_RET>(queueId, isHighPriority, ITask::Type::First, std::forward<FUNC>(func), std::forward<ARGS>(args)...);
1107 }
1108 
1109 template <class RET>
1110 template <class OTHER_RET, class FUNC, class ... ARGS>
1112 Context<RET>::postImpl(int queueId, bool isHighPriority, ITask::Type type, FUNC&& func, ARGS&&... args)
1113 {
1114  if (queueId < (int)IQueue::QueueId::Same)
1115  {
1116  throw std::runtime_error("Invalid coroutine queue id");
1117  }
1118  auto ctx = ContextPtr<OTHER_RET>(new Context<OTHER_RET>(*_dispatcher),
1120  auto task = Task::Ptr(new Task(ctx,
1121  (queueId == (int)IQueue::QueueId::Same) ? _task->getQueueId() : queueId,
1122  isHighPriority,
1123  type,
1124  std::forward<FUNC>(func),
1125  std::forward<ARGS>(args)...),
1126  Task::deleter);
1127  ctx->setTask(task);
1128  if (type == ITask::Type::Standalone)
1129  {
1130  _dispatcher->post(task);
1131  }
1132  return ctx;
1133 }
1134 
1135 template <class RET>
1136 void* Context<RET>::operator new(size_t)
1137 {
1138  return Allocator<ContextAllocator>::instance(AllocatorTraits::contextAllocSize()).allocate();
1139 }
1140 
1141 template <class RET>
1142 void Context<RET>::operator delete(void* p)
1143 {
1144  Allocator<ContextAllocator>::instance(AllocatorTraits::contextAllocSize()).deallocate(static_cast<Context<int>*>(p));
1145 }
1146 
1147 template <class RET>
1149 {
1150 #ifndef __QUANTUM_USE_DEFAULT_ALLOCATOR
1151  Allocator<ContextAllocator>::instance(AllocatorTraits::contextAllocSize()).dispose(reinterpret_cast<Context<int>*>(p));
1152 #else
1153  delete p;
1154 #endif
1155 }
1156 
1157 }}
ICoroContext< OTHER_RET >::Ptr finally(FUNC &&func, ARGS &&... args)
Posts a coroutine to run asynchronously. This coroutine is always guaranteed to run.
int getNumIoThreads() const
Returns the number of underlying IO threads as specified in the dispatcher constructor.
Definition: quantum_context_impl.h:218
Definition: quantum_buffer_impl.h:22
const std::pair< int, int > & getCoroQueueIdRangeForAny() const
Gets the range [minQueueId, maxQueueId] of coroutine queueIds covered by IQueue::QueueId::Any by the ...
Definition: quantum_context_impl.h:1027
typename ICoroFuture< T >::Ptr CoroFuturePtr
Definition: quantum_icoro_future.h:72
int getNumIoThreads() const
Returns the number of underlying IO threads as specified in the dispatcher constructor.
Definition: quantum_context_impl.h:92
bool valid() const final
Determines if the future object associated with this context has a valid shared state with the corres...
Definition: quantum_context_impl.h:474
Definition: quantum_allocator.h:36
typename Promise< T >::Ptr PromisePtr
Definition: quantum_promise.h:89
Ptr end()
This is the last method in a continuation chain.
Definition: quantum_context_impl.h:287
const NonBufferRetType< V > & getRef(ICoroSync::Ptr sync) const
Get a reference the future value associated with this context.
Definition: quantum_context_impl.h:146
ICoroContext< OTHER_RET >::Ptr postFirst(FUNC &&func, ARGS &&... args)
Posts a coroutine to run asynchronously.
typename ICoroContext< RET >::Ptr CoroContextPtr
Definition: quantum_icoro_context.h:479
Class implementing the dispatching logic unto worker threads. Used for both coroutines and IO tasks.
Definition: quantum_dispatcher_core.h:45
ICoroContext< std::map< KEY, REDUCED_TYPE > >::Ptr mapReduceBatch(INPUT_IT first, INPUT_IT last, Functions::MapFunc< KEY, MAPPED_TYPE, INPUT_IT > mapper, Functions::ReduceFunc< KEY, MAPPED_TYPE, REDUCED_TYPE > reducer)
This version of mapReduce() runs both the mapper and the reducer functions in batches for improved pe...
Definition: quantum_allocator.h:54
const NonBufferRetType< OTHER_RET > & getRefAt(int num) const
Get a reference to the future value from the 'num-th' continuation context.
Definition: quantum_context_impl.h:52
const NonBufferRetType< V > & getRef() const
Get a reference the future value associated with this context.
Definition: quantum_context_impl.h:38
std::shared_ptr< IThreadContext< RET > > Ptr
Definition: quantum_ithread_context.h:40
int getNumCoroutineThreads() const
Returns the number of underlying coroutine threads as specified in the dispatcher constructor....
Definition: quantum_context_impl.h:212
std::shared_ptr< ICoroSync > Ptr
Definition: quantum_icoro_sync.h:36
Type
Definition: quantum_itask.h:37
static void deleter(Promise< T > *p)
Definition: quantum_promise_impl.h:207
Class representing a promised value.
Definition: quantum_icoro_promise.h:77
std::function< RET(const typename std::iterator_traits< INPUT_IT >::value_type &)> ForEachFunc
Definition: quantum_functions.h:34
const std::pair< int, int > & getCoroQueueIdRangeForAny() const
Gets the range [minQueueId, maxQueueId] of coroutine queueIds covered by IQueue::QueueId::Any by the ...
Definition: quantum_context_impl.h:224
CoroFuturePtr< OTHER_RET > postAsyncIo(FUNC &&func, ARGS &&... args)
Posts an IO method to run asynchronously on the IO thread pool.
Definition: quantum_context_impl.h:295
std::function< std::vector< std::pair< KEY, MAPPED_TYPE > >(const typename std::iterator_traits< INPUT_IT >::value_type &)> MapFunc
Definition: quantum_functions.h:37
Runnable object representing a coroutine.
Definition: quantum_task.h:40
int closeBuffer()
Close a promise buffer.
Definition: quantum_context_impl.h:80
std::shared_ptr< Context< RET > > Ptr
Definition: quantum_context.h:47
std::enable_if_t<!Traits::IsBuffer< T >::value, typename Traits::IsBuffer< T >::Type > NonBufferRetType
Definition: quantum_traits.h:95
NonBufferRetType< OTHER_RET > getAt(int num, ICoroSync::Ptr sync)
Get the future value from the 'num-th' continuation context.
Definition: quantum_context_impl.h:169
const std::pair< int, int > & getCoroQueueIdRangeForAny() const
Gets the range [minQueueId, maxQueueId] of coroutine queueIds covered by IQueue::QueueId::Any by the ...
Definition: quantum_context_impl.h:98
std::shared_ptr< ITask > Ptr
Definition: quantum_itask.h:34
typename Context< RET >::Ptr ContextPtr
Definition: quantum_context.h:307
IThreadContext< OTHER_RET >::Ptr finally(FUNC &&func, ARGS &&... args)
Posts a function to run asynchronously. This function is always guaranteed to run.
Provides a stack-based object pool to the underlying ContiguousPoolManager. The default buffer size i...
Definition: quantum_stack_allocator.h:34
Concrete class representing a coroutine or a thread context.
Definition: quantum_icoro_context.h:29
NonBufferRetType< V > get()
Get the future value associated with this context.
Definition: quantum_context_impl.h:31
int getNumIoThreads() const
Returns the number of underlying IO threads as specified in the dispatcher constructor.
Definition: quantum_context_impl.h:1021
IThreadContext< OTHER_RET >::Ptr then(FUNC &&func, ARGS &&... args)
Posts a function to run asynchronously.
void terminate() final
Terminates the object.
Definition: quantum_context_impl.h:456
ICoroContext< OTHER_RET >::Ptr onError(FUNC &&func, ARGS &&... args)
Posts a coroutine to run asynchronously. This is the error handler for a continuation chain and acts ...
int closeBuffer()
Close a promise buffer.
Definition: quantum_context_impl.h:206
ICoroContext< std::map< KEY, REDUCED_TYPE > >::Ptr mapReduce(INPUT_IT first, INPUT_IT last, Functions::MapFunc< KEY, MAPPED_TYPE, INPUT_IT > mapper, Functions::ReduceFunc< KEY, MAPPED_TYPE, REDUCED_TYPE > reducer)
Implementation of map-reduce functionality.
std::shared_ptr< Task > Ptr
Definition: quantum_task.h:44
void push(V &&value)
Push a single value into the promise buffer.
Definition: quantum_context_impl.h:66
ICoroContext< std::vector< OTHER_RET > >::Ptr forEach(INPUT_IT first, INPUT_IT last, Functions::ForEachFunc< OTHER_RET, INPUT_IT > func)
Applies the given unary function to all the elements in the range [first,last). This function runs in...
NonBufferRetType< V > get(ICoroSync::Ptr sync)
Get the future value associated with this context.
Definition: quantum_context_impl.h:139
const NonBufferRetType< OTHER_RET > & getRefAt(int num, ICoroSync::Ptr sync) const
Get a reference to the future value from the 'num-th' continuation context.
Definition: quantum_context_impl.h:176
int getNumCoroutineThreads() const
Returns the number of underlying coroutine threads as specified in the dispatcher constructor....
Definition: quantum_context_impl.h:1015
typename BoostCoro::pull_type Yield
Definition: quantum_traits.h:58
void ThrowFutureException(FutureState state)
Definition: quantum_future_state.h:130
NonBufferRetType< OTHER_RET > getAt(int num)
Get the future value from the 'num-th' continuation context.
Definition: quantum_context_impl.h:45
const NonBufferRetType< OTHER_RET > & getPrevRef()
Get a reference to future value associated with the previous coroutine context in the continuation ch...
Definition: quantum_context_impl.h:161
std::function< std::pair< KEY, REDUCED_TYPE >(std::pair< KEY, std::vector< MAPPED_TYPE > > &&)> ReduceFunc
Definition: quantum_functions.h:40
void wait() const final
Waits for the future associated with this context to be ready.
Definition: quantum_context_impl.h:928
int set(V &&value)
Set the promised value associated with this context.
Definition: quantum_context_impl.h:183
int set(V &&value)
Set the promised value associated with this context.
Definition: quantum_context_impl.h:59
std::shared_ptr< ICoroContext< RET > > Ptr
Definition: quantum_icoro_context.h:41
int setException(std::exception_ptr ex) final
Set an exception in the promise associated with the current IThreadContext or ICoroContext.
Definition: quantum_context_impl.h:480
Provides a heap-based object pool to the underlying ContiguousPoolManager. The default buffer size is...
Definition: quantum_heap_allocator.h:33
IThreadContext< OTHER_RET >::Ptr onError(FUNC &&func, ARGS &&... args)
Posts a function to run asynchronously. This is the error handler for a continuation chain and acts a...
void push(V &&value)
Push a single value into the promise buffer.
Definition: quantum_context_impl.h:191
std::enable_if_t< Traits::IsBuffer< T >::value, typename Traits::IsBuffer< T >::Type > BufferRetType
Definition: quantum_traits.h:93
NonBufferRetType< OTHER_RET > getPrev()
Get the future value associated with the previous coroutine context in the continuation chain.
Definition: quantum_context_impl.h:153
BufferRetType< V > pull(ICoroSync::Ptr sync, bool &isBufferClosed)
Pull a single value from the future buffer.
Definition: quantum_context_impl.h:199
std::future_status waitFor(std::chrono::milliseconds timeMs) const final
Waits for the future associated with this context to be ready for a maximum of 'timeMs' milliseconds.
Definition: quantum_context_impl.h:934
ICoroContext< OTHER_RET >::Ptr post(FUNC &&func, ARGS &&... args)
Post a coroutine to run asynchronously.
ICoroContext< std::vector< std::vector< OTHER_RET > > >::Ptr forEachBatch(INPUT_IT first, INPUT_IT last, Functions::ForEachFunc< OTHER_RET, INPUT_IT > func)
Applies the given unary function to all the elements in the range [first,last). This function runs se...
Ptr end()
This is the last method in a continuation chain.
Definition: quantum_context_impl.h:129
int getNumCoroutineThreads() const
Returns the number of underlying coroutine threads as specified in the dispatcher constructor....
Definition: quantum_context_impl.h:86
typename IThreadContext< RET >::Ptr ThreadContextPtr
Definition: quantum_ithread_context.h:242
ICoroContext< OTHER_RET >::Ptr then(FUNC &&func, ARGS &&... args)
Posts a coroutine to run asynchronously.
friend class Context
Definition: quantum_context.h:44
BufferRetType< V > pull(bool &isBufferClosed)
Pull a single value from the future buffer.
Definition: quantum_context_impl.h:73
CoroFuturePtr< T > getICoroFuture() const
Get the associated coroutine future.
Definition: quantum_promise_impl.h:164