QuantumLibrary
quantum_util_impl.h
1 /*
2 ** Copyright 2018 Bloomberg Finance L.P.
3 **
4 ** Licensed under the Apache License, Version 2.0 (the "License");
5 ** you may not use this file except in compliance with the License.
6 ** You may obtain a copy of the License at
7 **
8 ** http://www.apache.org/licenses/LICENSE-2.0
9 **
10 ** Unless required by applicable law or agreed to in writing, software
11 ** distributed under the License is distributed on an "AS IS" BASIS,
12 ** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 ** See the License for the specific language governing permissions and
14 ** limitations under the License.
15 */
16 //NOTE: DO NOT INCLUDE DIRECTLY
17 
18 //##############################################################################################
19 //#################################### IMPLEMENTATIONS #########################################
20 //##############################################################################################
21 #include <quantum/util/quantum_future_joiner.h>
22 
23 namespace Bloomberg {
24 namespace quantum {
25 
26 template <typename RET, typename CAPTURE>
28  std::shared_ptr<CoroContext<RET>> ctx,
29  CAPTURE&& capture)
30 {
31  try
32  {
33  ctx->setYieldHandle(yield); //set coroutine yield
34  yield.get() = std::forward<CAPTURE>(capture)();
35  return 0;
36  }
37  catch(std::exception& ex)
38  {
39  UNUSED(ex);
40 #ifdef __QUANTUM_PRINT_DEBUG
41  std::lock_guard<std::mutex> guard(Util::LogMutex());
42  std::cerr << "Caught exception : " << ex.what() << std::endl;
43 #endif
44  ctx->setException(std::current_exception());
45  }
46  catch(...)
47  {
48 #ifdef __QUANTUM_PRINT_DEBUG
49  std::lock_guard<std::mutex> guard(Util::LogMutex());
50  std::cerr << "Caught unknown exception." << std::endl;
51 #endif
52  ctx->setException(std::current_exception());
53  }
54  yield.get() = (int)ITask::RetCode::Exception;
55  return (int)ITask::RetCode::Exception;
56 }
57 
58 template <typename RET, typename CAPTURE>
59 int bindIo(std::shared_ptr<Promise<RET>> promise,
60  CAPTURE&& capture)
61 {
62  try
63  {
64  return std::forward<CAPTURE>(capture)();
65  }
66  catch(std::exception& ex)
67  {
68  UNUSED(ex);
69 #ifdef __QUANTUM_PRINT_DEBUG
70  std::lock_guard<std::mutex> guard(Util::LogMutex());
71  std::cerr << "Caught exception : " << ex.what() << std::endl;
72 #endif
73  promise->setException(std::current_exception());
74  }
75  catch(...)
76  {
77 #ifdef __QUANTUM_PRINT_DEBUG
78  std::lock_guard<std::mutex> guard(Util::LogMutex());
79  std::cerr << "Caught unknown exception." << std::endl;
80 #endif
81  promise->setException(std::current_exception());
82  }
83  return (int)ITask::RetCode::Exception;
84 }
85 
86 template<class RET, class FUNC, class ...ARGS>
87 Function<int(Traits::Yield&)>
88 Util::bindCaller(std::shared_ptr<Context<RET>> context, FUNC&& func, ARGS&& ...args)
89 {
90  auto capture = makeCapture(std::forward<FUNC>(func), std::shared_ptr<Context<RET>>(context), std::forward<ARGS>(args)...);
91  return makeCapture(bindCoro<RET, decltype(capture)>, std::shared_ptr<Context<RET>>(context), std::move(capture));
92 }
93 
94 template<class RET, class FUNC, class ...ARGS>
95 Function<int()>
96 Util::bindIoCaller(std::shared_ptr<Promise<RET>> promise, FUNC&& func, ARGS&& ...args)
97 {
98  auto capture = makeCapture(std::forward<FUNC>(func), std::shared_ptr<Promise<RET>>(promise), std::forward<ARGS>(args)...);
99  return makeCapture(bindIo<RET, decltype(capture)>, std::shared_ptr<Promise<RET>>(promise), std::move(capture));
100 }
101 
102 template <class RET, class INPUT_IT>
103 int Util::forEachCoro(CoroContextPtr<std::vector<RET>> ctx,
104  INPUT_IT inputIt,
105  size_t num,
107 {
108  std::vector<CoroContextPtr<RET>> asyncResults;
109  asyncResults.reserve(num);
110  for (size_t i = 0; i < num; ++i, ++inputIt)
111  {
112  //Run the function
113  asyncResults.emplace_back(ctx->template post<RET>([inputIt, &func](CoroContextPtr<RET> ctx)->int
114  {
115  return ctx->set(func(*inputIt));
116  }));
117  }
118  return ctx->set(FutureJoiner<RET>()(*ctx, std::move(asyncResults))->get(ctx));
119 }
120 
121 template <class RET, class INPUT_IT>
122 int Util::forEachBatchCoro(CoroContextPtr<std::vector<std::vector<RET>>> ctx,
123  INPUT_IT inputIt,
124  size_t num,
126  size_t numCoroutineThreads)
127 {
128  size_t numPerBatch = num/numCoroutineThreads;
129  size_t remainder = num%numCoroutineThreads;
130  std::vector<CoroContextPtr<std::vector<RET>>> asyncResults;
131  asyncResults.reserve(numCoroutineThreads);
132 
133  // Post unto all the coroutine threads.
134  for (size_t i = 0; i < numCoroutineThreads; ++i)
135  {
136  //get the begin and end iterators for each batch
137  size_t batchSize = (i < remainder) ? numPerBatch + 1 : numPerBatch;
138  if (!batchSize)
139  {
140  break; //nothing to do
141  }
142  asyncResults.emplace_back(ctx->template post<std::vector<RET>>([inputIt, batchSize, &func](CoroContextPtr<std::vector<RET>> ctx)->int
143  {
144  std::vector<RET> result;
145  result.reserve(batchSize);
146  auto it = inputIt;
147  for (size_t j = 0; j < batchSize; ++j, ++it)
148  {
149  result.emplace_back(func(*it));
150  }
151  return ctx->set(std::move(result));
152  }));
153  std::advance(inputIt, batchSize);
154  }
155  return ctx->set(FutureJoiner<std::vector<RET>>()(*ctx, std::move(asyncResults))->get(ctx));
156 }
157 
158 template <class KEY,
159  class MAPPED_TYPE,
160  class REDUCED_TYPE,
161  class INPUT_IT>
162 int Util::mapReduceCoro(CoroContextPtr<std::map<KEY, REDUCED_TYPE>> ctx,
163  INPUT_IT inputIt,
164  size_t num,
167 {
168  // Typedefs
169  using MappedResult = std::pair<KEY, MAPPED_TYPE>;
170  using MapperOutput = std::vector<MappedResult>;
171  using IndexerInput = std::vector<MapperOutput>;
172  using IndexerOutput = std::map<KEY, std::vector<MAPPED_TYPE>>;
173  using ReducedResult = std::pair<KEY, REDUCED_TYPE>;
174  using ReducedResults = std::vector<ReducedResult>;
175  using ReducerOutput = std::map<KEY, REDUCED_TYPE>;
176 
177  // Map stage
178  IndexerInput indexerInput = ctx->template forEach<MapperOutput>(inputIt, num, mapper)->get(ctx);
179 
180  // Index stage
181  IndexerOutput indexerOutput;
182  for (auto&& mapperOutput : indexerInput)
183  {
184  for (auto&& mapperResult : mapperOutput) {
185  indexerOutput[std::move(mapperResult.first)].emplace_back(std::move(mapperResult.second));
186  }
187  }
188 
189  // Reduce stage
190  ReducedResults reducedResults = ctx->template forEach<ReducedResult>
191  (indexerOutput.begin(), indexerOutput.size(), reducer)->get(ctx);
192 
193  ReducerOutput reducerOutput;
194  for (auto&& reducedResult : reducedResults)
195  {
196  reducerOutput.emplace(std::move(reducedResult.first), std::move(reducedResult.second));
197  }
198 
199  return ctx->set(std::move(reducerOutput));
200 }
201 
202 template <class KEY,
203  class MAPPED_TYPE,
204  class REDUCED_TYPE,
205  class INPUT_IT>
206 int Util::mapReduceBatchCoro(CoroContextPtr<std::map<KEY, REDUCED_TYPE>> ctx,
207  INPUT_IT inputIt,
208  size_t num,
211 {
212  // Typedefs
213  using MappedResult = std::pair<KEY, MAPPED_TYPE>;
214  using MapperOutput = std::vector<MappedResult>;
215  using IndexerInput = std::vector<std::vector<MapperOutput>>;
216  using IndexerOutput = std::map<KEY, std::vector<MAPPED_TYPE>>;
217  using ReducedResult = std::pair<KEY, REDUCED_TYPE>;
218  using ReducedResults = std::vector<std::vector<ReducedResult>>;
219  using ReducerOutput = std::map<KEY, REDUCED_TYPE>;
220 
221  // Map stage
222  IndexerInput indexerInput = ctx->template forEachBatch<MapperOutput>(inputIt, num, mapper)->get(ctx);
223 
224  // Index stage
225  IndexerOutput indexerOutput;
226  for (auto&& partialMapOutput : indexerInput)
227  {
228  for (auto&& mapperOutput : partialMapOutput)
229  {
230  for (auto&& mapperResult : mapperOutput) {
231  indexerOutput[std::move(mapperResult.first)].emplace_back(std::move(mapperResult.second));
232  }
233  }
234  }
235 
236  // Reduce stage
237  ReducedResults reducedResults = ctx->template forEachBatch<ReducedResult>
238  (indexerOutput.begin(), indexerOutput.size(), reducer)->get(ctx);
239 
240  ReducerOutput reducerOutput;
241  for (auto&& partialReducedResult : reducedResults)
242  {
243  for (auto&& reducedResult : partialReducedResult)
244  {
245  reducerOutput.emplace(std::move(reducedResult.first), std::move(reducedResult.second));
246  }
247  }
248 
249  return ctx->set(std::move(reducerOutput));
250 }
251 
252 #ifdef __QUANTUM_PRINT_DEBUG
253 std::mutex& Util::LogMutex()
254 {
255  static std::mutex m;
256  return m;
257 }
258 #endif
259 
260 }}
int bindCoro(Traits::Yield &yield, std::shared_ptr< CoroContext< RET >> ctx, CAPTURE &&capture)
Definition: quantum_util_impl.h:27
Definition: quantum_buffer_impl.h:22
int bindIo(std::shared_ptr< Promise< RET >> promise, CAPTURE &&capture)
Definition: quantum_util_impl.h:59
Exposes methods to manipulate the coroutine context.
Definition: quantum_icoro_context.h:38
typename ICoroContext< RET >::Ptr CoroContextPtr
Definition: quantum_icoro_context.h:479
Class representing a promised value.
Definition: quantum_icoro_promise.h:77
std::function< RET(const typename std::iterator_traits< INPUT_IT >::value_type &)> ForEachFunc
Definition: quantum_functions.h:34
std::function< std::vector< std::pair< KEY, MAPPED_TYPE > >(const typename std::iterator_traits< INPUT_IT >::value_type &)> MapFunc
Definition: quantum_functions.h:37
Utility class that joins N futures into a single one.
Definition: quantum_future_joiner.h:39
Concrete class representing a coroutine or a thread context.
Definition: quantum_icoro_context.h:29
static Function< int(Traits::Yield &)> bindCaller(std::shared_ptr< Context< RET >> ctx, FUNC &&func0, ARGS &&...args0)
Definition: quantum_util_impl.h:88
static int forEachBatchCoro(CoroContextPtr< std::vector< std::vector< RET >>> ctx, INPUT_IT inputIt, size_t num, const Functions::ForEachFunc< RET, INPUT_IT > &func, size_t numCoroutineThreads)
Definition: quantum_util_impl.h:122
typename BoostCoro::pull_type Yield
Definition: quantum_traits.h:58
std::function< std::pair< KEY, REDUCED_TYPE >(std::pair< KEY, std::vector< MAPPED_TYPE > > &&)> ReduceFunc
Definition: quantum_functions.h:40
Similar implementation to std::function except that it allows capture of non-copyable types.
Definition: quantum_capture.h:62
static int forEachCoro(CoroContextPtr< std::vector< RET >> ctx, INPUT_IT inputIt, size_t num, const Functions::ForEachFunc< RET, INPUT_IT > &func)
Definition: quantum_util_impl.h:103
static Function< int()> bindIoCaller(std::shared_ptr< Promise< RET >> promise, FUNC &&func0, ARGS &&...args0)
Definition: quantum_util_impl.h:96
Capture< FUNC, ARGS... > makeCapture(FUNC &&func, ARGS &&... args)
Definition: quantum_capture_impl.h:42