Chapter 5
Connections
5.1 Introduction
When discussing connections, there are several terms that are thrown around in REDHAWK: uses, provides, Port, interfaces, IDL, among others. This section demystifies connections and presents the key concepts that enable a REDHAWK-based system to easily interact with other REDHAWK systems and external tools developed outside the scope of REDHAWK.
5.2 The Connection Process
All connections take a client-server pattern. All calls are made from the client to the server. It is the role of the server to provide a set of functions that can be called by the client. It is the role of the client to understand what interfaces the server provides and to invoke (use) them. This is the basis for the uses/provides nomenclature for Ports.
All uses Ports implement the CF::Port interface. CF::Port is an interface that is part of the REDHAWK Core Framework; it contains only two methods: connectPort() and disconnectPort(). To connect a uses Port to a provides Port, an external entity needs to invoke the connectPort() function on the uses Port, where the arguments are a CORBA pointer to the provides Port and a string that identifies that connection. To sever a connection, an external entity needs to invoke the disconnectPort() function on the uses Port, where its only argument is the string ID used to establish the connection. In the case of an Application, the connections are established/destroyed by an object in the Domain Manager process space based on the Waveform’s XML file. In the case of the Sandbox, the Sandbox makes the correct calls to establish and destroy a connection based on user input.
All provides Ports must implement an interface described in IDL. This interface implements the methods that the uses Port invokes after a connection has been made. When a uses Port is given a pointer to a provides Port, it essentially casts this generic pointer to the interface that it believes the provides Port to implement. If this casting process fails, an exception is raised during the connectPort() call.
5.3 Why Ports?
It seems burdensome to connect Components through Port objects; this is an additional level of indirection that adds another layer of complexity. This approach is taken largely because it allows modularizations of interfaces when Components have more than one input or output Port.
5.4 Port Access
A Port belongs to a Component or Device (Devices are specialized Components - more on that in Chapter 7). To retrieve a Port, an external entity needs to call getPort() on the Component that owns that Port. The argument to the getPort() function is the string name for the Port, and the return value is a CORBA pointer to that Port object. Both uses and provides Ports are retrieved from Components through this function call. Base supported interfaces are not retrieved through the getPort(), because they are not Ports. Instead, these references are retrieved directly from an entity like the Domain Manager or the Device Manager.
5.5 Dynamic Connections
Unless a Component is in the process of being terminated, it is valid to retrieve a Port reference at any other point in the Component’s life cycle. Anyone may call getPort() on the Component at any time. In the case of a uses Port, anyone may call connectPort() or disconnectPort() at any time. In the case of a provides Port, anyone may cast to that Port reference and start making calls on it. It is the task of the Component developer to make sure that the Component handles changes like this smoothly. The base classes and code generators provided with REDHAWK handle the vast majority of the issues arising from this change, especially when the provide Port implements one of the REDHAWK standard interfaces.
This dynamic connection behavior provides huge benefits to an Application developer. For example, if one wanted to inspect the data being passed from one Component to the next, a temporary provides-side implementation can be created and a new connection established. The standard behavior of a uses Port is to send the same data to all its existing connections. This means of dynamic connecting is essential for REDHAWK’s plotting mechanism.
5.6 Standardized Data Interfaces
Data flow between REDHAWK resources (Components and Devices) is managed through two sets of interfaces: BulkIO and BurstIO. The BulkIO module is designed for streaming data and maximizes the efficiency for bulk data transfers between resources, where as, BurstIO is designed for applications that require small and possibly non-contiguous chunks of data transfers. Both interfaces also allow for the association of metadata, Signal Related Information (Signal Related Information (SRI)), and a Precision Time Stamp, which describe the content being transferred in support of content processing. The following 3 sections detail the capabilites for both BulkIO and BurstIO implmementations and the interfaces they provide.
5.7 BulkIO
BulkIO is designed to provide a standardized methodology and to maximize efficiency for bulk data transfers between REDHAWK resources (Components/Devices). This interface supports the transfer of data vectors (float, double, char (int8), octet (uint8), short (int16), ushort (uint16), long (int32), ulong (uint32), longlong (int64), ulonglong(uint64) ), character strings (char *), and out-of-band connection descriptors for SDDS data streams.
These interfaces also allow for metadata, Signal Related Information (SRI), and a precision time stamp (described in detail in the following subsections), which describe the content being transferred and support content processing. Part of the required methodology for passing data between REDHAWK Components is that all data transfers via pushPacket() are preceeded by at least one call to pushSRI() with an appropriate SRI object. SRI data is passed out-of-band from the content data to reduce the overhead for transferring data between Components. The precision time stamp represents the birth date for data and is part of the pushPacket() method call for those Components that require this information.
The data flow implementation for a Component’s BulkIO Port interface is provided by a shared bulkio base class library. The resulting Component code instantiates a bulkio base class object and makes use of the shared library during deployment and execution.
5.7.1 Data Transfers
Data transfers happen through the pushPacket() method call of a REDHAWK Component’s Port object. This method transfers the data from the uses-side Port to the corresponding connected provides-side Port. The data is marshaled by the middleware (omniORB) and placed on a queue for processing by the receiving Component. The implementations of the pushPacket() methods are maximized for the efficiency of data throughput while providing network-accessible ingest/egress of data and minimizing the complexity of the implementation.
|
Each implementation maintains the required behavior of providing an SRI object before receiving any data transfers. This is accomplished by calling the pushSRI() method of the Port with an SRI object. In most cases, a Component takes the ingest SRI object received from an input Port, makes any required modifications as necessary, and passes this object down stream over its output Port. If a Component does not provide an SRI object before its first pushPacket(), the Port creates a default SRI object with nominal values to pass out the Port.
The following sections explain the different methods for transferring supported data types by a Component.
Caution: For the current implementation of omniORB, the /etc/omniORB.cfg maintains the configurable maximum transfer size defined by the value for giopMaxMsgSize. The default maximum transfer size is set to 2097152 (2 Megabyte (MB)). For every pushPacket(), the data+headers must be less than this value; otherwise, a MARSHAL exception is raised by the middleware. This maximum value can be found during run time by using the omniORB::giopMaxMsgSize() function call or the bulkio::Const::MAX_TRANSFER_BYTES value.
5.7.1.1 Vector Data
A Component usually ingests and egresses data from its Ports in the service function. A Component with a provides-Port (input), grabs data from the Port using the getPacket() method. This method returns a dataTransfer object (described in Table 5.1) from the input Port’s data queue or a null/None value if the queue is empty.
The following code snippet is an example of the getPacket() method.
|
The following code snippet is an example of the pushPacket() method call for vector data with sample parameters. The pushPacket() parameters for vector data are described in Table 5.2.
|
5.7.1.2 String Data/XML Document
The following code snippet is an example of the pushPacket() method call for string data with sample parameters. The pushPacket() parameters for string data are described in Table 5.3.
|
5.7.1.3 URL/File Data
The following code snippet is an example of the pushPacket() method call for file transfers with sample parameters. The pushPacket() parameters for file transfers are described in Table 5.4.
|
Data files may be sent via the BulkIO dataFile type. When using the BulkIO dataFile type, a filename is passed to the pushPacket() method. The location of the file is specified by a Uniform Resource Identifier (URI) that either points to the local file system or the SDR file system. To support portability, use of the SDR file system is recommended. Table 5.5 describes the URI options in greater detail.
|
5.7.1.4 SDDS Stream Definition
The SDDS Stream Definition object defines a multicast connection source for data from a network interface. The methods for the SDDS Stream Definition Interface do not follow the normal BulkIO pushPacket() convention. Instead this interface defines attach() and detach() methods as described in the code snippet below and Table 5.6.
|
|
5.7.2 Signal Related Information (SRI)
SRI is delivered with the data (when in-band) that describes the data payload. Section 5.7.2.1 provides guidance on how to manipulate keyworks in SRI. The SRI data structure fields are described in Table 5.8.
|
SRI is transferred over a connection by the uses-side invoking the provides-side function pushSRI(). The pushSRI() function contains a single argument, an instance of an SRI object.
Each provides-side Port that implements a BulkIO interface expects that SRI regarding data being received become available before any data is transferred. When using the code generators and base classes in the REDHAWK development tools, this behavior is hard-coded into the uses-side BulkIO Ports. If the user code on the uses-side of a BulkIO connection does not explicitly invoke a pushSRI() call before any data is sent out, the auto-generated code creates a trivial SRI message with normalized values.
Part of the hard-coded behavior on the uses-side BulkIO Port is to issue a pushSRI() when a new connection is made to the newly-connected object. For example, a system is created in which data is flowing between Components A and B. As data is flowing between these Components, a new connection is established between Components A and C. When this connection is established, a pushSRI() method call is automatically made from Component A to Component C.
5.7.2.1 SRI Keywords
SRI is metadata to describe the payload being pushed (for example, sampling period). While it is possible to describe some generic parameters, signal specific parameters are be stored in a generalized structure called SRI Keywords. SRI keywords are passed as a sequence of key/value pairs ( CF::DataType) of type CF::Properties. In Properties, the keys are strings, and the values are a CORBA type called CORBA::Any. CORBA::Any is a structure that can be used to marshal a wide variety of types. REDHAWK has developed helper APIs to interact with the keyword sequence.
5.7.2.2 Adding SRI Keywords in C++, Python, and Java
Given a Component with simple Properties chan_rf and col_rf that are of type double and have an initial value of -1, and a BulkIO StreamSRI instance named sri, the following implementations in C++, Python and Java, push out those Property values as the keywords COL_RF and CHAN_RF.
C++ Implementation The redhawk::PropertyMap property map enables you to manipulate the sequence of keywords.
Python Implementation omniORB helpers any.to_any are used to convert a Python type to a CORBA::Any.
Java Implementation The AnyUtils package is used to convert a Java type to a CORBA::Any.
5.7.2.3 Verifying SRI Keywords
It is possible to verify the keywords and values being pushed out by connecting a DataSink() Component in the Python Sandbox. This assumes there is at least one BulkIO output Port for the test Component, and a pushSRI() call is made on that Port. The following code demonstrates this verification:
Retrieving SRI Keywords in C++
Because redhawk::PropertyMap contains CORBA::Any values, retrieving the contents requires the use of getters to convert to a native type. Assuming that the content of a particular keyword is a double:
5.7.3 Stream API
The BulkIO stream API provides a high-level interface to sending and receiving data via BulkIO ports. Each stream is tied to a port, and encapsulates both the SRI and the data associated with it.
Streams are automatically managed by the port that creates them. User code does not own the stream itself; instead, user instances are opaque stream handles. This allows them to be passed around by value or safely stored in other data structures.
All numeric BulkIO port types support the stream API. UDP multicast (SDDS and VITA49) and string-based (string, file and XML) interfaces do not.
Most stream methods are not thread-safe; it is assumed that each stream will be written to or read from by a single thread. However, it is safe to use multiple streams simultaneously.
The BulkIO stream API is C++-only in REDHAWK 2.0.5.
5.7.3.1 Output Streams
Each numeric output port type has a corresponding stream type (e.g., bulkio::OutFloatStream for bulkio::OutFloatPort) that provides the interface for sending stream data. Using streams ensures that data is always associated with an active SRI and simplifies management of stream lifetime.
Create an output stream via the port:
Output streams provide convenience methods for modifying common SRI fields:
The SRI can be updated in its entirety with the sri() method. Updates to the SRI are stored and pushed before the next packet goes out.
It is not necessary to manually call pushSRI() when using streams.
Data is sent with the write() method:
Complex data should be written with a vector of or pointer to std::complex values to ensure that an integral number of samples is sent. When writing scalar data to a complex stream, make sure that the size is a multiple of 2.
Closing an output stream sends and end-of-stream packet and dissociates the stream from the output port.
5.7.3.2 Input Streams
An input stream encapsulates SRI and all received packets associated with that stream ID. Buffering and overlap are built in, removing the need for client code to implement these features. Each numeric input port type has a corresponding stream type (e.g., bulkio::InFloatStream for bulkio::InFloatPort).
Input streams are created automatically by the input port when an SRI is received with a new stream ID. Only one stream per port can exist with a given stream ID; in the event that an input stream has an unacknowledged end-of-stream waiting, a new SRI with the same stream ID will be queued until the end-of-stream has been reached.
Methods that accept or return a number of samples take the input stream’s complex mode into account. For example, requesting 1024 samples from a complex stream returns 1024 complex pairs, which is equivalent to 2048 scalar values.
There are two ways of retrieving an input stream: Section 5.7.3.3 or Section 5.7.3.4.
5.7.3.3 Stream Polling
For the basic case, the getCurrentStream() method returns the next input stream that is ready for reading. Similar to getPacket(), the next packet in the queue is consulted; however, if any stream has buffered data from a prior read (such as when using fixed-sized reads), it is given priority. Developers accustomed to using getPacket() will find that getCurrentStream() provides a familiar flow, while extending the available functionality.
The optional timeout argument is identical to the timeout argument for getPacket. If the timeout is omitted, getCurrentStream() defaults to blocking mode:
If there are no streams ready, such as when the timeout expires or the Component receives a stop() call, the returned stream will be invalid. The boolean not (!) operator returns true if the stream is invalid; no other operations are safe to call on an invalid stream.
Advanced Polling For more advanced use, the input port’s pollStreams() family of methods allow you to wait for one or more streams to be ready to read. Like getCurrentStream(), pollStreams takes a timeout argument to set the maximum wait time.
The ready streams are returned as a list:
If no streams are ready, the returned list is empty. pollStreams() returns as soon as one stream is ready.
If a minimum number of samples is required, it may be provided in the pollStreams() call:
5.7.3.4 Stream Callback
As opposed to polling, callback functions may be registered with the input port to be notified when a new stream has been created. Using a callback supports more sophisticated patterns, such as handling each stream in a separate thread or disabling unwanted streams.
The callback has no return value and takes a single argument, the input stream type:
Register the callback with the port in the REDHAWK constructor:
5.7.3.5 Data Blocks
In BulkIO input stream-based code, data is retrieved from data streams as blocks. Each input stream data type has a corresponding data block type, such as bulkio::FloatDataBlock. Data blocks can be retrieved on a per-packet basis, or they can be retrieved as a definite-sized buffer, with or without overlap.
Reading Data Blocks The read() family of methods synchronously fetch data from a stream. The basic read() returns the next packet worth of data for the stream, blocking if necessary:
You may request a set amount of data by supplying the number of samples:
The read() call blocks until at least the requested number of samples is available. Packets are combined or split as necessary to return the correct amount of data. The returned block may contain less than the requested number of samples if the stream has ended or the component is stopped.
For algorithms that require data to overlap between iterations, you may also pass the number of samples to consume:
The input stream’s read pointer is advanced up to the consume length. The next call to read() will return data starting at that point.
If the an end-of-stream flag is received, or the component is interrupted, read() may return early. In the overlap case, if end-of-stream is reached before receiving the requested number of samples, all remaining data is consumed and no further reads are possible.
Data can be dropped with skip():
The returned value is the number of samples that were dropped. If the streams ends or the component is stopped, this may be less than the requested value.
Non-Blocking Read The read() family of methods is always blocking. For non-blocking reads, use tryread():
tryread() will only return a valid block of data if the entire request can be satisfied, or if no more data will be received. In the case that the stream has ended or that component has been stopped, all remaining queued data in the stream will be returned.
5.7.3.6 Interacting with Data Blocks
Data blocks contain the data as an array of sample data, as well as the SRI that describes the data. The memory is managed automatically inside the object to minimize copies, so there is no need to explicitly delete data blocks. A variety of functions are contains in data blocks that help the developer manage and interact with the data block’s contents.
Verifying Correctness The boolean not (!) operator returns true if a block is invalid. This occurs when the stream is unable to read the requested data, such as when the component receives a stop() call.
Occasionally, the input stream’s state may change between data blocks. To handle this situation, the data block provides methods to check these conditions:
- inputQueueFlushed()
- sriChanged()
- sriChangeFlags() returns the changed SRI fields as a bit field
- data() returns a pointer to the first element
- size() returns the number of values
Complex Data If the input stream is complex, the returned data buffer should be treated as complex data. Data block objects provide convenience methods to make it easy to work with complex data:
- complex() returns true if the data is complex (i.e., SRI mode is 1)
- cxdata() returns a pointer to the first element as a complex value
- cxsize() returns the number of complex values
Time Stamps Because a single data block may span multiple input packets, it can contain than one time stamp. Data blocks returned from an input stream are guaranteed to have at least one time stamp.
Time stamps are accessed with the getTimestamps() method:
The bulkio::SampleTimestamp class contains three fields:
- time - a BULKIO::PrecisionUTCTime time stamp
- offset - the sample number at which this time stamp applies
- synthetic - is true if the time stamp was calculated based on a prior data block
When the start of a data block does not match up exactly with a packet, the input stream will use the last known time stamp, the SRI xdelta and the number of samples to calculate a time stamp. Only the first time stamp in a data block can be synthetic.
5.7.3.7 Ignoring Streams
Some components may prefer to only handle one stream at a time. Unwanted input streams can be disabled, typically in a new stream callback:
All data for the stream will be discarded until end-of-stream is reached, preventing queue backups due to unhandled data.
5.7.4 Multi-out Ports
A multi-out Port allows a Component to select specific streams to be sent over specific connections out of arbitrarily-selected Ports. To use multi-out Ports, a Component must include the following property:
To steer a particular stream out of a particular connection through a particular Port, an element must be added to the connection table structure that identifies the stream id/connection id/port name set. After this element is added to the structure, any data pushed to a particular Port is filtered by that Port in the appropriate fashion.
A Port does not filter its output until an element in the connection table sequence mentions the Port name. If a Port is listed on the connection table, then data is pushed out only if both the stream ID and connection ID match.
The multi-out capability is supported only for BulkIO and BurstIO output (uses) Ports.
5.7.5 Working with Complex Data
If the StreamSRI mode field of the incoming data is set to 1, the associated input data is complex (i.e., it is comprised of real and imaginary parts). Complex data is sent as alternating real and imaginary values. A developer can work with this data in any fashion; however, this section provides common methods for converting the data into a more workable form.
5.7.5.1 Converting Complex Data in C++
In C++, the incoming BulkIO data vector may be typecast into a std::vector of std::complex values. For example:
5.7.5.2 Converting Complex Data in Python
The helper functions bulkioComplexToPythonComplexList and pythonComplexListToBulkioComplex, defined in the module ossie.utils.bulkio.bulkio_helpers, provide an efficient translation to and from lists of Python complex numbers.
5.7.5.3 Converting Complex Data in Java
Unlike with C++ and Python, Java does not have a ubiquitous means for representing complex numbers; therefore, when using Java, users are free to map the incoming BulkIO data to the complex data representation of their choosing.
5.7.6 Time Stamps
BulkIO uses BULKIO::PrecisionUTCTime time stamps that denote the time since 12:00 AM January 1, 1970 (Unix epoch) in Coordinated Universal Time (UTC). The time stamp contains several elements. In BulkIO, a time stamp corresponds to the dateofbirth of the first element in the data being pushed. Table 5.9 describes the different elements making up the BULKIO::PrecisionUTCTime structure.
|
Two of the elements described in Table 5.9 correspond to predefined values. tcstatus can only take two values, TCS_INVALID (0), and TCS_VALID (1), showing whether the time stamp is valid or not. Invalid time stamps do not contain valid time data and should be ignored. tcmode is the method by which the timestamp was obtained, but this use has since been deprecated, and this value is ignored. The default value for tcmode is 1.
The following code segments provide examples of how to construct a time stamp to be sent in the pushPacket() call. The now() method returns the current time of day.
C++:
Python:
Java:
5.7.6.1 Timestamp Operators (C++)
In C++, BULKIO::PrecisionUTCTime supports common arithmetic, comparison and stream operators.
Adding an offset to a time stamp:
Subtracting two time stamps returns the difference in seconds:
Comparing two time stamps:
Stream formatting (output format is “YYYY:MM:DD::HH::MM::SS.SSSSSS”):
5.7.6.2 Timestamp Operators (Python)
In Python, BULKIO.PrecisionUTCTime supports common arithmetic, comparison and string conversion operators.
Adding an offset to a time stamp:
Subtracting two time stamps returns the difference in seconds:
Comparing two time stamps:
String formatting (output format is “YYYY:MM:DD::HH::MM::SS.SSSSSS”):
5.7.6.3 Timestamp Helpers (Java)
In Java, the bulkio.time.utils class provides static helper methods for common arithmetic, comparison and string conversion operations.
Adding an offset to a time stamp with increment() modifies the original time stamp:
Adding an offset to a time stamp with add() returns a new time stamp with the result, leaving the original time stamp unmodified:
Calculating the difference in seconds between two time stamps:
Comparing two time stamps:
The compare() method follows the same rules as java.util.Comparator.
String formatting (output format is “YYYY:MM:DD::HH::MM::SS.SSSSSS”):
5.7.7 Port Statistics
All BulkIO Ports contain a read only attribute called statistics. The statistics attribute is of type BULKIO::PortStatistics, and it contains information regarding the performance of the Port. Table 5.10 contains a description of a statistics structure
|
The provides-side Port contains a single PortStatistics structure. The uses-side Port contains a sequence of PortStatistics structures; each one associated with a single connection.
An interesting exercise is to create Components that generate and consume data in the three languages supported by REDHAWK. The data generator and consumer generate/consume data as fast as possible. The statistics data structure can provide metrics regarding data transfer rates, average latency, and other relevant data. Shifting the transfer length (by changing the size of the sequence in the pushPacket() call) and seeing its effects on the performance of the connection is also instructive.
5.7.8 Examples
These two examples illustrate high-speed data exchange between two C++ Components and basic data manipulation through the Sandbox.
5.7.8.1 High-speed data
In this example, two C++ Components are created: a source and a sink. We will then deploy these Components through the Sandbox and evaluate the statistics of the data transfer between them.
- Create a C++ Component called source with a uses Port called output of type dataShort. Add a simple Property with ID xfer_length, type ulong, and default value of 100000. Generate the Component code.
- Open the file source.h and add the following member to the source_i class:
- Open the file source.cpp and edit it in the following ways:
- In the source_i constructor:
- In serviceFunction() comment-out the LOG_DEBUG statement and add the following lines:
- Compile the Component source and install it on Target SDR.
- Create a C++ Component called sink with a provides Port called input of type dataShort. Generate the Component code.
- Open the file sink.cpp and edit it in the following ways:
- In serviceFunction(), comment-out the LOG_DEBUG statement
- Add the following lines:
- Compile the Component sink and install it on Target SDR.
Start a Python session in a command line terminal and run the following commands:
The output of the print statement is an instance of the PortStatistics structure in BulkIO. This structure contains the statistics gathered from this connection. A measure of data rate is bits per second.
To display the number of Gigabits per second, run the following command:
The resulting value is the measured data transfer rate between the two Components. The current xfer_length Property can be viewed by typing the following:
The default value is 100000. Update the Property to 200000 by running the following command:
Check the new data rate by repeating the call to _get_statistics(). The resulting data rate is now different.
5.7.8.2 Octet Ports
Octets are unsigned 8-bit units of data. In Java and C++, these map easily. However, that is not the case in Python, which treats a sequence of characters as a string. The following is an example of pushing Octet data out of a dataOctet port,
5.7.8.3 Data manipulation
In this example, a Python Component is created that takes vectors of floats as inputs, multiplies the vector by some arbitrary number, and then outputs the resulting vector. This example demonstrates some basic data manipulation as well as the interaction between the Python environment and the running Component.
- Create a Python Component called mult with a provides Port called input of type dataFloat and a uses Port called output of type dataFloat. Add a simple Property with ID factor, type float, and default value of 1. Generate the Component code.
- Open the file mult.py and add the following lines:
- Save the project and drag the mult project to REDHAWK Explorer > Target SDR.
- Start a Python session in a command line terminal and run the following commands:
- The multiplication factor can be changed while the Sandbox is up.
5.8 BurstIO
For those applications that require small, and possibly non-contiguous, chunks (or bursts) of data with frequently-varying metadata, BurstIO provides the data transfer containers and interfaces to meet those requirements. This interface only supports the transfer of data vectors: float, double, octet (int8/uint8), short (int16), ushort (uint16), long (int32), ulong (uint32), longlong (int64), and ulonglong(uint64). Similar to BulkIO, BurstIO provides Burst Signal Related Information (SRI), and a Precision Time stamp but provides this information in-band with each data burst. With the increased overhead requirement for metadata, BurstIO can achieve its highest throughput by grouping multiple bursts into a single transfer, either programmatically or through configurable policy settings, to try and maximize efficiency and limit latency.
5.8.1 Data Transfers
BurstIO data transfers happen through the pushBurst() and pushBursts() method calls of a REDHAWK resource’s (Component or Device) BurstIO Port object. A resource can use these push methods to transfer bursts and their associated meta data from one resource to another within the resource’s service function. Similar to BulkIO, BurstIO interfaces provide the same BULKIO::PrecisionUTCTime time stamp for each data vector of the burst. BurstIO defines a new BurstSRI SRI object that enables developers to further describe the signaling environment and the data transformations. These fields are further described in Section 5.8.2.
5.8.1.1 Input
A resource with a provides-Port (input), grabs data from the Port using the getBurst() method. This method returns a PacketType object (described in Table 5.11) from the input Port’s data queue or a null/None value if the queue is empty.
The following code snippet is an example of the getBurst() method.
|
5.8.1.2 Output
Due to the asynchronous nature of BurstIO data, the interface enables the developer to control the output (egress) of bursts from a resource. The 2 main method calls to push burst data downstream from one resource to another are: pushBursts() and pushBurst(). pushBursts() enables multiple bursts to be sent directly downstream as a sequence of BurstType objects, whereas, pushBurst() provides an interface to queue a single burst to be pushed but follows policy directives based on the number of bursts, total queue size, and send intervals. Both methods route burst data using the specified routing contraints and connection filter which are controlled using the following interface:
|
The major difference between the pushBurst() and pushBursts() methods is the ability to manage how and when the data is transferred. Only burst traffic that is queued using pushBurst() is controlled by the policy constraints, whereas, calls to pushBursts() are directly sent downstream to the connected resource.
The following code snippet is an example of the pushBurst() method call for a vector data sample that is queued to the Port.
he following code snippet is an example of the pushBursts() method call for a vector data sample. The bursts from this call are directly passed downstream to the connected resource.
5.8.2 Burst Signal Related Information (SRI)
BurstSRI objects are delivered with each data burst and describe the data payload and processing state from the data producer. Table 5.13 describes only the required fields of the data structure when passing burst data between resources.
|
5.8.3 Multi-out Ports
Each output BurstIO Port type provides the ability to filter burst data from the resource based on stream ID and connection ID. To use the multi-out capablility of the Ports, a resource must include code similar to the following:
To steer a particular stream of data to a particular connection, pass the connectionTable object to the Port’s updateConnectionFilter method. With the routing mode set to ROUTE_CONNECTION_STREAMS, the Port will then apply the filter state to any burst traffic as it is passed out the resource’s BurstIO Port. For the burst to be passed to an existing connection, there must exist a match in the Port’s filters table for the burst’s stream ID and connection ID of the resource downstream.
5.8.4 Working with Complex Data
Each BurstPacket of the incoming data provides the getComplex() method to denote if the vector contains complex samples (It is comprised of real and imaginary parts.) Complex data is sent as alternating real and imaginary values. A developer can work with this data in any fashion; however, this section describes the common methods for converting the data into a more workable form.
5.8.4.1 Converting Complex Data in C++
In C++, the incoming BurstIO data vector may be typecast into a std::vector of complex values. For example:
5.8.4.2 Converting Complex Data in Python
The helper functions bulkioComplexToPythonComplexList and pythonComplexListToBulkioComplex, defined in the module ossie.utils.bulkio.bulkio_helpers, provide an efficient translation to and from lists of Python complex numbers.
5.8.4.3 Converting Complex Data in Java
Unlike with C++ and Python, Java does not have an ubiquitous means for representing complex numbers; therefore, when using Java, users are free to map the incoming BurstIO getData() method to the complex data representation of their choosing.
5.8.5 Time Stamps
The following code segment provides an example of how to construct a BULKIO::PrecisionUTCTime time stamp to be sent in the burst SRI.
5.8.6 Port Statistics
All BurstIO Ports support the BulkIO statistics interface with additional keywords to track burst-specific metrics. Statistics are tracked over a window of 10 pushBurst calls. An input Port contains a single PortStatistics structure, where as, an output Port contains a sequence of PortStatistics structures; one structure per connection. For information about the BULKIO::Portstatistics, refer to Section 5.7.7. The additional BurstIO metrics for both input and output Ports are described in the following tables:
5.8.6.1 C++
The following example illustrates a Component that performs a transform on the incoming burst data and pushes the results downstream.
5.8.6.2 Java
The following example illustrates a Component that generates 10 bursts objects containing 100 samples of data and sends the array downstream.
5.8.6.3 Python
The following example illustrates a Component that generates burst samples that will be filtered out by the Port’s routing table when at least 10 bursts are queued for delivery.
5.9 Messaging
5.9.1 Introduction
Messaging relies on CORBA’s event structure as a transport structure. In CORBA’s event API, Messages are passed as an Any type using the function push().
While CORBA manages marshaling and delivery of the data, it does not provide any mechanisms inherent to events to describe the contents of the Any type. REDHAWK decided to leverage an existing payload structure descriptor to describe the payload of Messages, the Properties IDL. The selection of this interface eliminates the need to create a new IDL describing Messages. Furthermore, there is already an XML structure that is mapped to efficient binary data structures, allowing the use of XML to describe Message contents while eliminating the need for introducing XML parsers in the Message delivery mechanism.
To support this additional functionality, REDHAWK has expanded the Properties descriptor to allow a Property to have the kind Message. The only Property that can have a valid Message kind is a structure.
5.9.2 Message Producer
To create a Message producer, create a new Component or edit an existing Component.
- Add a Struct Property
- Add uses Port of IDL Interface ExtendedEvent> MessageEvent
- Regenerate the Component
For the purposes of the following examples, assume that the structure is as follows:
- ID: foo
- Contains two members:
- name: some_string, type: string
- name: some_float, type: float
The Component’s uses Port is called message_out
The Component’s name is message_producer
If a connection exists between this Component and either a Message consumer or an Event Channel, the following code examples send a Message.
5.9.2.1 C++
Whenever a Message needs to be generated (e.g., in the serviceFunction() method of the implementation file):
5.9.2.2 Java
Whenever a Message needs to be generated (e.g., in the serviceFunction() method):
5.9.2.3 Python
Whenever a Message needs to be generated (e.g., in the process() method of the implementation file):
5.9.3 Message Consumer
To create a Message consumer, create a new Component or edit an existing Component.
- Add a Struct Property
- Add uses/provides (bidirectional) Port of IDL Interface
ExtendedEvent> MessageEvent - Regenerate the Component
The bidirectional Port is needed because in point-to-point connections, the Port behaves like a Provides Port, while in connections with an Event Channel, the consumer behaves like a Uses Port. This is an artifact of mapping a uses/provides pattern over a producer/consumer pattern. A single object is created for the Port, and a call to getPort() returns a pointer to the same object irrespective of whether or not the Port is to be employed as a Uses or Provides; the object implements both sets of interfaces. The SCD contains two entries for this Port, both with the same name, but one is a uses and the other is a provides.
For the purposes of the following examples, assume that the structure is as follows:
- ID: foo
- Contains two members:
- name: some_string, type: string
- name: some_float, type: float
The Component’s uses/provides Port is called message_in
The Component’s callback function for this Message is messageReceived()
The Component’s name is message_consumer
If a connection exists between this Component and either a Message producer or an Event Channel, the following code examples process an incoming Message.
5.9.3.1 C++
Given the asynchronous nature of events, a callback pattern was selected for the consumer.
In the Component header file, declare the following callback function:
In the Component source file, implement the callback function:
In the constructor() method, register the callback function:
5.9.3.2 Java
Java callbacks use the org.ossie.events.MessageListener interface, which has a single messageReceived() method. The recommended style for Java messaging is to define the callback as a private method on the Component class, and use an anonymous subclass of MessageListener to dispatch the message to your callback.
Add to the list of imports:
Implement the callback as a method on the Component class:
In the constructor() method, register a MessageListener for the Message to dispatch the message to your callback:
5.9.3.3 Python
In the constructor() method, register the expected Message with a callback method:
In the class, define the callback method. In this example, the method is called messageReceived():
5.9.4 Viewing Messages
Messages are events with their payload definition tied to structures in Component Properties. Viewing Messages can be done with the same techniques that are used to view events.
To view events and Messages sent to an Event Channel in a terminal window:
Help for the utility:
Example output:
5.9.5 Connecting Producers and Consumers
Producers and consumers can be connected either point-to-point or through an Event Channel in the IDE.
Connecting a producer directly to a consumer does not require an Application and can be done in the Sandbox:
Connecting producers to consumers through an Event Channel requires an Application. An Application can also support point-to-point connections.
Below is a description of how to connect producers through point-to-point and through an Event Channel:
- Add producer and consumer Components.
- For point-to-point messaging, connect the output MessageEvent Port, message_out in this example, to the input MessageEvent Port of the receive Component.
- For messaging via an Event Channel, add an Event Channel to the Waveform and connect to
it.
- In the Waveform Diagram, under Palette > Find By:
- Select Event Channel and drag it onto the diagram. The New Event
Channel dialog is displayed.
- Enter the Event Channel you want to find and click OK. The Event Channel is displayed in the diagram.
- Select Event Channel and drag it onto the diagram. The New Event
Channel dialog is displayed.
- Connect the Uses (Output) MessageEvent Port of the sending Component, message_out in this example, to the Event Channel.
- Connect the Uses (Output) MessageEvent Port of the receiving Component, message_in, to the Event Channel. This is the black Output Port that must be connected to the Event Channel.
- In the Waveform Diagram, under Palette > Find By:
In this example, connections are made point-to-point and through the Event Channel. Therefore, for every Message sent, two Messages are received.

REDHAWK Documentation is licensed under a Creative Commons Attribution-ShareAlike 3.0 Unported License.