bulkio_base.h
Go to the documentation of this file.
1 /*
2  * This file is protected by Copyright. Please refer to the COPYRIGHT file
3  * distributed with this source distribution.
4  *
5  * This file is part of REDHAWK bulkioInterfaces.
6  *
7  * REDHAWK bulkioInterfaces is free software: you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as published by the
9  * Free Software Foundation, either version 3 of the License, or (at your
10  * option) any later version.
11  *
12  * REDHAWK bulkioInterfaces is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
15  * for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this program. If not, see http://www.gnu.org/licenses/.
19  */
20 
21 #ifndef __bulkio_base_h
22 #define __bulkio_base_h
23 
24 #include <queue>
25 #include <list>
26 #include <vector>
27 #include <set>
28 #include <boost/thread/condition_variable.hpp>
29 #include <boost/thread/locks.hpp>
30 #include <boost/thread/locks.hpp>
31 #include <ossie/debug.h>
32 #include <ossie/BULKIO/bio_runtimeStats.h>
33 #include <ossie/BULKIO/bulkioDataTypes.h>
34 #include "ossie/Autocomplete.h"
35 
36 
37 namespace bulkio {
38 
39 
40  //
41  // helper class to manage queue depth and back pressure
42  //
43  class queueSemaphore;
44 
45  //
46  // ConnectionList container, (allows for template typedef defs to occur)
47  //
48  template < typename T > class Connections {
49  private:
50  Connections(void) {};
51 
52  public:
53 
54  /*
55  * use end point definition for more descriptive error messages
56  struct EndPoint {
57  T bio_port;
58  std::string cid;
59  PT cf_port;
60  };
61 
62  typedef typename std::vector< EndPoint > List;
63  */
64 
65  typedef typename std::vector< std::pair< T, std::string > > List;
66 
67  };
68 
69  //
70  // Mapping of Stream IDs to SRI objects
71  //
72  typedef std::map< std::string, std::pair< BULKIO::StreamSRI, bool > > SriMap;
73 
74  typedef std::vector< BULKIO::StreamSRI > SriList;
75 
76  //
77  // Tracks an SRI and which connections have the most recent version
78  //
79  struct SriMapStruct {
80  BULKIO::StreamSRI sri;
81  std::set<std::string> connections;
82 
83  SriMapStruct( const BULKIO::StreamSRI &in_sri ) {
84  sri = in_sri;
85  };
86 
87  SriMapStruct( const SriMapStruct &src ) {
88  sri = src.sri;
89  connections = src.connections;
90  };
91  };
92 
93 
94  // Standard struct property for multi-out support
97  {
98  };
99 
100  static std::string getId() {
101  return std::string("connection_descriptor");
102  };
103 
104  std::string connection_id;
105  std::string stream_id;
106  std::string port_name;
107  };
108 
109  //
110  // Listing of Stream IDs for searching
111  //
112  typedef std::list < std::string > StreamIDList;
113 
114  //
115  // Common Name for Mutex construct used by port classes
116  //
117  typedef boost::mutex MUTEX;
118 
119  //
120  // Common Name for Condition construct used by port classes
121  //
122  typedef boost::condition_variable CONDITION;
123 
124  //
125  // Auto lock/unlock of mutex objects based on language scope
126  //
127  typedef boost::unique_lock< boost::mutex > UNIQUE_LOCK;
128 
129  //
130  // Auto lock/unlock of mutex objects based on language scope
131  //
132  typedef boost::mutex::scoped_lock SCOPED_LOCK;
133 
134 
135  //
136  // Logging interface definition
137  //
138  typedef LOGGER LOGGER_PTR;
139 
140 
141  //
142  // Base Types used by Ports
143  //
144  typedef char Char;
145  typedef int8_t Int8;
146  typedef uint8_t UInt8;
147  typedef int16_t Int16;
148  typedef uint16_t UInt16;
149 
150  typedef int32_t Int32;
151  typedef uint32_t UInt32;
152 
153  typedef int64_t Int64;
154  typedef uint64_t UInt64;
155  typedef float Float;
156  typedef double Double;
157 
158  //
159  // helper class for port statistics
160  //
162  {
163  public:
164 
165  linkStatistics( std::string &portName, const int nbytes=1 );
166 
167  linkStatistics();
168 
169  virtual ~linkStatistics() {};
170 
171  virtual void setEnabled(bool enableStats);
172 
173  virtual void setBitSize( double bitSize );
174 
175  virtual void update(unsigned int elementsReceived, float queueSize, bool EOS, const std::string &streamID, bool flush = false);
176 
177  StreamIDList getActiveStreamIDs(){return activeStreamIDs;};
178 
179  virtual BULKIO::PortStatistics retrieve();
180 
181  protected:
182 
183  struct statPoint {
184  unsigned int elements;
185  float queueSize;
186  double secs;
187  double usecs;
188  };
189 
190  std::string portName;
191  bool enabled;
192  int nbytes;
193  double bitSize;
194  BULKIO::PortStatistics runningStats;
195  std::vector< statPoint > receivedStatistics;
196  StreamIDList activeStreamIDs;
197  unsigned long historyWindow;
199 
200  double flush_sec; // track time since last queue flush happened
201  double flush_usec; // track time since last queue flush happened
202  };
203 
204 
206 
207  public:
208  queueSemaphore(unsigned int initialMaxValue);
209 
210  void release();
211 
212  void setMaxValue(unsigned int newMaxValue);
213 
214  unsigned int getMaxValue(void);
215 
216  void setCurrValue(unsigned int newValue);
217 
218  void incr();
219 
220  void decr();
221 
222  private:
223  unsigned int maxValue;
224  unsigned int currValue;
225  MUTEX mutex;
226  CONDITION condition;
227 
228  };
229 
230  namespace Const {
231 
232  //
233  // Maximum transfer size for middleware
234  //
235  const uint64_t MAX_TRANSFER_BYTES = omniORB::giopMaxMsgSize();
236 
237  //
238  // Constant that defines if retrieval of data from a port's queue will NOT block
239  //
240  const float NON_BLOCKING = 0.0;
241 
242  //
243  // Constant that defines if retrieval of data from a ports's queue will BLOCK
244  //
245  const float BLOCKING = -1.0;
246 
247  inline uint64_t MaxTransferBytes() { return omniORB::giopMaxMsgSize(); };
248 
249  };
250 
251 
252  /*
253  *
254  * Time Stamp Helpers
255  *
256  */
257  namespace time {
258 
259  /*
260  * PrecisionUTCTime object as defined by bulkio_dataTypes.idl, definition provided for information only
261  *
262  * Time code modes
263  *
264  const short TCM_OFF = 0;
265  const short TCM_CPU = 1;
266  const short TCM_ZTC = 2;
267  const short TCM_SDN = 3;
268  const short TCM_SMS = 4;
269  const short TCM_DTL = 5;
270  const short TCM_IRB = 6;
271  const short TCM_SDDS = 7;
272 
273  struct PrecisionUTCTime {
274  short tcmode; timecode mode
275  short tcstatus; timecode status
276  double toff; Fractional sample offset
277  double twsec; J1970 GMT
278  double tfsec; 0.0 to 1.0
279 
280  };
281 
282  **/
283 
284  namespace utils {
285 
286  /*
287  * Create a time stamp object from the provided input...
288  */
289  BULKIO::PrecisionUTCTime create( const double wholeSecs=-1.0, const double fractionalSecs=-1.0, const Int16 tsrc= BULKIO::TCM_CPU );
290 
291  /*
292  * Create a time stamp object from the current time of day reported by the system
293  */
294  BULKIO::PrecisionUTCTime now();
295 
296  /*
297  * Create a time stamp object from the current time of day reported by the system
298  */
299  BULKIO::PrecisionUTCTime notSet();
300 
301  /*
302  * Return a new time stamp object which increments a given time stamp by numSamples*xdelta seconds
303  */
304  BULKIO::PrecisionUTCTime addSampleOffset( const BULKIO::PrecisionUTCTime &T, const size_t numSamples, const double xdelta );
305 
306  /*
307  * Adjust the whole and fractional portions of a time stamp object to
308  * ensure there is no fraction in the whole seconds, and vice-versa
309  */
310  void normalize(BULKIO::PrecisionUTCTime& time);
311  };
312 
313 
314  /*
315  * A default time stamp comparison method
316  */
317  bool DefaultComparator( const BULKIO::PrecisionUTCTime &a, const BULKIO::PrecisionUTCTime &b);
318 
319  /*
320  * Method signature for comparing time stamp objects
321  */
322  typedef bool (*Compare)( const BULKIO::PrecisionUTCTime &a, const BULKIO::PrecisionUTCTime &b);
323 
324  };
325 
326 
327  /*
328  * StreamSRI
329  *
330  * Convenience routines for building and working with StreamSRI objects
331  *
332  */
333  namespace sri {
334 
335  /*
336  StreamSRI object as defined by bulkio_dataTypes.idl, definition provided for information only
337 
338  struct StreamSRI {
339  long hversion; version of the StreamSRI header
340  double xstart; start time of the stream
341  double xdelta; delta between two samples
342  short xunits; unit types from Platinum specification; common codes defined above
343  long subsize; 0 if the data is one dimensional; > 0 if two dimensional
344  double ystart; start of second dimension
345  double ydelta; delta between two samples of second dimension
346  short yunits; unit types from Platinum specification; common codes defined above
347  short mode; 0-Scalar, 1-Complex
348  string streamID; stream identifier
349  boolean blocking; flag to determine whether the receiving port should exhibit back pressure
350  sequence<CF::DataType> keywords; user defined keywords
351  };
352  */
353 
354  /*
355  * Comparator method to search for matching SRI information if "a" matches "b"
356  */
357  typedef bool (*Compare)( const BULKIO::StreamSRI &a, const BULKIO::StreamSRI &b);
358 
359  // Bit flags for SRI fields
360  enum {
361  NONE = 0,
362  HVERSION = (1<<0),
363  XSTART = (1<<1),
364  XDELTA = (1<<2),
365  XUNITS = (1<<3),
366  SUBSIZE = (1<<4),
367  YSTART = (1<<5),
368  YDELTA = (1<<6),
369  YUNITS = (1<<7),
370  MODE = (1<<8),
371  STREAMID = (1<<9),
372  BLOCKING = (1<<10),
373  KEYWORDS = (1<<11)
374  };
375 
376  /*
377  * Do a field-by-field comparison of two SRI streams
378  */
379  int compareFields(const BULKIO::StreamSRI& lhs, const BULKIO::StreamSRI& rhs);
380 
381  /*
382  * Default comparator method when comparing SRI objects
383  *
384  * Performs a member wise comparision of a StreamSRI object. In addition to performing
385  * this comparison, any additional key/value pairs will be compared. The key identifiers
386  * are compared in order, and their associated values are compared using the
387  * equivalency method of the REDHAWK framework compare_anys method.
388  */
389  bool DefaultComparator( const BULKIO::StreamSRI &a, const BULKIO::StreamSRI &b);
390 
391  /*
392  * Zeroize an SRI stream
393  */
394  inline void zeroSRI(BULKIO::StreamSRI &sri) {
395  sri.hversion = 1;
396  sri.xstart = 0.0;
397  sri.xdelta = 1.0;
398  sri.xunits = 1;
399  sri.subsize = 1;
400  sri.ystart = 0.0;
401  sri.ydelta = 1.0;
402  sri.yunits = 1;
403  sri.mode = 0;
404  sri.streamID = "";
405  sri.keywords.length(0);
406  };
407 
408  /*
409  * Zeroize a PrecisionUTCTime timestamp
410  */
411  inline void zeroTime(BULKIO::PrecisionUTCTime &timeTag) {
412  timeTag = bulkio::time::utils::notSet();
413  timeTag.tcmode = BULKIO::TCM_CPU;
414  };
415 
416  /*
417  * Create a SRI object with default parameters
418  */
419  BULKIO::StreamSRI create( std::string sid="defStream", const double srate = 1.0, const Int16 xunits = BULKIO::UNITS_TIME, const bool blocking=false );
420 
421 
422  };
423 
424 
425 } // end of bulkio namespace
426 
427 
428 #endif
bool DefaultComparator(const BULKIO::PrecisionUTCTime &a, const BULKIO::PrecisionUTCTime &b)
StreamIDList activeStreamIDs
Definition: bulkio_base.h:196
BULKIO::PortStatistics runningStats
Definition: bulkio_base.h:194
int32_t Int32
Definition: bulkio_base.h:150
Definition: bulkio_base.h:370
const float BLOCKING
Definition: bulkio_base.h:245
Definition: bulkio_out_port.h:39
bool enabled
Definition: bulkio_base.h:191
std::set< std::string > connections
Definition: bulkio_base.h:81
int nbytes
Definition: bulkio_base.h:192
Definition: bulkio_base.h:366
unsigned int elements
Definition: bulkio_base.h:184
std::string connection_id
Definition: bulkio_base.h:102
Definition: bulkio_base.h:368
int receivedStatistics_idx
Definition: bulkio_base.h:198
double Double
Definition: bulkio_base.h:156
bool(* Compare)(const BULKIO::StreamSRI &a, const BULKIO::StreamSRI &b)
Definition: bulkio_base.h:357
std::map< std::string, std::pair< BULKIO::StreamSRI, bool > > SriMap
Definition: bulkio_base.h:72
uint64_t UInt64
Definition: bulkio_base.h:154
BULKIO::PrecisionUTCTime create(const double wholeSecs=-1.0, const double fractionalSecs=-1.0, const Int16 tsrc=BULKIO::TCM_CPU)
const float NON_BLOCKING
Definition: bulkio_base.h:240
uint8_t UInt8
Definition: bulkio_base.h:146
BULKIO::PrecisionUTCTime notSet()
unsigned long historyWindow
Definition: bulkio_base.h:197
Definition: bulkio_base.h:161
std::vector< statPoint > receivedStatistics
Definition: bulkio_base.h:195
boost::unique_lock< boost::mutex > UNIQUE_LOCK
Definition: bulkio_base.h:127
double secs
Definition: bulkio_base.h:186
std::string portName
Definition: bulkio_base.h:190
void zeroSRI(BULKIO::StreamSRI &sri)
Definition: bulkio_base.h:394
Definition: bulkio_base.h:205
Definition: bulkio_base.h:95
void setCurrValue(unsigned int newValue)
double flush_usec
Definition: bulkio_base.h:201
const uint64_t MAX_TRANSFER_BYTES
Definition: bulkio_base.h:235
Definition: bulkio_base.h:361
double usecs
Definition: bulkio_base.h:187
virtual void update(unsigned int elementsReceived, float queueSize, bool EOS, const std::string &streamID, bool flush=false)
std::string port_name
Definition: bulkio_base.h:106
int64_t Int64
Definition: bulkio_base.h:153
std::list< std::string > StreamIDList
Definition: bulkio_base.h:112
unsigned int getMaxValue(void)
Definition: bulkio_base.h:48
static std::string getId()
Definition: bulkio_base.h:100
char Char
Definition: bulkio_base.h:144
BULKIO::PrecisionUTCTime addSampleOffset(const BULKIO::PrecisionUTCTime &T, const size_t numSamples, const double xdelta)
bool(* Compare)(const BULKIO::PrecisionUTCTime &a, const BULKIO::PrecisionUTCTime &b)
Definition: bulkio_base.h:322
void zeroTime(BULKIO::PrecisionUTCTime &timeTag)
Definition: bulkio_base.h:411
Definition: bulkio_base.h:183
std::vector< std::pair< T, std::string > > List
Definition: bulkio_base.h:50
float queueSize
Definition: bulkio_base.h:185
BULKIO::StreamSRI sri
Definition: bulkio_base.h:80
double flush_sec
Definition: bulkio_base.h:200
Definition: bulkio_base.h:365
LOGGER LOGGER_PTR
Definition: bulkio_base.h:138
StreamIDList getActiveStreamIDs()
Definition: bulkio_base.h:177
boost::mutex::scoped_lock SCOPED_LOCK
Definition: bulkio_base.h:132
virtual void setEnabled(bool enableStats)
void setMaxValue(unsigned int newMaxValue)
Definition: bulkio_base.h:373
double bitSize
Definition: bulkio_base.h:193
virtual void setBitSize(double bitSize)
Definition: bulkio_base.h:364
std::string stream_id
Definition: bulkio_base.h:105
queueSemaphore(unsigned int initialMaxValue)
float Float
Definition: bulkio_base.h:155
virtual ~linkStatistics()
Definition: bulkio_base.h:169
Definition: bulkio_base.h:369
Definition: bulkio_base.h:372
bool DefaultComparator(const BULKIO::StreamSRI &a, const BULKIO::StreamSRI &b)
boost::condition_variable CONDITION
Definition: bulkio_base.h:122
boost::mutex MUTEX
Definition: bulkio_base.h:117
uint16_t UInt16
Definition: bulkio_base.h:148
connection_descriptor_struct()
Definition: bulkio_base.h:96
Definition: bulkio_base.h:362
int8_t Int8
Definition: bulkio_base.h:145
Definition: bulkio_base.h:79
Definition: bulkio_base.h:367
Definition: bulkio_base.h:363
int compareFields(const BULKIO::StreamSRI &lhs, const BULKIO::StreamSRI &rhs)
BULKIO::PrecisionUTCTime now()
int16_t Int16
Definition: bulkio_base.h:147
void normalize(BULKIO::PrecisionUTCTime &time)
SriMapStruct(const SriMapStruct &src)
Definition: bulkio_base.h:87
virtual BULKIO::PortStatistics retrieve()
SriMapStruct(const BULKIO::StreamSRI &in_sri)
Definition: bulkio_base.h:83
uint64_t MaxTransferBytes()
Definition: bulkio_base.h:247
Definition: bulkio_base.h:371
std::vector< BULKIO::StreamSRI > SriList
Definition: bulkio_base.h:74
uint32_t UInt32
Definition: bulkio_base.h:151
BULKIO::StreamSRI create(std::string sid="defStream", const double srate=1.0, const Int16 xunits=BULKIO::UNITS_TIME, const bool blocking=false)