20 #ifndef __bulkio_in_port_h 21 #define __bulkio_in_port_h 26 #include <boost/thread/condition_variable.hpp> 27 #include <boost/thread/locks.hpp> 28 #include <boost/make_shared.hpp> 29 #include <boost/ref.hpp> 31 #include <ossie/callback.h> 32 #include <ossie/signalling.h> 35 #include "bulkio_traits.h" 36 #include "bulkio_in_stream.h" 37 #include "bulkio_callbacks.h" 48 template <
typename PortTraits >
95 virtual DataTransferType *
getPacket(
float timeout);
107 virtual DataTransferType *
getPacket(
float timeout,
const std::string &streamID);
119 virtual void pushSRI(
const BULKIO::StreamSRI& H);
138 virtual BULKIO::PortUsageType
state();
163 virtual BULKIO::StreamSRISequence*
activeSRIs();
188 virtual void block();
216 template<
typename T >
inline 218 newStreamCallback = boost::make_shared< MemberSriListener< T > >( boost::ref(target), func );
224 template<
typename T >
inline 226 newStreamCallback = boost::make_shared< MemberSriListener< T > >( boost::ref(*target), func );
250 SriListener *newStreamCB = NULL );
327 void queuePacket(PushArgumentType data,
const BULKIO::PrecisionUTCTime& T, CORBA::Boolean EOS,
const char* streamID);
329 virtual void createStream(
const std::string& streamID,
const BULKIO::StreamSRI& sri);
335 DataTransferType*
fetchPacket(
const std::string& streamID);
338 friend class InputStream<PortTraits>;
349 template <
typename PortTraits >
401 InPort(std::string port_name,
404 SriListener *newStreamCB = NULL );
406 InPort(std::string port_name,
408 SriListener *newStreamCB = NULL );
410 InPort(std::string port_name,
void *);
420 virtual void pushPacket(
const PortSequenceType& data,
const BULKIO::PrecisionUTCTime& T, CORBA::Boolean EOS,
const char* streamID);
428 StreamList
pollStreams(StreamList& pollset,
float timeout);
430 StreamList
pollStreams(
size_t samples,
float timeout);
431 StreamList
pollStreams(StreamList& pollset,
size_t samples,
float timeout);
433 template <
class Target,
class Func>
438 template <
class Target,
class Func>
464 virtual void createStream(
const std::string& streamID,
const BULKIO::StreamSRI& sri);
485 template <
typename PortTraits >
522 typedef DataTransfer< typename Traits::DataTransferTraits >
dataTransfer;
540 SriListener *newStreamCB = NULL );
544 SriListener *newStreamCB = NULL );
556 virtual void pushPacket(
const char *data,
const BULKIO::PrecisionUTCTime& T, CORBA::Boolean EOS,
const char* streamID);
566 virtual void pushPacket(
const char *data, CORBA::Boolean EOS,
const char* streamID);
PortTraits Traits
Definition: bulkio_in_port.h:491
InLongPort InInt32Port
Definition: bulkio_in_port.h:595
InPort(std::string port_name, LOGGER_PTR logger, bulkio::sri::Compare sriCmp=bulkio::sri::DefaultComparator, SriListener *newStreamCB=NULL)
InStringPort< XMLPortTraits > InXMLPort
Definition: bulkio_in_port.h:615
StreamMap streams
Definition: bulkio_in_port.h:457
Traits::PortType ProvidesPortType
Definition: bulkio_in_port.h:362
virtual int getCurrentQueueDepth()
redhawk::signal< std::string > packetWaiters
Definition: bulkio_in_port.h:321
Traits::PushType PushArgumentType
Definition: bulkio_in_port.h:68
DataTransfer< typename Traits::DataTransferTraits > DataTransferType
Definition: bulkio_in_port.h:75
ossie::notification< void(StreamType)> streamAdded
Definition: bulkio_in_port.h:451
Traits::NativeType NativeType
Definition: bulkio_in_port.h:513
Definition: bulkio_in_port.h:486
LOGGER_PTR logger
Definition: bulkio_in_port.h:316
InOctetPort InUInt8Port
Definition: bulkio_in_port.h:581
DataTransferType * fetchPacket(const std::string &streamID)
Definition: bulkio_out_port.h:39
DataTransfer< typename Traits::DataTransferTraits > DataTransferType
Definition: bulkio_in_port.h:380
virtual BULKIO::PortStatistics * statistics()
Traits::TransportType TransportType
Definition: bulkio_in_port.h:370
std::deque< DataTransferType * > WorkQueue
Definition: bulkio_in_port.h:78
virtual void setMaxQueueDepth(int newDepth)
InStringPort< FilePortTraits > InFilePort
Definition: bulkio_in_port.h:613
Definition: Port_impl.h:364
DataTransfer< typename Traits::DataTransferTraits > dataTransfer
Definition: bulkio_in_port.h:522
void setNewStreamListener(T *target, void(T::*func)(BULKIO::StreamSRI &))
Definition: bulkio_in_port.h:225
virtual void pushPacket(const char *data, const BULKIO::PrecisionUTCTime &T, CORBA::Boolean EOS, const char *streamID)
Traits::PortType PortType
Definition: bulkio_in_port.h:70
bool(* Compare)(const BULKIO::StreamSRI &a, const BULKIO::StreamSRI &b)
Definition: bulkio_base.h:357
PortTraits Traits
Definition: bulkio_in_port.h:54
InPort< ULongLongPortTraits > InULongLongPort
Definition: bulkio_in_port.h:601
InPort< LongPortTraits > InLongPort
Definition: bulkio_in_port.h:591
void addStreamListener(Target target, Func func)
Definition: bulkio_in_port.h:434
std::map< std::string, std::pair< BULKIO::StreamSRI, bool > > SriMap
Definition: bulkio_base.h:72
size_t samplesAvailable(const std::string &streamID, bool firstPacket)
DataTransferType dataTransfer
Definition: bulkio_in_port.h:383
InShortPort InInt16Port
Definition: bulkio_in_port.h:587
InULongPort InUInt32Port
Definition: bulkio_in_port.h:597
InLongLongPort InInt64Port
Definition: bulkio_in_port.h:603
virtual void pushPacket(const PortSequenceType &data, const BULKIO::PrecisionUTCTime &T, CORBA::Boolean EOS, const char *streamID)
InPort< FloatPortTraits > InFloatPort
Definition: bulkio_in_port.h:607
Traits::PortType PortType
Definition: bulkio_in_port.h:497
uint32_t lastQueueSize
Definition: bulkio_in_port.h:260
InUShortPort InUInt16Port
Definition: bulkio_in_port.h:589
Definition: bulkio_base.h:161
virtual void enableStats(bool enable)
MUTEX sriUpdateLock
Definition: bulkio_in_port.h:285
InPort< LongLongPortTraits > InLongLongPort
Definition: bulkio_in_port.h:599
InPort< OctetPortTraits > InOctetPort
Definition: bulkio_in_port.h:577
Traits::PortType ProvidesPortType
Definition: bulkio_in_port.h:500
InPort< DoublePortTraits > InDoublePort
Definition: bulkio_in_port.h:609
std::deque< DataTransferType * > WorkQueue
Definition: bulkio_in_port.h:526
Definition: bulkio_base.h:205
Definition: bulkio_in_port.h:350
boost::shared_ptr< SriListener > newStreamCallback
Definition: bulkio_in_port.h:270
virtual void pushSRI(const BULKIO::StreamSRI &H)
std::multimap< std::string, StreamType > pendingStreams
Definition: bulkio_in_port.h:462
boost::mutex streamsMutex
Definition: bulkio_in_port.h:458
linkStatistics * stats
Definition: bulkio_in_port.h:314
Traits::NativeType NativeType
Definition: bulkio_in_port.h:375
InULongLongPort InUInt64Port
Definition: bulkio_in_port.h:605
std::string getRepid() const
Traits::POAPortType PortVarType
Definition: bulkio_in_port.h:494
DataTransfer< typename Traits::DataTransferTraits > DataTransferType
Definition: bulkio_in_port.h:518
void packetReceived(const std::string &streamID)
Definition: bulkio_in_port.h:49
SriMap currentHs
Definition: bulkio_in_port.h:275
Traits::TransportType TransportType
Definition: bulkio_in_port.h:508
Traits::PortType PortType
Definition: bulkio_in_port.h:359
Traits::SequenceType PortSequenceType
Definition: bulkio_in_port.h:365
void setNewStreamListener(T &target, void(T::*func)(BULKIO::StreamSRI &))
Definition: bulkio_in_port.h:217
LOGGER LOGGER_PTR
Definition: bulkio_base.h:138
queueSemaphore * queueSem
Definition: bulkio_in_port.h:309
Traits::POAPortType PortVarType
Definition: bulkio_in_port.h:356
bulkio::sri::Compare sri_cmp
Definition: bulkio_in_port.h:265
bool breakBlock
Definition: bulkio_in_port.h:299
bool blocking
Definition: bulkio_in_port.h:304
virtual bool isStreamEnabled(const std::string &streamID)
std::map< std::string, StreamType > StreamMap
Definition: bulkio_in_port.h:456
InPort< ShortPortTraits > InShortPort
Definition: bulkio_in_port.h:583
std::deque< DataTransferType * > WorkQueue
Definition: bulkio_in_port.h:386
virtual void removeStream(const std::string &streamID)
virtual BULKIO::PortUsageType state()
char * PortSequenceType
Definition: bulkio_in_port.h:503
InStringPort(std::string port_name, LOGGER_PTR logger, bulkio::sri::Compare=bulkio::sri::DefaultComparator, SriListener *newStreamCB=NULL)
virtual BULKIO::StreamSRISequence * activeSRIs()
void queuePacket(PushArgumentType data, const BULKIO::PrecisionUTCTime &T, CORBA::Boolean EOS, const char *streamID)
virtual DataTransferType * getPacket(float timeout)
void removeStreamListener(Target target, Func func)
Definition: bulkio_in_port.h:439
friend class InputStream< PortTraits >
Definition: bulkio_in_port.h:338
virtual void createStream(const std::string &streamID, const BULKIO::StreamSRI &sri)
virtual void removeStream(const std::string &streamID)
InPortBase< PortTraits > super
Definition: bulkio_in_port.h:444
Traits::TransportType TransportType
Definition: bulkio_in_port.h:62
MUTEX dataAvailableMutex
Definition: bulkio_in_port.h:292
bool DefaultComparator(const BULKIO::StreamSRI &a, const BULKIO::StreamSRI &b)
std::list< StreamType > StreamList
Definition: bulkio_in_port.h:392
CONDITION dataAvailable
Definition: bulkio_in_port.h:294
boost::condition_variable CONDITION
Definition: bulkio_base.h:122
boost::mutex MUTEX
Definition: bulkio_base.h:117
InPortBase(std::string port_name, LOGGER_PTR logger, bulkio::sri::Compare sriCmp=bulkio::sri::DefaultComparator, SriListener *newStreamCB=NULL)
void setLogger(LOGGER_PTR logger)
virtual void createStream(const std::string &streamID, const BULKIO::StreamSRI &sri)
PortTraits Traits
Definition: bulkio_in_port.h:353
MUTEX dataBufferLock
Definition: bulkio_in_port.h:280
virtual bool isStreamEnabled(const std::string &streamID)
virtual bool isStreamActive(const std::string &streamID)
InCharPort InInt8Port
Definition: bulkio_in_port.h:579
virtual bool isStreamActive(const std::string &streamID)
InputStream< PortTraits > StreamType
Definition: bulkio_in_port.h:389
StreamList pollStreams(float timeout)
Traits::SequenceType PortSequenceType
Definition: bulkio_in_port.h:57
WorkQueue workQueue
Definition: bulkio_in_port.h:255
InPort< UShortPortTraits > InUShortPort
Definition: bulkio_in_port.h:585
virtual int getMaxQueueDepth()
int _getElementLength(PushArgumentType data)
InPort< CharPortTraits > InCharPort
Definition: bulkio_in_port.h:575
StreamList getReadyStreams(size_t samples)
InPort< ULongPortTraits > InULongPort
Definition: bulkio_in_port.h:593
InStringPort< URLPortTraits > InURLPort
Definition: bulkio_in_port.h:611