bulkio_out_port.h
Go to the documentation of this file.
1 /*
2  * This file is protected by Copyright. Please refer to the COPYRIGHT file
3  * distributed with this source distribution.
4  *
5  * This file is part of REDHAWK bulkioInterfaces.
6  *
7  * REDHAWK bulkioInterfaces is free software: you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as published by the
9  * Free Software Foundation, either version 3 of the License, or (at your
10  * option) any later version.
11  *
12  * REDHAWK bulkioInterfaces is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
15  * for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this program. If not, see http://www.gnu.org/licenses/.
19  */
20 
21 #ifndef __bulkio_out_port_h
22 #define __bulkio_out_port_h
23 
24 #include <queue>
25 #include <list>
26 #include <vector>
27 #include <boost/thread/condition_variable.hpp>
28 #include <boost/thread/locks.hpp>
29 #include <boost/make_shared.hpp>
30 #include <boost/ref.hpp>
31 
32 #include <ossie/CorbaUtils.h>
33 
34 #include "bulkio_base.h"
35 #include "bulkio_traits.h"
36 #include "bulkio_callbacks.h"
37 #include "bulkio_out_stream.h"
38 
39 namespace bulkio {
40 
41  //
42  // OutPortBase
43  //
44  // Base template for data transfers between BULKIO ports. This class is defined by 2 trait classes
45  // PortTraits - This template provides the context for the port's middleware transport classes and they base data types
46  // passed between port objects
47  //
48  //
49  template < typename PortTraits >
51 #ifdef BEGIN_AUTOCOMPLETE_IGNORE
52  , public virtual POA_BULKIO::UsesPortStatisticsProvider
53 #endif
54  {
55 
56  public:
57 
58  typedef PortTraits Traits;
59 
60  //
61  // Port Variable Definition
62  //
63  typedef typename Traits::PortVarType PortVarType;
64 
65  //
66  // BULKIO Interface Type
67  //
68  typedef typename Traits::PortType PortType;
69 
70  //
71  // Port pointer type
72  //
73  typedef typename PortType::_ptr_type PortPtrType;
74 
75  //
76  // Sequence container used during actual pushPacket call
77  //
78  typedef typename Traits::SequenceType PortSequenceType;
79 
80  //
81  // True type of argument to pushPacket, typically "const PortSequenceType&"
82  // except for dataXML and dataFile (which use "const char*")
83  //
84  typedef typename Traits::PushType PushArgumentType;
85 
86  //
87  // Data type of items passed into the pushPacket method
88  //
89  typedef typename Traits::NativeType NativeType;
90 
91  //
92  // ConnectionList Definition
93  //
95 
96  //
97  // Mapping of Stream IDs to SRI Map/Refresh objects
98  //
99  typedef std::map< std::string, SriMapStruct > OutPortSriMap;
100 
101 
102  //
103  // OutPortBase Creates a uses port object for publishing data to the framework
104  //
105  // @param port_name name assigned to the port located in scd.xml file
106  // @param connectionCB callback that will be called when the connectPort method is called
107  // @pararm disconnectDB callback that receives notification when a disconnectPort happens
108  //
109  OutPortBase(std::string port_name,
110  ConnectionEventListener *connectCB=NULL,
111  ConnectionEventListener *disconnectCB=NULL );
112 
113  OutPortBase(std::string port_name,
115  ConnectionEventListener *connectCB=NULL,
116  ConnectionEventListener *disconnectCB=NULL );
117 
118 
119  //
120  // virtual destructor to clean up resources
121  //
122  virtual ~OutPortBase();
123 
124  //
125  // Interface used by framework to connect/disconnect ports together and introspection of connection states
126  //
127 
128  //
129  // connections - Return a list of connection objects and identifiers for each connection made by connectPort
130  //
131  // @return ExtendedCF::UsesConnectionSequence * List of connection objects and identifiers
132  //
133  virtual ExtendedCF::UsesConnectionSequence * connections();
134 
135  //
136  // connectPort - Called by the framework to connect this port to a Provides port object, the connection is established
137  // via the association and identified by the connectionId string, no formal "type capatablity" or "bukio interface support"
138  // is resolved at this time. All data flow occurs from point A to B via the pushPacket/pushSRI interface.
139  //
140  // @param CORBA::Object_ptr pointer to an instance of a Provides port
141  // @param connectionsId identifer for this connection, allows for external users to reference the connection association
142  //
143  virtual void connectPort(CORBA::Object_ptr connection, const char* connectionId);
144 
145  //
146  // disconnectPort - Called by the framework to disconnect this port from the Provides port object. The port basicall removes
147  // the association to the provides port that was established with the connectionId.
148  //
149  // @param connectionsId identifer for this connection, allows for external users to reference the connection association
150  virtual void disconnectPort(const char* connectionId);
151 
152  void updateConnectionFilter(const std::vector<connection_descriptor_struct> &_filterTable) {
153  SCOPED_LOCK lock(updatingPortsLock); // don't want to process while command information is coming in
154  filterTable = _filterTable;
155  };
156 
157 
158  template< typename T > inline
159  void setNewConnectListener(T &target, void (T::*func)( const char *connectionId ) )
160  {
161  _connectCB = boost::make_shared< MemberConnectionEventListener< T > >( boost::ref(target), func );
162  }
163 
164  template< typename T > inline
165  void setNewConnectListener(T *target, void (T::*func)( const char *connectionId ) )
166  {
167  _connectCB = boost::make_shared< MemberConnectionEventListener< T > >( boost::ref(*target), func );
168  }
169 
170  template< typename T > inline
171  void setNewDisconnectListener(T &target, void (T::*func)( const char *connectionId ) )
172  {
173  _disconnectCB = boost::make_shared< MemberConnectionEventListener< T > >( boost::ref(target), func );
174  }
175 
176  template< typename T > inline
177  void setNewDisconnectListener(T *target, void (T::*func)( const char *connectionId ) )
178  {
179  _disconnectCB = boost::make_shared< MemberConnectionEventListener< T > >( boost::ref(*target), func );
180  }
181 
182  //
183  // Attach listener interfaces for connect and disconnect events
184  //
185  void setNewConnectListener( ConnectionEventListener *newListener );
186  void setNewConnectListener( ConnectionEventCallbackFn newListener );
187  void setNewDisconnectListener( ConnectionEventListener *newListener );
188  void setNewDisconnectListener( ConnectionEventCallbackFn newListener );
189 
190  //
191  // pushSRI - called by the source component when SRI data about the stream changes, the data flow policy is this activity
192  // will occurr first before any data flows to the component.
193  //
194  // @param H - Incoming StreamSRI object that defines the state of the data flow portion of the stream (pushPacket)
195  //
196  virtual void pushSRI(const BULKIO::StreamSRI& H);
197 
198 
199  //
200  // statisics - returns a PortStatistics object for this uses port
201  // BULKIO::UsesPortStatisticsSequence: sequence of PortStatistics object
202  // PortStatistics
203  // portname - name of port
204  // elementsPerSecond - number of elements per second (element is based on size of port type )
205  // bitsPerSecond - number of bits per second (based on element storage size in bits)
206  // callsPerSecond - history window -1 / time between calls to this method
207  // streamIds - list of active stream id values
208  // averageQueueDepth - the average depth of the queue for this port
209  // timeSinceLastCall - time since this method as invoked and the last pushPacket happened
210  // Keyword Sequence - deprecated
211  //
212  // @return BULKIO::UsesPortStatisticsSequenc - current data flow metrics collected for the port, the caller of the method
213  // is responsible for freeing this object
214  //
215  virtual BULKIO::UsesPortStatisticsSequence * statistics();
216 
217  //
218  // state - returns the current state of the port as follows:
219  // BULKIO::BUSY - internal queue has reached FULL state
220  // BULKIO::IDLE - there are no items on the internal queue
221  // BULKIO::ACTIVE - there are items on the queue
222  //
223  // @return BULKIO::PortUsageType - current state of port
224  //
225  virtual BULKIO::PortUsageType state();
226 
227  //
228  // turn on/off the port monitoring capability
229  //
230  virtual void enableStats(bool enable);
231 
232  //
233  // Return map of streamID/SRI objects
234  //
235  virtual bulkio::SriMap getCurrentSRI();
236 
237  //
238  // Return list of SRI objects
239  //
240  virtual bulkio::SriList getActiveSRIs();
241 
242  //
243  // Return a ConnectionsList for the current ports and connections ids establish via connectPort method
244  //
245  virtual ConnectionsList getConnections();
246 
247  //
248  // Deprecation Warning
249  //
250  // The _getConnections and currentSRIs access will be deprecated in the next release of the
251  // the bulkio library class, in favor of getCurrentSRI and getConnections.
252  //
253 
254  //
255  // Allow access to the port's connection list
256  //
257  virtual ConnectionsList __attribute__ ((deprecated)) _getConnections() {
258  return outConnections;
259  }
260 
261  void setLogger( LOGGER_PTR newLogger );
262 
263  std::string getRepid () const;
264 
265  protected:
266 
267 
268  // Map of stream ids and statistic object
269  typedef typename std::map<std::string, linkStatistics > _StatsMap;
270 
271  public:
272  //
273  // List of SRIs sent out by this port
274  //
275  OutPortSriMap currentSRIs __attribute__ ((deprecated));
276 
277  protected:
278  //
279  // List of Port connections and connection identifiers
280  //
281  ConnectionsList outConnections;
282 
283  //
284  // List of connections returned by connections() method. Used to increase efficiency when there a large amount
285  // of connections for a port.
286  //
287  ExtendedCF::UsesConnectionSequence recConnections;
288 
289  //
290  //
291  //
293 
294  //
295  // Set of statistical collector objects for each stream id
296  //
297  _StatsMap stats;
298 
299  //
300  // _pushSRI - method to push given SRI to a specific connections
301  //
302  void _pushSRI( typename ConnectionsList::iterator connPair, SriMapStruct &sri_ctx);
303  void _pushSRI( const std::string &connectionId, SriMapStruct &sri_ctx);
304 
306  std::vector<connection_descriptor_struct> filterTable;
307  boost::shared_ptr< ConnectionEventListener > _connectCB;
308  boost::shared_ptr< ConnectionEventListener > _disconnectCB;
309 
310  //
311  // Returns true if the given connection should receive SRI updates and data
312  // for the given stream
313  //
314  bool _isStreamRoutedToConnection(const std::string& connectionID, const std::string& streamID);
315 
316  //
317  // Sends the given data and metadata as a single push, for subclasses that
318  // will never break a push into multiple packets (XML, File); acquires and
319  // releases the port lock
320  //
321  void _pushSinglePacket(
322  PushArgumentType data,
323  const BULKIO::PrecisionUTCTime& T,
324  bool EOS,
325  const std::string& streamID);
326 
327  //
328  // Sends the given data and metadata to all appropriate connections and
329  // updates the associated SRI if necessary (or creates one if it does not
330  // exist); must be called with the port lock held
331  //
332  void _pushPacketLocked(
333  PushArgumentType data,
334  const BULKIO::PrecisionUTCTime& T,
335  bool EOS,
336  const std::string& streamID);
337 
338  //
339  // Sends an end-of-stream packet for the given stream to a particular port,
340  // for use when disconnecting; enables XML and File specialization for
341  // consistent end-of-stream behavior
342  //
343  void _sendEOS(PortPtrType port,
344  const std::string& streamID);
345 
346  //
347  // Low-level push of data and metadata to the given port; enables XML and
348  // File specialization for consistent high-level pushPacket behavior
349  //
350  void _pushPacketToPort(
351  PortPtrType port,
352  PushArgumentType data,
353  const BULKIO::PrecisionUTCTime& T,
354  bool EOS,
355  const char* streamID);
356 
357  //
358  // Returns the total number of elements of data in a pushPacket call, for
359  // statistical tracking; enables XML and File specialization, which have
360  // different notions of size
361  //
362  size_t _dataLength(PushArgumentType data);
363  };
364 
365 
366  template < typename PortTraits >
367  class OutPort : public OutPortBase< PortTraits > {
368  public:
369 
370  typedef PortTraits Traits;
371 
372  //
373  // Port Variable Definition
374  //
375  typedef typename Traits::PortVarType PortVarType;
376 
377  //
378  // BULKIO Interface Type
379  //
380  typedef typename Traits::PortType PortType;
381 
382  //
383  // Sequence container used during actual pushPacket call
384  //
385  typedef typename Traits::SequenceType PortSequenceType;
386 
387  //
388  // Data type contained in sequence container
389  //
390  typedef typename Traits::TransportType TransportType;
391 
392  //
393  // Data type of items passed into the pushPacket method
394  //
395  typedef typename Traits::NativeType NativeType;
396 
397  //
398  // Data type of the container for passing data into the pushPacket method
399  //
400  typedef std::vector< NativeType > NativeSequenceType;
401 
402  //
403  // Sequence of data returned from an input port and can be passed to the output port
404  //
405  typedef typename Traits::DataBufferType DataBufferType;
406 
407  //
408  // ConnectionList Definition
409  //
411 
412  //
413  // Mapping of Stream IDs to SRI Map/Refresh objects
414  //
415  typedef std::map< std::string, SriMapStruct > OutPortSriMap;
416 
417  //
418  // OutputStream class
419  //
420  typedef OutputStream<PortTraits> StreamType;
421 
422  //
423  // OutPort Creates a uses port object for publishing data to the framework
424  //
425  // @param port_name name assigned to the port located in scd.xml file
426  // @param connectionCB callback that will be called when the connectPort method is called
427  // @pararm disconnectDB callback that receives notification when a disconnectPort happens
428  //
429  OutPort(std::string port_name,
430  ConnectionEventListener *connectCB=NULL,
431  ConnectionEventListener *disconnectCB=NULL );
432 
433  OutPort(std::string port_name,
435  ConnectionEventListener *connectCB=NULL,
436  ConnectionEventListener *disconnectCB=NULL );
437 
438  //
439  // virtual destructor to clean up resources
440  //
441  virtual ~OutPort();
442 
443  /*
444  * pushPacket
445  * maps to data<Type> BULKIO method call for passing vectors of data
446  *
447  * data: sequence structure containing the payload to send out
448  * T: constant of type BULKIO::PrecisionUTCTime containing the timestamp for the outgoing data.
449  * tcmode: timecode mode
450  * tcstatus: timecode status
451  * toff: fractional sample offset
452  * twsec: J1970 GMT
453  * tfsec: fractional seconds: 0.0 to 1.0
454  * EOS: end-of-stream flag
455  * streamID: stream identifier
456  */
457  void pushPacket( NativeSequenceType & data, const BULKIO::PrecisionUTCTime& T, bool EOS, const std::string& streamID);
458 
459  /*
460  * pushPacket
461  * maps to data<Type> BULKIO method call for passing a limited amount of data from a source vector
462  *
463  * data: pointer to a buffer of data
464  * size: number of data points in the buffer
465  * T: constant of type BULKIO::PrecisionUTCTime containing the timestamp for the outgoing data.
466  * tcmode: timecode mode
467  * tcstatus: timecode status
468  * toff: fractional sample offset
469  * twsec: J1970 GMT
470  * tfsec: fractional seconds: 0.0 to 1.0
471  * EOS: end-of-stream flag
472  * streamID: stream identifier
473  */
474  void pushPacket( const TransportType* data, size_t size, const BULKIO::PrecisionUTCTime& T, bool EOS, const std::string& streamID);
475 
476  /*
477  * pushPacket
478  * maps to data<Type> BULKIO method call for passing an entire vector of data
479  *
480  * data: The sequence structure from an input port containing the payload to send out
481  * T: constant of type BULKIO::PrecisionUTCTime containing the timestamp for the outgoing data.
482  * tcmode: timecode mode
483  * tcstatus: timecode status
484  * toff: fractional sample offset
485  * twsec: J1970 GMT
486  * tfsec: fractional seconds: 0.0 to 1.0
487  * EOS: end-of-stream flag
488  * streamID: stream identifier
489  */
490  void pushPacket( const DataBufferType & data, const BULKIO::PrecisionUTCTime& T, bool EOS, const std::string& streamID);
491 
492  // Create a new stream based on a stream ID
493  StreamType createStream(const std::string& streamID);
494  // Create a new stream based on an SRI instance
495  StreamType createStream(const BULKIO::StreamSRI& sri);
497 
498  protected:
500 
502  const TransportType* buffer,
503  size_t size,
504  const BULKIO::PrecisionUTCTime& T,
505  bool EOS,
506  const std::string& streamID);
507  };
508 
509  //
510  // Character Specialization..
511  //
512  // This class overrides the pushPacket method to support Int8 and char data types
513  //
514  // Output port for Int8 and char data types
515  class OutCharPort : public OutPort < CharPortTraits > {
516  public:
517  OutCharPort(std::string port_name,
518  ConnectionEventListener *connectCB=NULL,
519  ConnectionEventListener *disconnectCB=NULL );
520 
521  OutCharPort(std::string port_name,
523  ConnectionEventListener *connectCB=NULL,
524  ConnectionEventListener *disconnectCB=NULL );
525 
526 
527  virtual ~OutCharPort() {};
528 
529  // Push a vector of Int8 data
530  void pushPacket(const std::vector< Int8 >& data, const BULKIO::PrecisionUTCTime& T, bool EOS, const std::string& streamID);
531 
532  // Push a vector of Char data
533  void pushPacket(const std::vector< Char >& data, const BULKIO::PrecisionUTCTime& T, bool EOS, const std::string& streamID);
534 
535  // Push a subset of a vector of Int8 data
536  void pushPacket(const Int8* buffer, size_t size, const BULKIO::PrecisionUTCTime& T, bool EOS, const std::string& streamID);
537 
538  // Push a subset of a vector of Char data
539  void pushPacket(const Char* buffer, size_t size, const BULKIO::PrecisionUTCTime& T, bool EOS, const std::string& streamID);
540 
541  };
542 
543 
544  //
545  // OutFilePort
546  //
547  // This class defines the pushPacket interface for file URL data.
548  //
549  //
550  class OutFilePort : public OutPortBase < FilePortTraits > {
551 
552  public:
553 
554  typedef FilePortTraits Traits;
555 
556  //
557  // Port Variable Definition
558  //
559  typedef Traits::PortVarType PortVarType;
560 
561  //
562  // BULKIO Interface Type
563  //
564  typedef Traits::PortType PortType;
565 
566  //
567  // Sequence container used during actual pushPacket call
568  //
569  typedef Traits::SequenceType PortSequenceType;
570 
571  //
572  // Data type contained in sequence container
573  //
574  typedef Traits::TransportType TransportType;
575 
576  //
577  // Data type of the container for passing data into the pushPacket method
578  //
579  typedef char* NativeSequenceType;
580 
581  //
582  // Data type of items passed into the pushPacket method
583  //
584  typedef Traits::NativeType NativeType;
585 
586 
587  OutFilePort( std::string pname,
588  ConnectionEventListener *connectCB=NULL,
589  ConnectionEventListener *disconnectCB=NULL );
590 
591 
592  OutFilePort( std::string port_name,
594  ConnectionEventListener *connectCB=NULL,
595  ConnectionEventListener *disconnectCB=NULL );
596 
597 
598 
599  virtual ~OutFilePort() {};
600 
601  /*
602  * pushPacket
603  * maps to dataFile BULKIO method call for passing the URL of a file
604  *
605  * data: char string containing the file URL to send out
606  * T: constant of type BULKIO::PrecisionUTCTime containing the timestamp for the outgoing data.
607  * tcmode: timecode mode
608  * tcstatus: timecode status
609  * toff: fractional sample offset
610  * twsec: J1970 GMT
611  * tfsec: fractional seconds: 0.0 to 1.0
612  * EOS: end-of-stream flag
613  * streamID: stream identifier
614  */
615  void pushPacket(const char *URL, const BULKIO::PrecisionUTCTime& T, bool EOS, const std::string& streamID);
616  /*
617  * pushPacket
618  * maps to dataFile BULKIO method call for passing the URL of a file
619  *
620  * data: string containing the file URL to send out
621  * T: constant of type BULKIO::PrecisionUTCTime containing the timestamp for the outgoing data.
622  * tcmode: timecode mode
623  * tcstatus: timecode status
624  * toff: fractional sample offset
625  * twsec: J1970 GMT
626  * tfsec: fractional seconds: 0.0 to 1.0
627  * EOS: end-of-stream flag
628  * streamID: stream identifier
629  */
630  void pushPacket(const std::string& URL, const BULKIO::PrecisionUTCTime& T, bool EOS, const std::string& streamID);
631 
632  /*
633  * DEPRECATED: maps to dataXML BULKIO method call for passing strings of data
634  */
635  void pushPacket(const char *data, bool EOS, const std::string& streamID);
636 
637  };
638 
639 
640  //
641  // OutXMLPort
642  //
643  // This class defines the pushPacket interface for XML data.
644  //
645  //
646  class OutXMLPort : public OutPortBase < XMLPortTraits > {
647 
648  public:
649 
650  typedef XMLPortTraits Traits;
651 
653 
654  //
655  // Port Variable Definition
656  //
657  typedef Traits::PortVarType PortVarType;
658 
659  //
660  // BULKIO Interface Type
661  //
662  typedef Traits::PortType PortType;
663 
664  //
665  // Sequence container used during actual pushPacket call
666  //
667  typedef Traits::SequenceType PortSequenceType;
668 
669  //
670  // Data type contained in sequence container
671  //
672  typedef Traits::TransportType TransportType;
673 
674  //
675  // Data type of the container for passing data into the pushPacket method
676  //
677  typedef char* NativeSequenceType;
678 
679  //
680  // Data type of items passed into the pushPacket method
681  //
682  typedef Traits::NativeType NativeType;
683 
684 
685  OutXMLPort( std::string pname,
686  ConnectionEventListener *connectCB=NULL,
687  ConnectionEventListener *disconnectCB=NULL );
688 
689 
690  OutXMLPort( std::string port_name,
692  ConnectionEventListener *connectCB=NULL,
693  ConnectionEventListener *disconnectCB=NULL );
694 
695 
696 
697  virtual ~OutXMLPort() {};
698 
699  /*
700  * DEPRECATED: maps to dataFile BULKIO method call for passing strings of data
701  */
702  void pushPacket(const char *data, const BULKIO::PrecisionUTCTime& T, bool EOS, const std::string& streamID);
703 
704  /*
705  * pushPacket
706  * maps to dataXML BULKIO method call for passing an XML-formatted string
707  *
708  * data: character string containing the XML data to send out
709  * EOS: end-of-stream flag
710  * streamID: stream identifier
711  */
712  void pushPacket(const char *data, bool EOS, const std::string& streamID);
713 
714  /*
715  * pushPacket
716  * maps to dataXML BULKIO method call for passing an XML-formatted string
717  *
718  * data: string containing the XML data to send out
719  * EOS: end-of-stream flag
720  * streamID: stream identifier
721  */
722  void pushPacket(const std::string& data, bool EOS, const std::string& streamID);
723 
724  };
725 
726 
727  /*
728  Uses Port Definitions for All Bulk IO port definitions
729  *
730  */
731  // Bulkio octet (UInt8) output
733  // Bulkio UInt8 output
734  typedef OutOctetPort OutUInt8Port;
735  // Bulkio short output
737  // Bulkio unsigned short output
739  // Bulkio Int16 output
740  typedef OutShortPort OutInt16Port;
741  // Bulkio UInt16 output
742  typedef OutUShortPort OutUInt16Port;
743  // Bulkio long output
745  // Bulkio unsigned long output
747  // Bulkio Int32 output
748  typedef OutLongPort OutInt32Port;
749  // Bulkio UInt32 output
750  typedef OutULongPort OutUInt32Port;
751  // Bulkio long long output
753  // Bulkio unsigned long long output
755  // Bulkio Int64 output
756  typedef OutLongLongPort OutInt64Port;
757  // Bulkio UInt64 output
758  typedef OutULongLongPort OutUInt64Port;
759  // Bulkio float output
761  // Bulkio double output
763  // Bulkio URL output
765 
766 } // end of bulkio namespace
767 
768 inline bool operator>>= (const CORBA::Any& a, bulkio::connection_descriptor_struct& s) {
769  CF::Properties* temp;
770  if (!(a >>= temp)) return false;
771  CF::Properties& props = *temp;
772  for (unsigned int idx = 0; idx < props.length(); idx++) {
773  if (!strcmp("connectionTable::connection_id", props[idx].id)) {
774  if (!(props[idx].value >>= s.connection_id)) return false;
775  } else if (!strcmp("connectionTable::stream_id", props[idx].id)) {
776  if (!(props[idx].value >>= s.stream_id)) return false;
777  } else if (!strcmp("connectionTable::port_name", props[idx].id)) {
778  if (!(props[idx].value >>= s.port_name)) return false;
779  }
780  }
781  return true;
782 };
783 
784 inline void operator<<= (CORBA::Any& a, const bulkio::connection_descriptor_struct& s) {
785  CF::Properties props;
786  props.length(3);
787  props[0].id = CORBA::string_dup("connectionTable::connection_id");
788  props[0].value <<= s.connection_id;
789  props[1].id = CORBA::string_dup("connectionTable::stream_id");
790  props[1].value <<= s.stream_id;
791  props[2].id = CORBA::string_dup("connectionTable::port_name");
792  props[2].value <<= s.port_name;
793  a <<= props;
794 };
795 
797  if (s1.connection_id!=s2.connection_id)
798  return false;
799  if (s1.stream_id!=s2.stream_id)
800  return false;
801  if (s1.port_name!=s2.port_name)
802  return false;
803  return true;
804 };
805 
807  return !(s1==s2);
808 };
809 
810 #endif
Traits::PortVarType PortVarType
Definition: bulkio_out_port.h:657
std::vector< connection_descriptor_struct > filterTable
Definition: bulkio_out_port.h:306
Traits::DataBufferType DataBufferType
Definition: bulkio_out_port.h:405
OutPortBase(std::string port_name, ConnectionEventListener *connectCB=NULL, ConnectionEventListener *disconnectCB=NULL)
Traits::PushType PushArgumentType
Definition: bulkio_out_port.h:84
void _pushSRI(typename ConnectionsList::iterator connPair, SriMapStruct &sri_ctx)
void setNewConnectListener(T &target, void(T::*func)(const char *connectionId))
Definition: bulkio_out_port.h:159
void _pushOversizedPacket(const TransportType *buffer, size_t size, const BULKIO::PrecisionUTCTime &T, bool EOS, const std::string &streamID)
void setLogger(LOGGER_PTR newLogger)
boost::mutex updatingPortsLock
Definition: Port_impl.h:360
Traits::PortType PortType
Definition: bulkio_out_port.h:662
PortTraits Traits
Definition: bulkio_out_port.h:58
OutXMLPort(std::string pname, ConnectionEventListener *connectCB=NULL, ConnectionEventListener *disconnectCB=NULL)
Definition: bulkio_out_port.h:39
std::vector< NativeType > NativeSequenceType
Definition: bulkio_out_port.h:400
Definition: bulkio_out_port.h:50
virtual ~OutXMLPort()
Definition: bulkio_out_port.h:697
Definition: bulkio_out_port.h:646
Traits::SequenceType PortSequenceType
Definition: bulkio_out_port.h:569
OutPort< ULongPortTraits > OutULongPort
Definition: bulkio_out_port.h:746
OutOctetPort OutUInt8Port
Definition: bulkio_out_port.h:734
virtual ExtendedCF::UsesConnectionSequence * connections()
std::string connection_id
Definition: bulkio_base.h:102
void setNewConnectListener(T *target, void(T::*func)(const char *connectionId))
Definition: bulkio_out_port.h:165
virtual void disconnectPort(const char *connectionId)
void pushPacket(const char *data, const BULKIO::PrecisionUTCTime &T, bool EOS, const std::string &streamID)
Traits::SequenceType PortSequenceType
Definition: bulkio_out_port.h:385
Traits::PortVarType PortVarType
Definition: bulkio_out_port.h:63
OutPort< ShortPortTraits > OutShortPort
Definition: bulkio_out_port.h:736
virtual ConnectionsList __attribute__((deprecated)) _getConnections()
Definition: bulkio_out_port.h:257
OutPort< UShortPortTraits > OutUShortPort
Definition: bulkio_out_port.h:738
OutCharPort(std::string port_name, ConnectionEventListener *connectCB=NULL, ConnectionEventListener *disconnectCB=NULL)
Traits::NativeType NativeType
Definition: bulkio_out_port.h:682
void setNewDisconnectListener(T &target, void(T::*func)(const char *connectionId))
Definition: bulkio_out_port.h:171
std::map< std::string, std::pair< BULKIO::StreamSRI, bool > > SriMap
Definition: bulkio_base.h:72
_StatsMap stats
Definition: bulkio_out_port.h:297
virtual void pushSRI(const BULKIO::StreamSRI &H)
bool operator!=(const bulkio::connection_descriptor_struct &s1, const bulkio::connection_descriptor_struct &s2)
Definition: bulkio_out_port.h:806
Traits::PortType PortType
Definition: bulkio_out_port.h:380
std::map< std::string, SriMapStruct > OutPortSriMap
Definition: bulkio_out_port.h:415
bool operator==(const bulkio::connection_descriptor_struct &s1, const bulkio::connection_descriptor_struct &s2)
Definition: bulkio_out_port.h:796
Traits::SequenceType PortSequenceType
Definition: bulkio_out_port.h:78
void operator<<=(CORBA::Any &a, const bulkio::connection_descriptor_struct &s)
Definition: bulkio_out_port.h:784
OutUShortPort OutUInt16Port
Definition: bulkio_out_port.h:742
OutPort< ULongLongPortTraits > OutULongLongPort
Definition: bulkio_out_port.h:754
Definition: bulkio_out_port.h:515
OutFilePort OutURLPort
Definition: bulkio_out_port.h:764
virtual BULKIO::PortUsageType state()
Traits::PortVarType PortVarType
Definition: bulkio_out_port.h:375
virtual ~OutPortBase()
virtual void connectPort(CORBA::Object_ptr connection, const char *connectionId)
Traits::TransportType TransportType
Definition: bulkio_out_port.h:390
char * NativeSequenceType
Definition: bulkio_out_port.h:579
Traits::PortVarType PortVarType
Definition: bulkio_out_port.h:559
Definition: bulkio_base.h:95
FilePortTraits Traits
Definition: bulkio_out_port.h:554
StreamType createStream(const std::string &streamID)
virtual ConnectionsList getConnections()
LOGGER_PTR logger
Definition: bulkio_out_port.h:305
bool _isStreamRoutedToConnection(const std::string &connectionID, const std::string &streamID)
PortTraits Traits
Definition: bulkio_out_port.h:370
size_t _dataLength(PushArgumentType data)
void _pushPacketLocked(PushArgumentType data, const BULKIO::PrecisionUTCTime &T, bool EOS, const std::string &streamID)
OutputStream< PortTraits > StreamType
Definition: bulkio_out_port.h:420
XMLPortTraits Traits
Definition: bulkio_out_port.h:650
OutLongLongPort OutInt64Port
Definition: bulkio_out_port.h:756
std::string port_name
Definition: bulkio_base.h:106
virtual bulkio::SriList getActiveSRIs()
OutULongLongPort OutUInt64Port
Definition: bulkio_out_port.h:758
std::string getRepid() const
char Char
Definition: bulkio_base.h:144
virtual ~OutPort()
Traits::TransportType TransportType
Definition: bulkio_out_port.h:574
Traits::PortType PortType
Definition: bulkio_out_port.h:564
std::vector< std::pair< T, std::string > > List
Definition: bulkio_base.h:50
bool recConnectionsRefresh
Definition: bulkio_out_port.h:292
ExtendedCF::UsesConnectionSequence recConnections
Definition: bulkio_out_port.h:287
OutFilePort(std::string pname, ConnectionEventListener *connectCB=NULL, ConnectionEventListener *disconnectCB=NULL)
LOGGER LOGGER_PTR
Definition: bulkio_base.h:138
boost::mutex::scoped_lock SCOPED_LOCK
Definition: bulkio_base.h:132
OutLongPort OutInt32Port
Definition: bulkio_out_port.h:748
OutPort< DoublePortTraits > OutDoublePort
Definition: bulkio_out_port.h:762
Traits::NativeType NativeType
Definition: bulkio_out_port.h:395
virtual void enableStats(bool enable)
void pushPacket(const std::vector< Int8 > &data, const BULKIO::PrecisionUTCTime &T, bool EOS, const std::string &streamID)
OutPort< LongLongPortTraits > OutLongLongPort
Definition: bulkio_out_port.h:752
void pushPacket(NativeSequenceType &data, const BULKIO::PrecisionUTCTime &T, bool EOS, const std::string &streamID)
std::string stream_id
Definition: bulkio_base.h:105
bool operator>>=(const CORBA::Any &a, bulkio::connection_descriptor_struct &s)
Definition: bulkio_out_port.h:768
boost::shared_ptr< ConnectionEventListener > _disconnectCB
Definition: bulkio_out_port.h:308
PortType::_ptr_type PortPtrType
Definition: bulkio_out_port.h:73
void _pushPacketToPort(PortPtrType port, PushArgumentType data, const BULKIO::PrecisionUTCTime &T, bool EOS, const char *streamID)
OutPortBase< XMLPortTraits > Base
Definition: bulkio_out_port.h:652
void _pushSinglePacket(PushArgumentType data, const BULKIO::PrecisionUTCTime &T, bool EOS, const std::string &streamID)
Traits::NativeType NativeType
Definition: bulkio_out_port.h:89
Definition: bulkio_out_port.h:367
std::map< std::string, SriMapStruct > OutPortSriMap
Definition: bulkio_out_port.h:99
virtual BULKIO::UsesPortStatisticsSequence * statistics()
ConnectionsList outConnections
Definition: bulkio_out_port.h:281
OutPort< OctetPortTraits > OutOctetPort
Definition: bulkio_out_port.h:732
char * NativeSequenceType
Definition: bulkio_out_port.h:677
Traits::SequenceType PortSequenceType
Definition: bulkio_out_port.h:667
OutShortPort OutInt16Port
Definition: bulkio_out_port.h:740
int8_t Int8
Definition: bulkio_base.h:145
Definition: bulkio_base.h:79
void pushPacket(const char *URL, const BULKIO::PrecisionUTCTime &T, bool EOS, const std::string &streamID)
OutULongPort OutUInt32Port
Definition: bulkio_out_port.h:750
Definition: bulkio_out_port.h:550
virtual ~OutCharPort()
Definition: bulkio_out_port.h:527
void setNewDisconnectListener(T *target, void(T::*func)(const char *connectionId))
Definition: bulkio_out_port.h:177
OutPort< FloatPortTraits > OutFloatPort
Definition: bulkio_out_port.h:760
Traits::PortType PortType
Definition: bulkio_out_port.h:68
OutPort< LongPortTraits > OutLongPort
Definition: bulkio_out_port.h:744
OutPort(std::string port_name, ConnectionEventListener *connectCB=NULL, ConnectionEventListener *disconnectCB=NULL)
Traits::TransportType TransportType
Definition: bulkio_out_port.h:672
bulkio::Connections< PortVarType >::List ConnectionsList
Definition: bulkio_out_port.h:410
bulkio::Connections< PortVarType >::List ConnectionsList
Definition: bulkio_out_port.h:94
virtual bulkio::SriMap getCurrentSRI()
Definition: Port_impl.h:320
std::map< std::string, linkStatistics > _StatsMap
Definition: bulkio_out_port.h:269
std::vector< BULKIO::StreamSRI > SriList
Definition: bulkio_base.h:74
boost::shared_ptr< ConnectionEventListener > _connectCB
Definition: bulkio_out_port.h:307
void _sendEOS(PortPtrType port, const std::string &streamID)
Traits::NativeType NativeType
Definition: bulkio_out_port.h:584
void updateConnectionFilter(const std::vector< connection_descriptor_struct > &_filterTable)
Definition: bulkio_out_port.h:152
virtual ~OutFilePort()
Definition: bulkio_out_port.h:599