QuantumLibrary
quantum_sequencer.h
1 /*
2 ** Copyright 2018 Bloomberg Finance L.P.
3 **
4 ** Licensed under the Apache License, Version 2.0 (the "License");
5 ** you may not use this file except in compliance with the License.
6 ** You may obtain a copy of the License at
7 **
8 ** http://www.apache.org/licenses/LICENSE-2.0
9 **
10 ** Unless required by applicable law or agreed to in writing, software
11 ** distributed under the License is distributed on an "AS IS" BASIS,
12 ** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 ** See the License for the specific language governing permissions and
14 ** limitations under the License.
15 */
16 #ifndef BLOOMBERG_QUANTUM_SEQUENCER_H
17 #define BLOOMBERG_QUANTUM_SEQUENCER_H
18 
19 #include <quantum/quantum_dispatcher.h>
20 #include <quantum/interface/quantum_ithread_context_base.h>
21 #include <quantum/util/quantum_sequencer_configuration.h>
22 #include <quantum/util/quantum_sequence_key_statistics.h>
23 #include <vector>
24 #include <unordered_map>
25 
26 namespace Bloomberg {
27 namespace quantum {
28 
29 //==============================================================================================
30 // class Sequencer
31 //==============================================================================================
38 
39 template <class SequenceKey,
40  class Hash = std::hash<SequenceKey>,
41  class KeyEqual = std::equal_to<SequenceKey>,
42  class Allocator = std::allocator<std::pair<const SequenceKey, SequenceKeyData>>>
43 class Sequencer
44 {
45 public:
48 
52  Sequencer(Dispatcher& dispatcher, const Configuration& configuration = Configuration());
53 
67  template <class FUNC, class ... ARGS>
68  void
69  post(const SequenceKey& sequenceKey, FUNC&& func, ARGS&&... args);
70 
94  template <class FUNC, class ... ARGS>
95  void
96  post(void* opaque, int queueId, bool isHighPriority, const SequenceKey& sequenceKey, FUNC&& func, ARGS&&... args);
97 
111  template <class FUNC, class ... ARGS>
112  void
113  post(const std::vector<SequenceKey>& sequenceKeys, FUNC&& func, ARGS&&... args);
114 
138  template <class FUNC, class ... ARGS>
139  void
140  post(void* opaque,
141  int queueId,
142  bool isHighPriority,
143  const std::vector<SequenceKey>& sequenceKeys,
144  FUNC&& func,
145  ARGS&&... args);
146 
159  template <class FUNC, class ... ARGS>
160  void
161  postAll(FUNC&& func, ARGS&&... args);
162 
185  template <class FUNC, class ... ARGS>
186  void
187  postAll(void* opaque, int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args);
188 
194  size_t trimSequenceKeys();
195 
199  size_t getSequenceKeyCount();
200 
205  SequenceKeyStatistics getStatistics(const SequenceKey& sequenceKey);
206 
211 
217 
218 private:
219  using ContextMap = std::unordered_map<SequenceKey, SequenceKeyData, Hash, KeyEqual, Allocator>;
220  using ExceptionCallback = typename Configuration::ExceptionCallback;
221 
222  template <class FUNC, class ... ARGS>
223  static int waitForTwoDependents(CoroContextPtr<int> ctx,
224  void* opaque,
225  Sequencer& sequencer,
226  SequenceKeyData&& dependent,
227  SequenceKeyData&& universalDependent,
228  FUNC&& func,
229  ARGS&&... args);
230  template <class FUNC, class ... ARGS>
231  static int waitForDependents(CoroContextPtr<int> ctx,
232  void* opaque,
233  Sequencer& sequencer,
234  std::vector<SequenceKeyData>&& dependents,
235  SequenceKeyData&& universalDependent,
236  FUNC&& func,
237  ARGS&&... args);
238  template <class FUNC, class ... ARGS>
239  static int waitForUniversalDependent(CoroContextPtr<int> ctx,
240  void* opaque,
241  Sequencer& sequencer,
242  std::vector<SequenceKeyData>&& dependents,
243  SequenceKeyData&& universalDependent,
244  FUNC&& func,
245  ARGS&&... args);
246  template <class FUNC, class ... ARGS>
247  static int singleSequenceKeyTaskScheduler(
249  void* opaque,
250  int queueId,
251  bool isHighPriority,
252  Sequencer& sequencer,
253  SequenceKey&& sequenceKey,
254  FUNC&& func,
255  ARGS&&... args);
256  template <class FUNC, class ... ARGS>
257  static int multiSequenceKeyTaskScheduler(
259  void* opaque,
260  int queueId,
261  bool isHighPriority,
262  Sequencer& sequencer,
263  std::vector<SequenceKey>&& sequenceKeys,
264  FUNC&& func,
265  ARGS&&... args);
266  template <class FUNC, class ... ARGS>
267  static int universalTaskScheduler(
269  void* opaque,
270  int queueId,
271  bool isHighPriority,
272  Sequencer& sequencer,
273  FUNC&& func,
274  ARGS&&... args);
275  template <class FUNC, class ... ARGS>
276  static void callPosted(CoroContextPtr<int> ctx,
277  void* opaque,
278  const Sequencer& sequencer,
279  FUNC&& func,
280  ARGS&&... args);
281 
282  static bool canTrimContext(const ICoroContextBasePtr& ctx,
283  const ICoroContextBasePtr& ctxToValidate);
284  static bool isPendingContext(const ICoroContextBasePtr& ctx,
285  const ICoroContextBasePtr& ctxToValidate);
286 
287  Dispatcher& _dispatcher;
288  int _controllerQueueId;
289  SequenceKeyData _universalContext;
290  ContextMap _contexts;
291  ExceptionCallback _exceptionCallback;
292  std::shared_ptr<SequenceKeyStatisticsWriter> _taskStats;
293 };
294 
295 }}
296 
297 #include <quantum/util/impl/quantum_sequencer_impl.h>
298 
299 #endif //BLOOMBERG_QUANTUM_SEQUENCER_H
Sequencer(Dispatcher &dispatcher, const Configuration &configuration=Configuration())
Constructor.
Definition: quantum_sequencer_impl.h:28
SequenceKeyStatistics getTaskStatistics()
Gets the sequencer statistics for all jobs.
Definition: quantum_sequencer_impl.h:226
Definition: quantum_buffer_impl.h:22
SequenceKeyStatistics getStatistics()
Gets the sequencer statistics for the 'universal key', a.k.a. posted via postAll() method.
Definition: quantum_sequencer_impl.h:219
size_t trimSequenceKeys()
Trims the sequence keys not used by the sequencer anymore.
Definition: quantum_sequencer_impl.h:184
void post(const SequenceKey &sequenceKey, FUNC &&func, ARGS &&... args)
Post a coroutine to run asynchronously.
Definition: quantum_sequencer_impl.h:49
typename ICoroContext< RET >::Ptr CoroContextPtr
Definition: quantum_icoro_context.h:479
Implementation of a configuration class for Sequencer.
Definition: quantum_sequencer_configuration.h:41
ICoroContextBase::Ptr ICoroContextBasePtr
Definition: quantum_icoro_context_base.h:79
Parallel execution engine used to run coroutines or IO tasks asynchronously. This class is the main e...
Definition: quantum_dispatcher.h:34
Implementation of a statistics collection for a SequenceKey in Sequencer.
Definition: quantum_sequence_key_statistics.h:32
Definition: quantum_sequencer_configuration_impl.h:28
std::function< void(std::exception_ptr exception, void *opaque)> ExceptionCallback
Callback for unhandled exceptions in tasks posted to Sequencer.
Definition: quantum_sequencer_configuration.h:48
size_t getSequenceKeyCount()
Gets the number of tracked sequence keys.
Definition: quantum_sequencer_impl.h:233
SequencerConfiguration< SequenceKey, Hash, KeyEqual, Allocator > Configuration
Configuration class for Sequencer.
Definition: quantum_sequencer.h:47
void postAll(FUNC &&func, ARGS &&... args)
Post a coroutine to run asynchronously.
Definition: quantum_sequencer_impl.h:144
Implementation of a key-based task sequencing with quantum.
Definition: quantum_sequencer.h:43