21 #include <quantum/util/quantum_future_joiner.h> 26 template <
typename RET,
typename CAPTURE>
33 ctx->setYieldHandle(yield);
34 yield.get() = std::forward<CAPTURE>(capture)();
37 catch(std::exception& ex)
40 #ifdef __QUANTUM_PRINT_DEBUG 41 std::lock_guard<std::mutex> guard(Util::LogMutex());
42 std::cerr <<
"Caught exception : " << ex.what() << std::endl;
44 ctx->setException(std::current_exception());
48 #ifdef __QUANTUM_PRINT_DEBUG 49 std::lock_guard<std::mutex> guard(Util::LogMutex());
50 std::cerr <<
"Caught unknown exception." << std::endl;
52 ctx->setException(std::current_exception());
58 template <
typename RET,
typename CAPTURE>
64 return std::forward<CAPTURE>(capture)();
66 catch(std::exception& ex)
69 #ifdef __QUANTUM_PRINT_DEBUG 70 std::lock_guard<std::mutex> guard(Util::LogMutex());
71 std::cerr <<
"Caught exception : " << ex.what() << std::endl;
73 promise->setException(std::current_exception());
77 #ifdef __QUANTUM_PRINT_DEBUG 78 std::lock_guard<std::mutex> guard(Util::LogMutex());
79 std::cerr <<
"Caught unknown exception." << std::endl;
81 promise->setException(std::current_exception());
86 template<
class RET,
class FUNC,
class ...ARGS>
90 auto capture =
makeCapture(std::forward<FUNC>(func), std::shared_ptr<
Context<RET>>(context), std::forward<ARGS>(args)...);
94 template<
class RET,
class FUNC,
class ...ARGS>
98 auto capture =
makeCapture(std::forward<FUNC>(func), std::shared_ptr<
Promise<RET>>(promise), std::forward<ARGS>(args)...);
102 template <
class RET,
class INPUT_IT>
108 std::vector<CoroContextPtr<RET>> asyncResults;
109 asyncResults.reserve(num);
110 for (
size_t i = 0; i < num; ++i, ++inputIt)
113 asyncResults.emplace_back(ctx->template post<RET>([inputIt, &func](
CoroContextPtr<RET> ctx)->int
115 return ctx->set(func(*inputIt));
121 template <
class RET,
class INPUT_IT>
126 size_t numCoroutineThreads)
128 size_t numPerBatch = num/numCoroutineThreads;
129 size_t remainder = num%numCoroutineThreads;
130 std::vector<CoroContextPtr<std::vector<RET>>> asyncResults;
131 asyncResults.reserve(numCoroutineThreads);
134 for (
size_t i = 0; i < numCoroutineThreads; ++i)
137 size_t batchSize = (i < remainder) ? numPerBatch + 1 : numPerBatch;
142 asyncResults.emplace_back(ctx->template post<std::vector<RET>>([inputIt, batchSize, &func](
CoroContextPtr<std::vector<RET>> ctx)->int
144 std::vector<RET> result;
145 result.reserve(batchSize);
147 for (size_t j = 0; j < batchSize; ++j, ++it)
149 result.emplace_back(func(*it));
151 return ctx->set(std::move(result));
153 std::advance(inputIt, batchSize);
155 return ctx->set(
FutureJoiner<std::vector<RET>>()(*ctx, std::move(asyncResults))->get(ctx));
169 using MappedResult = std::pair<KEY, MAPPED_TYPE>;
170 using MapperOutput = std::vector<MappedResult>;
171 using IndexerInput = std::vector<MapperOutput>;
172 using IndexerOutput = std::map<KEY, std::vector<MAPPED_TYPE>>;
173 using ReducedResult = std::pair<KEY, REDUCED_TYPE>;
174 using ReducedResults = std::vector<ReducedResult>;
175 using ReducerOutput = std::map<KEY, REDUCED_TYPE>;
178 IndexerInput indexerInput = ctx->template forEach<MapperOutput>(inputIt, num, mapper)->get(ctx);
181 IndexerOutput indexerOutput;
182 for (
auto&& mapperOutput : indexerInput)
184 for (
auto&& mapperResult : mapperOutput) {
185 indexerOutput[std::move(mapperResult.first)].emplace_back(std::move(mapperResult.second));
190 ReducedResults reducedResults = ctx->template forEach<ReducedResult>
191 (indexerOutput.begin(), indexerOutput.size(), reducer)->get(ctx);
193 ReducerOutput reducerOutput;
194 for (
auto&& reducedResult : reducedResults)
196 reducerOutput.emplace(std::move(reducedResult.first), std::move(reducedResult.second));
199 return ctx->set(std::move(reducerOutput));
213 using MappedResult = std::pair<KEY, MAPPED_TYPE>;
214 using MapperOutput = std::vector<MappedResult>;
215 using IndexerInput = std::vector<std::vector<MapperOutput>>;
216 using IndexerOutput = std::map<KEY, std::vector<MAPPED_TYPE>>;
217 using ReducedResult = std::pair<KEY, REDUCED_TYPE>;
218 using ReducedResults = std::vector<std::vector<ReducedResult>>;
219 using ReducerOutput = std::map<KEY, REDUCED_TYPE>;
222 IndexerInput indexerInput = ctx->template forEachBatch<MapperOutput>(inputIt, num, mapper)->get(ctx);
225 IndexerOutput indexerOutput;
226 for (
auto&& partialMapOutput : indexerInput)
228 for (
auto&& mapperOutput : partialMapOutput)
230 for (
auto&& mapperResult : mapperOutput) {
231 indexerOutput[std::move(mapperResult.first)].emplace_back(std::move(mapperResult.second));
237 ReducedResults reducedResults = ctx->template forEachBatch<ReducedResult>
238 (indexerOutput.begin(), indexerOutput.size(), reducer)->get(ctx);
240 ReducerOutput reducerOutput;
241 for (
auto&& partialReducedResult : reducedResults)
243 for (
auto&& reducedResult : partialReducedResult)
245 reducerOutput.emplace(std::move(reducedResult.first), std::move(reducedResult.second));
249 return ctx->set(std::move(reducerOutput));
252 #ifdef __QUANTUM_PRINT_DEBUG 253 std::mutex& Util::LogMutex()
int bindCoro(Traits::Yield &yield, std::shared_ptr< CoroContext< RET >> ctx, CAPTURE &&capture)
Definition: quantum_util_impl.h:27
Definition: quantum_buffer_impl.h:22
int bindIo(std::shared_ptr< Promise< RET >> promise, CAPTURE &&capture)
Definition: quantum_util_impl.h:59
Exposes methods to manipulate the coroutine context.
Definition: quantum_icoro_context.h:38
typename ICoroContext< RET >::Ptr CoroContextPtr
Definition: quantum_icoro_context.h:479
Class representing a promised value.
Definition: quantum_icoro_promise.h:77
std::function< RET(const typename std::iterator_traits< INPUT_IT >::value_type &)> ForEachFunc
Definition: quantum_functions.h:34
std::function< std::vector< std::pair< KEY, MAPPED_TYPE > >(const typename std::iterator_traits< INPUT_IT >::value_type &)> MapFunc
Definition: quantum_functions.h:37
Utility class that joins N futures into a single one.
Definition: quantum_future_joiner.h:39
Concrete class representing a coroutine or a thread context.
Definition: quantum_icoro_context.h:29
static Function< int(Traits::Yield &)> bindCaller(std::shared_ptr< Context< RET >> ctx, FUNC &&func0, ARGS &&...args0)
Definition: quantum_util_impl.h:88
static int forEachBatchCoro(CoroContextPtr< std::vector< std::vector< RET >>> ctx, INPUT_IT inputIt, size_t num, const Functions::ForEachFunc< RET, INPUT_IT > &func, size_t numCoroutineThreads)
Definition: quantum_util_impl.h:122
typename BoostCoro::pull_type Yield
Definition: quantum_traits.h:58
std::function< std::pair< KEY, REDUCED_TYPE >(std::pair< KEY, std::vector< MAPPED_TYPE > > &&)> ReduceFunc
Definition: quantum_functions.h:40
Similar implementation to std::function except that it allows capture of non-copyable types.
Definition: quantum_capture.h:62
static int forEachCoro(CoroContextPtr< std::vector< RET >> ctx, INPUT_IT inputIt, size_t num, const Functions::ForEachFunc< RET, INPUT_IT > &func)
Definition: quantum_util_impl.h:103
static Function< int()> bindIoCaller(std::shared_ptr< Promise< RET >> promise, FUNC &&func0, ARGS &&...args0)
Definition: quantum_util_impl.h:96
Capture< FUNC, ARGS... > makeCapture(FUNC &&func, ARGS &&... args)
Definition: quantum_capture_impl.h:42