bulkio_in_port.h
Go to the documentation of this file.
1 /*
2  * This file is protected by Copyright. Please refer to the COPYRIGHT file
3  * distributed with this source distribution.
4  *
5  * This file is part of REDHAWK bulkioInterfaces.
6  *
7  * REDHAWK bulkioInterfaces is free software: you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as published by the
9  * Free Software Foundation, either version 3 of the License, or (at your
10  * option) any later version.
11  *
12  * REDHAWK bulkioInterfaces is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
15  * for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this program. If not, see http://www.gnu.org/licenses/.
19  */
20 #ifndef __bulkio_in_port_h
21 #define __bulkio_in_port_h
22 
23 #include <queue>
24 #include <list>
25 #include <vector>
26 #include <boost/thread/condition_variable.hpp>
27 #include <boost/thread/locks.hpp>
28 #include <boost/make_shared.hpp>
29 #include <boost/ref.hpp>
30 
31 #include <ossie/callback.h>
32 #include <ossie/signalling.h>
33 
34 #include "bulkio_base.h"
35 #include "bulkio_traits.h"
36 #include "bulkio_in_stream.h"
37 #include "bulkio_callbacks.h"
38 
39 namespace bulkio {
40 
41  //
42  // InPortBase
43  // Base template for data transfers between BULKIO ports. This class is defined by 2 trait classes
44  // DataTransferTraits: This template trait defines the DataTranfer object that is returned by the getPacket method
45  // PortTraits - This template provides the context for the port's middleware transport classes and they base data types
46  // passed between port objects
47  //
48  template < typename PortTraits >
49  class InPortBase : public PortTraits::POAPortType, public Port_Provides_base_impl
50  {
51 
52  public:
53 
54  typedef PortTraits Traits;
55 
56  // Transport Sequence Type use to during push packet
57  typedef typename Traits::SequenceType PortSequenceType;
58 
59  //
60  // Transport type used by this port
61  //
62  typedef typename Traits::TransportType TransportType;
63 
64  //
65  // True type of argument to pushPacket, typically "const PortSequenceType&"
66  // except for dataXML and dataFile (which use "const char*")
67  //
68  typedef typename Traits::PushType PushArgumentType;
69 
70  typedef typename Traits::PortType PortType;
71 
72  //
73  // Declaration of DataTransfer class from TransportType trait and DataBuffer type trait
74  //
75  typedef DataTransfer< typename Traits::DataTransferTraits > DataTransferType;
76 
77  // Queue of data transfer objects maintained by the port
78  typedef std::deque< DataTransferType * > WorkQueue;
79 
80  //
81  // ~InPortBase - call the virtual destructor to remove all allocated memebers
82  //
83  virtual ~InPortBase();
84 
85  /*
86  * getPacket - interface used by components to grab data from the port's internal queue object for processing. The timeout parameter allows
87  * the calling component to perform blocking and non-blocking retrievals.
88  *
89  * @param timeout - timeout == bulkio::Const::NON_BLOCKING (0.0) non-blocking io
90  * timeout == bulkio::Const::BLOCKING (-1) block until data arrives or lock is broken on exit
91  * timeout > 0.0 wait until time expires.
92  * @return dataTranfer * pointer to a data transfer object from the port's work queue
93  * @return NULL - no data available
94  */
95  virtual DataTransferType *getPacket(float timeout);
96 
97  /*
98  * getPacket - interface used by components to grab data from the port's internal queue object for a specified streamID
99  *
100  * @param timeout - timeout == bulkio::Const::NON_BLOCKING (0.0) non-blocking io
101  * timeout == bulkio::Const::BLOCKING (-1) block until data arrives or lock is broken on exit
102  * timeout > 0.0 wait until time expires.
103  * @param streamID stream id to match on for when pulling data from the port's work queue
104  * @return dataTranfer * pointer to a data transfer object from the port's work queue
105  * @return NULL - no data available
106  */
107  virtual DataTransferType *getPacket(float timeout, const std::string &streamID);
108 
109  //
110  // BULKIO IDL interface for pushing Floating Point vectors between components
111  //
112 
113  /*
114  * pushSRI - called by the source component when SRI data about the stream changes, the data flow policy is this activity
115  * will occurr first before any data flows to the component.
116  *
117  * @param H - Incoming StreamSRI object that defines the state of the data flow portion of the stream (pushPacket)
118  */
119  virtual void pushSRI(const BULKIO::StreamSRI& H);
120 
121  //
122  // Port Statistics Interface
123  //
124 
125  /*
126  * turn on/off the port monitoring capability
127  */
128  virtual void enableStats(bool enable);
129 
130  //
131  // state - returns the current state of the port as follows:
132  // BULKIO::BUSY - internal queue has reached FULL state
133  // BULKIO::IDLE - there are no items on the internal queue
134  // BULKIO::ACTIVE - there are items on the queue
135  //
136  // @return BULKIO::PortUsageType - current state of port
137  //
138  virtual BULKIO::PortUsageType state();
139 
140  //
141  // statisics - returns a PortStatistics object for this provides port
142  // PortStatistics:
143  // portname - name of port
144  // elementsPerSecond - number of elements per second (element is based on size of port type )
145  // bitsPerSecond - number of bits per second (based on element storage size in bits)
146  // callsPerSecond - history window -1 / time between calls to this method
147  // streamIds - list of active stream id values
148  // averageQueueDepth - the average depth of the queue for this port
149  // timeSinceLastCall - time since this method as invoked and the last pushPacket happened
150  // Keyword Sequence - deprecated
151  //
152  // @return BULKIO::PortStatistics - current data flow metrics collected for the port.
153  // the caller of the method is responsible for freeing this object
154  //
155  virtual BULKIO::PortStatistics* statistics();
156 
157  //
158  // activeSRIs - returns a sequence of BULKIO::StreamSRI objectsPort
159  //
160  // @return BULKIO::StreamSRISequence - list of activte SRI objects for this port
161  // the caller of the method is responsible for freeing this object
162  //
163  virtual BULKIO::StreamSRISequence* activeSRIs();
164 
165  /*
166  * getCurrentQueueDepth - returns the current number of elements in the queue
167  *
168  * @return int - number of items in the queue
169  */
170  virtual int getCurrentQueueDepth();
171 
172  /*
173  * getMaxQueueDepth - returns the maximum size of the queue , if this water mark is reached the queue will be purged, and the
174  * component of the port will be notified in getPacket method
175  * @return int - maximum size the queue can reach before purging occurs
176  */
177  virtual int getMaxQueueDepth();
178 
179  /*
180  * setMaxQueueDepth - allow users of this port to modify the maximum number of allowable vectors on the queue.
181  */
182  virtual void setMaxQueueDepth(int newDepth);
183 
184  //
185  // Allow the component to control the flow of data from the port to the component. Block will restrict the flow of data back into the
186  // component. Call in component's stop method
187  //
188  virtual void block();
189 
190  //
191  // Allow the component to control the flow of data from the port to the component. Unblock will release the flow of data back into the
192  // component. Called in component's start method.
193  //
194  virtual void unblock();
195 
196  //
197  // Support function for automatic component-managed start. Calls unblock.
198  //
199  virtual void startPort();
200 
201  //
202  // Support function for automatic component-managed stop. Calls block.
203  //
204  virtual void stopPort();
205 
206  /*
207  * blocked
208  *
209  * @return bool returns state of breakBlock variable used to release any upstream blocking pushPacket calls
210  */
211  virtual bool blocked();
212 
213  /*
214  * Assign a callback for notification when a new SRI StreamId is received
215  */
216  template< typename T > inline
217  void setNewStreamListener(T &target, void (T::*func)( BULKIO::StreamSRI &) ) {
218  newStreamCallback = boost::make_shared< MemberSriListener< T > >( boost::ref(target), func );
219  };
220 
221  /*
222  * Assign a callback for notification when a new SRI StreamId is received
223  */
224  template< typename T > inline
225  void setNewStreamListener(T *target, void (T::*func)( BULKIO::StreamSRI &) ) {
226  newStreamCallback = boost::make_shared< MemberSriListener< T > >( boost::ref(*target), func );
227 
228  };
229 
230  void setNewStreamListener( SriListener *newListener );
231 
232  void setNewStreamListener( SriListenerCallbackFn newListener );
233 
234  void setLogger( LOGGER_PTR logger );
235 
236  // Return the interface that this Port supports
237  std::string getRepid () const;
238 
239  protected:
240  //
241  // InPortBase - creates a provides port that can accept data vectors from a source
242  //
243  // @param port_name name of the port taken from .scd.xml file
244  // @param sriCmp comparator function that accepts to StreamSRI objects and compares their contents,
245  // if all members match then return true, otherwise false. This is used during the pushSRI method
246  // @param newStreamCB interface that is called when new SRI.streamID is received
247  InPortBase(std::string port_name,
250  SriListener *newStreamCB = NULL );
251 
252  //
253  // FIFO of data vectors and time stamps waiting to be processed by a component
254  //
255  WorkQueue workQueue;
256 
257  //
258  // Track size of work queue between getPacket calls when using streamID for extraction
259  //
260  uint32_t lastQueueSize;
261 
262  //
263  // SRI compare method used by pushSRI method to determine how to match incoming SRI objects and streamsID
264  //
266 
267  //
268  // Callback for notifications when new SRI streamID's are received
269  //
270  boost::shared_ptr< SriListener > newStreamCallback;
271 
272  //
273  // List of SRI objects managed by StreamID
274  //
276 
277  //
278  // synchronizes access to the workQueue member
279  //
281 
282  //
283  // synchronizes access to the currentHs member
284  //
286 
287  //
288  // mutex for use with condition variable to signify when data is available for consumption
289  // RESOVLE: combine deque and condition into template for pushing and poping items onto the queue...
290  // refer to ConditionList.h example
291  //
293 
295 
296  //
297  // used to control data flow from getPacket call
298  //
300 
301  //
302  // Transfers blocking request from data provider to this port that will block pushPacket calls if queue has reached a maximum value
303  //
304  bool blocking;
305 
306  //
307  // An abstraction of a counting semaphore to control access to the workQueue member
308  //
310 
311  //
312  // Statistics provider object used by the port monitoring interface
313  //
315 
317 
318  //
319  // Synchronized waiter list for use in poll()
320  //
321  redhawk::signal<std::string> packetWaiters;
322 
323  //
324  // Queues a packet received via pushPacket; in most cases, this method maps
325  // exactly to pushPacket, except for dataFile
326  //
327  void queuePacket(PushArgumentType data, const BULKIO::PrecisionUTCTime& T, CORBA::Boolean EOS, const char* streamID);
328 
329  virtual void createStream(const std::string& streamID, const BULKIO::StreamSRI& sri);
330  virtual void removeStream(const std::string& streamID);
331 
332  virtual bool isStreamActive(const std::string& streamID);
333  virtual bool isStreamEnabled(const std::string& streamID);
334 
335  DataTransferType* fetchPacket(const std::string& streamID);
336  void packetReceived(const std::string& streamID);
337 
338  friend class InputStream<PortTraits>;
339  size_t samplesAvailable(const std::string& streamID, bool firstPacket);
340 
341  //
342  // Returns the total number of elements of data in a pushPacket call, for
343  // statistical tracking; enables XML and File specialization, which have
344  // different notions of size
345  //
346  int _getElementLength(PushArgumentType data);
347  };
348 
349  template < typename PortTraits >
350  class InPort : public InPortBase<PortTraits>
351  {
352  public:
353  typedef PortTraits Traits;
354 
355  // Port Variable Type
356  typedef typename Traits::POAPortType PortVarType;
357 
358  // Interface Type
359  typedef typename Traits::PortType PortType;
360 
361  // Interface Type
362  typedef typename Traits::PortType ProvidesPortType;
363 
364  // Transport Sequence Type use to during push packet
365  typedef typename Traits::SequenceType PortSequenceType;
366 
367  //
368  // Transport type used by this port
369  //
370  typedef typename Traits::TransportType TransportType;
371 
372  //
373  // Native type mapping of TransportType
374  //
375  typedef typename Traits::NativeType NativeType;
376 
377  //
378  // Declaration of DataTransfer class from TransportType trait and DataBuffer type trait
379  //
380  typedef DataTransfer< typename Traits::DataTransferTraits > DataTransferType;
381 
382  // backwards compatible definition
383  typedef DataTransferType dataTransfer;
384 
385  // queue of dataTranfer objects maintained by the port
386  typedef std::deque< DataTransferType * > WorkQueue;
387 
388  // Input stream interface used by this port
390 
391  // List type for input streams provided by this port
392  typedef std::list<StreamType> StreamList;
393 
394  //
395  // InPort - creates a provides port that can accept data vectors from a source
396  //
397  // @param port_name name of the port taken from .scd.xml file
398  // @param sriCmp comparator function that accepts to StreamSRI objects and compares their contents,
399  // if all members match then return true, otherwise false. This is used during the pushSRI method
400  // @param newStreamCB interface that is called when new SRI.streamID is received
401  InPort(std::string port_name,
404  SriListener *newStreamCB = NULL );
405 
406  InPort(std::string port_name,
408  SriListener *newStreamCB = NULL );
409 
410  InPort(std::string port_name, void *);
411 
412  //
413  // pushPacket called by the source component when pushing a vector of data into a component. This method will save off the data
414  // vector, timestamp, EOS and streamID onto a queue for consumption by the component via the getPacket method
415  //
416  // @param data - the vector of data to be consumed
417  // @param T - a time stamp for the data, the time represents the associated time value for the first entry of the data vector
418  // @param EOS - indicator that the stream has ended, (stream is identified by streamID)
419  // @param streamID - name of the stream the vector and stream context data are associated with
420  virtual void pushPacket(const PortSequenceType& data, const BULKIO::PrecisionUTCTime& T, CORBA::Boolean EOS, const char* streamID);
421 
422  //
423  // Stream-based input API
424  //
425  StreamList getStreams();
426 
427  StreamList pollStreams(float timeout);
428  StreamList pollStreams(StreamList& pollset, float timeout);
429 
430  StreamList pollStreams(size_t samples, float timeout);
431  StreamList pollStreams(StreamList& pollset, size_t samples, float timeout);
432 
433  template <class Target, class Func>
434  void addStreamListener(Target target, Func func) {
435  streamAdded.add(target, func);
436  }
437 
438  template <class Target, class Func>
439  void removeStreamListener(Target target, Func func) {
440  streamAdded.remove(target, func);
441  }
442 
443  protected:
445  using super::packetWaiters;
446  using super::logger;
447 
448  //
449  // Notification for new stream creation
450  //
451  ossie::notification<void (StreamType)> streamAdded;
452 
453  //
454  // Streams that are currently active
455  //
456  typedef std::map<std::string,StreamType> StreamMap;
457  StreamMap streams;
458  boost::mutex streamsMutex;
459 
460  // Streams that have the same stream ID as an active stream, when an
461  // end-of-stream has been queued but not yet read
462  std::multimap<std::string,StreamType> pendingStreams;
463 
464  virtual void createStream(const std::string& streamID, const BULKIO::StreamSRI& sri);
465  virtual void removeStream(const std::string& streamID);
466 
467  virtual bool isStreamActive(const std::string& streamID);
468  virtual bool isStreamEnabled(const std::string& streamID);
469 
470  StreamList getReadyStreams(size_t samples);
471  };
472 
473  //
474  // InStringPort
475  // Base template for simple data transfers between Input/Output ports. This class is defined by 2 trait classes
476  // DataTransferTraits: This template trait defines the DataTranfer object that is returned by the getPacket method
477  // PortTraits - This template provides the context for the port's middleware transport classes and they base data types
478  // passed between port objects
479  //
480  // Both classes have a simlar types of TransportType and SequenceType and the DataTransferTraits defines the the type for the
481  // data buffer used to store incoming streams of data. These 2 class should be combined to described InputPortTraits.
482  //
483 
484 
485  template < typename PortTraits >
486  class InStringPort : public InPortBase<PortTraits>
487  {
488 
489  public:
490 
491  typedef PortTraits Traits;
492 
493  // Port Variable Type
494  typedef typename Traits::POAPortType PortVarType;
495 
496  // Interface Type
497  typedef typename Traits::PortType PortType;
498 
499  // Interface Type
500  typedef typename Traits::PortType ProvidesPortType;
501 
502  // Transport Sequence Type use to during push packet
503  typedef char * PortSequenceType;
504 
505  //
506  // Transport type used by this port
507  //
508  typedef typename Traits::TransportType TransportType;
509 
510  //
511  // Native type mapping of TransportType
512  //
513  typedef typename Traits::NativeType NativeType;
514 
515  //
516  // Data transfer object from ports to components
517  //
518  typedef DataTransfer< typename Traits::DataTransferTraits > DataTransferType;
519 
520 
521  // backwards compatible defintion
522  typedef DataTransfer< typename Traits::DataTransferTraits > dataTransfer;
523 
524 
525  // queue of dataTranfer objects maintained by the port
526  typedef std::deque< DataTransferType * > WorkQueue;
527 
528 
529  //
530  // InStringPort - creates a provides port that can accept floating point vectors from a source
531  //
532  // @param port_name name of the port taken from .scd.xml file
533  // @param SriCompareFunc comparator function that accepts to StreamSRI objects and compares their contents,
534  // if all members match then return true, otherwise false. This is used during the pushSRI method
535  // @param newStreamCB interface that is called when new SRI.streamID is received
536 
537  InStringPort(std::string port_name,
540  SriListener *newStreamCB = NULL );
541 
542  InStringPort(std::string port_name,
544  SriListener *newStreamCB = NULL );
545 
546  InStringPort(std::string port_name, void * );
547 
548  //
549  // pushPacket called by the source component when pushing a vector of data into a component. This method will save off the data
550  // vector, timestamp, EOS and streamID onto a queue for consumption by the component via the getPacket method
551  //
552  // @param data - the vector of data to be consumed
553  // @param T - a time stamp for the data, the time represents the associated time value for the first entry of the data vector
554  // @param EOS - indicator that the stream has ended, (stream is identified by streamID)
555  // @param streamID - name of the stream the vector and stream context data are associated with
556  virtual void pushPacket(const char *data, const BULKIO::PrecisionUTCTime& T, CORBA::Boolean EOS, const char* streamID);
557 
558 
559  //
560  // pushPacket called by the source component when pushing a vector of data into a component. This method will save off the data
561  // vector, timestamp, EOS and streamID onto a queue for consumption by the component via the getPacket method
562  //
563  // @param data - the vector of data to be consumed
564  // @param EOS - indicator that the stream has ended, (stream is identified by streamID)
565  // @param streamID - name of the stream the vector and stream context data are associated with
566  virtual void pushPacket( const char *data, CORBA::Boolean EOS, const char* streamID);
567  };
568 
569 
570  /*
571  Provides Port Definitions for All Bulk IO pushPacket Port definitions
572  *
573  */
574  // Bulkio char (Int8) input
576  // Bulkio octet (UInt8) input
578  // Bulkio Int8 input
579  typedef InCharPort InInt8Port;
580  // Bulkio UInt8 input
581  typedef InOctetPort InUInt8Port;
582  // Bulkio short (Int16) input
584  // Bulkio unsigned short (UInt16) input
586  // Bulkio Int16 input
587  typedef InShortPort InInt16Port;
588  // Bulkio UInt16 input
589  typedef InUShortPort InUInt16Port;
590  // Bulkio long (Int32) input
592  // Bulkio unsigned long (UInt32) input
594  // Bulkio Int32 input
595  typedef InLongPort InInt32Port;
596  // Bulkio UInt32 input
597  typedef InULongPort InUInt32Port;
598  // Bulkio long long (Int64) input
600  // Bulkio unsigned long long (UInt64) input
602  // Bulkio Int64 input
603  typedef InLongLongPort InInt64Port;
604  // Bulkio UInt64 input
605  typedef InULongLongPort InUInt64Port;
606  // Bulkio float input
608  // Bulkio double input
610  // Bulkio URL input
612  // Bulkio File (URL) input
614  // Bulkio XML input
616 
617 
618 
619 
620 } // end of bulkio namespace
621 
622 
623 #endif
PortTraits Traits
Definition: bulkio_in_port.h:491
InLongPort InInt32Port
Definition: bulkio_in_port.h:595
InPort(std::string port_name, LOGGER_PTR logger, bulkio::sri::Compare sriCmp=bulkio::sri::DefaultComparator, SriListener *newStreamCB=NULL)
InStringPort< XMLPortTraits > InXMLPort
Definition: bulkio_in_port.h:615
virtual bool blocked()
StreamMap streams
Definition: bulkio_in_port.h:457
Traits::PortType ProvidesPortType
Definition: bulkio_in_port.h:362
virtual int getCurrentQueueDepth()
redhawk::signal< std::string > packetWaiters
Definition: bulkio_in_port.h:321
Traits::PushType PushArgumentType
Definition: bulkio_in_port.h:68
DataTransfer< typename Traits::DataTransferTraits > DataTransferType
Definition: bulkio_in_port.h:75
ossie::notification< void(StreamType)> streamAdded
Definition: bulkio_in_port.h:451
Traits::NativeType NativeType
Definition: bulkio_in_port.h:513
Definition: bulkio_in_port.h:486
LOGGER_PTR logger
Definition: bulkio_in_port.h:316
InOctetPort InUInt8Port
Definition: bulkio_in_port.h:581
DataTransferType * fetchPacket(const std::string &streamID)
Definition: bulkio_out_port.h:39
DataTransfer< typename Traits::DataTransferTraits > DataTransferType
Definition: bulkio_in_port.h:380
virtual BULKIO::PortStatistics * statistics()
virtual void unblock()
Traits::TransportType TransportType
Definition: bulkio_in_port.h:370
std::deque< DataTransferType * > WorkQueue
Definition: bulkio_in_port.h:78
virtual void setMaxQueueDepth(int newDepth)
InStringPort< FilePortTraits > InFilePort
Definition: bulkio_in_port.h:613
Definition: Port_impl.h:364
DataTransfer< typename Traits::DataTransferTraits > dataTransfer
Definition: bulkio_in_port.h:522
void setNewStreamListener(T *target, void(T::*func)(BULKIO::StreamSRI &))
Definition: bulkio_in_port.h:225
virtual void pushPacket(const char *data, const BULKIO::PrecisionUTCTime &T, CORBA::Boolean EOS, const char *streamID)
Traits::PortType PortType
Definition: bulkio_in_port.h:70
bool(* Compare)(const BULKIO::StreamSRI &a, const BULKIO::StreamSRI &b)
Definition: bulkio_base.h:357
PortTraits Traits
Definition: bulkio_in_port.h:54
InPort< ULongLongPortTraits > InULongLongPort
Definition: bulkio_in_port.h:601
InPort< LongPortTraits > InLongPort
Definition: bulkio_in_port.h:591
void addStreamListener(Target target, Func func)
Definition: bulkio_in_port.h:434
std::map< std::string, std::pair< BULKIO::StreamSRI, bool > > SriMap
Definition: bulkio_base.h:72
size_t samplesAvailable(const std::string &streamID, bool firstPacket)
DataTransferType dataTransfer
Definition: bulkio_in_port.h:383
InShortPort InInt16Port
Definition: bulkio_in_port.h:587
InULongPort InUInt32Port
Definition: bulkio_in_port.h:597
InLongLongPort InInt64Port
Definition: bulkio_in_port.h:603
virtual void pushPacket(const PortSequenceType &data, const BULKIO::PrecisionUTCTime &T, CORBA::Boolean EOS, const char *streamID)
InPort< FloatPortTraits > InFloatPort
Definition: bulkio_in_port.h:607
Traits::PortType PortType
Definition: bulkio_in_port.h:497
uint32_t lastQueueSize
Definition: bulkio_in_port.h:260
InUShortPort InUInt16Port
Definition: bulkio_in_port.h:589
Definition: bulkio_base.h:161
virtual void enableStats(bool enable)
MUTEX sriUpdateLock
Definition: bulkio_in_port.h:285
InPort< LongLongPortTraits > InLongLongPort
Definition: bulkio_in_port.h:599
InPort< OctetPortTraits > InOctetPort
Definition: bulkio_in_port.h:577
Traits::PortType ProvidesPortType
Definition: bulkio_in_port.h:500
InPort< DoublePortTraits > InDoublePort
Definition: bulkio_in_port.h:609
std::deque< DataTransferType * > WorkQueue
Definition: bulkio_in_port.h:526
Definition: bulkio_base.h:205
Definition: bulkio_in_port.h:350
boost::shared_ptr< SriListener > newStreamCallback
Definition: bulkio_in_port.h:270
virtual void pushSRI(const BULKIO::StreamSRI &H)
std::multimap< std::string, StreamType > pendingStreams
Definition: bulkio_in_port.h:462
boost::mutex streamsMutex
Definition: bulkio_in_port.h:458
linkStatistics * stats
Definition: bulkio_in_port.h:314
Traits::NativeType NativeType
Definition: bulkio_in_port.h:375
InULongLongPort InUInt64Port
Definition: bulkio_in_port.h:605
StreamList getStreams()
std::string getRepid() const
Traits::POAPortType PortVarType
Definition: bulkio_in_port.h:494
DataTransfer< typename Traits::DataTransferTraits > DataTransferType
Definition: bulkio_in_port.h:518
void packetReceived(const std::string &streamID)
Definition: bulkio_in_port.h:49
SriMap currentHs
Definition: bulkio_in_port.h:275
Traits::TransportType TransportType
Definition: bulkio_in_port.h:508
Traits::PortType PortType
Definition: bulkio_in_port.h:359
Traits::SequenceType PortSequenceType
Definition: bulkio_in_port.h:365
void setNewStreamListener(T &target, void(T::*func)(BULKIO::StreamSRI &))
Definition: bulkio_in_port.h:217
LOGGER LOGGER_PTR
Definition: bulkio_base.h:138
queueSemaphore * queueSem
Definition: bulkio_in_port.h:309
Traits::POAPortType PortVarType
Definition: bulkio_in_port.h:356
bulkio::sri::Compare sri_cmp
Definition: bulkio_in_port.h:265
bool breakBlock
Definition: bulkio_in_port.h:299
bool blocking
Definition: bulkio_in_port.h:304
virtual bool isStreamEnabled(const std::string &streamID)
std::map< std::string, StreamType > StreamMap
Definition: bulkio_in_port.h:456
InPort< ShortPortTraits > InShortPort
Definition: bulkio_in_port.h:583
std::deque< DataTransferType * > WorkQueue
Definition: bulkio_in_port.h:386
virtual void removeStream(const std::string &streamID)
virtual BULKIO::PortUsageType state()
char * PortSequenceType
Definition: bulkio_in_port.h:503
InStringPort(std::string port_name, LOGGER_PTR logger, bulkio::sri::Compare=bulkio::sri::DefaultComparator, SriListener *newStreamCB=NULL)
virtual BULKIO::StreamSRISequence * activeSRIs()
void queuePacket(PushArgumentType data, const BULKIO::PrecisionUTCTime &T, CORBA::Boolean EOS, const char *streamID)
virtual DataTransferType * getPacket(float timeout)
void removeStreamListener(Target target, Func func)
Definition: bulkio_in_port.h:439
friend class InputStream< PortTraits >
Definition: bulkio_in_port.h:338
virtual void createStream(const std::string &streamID, const BULKIO::StreamSRI &sri)
virtual ~InPortBase()
virtual void removeStream(const std::string &streamID)
InPortBase< PortTraits > super
Definition: bulkio_in_port.h:444
Traits::TransportType TransportType
Definition: bulkio_in_port.h:62
MUTEX dataAvailableMutex
Definition: bulkio_in_port.h:292
bool DefaultComparator(const BULKIO::StreamSRI &a, const BULKIO::StreamSRI &b)
std::list< StreamType > StreamList
Definition: bulkio_in_port.h:392
CONDITION dataAvailable
Definition: bulkio_in_port.h:294
boost::condition_variable CONDITION
Definition: bulkio_base.h:122
boost::mutex MUTEX
Definition: bulkio_base.h:117
InPortBase(std::string port_name, LOGGER_PTR logger, bulkio::sri::Compare sriCmp=bulkio::sri::DefaultComparator, SriListener *newStreamCB=NULL)
void setLogger(LOGGER_PTR logger)
virtual void createStream(const std::string &streamID, const BULKIO::StreamSRI &sri)
PortTraits Traits
Definition: bulkio_in_port.h:353
MUTEX dataBufferLock
Definition: bulkio_in_port.h:280
virtual bool isStreamEnabled(const std::string &streamID)
virtual bool isStreamActive(const std::string &streamID)
virtual void stopPort()
virtual void startPort()
virtual void block()
InCharPort InInt8Port
Definition: bulkio_in_port.h:579
virtual bool isStreamActive(const std::string &streamID)
InputStream< PortTraits > StreamType
Definition: bulkio_in_port.h:389
StreamList pollStreams(float timeout)
Traits::SequenceType PortSequenceType
Definition: bulkio_in_port.h:57
WorkQueue workQueue
Definition: bulkio_in_port.h:255
InPort< UShortPortTraits > InUShortPort
Definition: bulkio_in_port.h:585
virtual int getMaxQueueDepth()
int _getElementLength(PushArgumentType data)
InPort< CharPortTraits > InCharPort
Definition: bulkio_in_port.h:575
StreamList getReadyStreams(size_t samples)
InPort< ULongPortTraits > InULongPort
Definition: bulkio_in_port.h:593
InStringPort< URLPortTraits > InURLPort
Definition: bulkio_in_port.h:611