btllib
 All Classes Namespaces Functions Variables
seq_reader_sam_module.hpp
1 #ifndef BTLLIB_SEQ_READER_SAM_MODULE_HPP
2 #define BTLLIB_SEQ_READER_SAM_MODULE_HPP
3 
4 #include "btllib/cstring.hpp"
5 #include "btllib/process_pipeline.hpp"
6 #include "btllib/seq.hpp"
7 #include "btllib/status.hpp"
8 
9 #include <cstdio>
10 #include <memory>
11 #include <thread>
12 
13 namespace btllib {
14 
16 class SeqReaderSamModule
17 {
18 
19 private:
20  friend class SeqReader;
21 
22  enum class Stage
23  {
24  HEADER,
25  ALIGNMENTS
26  };
27 
28  ~SeqReaderSamModule()
29  {
30  if (loader_thread) {
31  loader_thread->join();
32  }
33  }
34 
35  std::unique_ptr<ProcessPipeline> samtools_process;
36  std::unique_ptr<std::thread> loader_thread;
37  CString tmp;
38  static const size_t LOADER_BLOCK_SIZE = 4096;
39 
40  static bool buffer_valid(const char* buffer, size_t size);
41  template<typename ReaderType, typename RecordType>
42  bool read_buffer(ReaderType& reader, RecordType& record);
43  template<typename ReaderType, typename RecordType>
44  bool read_transition(ReaderType& reader, RecordType& record);
45  template<typename ReaderType, typename RecordType>
46  bool read_file(ReaderType& reader, RecordType& record);
47 };
48 
49 template<typename ReaderType, typename RecordType>
50 inline bool
51 SeqReaderSamModule::read_buffer(ReaderType& reader, RecordType& record)
52 {
53  (void)reader;
54  (void)record;
55  {
56  const ProcessPipeline version_test(
57  "samtools --version 2>/dev/stdout | head -n2");
58  char* line = nullptr;
59  size_t n = 0;
60  std::string version = "\n";
61 
62  check_error(getline(&line, &n, version_test.out) < 2,
63  "Failed to get samtools version.");
64  version += line;
65 
66  line = nullptr;
67  check_error(getline(&line, &n, version_test.out) < 2,
68  "Failed to get samtools version.");
69  version += line;
70  version.pop_back();
71 
72  log_info(version);
73  }
74  samtools_process =
75  std::unique_ptr<ProcessPipeline>( // NOLINT(modernize-make-unique)
76  new ProcessPipeline("samtools fastq"));
77  loader_thread =
78  std::unique_ptr<std::thread>(new std::thread([this, &reader]() {
79  check_error(fwrite(reader.buffer.data.data() + reader.buffer.start,
80  1,
81  reader.buffer.end - reader.buffer.start,
82  samtools_process->in) !=
83  reader.buffer.end - reader.buffer.start,
84  "SeqReader SAM module: fwrite failed.");
85  reader.buffer.start = reader.buffer.end;
86  if (std::ferror(reader.source) == 0 && std::feof(reader.source) == 0) {
87  const auto p = std::fgetc(reader.source);
88  if (p != EOF) {
89  const auto ret = std::ungetc(p, reader.source);
90  check_error(ret == EOF, "SeqReaderSamModule: ungetc failed.");
91  while (std::ferror(reader.source) == 0 &&
92  std::feof(reader.source) == 0) {
93  const size_t bufsize = LOADER_BLOCK_SIZE;
94  char buf[bufsize];
95  size_t bytes_read = fread(buf, 1, bufsize, reader.source);
96  check_error(fwrite(buf, 1, bytes_read, samtools_process->in) !=
97  bytes_read,
98  "SeqReader SAM module: fwrite failed.");
99  }
100  }
101  }
102  samtools_process->close_in();
103  }));
104  return false;
105 }
106 
107 template<typename ReaderType, typename RecordType>
108 inline bool
109 SeqReaderSamModule::read_transition(ReaderType& reader, RecordType& record)
110 {
111  (void)reader;
112  (void)record;
113  return false;
114 }
115 
116 template<typename ReaderType, typename RecordType>
117 inline bool
118 SeqReaderSamModule::read_file(ReaderType& reader, RecordType& record)
119 {
120  if (!reader.file_at_end(samtools_process->out)) {
121  reader.readline_file(record.header, samtools_process->out);
122  reader.readline_file(record.seq, samtools_process->out);
123  reader.readline_file(tmp, samtools_process->out);
124  reader.readline_file(record.qual, samtools_process->out);
125  return true;
126  }
127  return false;
128 }
130 
131 } // namespace btllib
132 
133 #endif
void check_error(bool condition, const std::string &msg)
void log_info(const std::string &msg)