MessageInterface.h
Go to the documentation of this file.
1 /*
2  * This file is protected by Copyright. Please refer to the COPYRIGHT file
3  * distributed with this source distribution.
4  *
5  * This file is part of REDHAWK core.
6  *
7  * REDHAWK core is free software: you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as published by the
9  * Free Software Foundation, either version 3 of the License, or (at your
10  * option) any later version.
11  *
12  * REDHAWK core is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
15  * for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this program. If not, see http://www.gnu.org/licenses/.
19  */
20 
21 #ifndef MESSAGEINTERFACE_H
22 #define MESSAGEINTERFACE_H
23 
24 #include <map>
25 #include <string>
26 #include <vector>
27 #include <iterator>
28 
29 #include "CF/ExtendedEvent.h"
30 #include "CF/cf.h"
31 #include "CorbaUtils.h"
32 #include "Port_impl.h"
33 #include "callback.h"
34 
35 #include <COS/CosEventChannelAdmin.hh>
36 
37 
38 
39 /************************************************************************************
40  Message consumer
41 ************************************************************************************/
42 
44 
45 #ifdef BEGIN_AUTOCOMPLETE_IGNORE
46 
49 class Consumer_i : public virtual POA_CosEventChannelAdmin::ProxyPushConsumer {
50  public:
51  Consumer_i(MessageConsumerPort *_parent);
52  // CosEventComm::PushConsumer methods
53  void push(const CORBA::Any& data);
54 
55  void connect_push_supplier(CosEventComm::PushSupplier_ptr push_supplier);
56 
57  void disconnect_push_consumer();
58 
59  protected:
60  MessageConsumerPort *parent;
61 
62 };
63 
64 class SupplierAdmin_i : public virtual POA_CosEventChannelAdmin::SupplierAdmin {
65  public:
66  SupplierAdmin_i(MessageConsumerPort *_parent);
67 
68  CosEventChannelAdmin::ProxyPushConsumer_ptr obtain_push_consumer();
69 
70  CosEventChannelAdmin::ProxyPullConsumer_ptr obtain_pull_consumer();
71 
72 
73  protected:
74  MessageConsumerPort *parent;
75  unsigned int instance_counter;
76 
77 };
81 #endif
82 
84 #ifdef BEGIN_AUTOCOMPLETE_IGNORE
85 , public virtual POA_ExtendedEvent::MessageEvent
86 #endif
87 {
88  ENABLE_LOGGING
89 
90 public:
91  MessageConsumerPort (std::string port_name);
92  virtual ~MessageConsumerPort (void) { };
93 
94  /*
95  * Register a callback function
96  * @param id The message id that this callback is intended to support
97  * @param target A pointer to the object that owns the callback function
98  * @param func The function that implements the callback
99  */
100  template <class Class, class MessageStruct>
101  void registerMessage (const std::string& id, Class* target, void (Class::*func)(const std::string&, const MessageStruct&))
102  {
103  callbacks_[id] = new MemberCallback<Class, MessageStruct>(*target, func);
104  }
105 
106  template <class Target, class Func>
107  void registerMessage (Target target, Func func)
108  {
109  generic_callbacks_.add(target, func);
110  }
111 
112  // CF::Port methods
113  void connectPort(CORBA::Object_ptr connection, const char* connectionId);
114 
115  void disconnectPort(const char* connectionId);
116 
117  CosEventChannelAdmin::ConsumerAdmin_ptr for_consumers();
118 
119  CosEventChannelAdmin::SupplierAdmin_ptr for_suppliers();
120 
121  void destroy();
122 
123  CosEventChannelAdmin::ProxyPushConsumer_ptr extendConsumers(std::string consumer_id);
124 
125  Consumer_i* removeConsumer(std::string consumer_id);
126 
127  void fireCallback (const std::string& id, const CORBA::Any& data);
128 
129  std::string getRepid() const;
130 
131  std::string getDirection() const;
132 
133 
134 protected:
135  void addSupplier (const std::string& connectionId, CosEventComm::PushSupplier_ptr supplier);
136 
137  CosEventComm::PushSupplier_ptr removeSupplier (const std::string& connectionId);
138 
139  boost::mutex portInterfaceAccess;
140  std::map<std::string, Consumer_i*> consumers;
141  std::map<std::string, CosEventChannelAdmin::EventChannel_ptr> _connections;
142 
143  SupplierAdmin_i *supplier_admin;
144 
145  /*
146  * Abstract interface for message callbacks.
147  */
149  {
150  public:
151  virtual void operator() (const std::string& value, const CORBA::Any& data) = 0;
152  virtual ~MessageCallback () { }
153 
154  protected:
156  };
157 
158 
159  /*
160  * Concrete class for member function property change callbacks.
161  */
162  template <class Class, class M>
164  {
165  public:
166  typedef void (Class::*MemberFn)(const std::string&, const M&);
167 
168  virtual void operator() (const std::string& value, const CORBA::Any& data)
169  {
170  M message;
171  if (data >>= message) {
172  (target_.*func_)(value, message);
173  }
174  }
175 
176  protected:
177  // Only allow MessageConsumerPort to instantiate this class.
178  MemberCallback (Class& target, MemberFn func) :
179  target_(target),
180  func_(func)
181  {
182  }
183 
184  friend class MessageConsumerPort;
185 
186  Class& target_;
188  };
189 
190  typedef std::map<std::string, MessageCallback*> CallbackTable;
191  CallbackTable callbacks_;
192 
193  ossie::notification<void (const std::string&, const CORBA::Any&)> generic_callbacks_;
194 
195  typedef std::map<std::string, CosEventComm::PushSupplier_var> SupplierTable;
196  SupplierTable suppliers_;
197 };
198 
199 
200 /************************************************************************************
201  Message producer
202 ************************************************************************************/
203 
205 #ifdef BEGIN_AUTOCOMPLETE_IGNORE
206 , public virtual POA_CF::Port
207 #endif
208 {
209 
210 public:
211  MessageSupplierPort (std::string port_name);
212  virtual ~MessageSupplierPort (void);
213 
214  // CF::Port methods
215  void connectPort(CORBA::Object_ptr connection, const char* connectionId);
216  void disconnectPort(const char* connectionId);
217 
218  void push(const CORBA::Any& data);
219 
220  CosEventChannelAdmin::ProxyPushConsumer_ptr removeConsumer(std::string consumer_id);
221  void extendConsumers(std::string consumer_id, CosEventChannelAdmin::ProxyPushConsumer_ptr proxy_consumer);
222 
223  // Send a single message
224  template <typename Message>
225  void sendMessage(const Message& message) {
226  const Message* begin(&message);
227  const Message* end(&begin[1]);
228  sendMessages(begin, end);
229  }
230 
231  // Send a sequence of messages
232  template <class Sequence>
233  void sendMessages(const Sequence& messages) {
234  sendMessages(messages.begin(), messages.end());
235  }
236 
237  // Send a set of messages from an iterable set
238  template <typename Iterator>
239  void sendMessages(Iterator first, Iterator last)
240  {
241  CF::Properties properties;
242  properties.length(std::distance(first, last));
243  for (CORBA::ULong ii = 0; first != last; ++ii, ++first) {
244  // Workaround for older components whose structs have a non-const,
245  // non-static member function getId(): determine the type of value
246  // pointed to by the iterator, and const_cast the dereferenced
247  // value; this ensures that it works for both bare pointers and
248  // "true" iterators
249  typedef typename std::iterator_traits<Iterator>::value_type value_type;
250  properties[ii].id = const_cast<value_type&>(*first).getId().c_str();
251  properties[ii].value <<= *first;
252  }
253  CORBA::Any data;
254  data <<= properties;
255  push(data);
256  }
257 
258  std::string getRepid() const;
259 
260 protected:
261  boost::mutex portInterfaceAccess;
262  std::map<std::string, CosEventChannelAdmin::ProxyPushConsumer_var> consumers;
263  std::map<std::string, CosEventChannelAdmin::EventChannel_ptr> _connections;
264 
265 };
266 
267 #endif // MESSAGEINTERFACE_H
Class & target_
Definition: MessageInterface.h:186
std::map< std::string, MessageCallback * > CallbackTable
Definition: MessageInterface.h:190
ossie::notification< void(const std::string &, const CORBA::Any &)> generic_callbacks_
Definition: MessageInterface.h:193
MessageCallback()
Definition: MessageInterface.h:155
CosEventChannelAdmin::ConsumerAdmin_ptr for_consumers()
void disconnectPort(const char *connectionId)
virtual void operator()(const std::string &value, const CORBA::Any &data)
Definition: MessageInterface.h:168
MessageSupplierPort(std::string port_name)
virtual void operator()(const std::string &value, const CORBA::Any &data)=0
CallbackTable callbacks_
Definition: MessageInterface.h:191
MemberFn func_
Definition: MessageInterface.h:187
Consumer_i * removeConsumer(std::string consumer_id)
std::string getDirection() const
Definition: MessageInterface.h:163
Definition: Port_impl.h:364
Definition: MessageInterface.h:83
void(Class::* MemberFn)(const std::string &, const M &)
Definition: MessageInterface.h:166
void connectPort(CORBA::Object_ptr connection, const char *connectionId)
MessageConsumerPort(std::string port_name)
boost::mutex portInterfaceAccess
Definition: MessageInterface.h:261
void registerMessage(Target target, Func func)
Definition: MessageInterface.h:107
std::map< std::string, CosEventChannelAdmin::EventChannel_ptr > _connections
Definition: MessageInterface.h:141
CosEventComm::PushSupplier_ptr removeSupplier(const std::string &connectionId)
CosEventChannelAdmin::ProxyPushConsumer_ptr extendConsumers(std::string consumer_id)
virtual ~MessageSupplierPort(void)
virtual ~MessageCallback()
Definition: MessageInterface.h:152
CosEventChannelAdmin::ProxyPushConsumer_ptr removeConsumer(std::string consumer_id)
Definition: MessageInterface.h:204
std::string getRepid() const
SupplierAdmin_i * supplier_admin
Definition: MessageInterface.h:143
Definition: MessageInterface.h:148
std::map< std::string, CosEventComm::PushSupplier_var > SupplierTable
Definition: MessageInterface.h:195
void connectPort(CORBA::Object_ptr connection, const char *connectionId)
void push(const CORBA::Any &data)
void extendConsumers(std::string consumer_id, CosEventChannelAdmin::ProxyPushConsumer_ptr proxy_consumer)
void registerMessage(const std::string &id, Class *target, void(Class::*func)(const std::string &, const MessageStruct &))
Definition: MessageInterface.h:101
std::map< std::string, Consumer_i * > consumers
Definition: MessageInterface.h:140
void sendMessages(Iterator first, Iterator last)
Definition: MessageInterface.h:239
void disconnectPort(const char *connectionId)
boost::mutex portInterfaceAccess
Definition: MessageInterface.h:139
std::string getRepid() const
void fireCallback(const std::string &id, const CORBA::Any &data)
virtual ~MessageConsumerPort(void)
Definition: MessageInterface.h:92
SupplierTable suppliers_
Definition: MessageInterface.h:196
void addSupplier(const std::string &connectionId, CosEventComm::PushSupplier_ptr supplier)
Definition: Port_impl.h:320
std::map< std::string, CosEventChannelAdmin::EventChannel_ptr > _connections
Definition: MessageInterface.h:263
CosEventChannelAdmin::SupplierAdmin_ptr for_suppliers()
void sendMessages(const Sequence &messages)
Definition: MessageInterface.h:233
MemberCallback(Class &target, MemberFn func)
Definition: MessageInterface.h:178
std::map< std::string, CosEventChannelAdmin::ProxyPushConsumer_var > consumers
Definition: MessageInterface.h:262
void sendMessage(const Message &message)
Definition: MessageInterface.h:225