btllib
 All Classes Namespaces Functions Variables
seq_reader.hpp
1 #ifndef BTLLIB_SEQ_READER_HPP
2 #define BTLLIB_SEQ_READER_HPP
3 
4 #include "btllib/cstring.hpp"
5 #include "btllib/data_stream.hpp"
6 #include "btllib/order_queue.hpp"
7 #include "btllib/seq.hpp"
8 #include "btllib/seq_reader_fasta_module.hpp"
9 #include "btllib/seq_reader_fastq_module.hpp"
10 #include "btllib/seq_reader_multiline_fasta_module.hpp"
11 #include "btllib/seq_reader_multiline_fastq_module.hpp"
12 #include "btllib/seq_reader_sam_module.hpp"
13 #include "btllib/status.hpp"
14 
15 #include <atomic>
16 #include <cctype>
17 #include <condition_variable>
18 #include <cstdio>
19 #include <cstdlib>
20 #include <cstring>
21 #include <limits>
22 #include <memory>
23 #include <mutex>
24 #include <stack>
25 #include <string>
26 #include <thread>
27 #include <vector>
28 
29 namespace btllib {
30 
41 class SeqReader
42 {
43 public:
44  /* Has to be a struct and not an enum because:
45  * 1) Non-class enums are not name qualified and can collide
46  * 2) class enums can't be implicitly converted into integers
47  */
48  struct Flag
49  {
51  static const unsigned FOLD_CASE = 1;
54  static const unsigned TRIM_MASKED = 2;
56  static const unsigned SHORT_MODE = 4;
58  static const unsigned LONG_MODE = 8;
59  };
60 
69  SeqReader(const std::string& source_path,
70  unsigned flags,
71  unsigned threads = 3);
72 
73  SeqReader(const SeqReader&) = delete;
74  SeqReader(SeqReader&&) = delete;
75 
76  SeqReader& operator=(const SeqReader&) = delete;
77  SeqReader& operator=(SeqReader&&) = delete;
78 
79  ~SeqReader();
80 
81  void close() noexcept;
82 
83  bool fold_case() const { return bool(flags & Flag::FOLD_CASE); }
84  bool trim_masked() const { return bool(flags & Flag::TRIM_MASKED); }
85  bool short_mode() const { return bool(flags & Flag::SHORT_MODE); }
86  bool long_mode() const { return bool(flags & Flag::LONG_MODE); }
87 
88  enum class Format
89  {
90  UNDETERMINED,
91  FASTA,
92  FASTQ,
93  SAM,
94  INVALID
95  };
96 
97  friend std::ostream& operator<<(std::ostream& os, const Format f)
98  {
99  return os << static_cast<int32_t>(f);
100  }
101 
102  Format get_format() const { return format; }
103 
104  struct Record
105  {
106  size_t num = std::numeric_limits<size_t>::max();
107  std::string id;
108  std::string comment;
109  std::string seq;
110  std::string qual;
111 
112  operator bool() const { return !seq.empty(); }
113  };
114 
116  Record read();
117 
119  OrderQueueMPMC<Record>::Block read_block();
120 
121  static const size_t MAX_SIMULTANEOUS_SEQREADERS = 256;
122 
124  class RecordIterator
126  {
127  public:
128  void operator++() { record = reader.read(); }
129  bool operator!=(const RecordIterator& i)
130  {
131  return bool(record) || bool(i.record);
132  }
133  Record operator*() { return std::move(record); }
134  // For wrappers
135  Record next()
136  {
137  auto val = operator*();
138  operator++();
139  return val;
140  }
141 
142  private:
143  friend SeqReader;
144 
145  RecordIterator(SeqReader& reader, bool end)
146  : reader(reader)
147  {
148  if (!end) {
149  operator++();
150  }
151  }
152 
153  SeqReader& reader;
154  Record record;
155  };
157 
158  RecordIterator begin() { return RecordIterator(*this, false); }
159  RecordIterator end() { return RecordIterator(*this, true); }
160 
161  size_t get_buffer_size() const { return buffer_size; }
162  size_t get_block_size() const { return block_size; }
163 
164  static const size_t SHORT_MODE_BUFFER_SIZE = 32;
165  static const size_t SHORT_MODE_BLOCK_SIZE = 32;
166 
167  static const size_t LONG_MODE_BUFFER_SIZE = 4;
168  static const size_t LONG_MODE_BLOCK_SIZE = 1;
169 
170  static const size_t FORMAT_BUFFER_SIZE = 16384;
171 
172 private:
173  struct Buffer
174  {
175 
176  Buffer()
177  : data(FORMAT_BUFFER_SIZE)
178  {
179  }
180 
181  std::vector<char> data;
182  size_t start = 0;
183  size_t end = 0;
184  bool eof_newline_inserted = false;
185  };
186 
187  struct RecordCString
188  {
189  CString header;
190  CString seq;
191  CString qual;
192  };
193 
194  const std::string& source_path;
195  DataSource source;
196  const unsigned flags;
197  const unsigned threads;
198  Format format = Format::UNDETERMINED; // Format of the source file
199  std::atomic<bool> closed{ false };
200  Buffer buffer;
201  std::unique_ptr<std::thread> reader_thread;
202  std::vector<std::unique_ptr<std::thread>> processor_threads;
203  std::mutex format_mutex;
204  std::condition_variable format_cv;
205  std::atomic<bool> reader_end{ false };
206  RecordCString* reader_record = nullptr;
207  const std::atomic<size_t> buffer_size;
208  const std::atomic<size_t> block_size;
209  OrderQueueSPMC<RecordCString> cstring_queue;
210  OrderQueueMPMC<Record> output_queue;
211  std::atomic<size_t> dummy_block_num{ 0 };
212  const long id;
213 
214  // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
215  thread_local static std::unique_ptr<decltype(output_queue)::Block>
216  // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
217  ready_records_array[MAX_SIMULTANEOUS_SEQREADERS];
218 
219  // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
220  thread_local static long ready_records_owners[MAX_SIMULTANEOUS_SEQREADERS];
221 
222  // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
223  thread_local static size_t ready_records_current[MAX_SIMULTANEOUS_SEQREADERS];
224 
225  // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
226  static std::atomic<long> last_id;
227 
228  bool load_buffer();
229  void determine_format();
230  void start_reader();
231  void start_processors();
232 
233  CString tmp;
234  bool readline_buffer_append(CString& s);
235  static void readline_file(CString& s, FILE* f);
236  void readline_file_append(CString& s, FILE* f);
237  static bool file_at_end(FILE* f);
238  int getc_buffer();
239  int ungetc_buffer(int c);
240 
241  void update_cstring_records(OrderQueueSPMC<RecordCString>::Block& records,
242  size_t& counter);
243 
245  template<typename Module>
246  void read_from_buffer(Module& module,
247  OrderQueueSPMC<RecordCString>::Block& records,
248  size_t& counter);
249 
250  template<typename Module>
251  void read_transition(Module& module,
252  OrderQueueSPMC<RecordCString>::Block& records,
253  size_t& counter);
254 
255  template<typename Module>
256  void read_from_file(Module& module,
257  OrderQueueSPMC<RecordCString>::Block& records,
258  size_t& counter);
260 
261  friend class SeqReaderFastaModule;
262  SeqReaderFastaModule fasta_module;
263 
264  friend class SeqReaderMultilineFastaModule;
265  SeqReaderMultilineFastaModule multiline_fasta_module;
266 
267  friend class SeqReaderFastqModule;
268  SeqReaderFastqModule fastq_module;
269 
270  friend class SeqReaderMultilineFastqModule;
271  SeqReaderMultilineFastqModule multiline_fastq_module;
272 
273  friend class SeqReaderSamModule;
274  SeqReaderSamModule sam_module;
275 
276  int module_in_use = 0;
277 
278  void postprocess();
279 };
280 
281 template<typename Module>
282 inline void
283 SeqReader::read_from_buffer(Module& module,
284  OrderQueueSPMC<RecordCString>::Block& records,
285  size_t& counter)
286 {
287  while (!reader_end) {
288  reader_record = &(records.data[records.count]);
289  if (!module.read_buffer(*this, *reader_record) ||
290  reader_record->seq.empty()) {
291  break;
292  }
293  update_cstring_records(records, counter);
294  }
295 }
296 
297 template<typename Module>
298 inline void
299 SeqReader::read_transition(Module& module,
300  OrderQueueSPMC<RecordCString>::Block& records,
301  size_t& counter)
302 {
303  if (!reader_end) {
304  reader_record = &(records.data[records.count]);
305  module.read_transition(*this, *reader_record);
306  if (!reader_record->seq.empty()) {
307  update_cstring_records(records, counter);
308  }
309  } else if (reader_record != nullptr && !reader_record->seq.empty()) {
310  update_cstring_records(records, counter);
311  }
312 }
313 
314 template<typename Module>
315 inline void
316 SeqReader::read_from_file(Module& module,
317  OrderQueueSPMC<RecordCString>::Block& records,
318  size_t& counter)
319 {
320  while (!reader_end) {
321  reader_record = &(records.data[records.count]);
322  if (!module.read_file(*this, *reader_record) ||
323  reader_record->seq.empty()) {
324  break;
325  }
326  update_cstring_records(records, counter);
327  }
328 }
329 
330 } // namespace btllib
331 
332 #endif
static const unsigned LONG_MODE
Definition: seq_reader.hpp:58
RecordIterator begin()
Definition: seq_reader.hpp:158
Definition: seq_reader.hpp:41
static const unsigned FOLD_CASE
Definition: seq_reader.hpp:51
OrderQueueMPMC< Record >::Block read_block()
static const unsigned SHORT_MODE
Definition: seq_reader.hpp:56
Definition: seq_reader.hpp:104
Definition: seq_reader.hpp:48
static const unsigned TRIM_MASKED
Definition: seq_reader.hpp:54
SeqReader(const std::string &source_path, unsigned flags, unsigned threads=3)