OmniEvents
ProxyPushConsumer.cc
Go to the documentation of this file.
1 // Package : omniEvents
2 // ProxyPushConsumer.cc Created : 2003/12/04
3 // Author : Alex Tingle
4 //
5 // Copyright (C) 2003,2005 Alex Tingle.
6 //
7 // This file is part of the omniEvents application.
8 //
9 // omniEvents is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Lesser General Public
11 // License as published by the Free Software Foundation; either
12 // version 2.1 of the License, or (at your option) any later version.
13 //
14 // omniEvents is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Lesser General Public License for more details.
18 //
19 // You should have received a copy of the GNU Lesser General Public
20 // License along with this library; if not, write to the Free Software
21 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 //
23 
24 #include "ProxyPushConsumer.h"
25 #include "ConsumerAdmin.h"
26 #include "Orb.h"
27 #include "omniEventsLog.h"
28 #include "PersistNode.h"
29 
30 #include <assert.h>
31 
32 namespace OmniEvents {
33 
35  CosEventComm::PushSupplier_ptr pushSupplier)
36 {
37  // pushSupplier is permitted to be nil.
38  if(CORBA::is_nil(pushSupplier))
39  return;
40 
41  string oidstr =currentObjectId();
42  Connections_t::iterator pos =_connections.find(oidstr);
43 
44  if(pos!=_connections.end())
45  throw CosEventChannelAdmin::AlreadyConnected();
46 
47  Connection* newConnection =
48  new Connection(
49  _channelName.in(),
50  oidstr,
51  CosEventComm::PushSupplier::_duplicate(pushSupplier)
52  );
53  _connections.insert( Connections_t::value_type(oidstr,newConnection) );
54 
55  // Test to see whether pushSupplier is a ProxyPushSupplier.
56  // If so, then we will aggressively try to reconnect, when we are reincarnated
57  CORBA::Request_var req =pushSupplier->_request("_is_a");
58  req->add_in_arg() <<= CosEventChannelAdmin::_tc_ProxyPushSupplier->id();
59  req->set_return_type(CORBA::_tc_boolean);
60  req->send_deferred();
61  Orb::inst().deferredRequest(req._retn(),newConnection); // Register callback
62 
64  {
65  WriteLock log;
66  newConnection->output(log.os);
67  }
68 }
69 
70 
72 {
73 #ifdef HAVE_OMNIORB4
74  DB(5,"ProxyPushConsumer_i::disconnect_push_consumer()")
75  string oidstr =currentObjectId();
76  Connections_t::iterator pos =_connections.find(oidstr);
77 
78  if(pos!=_connections.end())
79  {
80  CORBA::Request_var req =
81  pos->second->_target->_request("disconnect_push_supplier");
82  pos->second->_remove_ref();
83  _connections.erase(pos);
84  // The following line could result in a reentrant callback, if this call was
85  // not made through the POA => must erase the connection BEFORE this point.
86  req->send_deferred();
87  Orb::inst().deferredRequest(req._retn());
89  {
90  // Erase this connection from the log file.
91  WriteLock log;
92  log.os<<"-ecf/"<<_channelName.in();
93  log.os<<"/SupplierAdmin/ProxyPushConsumer/"<<oidstr<<'\n';
94  }
95  }
96 #else /* Silently ignore disconnects with omniORB3 */
97  DB(5,"Ignoring disconnect_push_consumer(). Upgrade to omniORB4!")
98 #endif
99 }
100 
101 
102 void ProxyPushConsumer_i::push(const CORBA::Any& event)
103 {
104 #ifdef OMNIEVENTS_REAL_TIME_PUSH
105  if(!_useLocalQueue)
106  {
107  _consumerAdmin.send(new CORBA::Any(event));
108  _useLocalQueue=true;
109  }
110  else
111 #endif
112  _queue.push_back(new CORBA::Any(event));
113 }
114 
115 
117  PortableServer::POA_ptr p,
118  list<CORBA::Any*>& q,
119  ConsumerAdmin_i& consumerAdmin
120 )
121 : Servant(PortableServer::POA::_nil()),
122  _connections(),
123  _channelName(p->the_name()),
124  _consumerAdmin(consumerAdmin),
125  _queue(q),
126  _useLocalQueue(false)
127 {
128  _consumerAdmin._add_ref();
129 
130  using namespace PortableServer;
131 
132  // POLICIES:
133  // Lifespan =PERSISTENT // we can persist
134  // Assignment =USER_ID // write our own oid
135  // Uniqueness =MULTIPLE_ID // only one servant
136  // ImplicitActivation=NO_IMPLICIT_ACTIVATION // disable auto activation
137  // RequestProcessing =USE_DEFAULT_SERVANT // only one servant
138  // ServantRetention =NON_RETAIN // stateless POA
139  // Thread =SINGLE_THREAD_MODEL // keep it simple
140 
141  CORBA::PolicyList policies;
142  policies.length(7);
143  policies[0]=p->create_lifespan_policy(PERSISTENT);
144  policies[1]=p->create_id_assignment_policy(USER_ID);
145  policies[2]=p->create_id_uniqueness_policy(MULTIPLE_ID);
146  policies[3]=p->create_implicit_activation_policy(NO_IMPLICIT_ACTIVATION);
147  policies[4]=p->create_request_processing_policy(USE_DEFAULT_SERVANT);
148  policies[5]=p->create_servant_retention_policy(NON_RETAIN);
149  policies[6]=p->create_thread_policy(SINGLE_THREAD_MODEL);
150 
151  try
152  {
153  // Create a POA for this proxy type in this channel.
154  string poaName =string(_channelName.in())+".ProxyPushConsumer";
155  POAManager_var parentManager =p->the_POAManager();
156  _poa=p->create_POA(poaName.c_str(),parentManager.in(),policies);
157  }
158  catch(POA::AdapterAlreadyExists&) // create_POA
159  {
160  DB(0,"ProxyPushConsumer_i::ProxyPushConsumer_i() - "
161  "POA::AdapterAlreadyExists")
162  }
163  catch(POA::InvalidPolicy& ex) // create_POA
164  {
165  DB(0,"ProxyPushConsumer_i::ProxyPushConsumer_i() - "
166  "POA::InvalidPolicy: "<<ex.index)
167  }
168 
169  // Destroy the policy objects (Not strictly necessary in omniORB)
170  for(CORBA::ULong i=0; i<policies.length(); ++i)
171  policies[i]->destroy();
172 
173  // This object is the POA's default servant.
174  _poa->set_servant(this);
175 }
176 
177 
179 {
180  DB(20,"~ProxyPushConsumer_i()")
181  for(Connections_t::iterator i =_connections.begin();
182  i!=_connections.end();
183  ++i)
184  {
185  i->second->_remove_ref();
186  }
187  _connections.clear();
188 
189  _consumerAdmin._remove_ref();
190 }
191 
192 
193 CosEventChannelAdmin::ProxyPushConsumer_ptr
195 {
196  return createNarrowedReference<CosEventChannelAdmin::ProxyPushConsumer>(
197  _poa.in(),
198  CosEventChannelAdmin::_tc_ProxyPushConsumer->id()
199  );
200 }
201 
202 
204 {
205  // Note. We are (probably) in the EventChannel's thread.
206  Connections_t::iterator curr,next=_connections.begin();
207  while(next!=_connections.end())
208  {
209  curr=next++;
210  CORBA::Request_var req =
211  curr->second->_target->_request("disconnect_push_supplier");
212  curr->second->_remove_ref();
213  _connections.erase(curr);
214  // The following line could result in a reentrant callback
215  // => must erase the connection BEFORE this point.
216  req->send_deferred();
217  Orb::inst().deferredRequest(req._retn());
218  }
219 }
220 
221 
223 {
224  // Reincarnate all connections from node's children.
225  for(map<string,PersistNode*>::const_iterator i=node._child.begin();
226  i!=node._child.end();
227  ++i)
228  {
229  const char* oidstr =i->first.c_str();
230  string ior( i->second->attrString("IOR") );
231  bool isProxy( i->second->attrLong("proxy") );
232  assert(_connections.find(oidstr)==_connections.end());
233  try
234  {
235  using namespace CosEventComm;
236  using namespace CosEventChannelAdmin;
237 
238  PushSupplier_var supp =string_to_<PushSupplier>(ior.c_str());
239  _connections.insert(Connections_t::value_type(
240  oidstr,
241  new Connection(_channelName.in(),oidstr,supp._retn(),isProxy)
242  ));
243  DB(5,"Reincarnated ProxyPushConsumer: "<<oidstr)
244 
245  // If supp is a ProxyPushSupplier, then try to reconnect.
246  if(isProxy)
247  {
248  DB(15,"Attempting to reconnect ProxyPushConsumer: "<<oidstr)
249  // This will only work if the proxy is implemented in the same way as
250  // omniEvents, so connect_() automatically creates a proxy.
251  ProxyPushSupplier_var proxySupp =
252  string_to_<ProxyPushSupplier>(ior.c_str());
253  PortableServer::ObjectId_var objectId =
254  PortableServer::string_to_ObjectId(oidstr);
255  CORBA::Object_var obj =
256  _poa->create_reference_with_id(
257  objectId.in(),
258  CosEventChannelAdmin::_tc_ProxyPushConsumer->id()
259  );
260  PushConsumer_var thisCons =CosEventComm::PushConsumer::_narrow(obj);
261  proxySupp->connect_push_consumer(thisCons.in());
262  DB(7,"Reconnected ProxyPushConsumer: "<<oidstr)
263  }
264  }
265  catch(CORBA::BAD_PARAM&) {
266  // This will happen when IOR fails to narrow.
267  DB(5,"Failed to reincarnate ProxyPushConsumer: "<<oidstr)
268  }
269  catch(CosEventChannelAdmin::AlreadyConnected&){ //connect_push_consumer()
270  // The supplier doesn't need to be reconnected.
271  DB(7,"Remote ProxyPushSupplier already connected: "<<oidstr)
272  }
273  catch(CosEventChannelAdmin::TypeError&){ // connect_push_consumer()
274  // Don't know what to make of this...
275  DB(2,"Remote ProxyPushSupplier threw TypeError: "<<oidstr)
276  }
277  catch(CORBA::OBJECT_NOT_EXIST&) {} // object 'supp' not responding.
278  catch(CORBA::TRANSIENT& ) {} // object 'supp' not responding.
279  catch(CORBA::COMM_FAILURE& ) {} // object 'supp' not responding.
280  } // end loop for(i)
281 }
282 
283 
284 void ProxyPushConsumer_i::output(ostream& os) const
285 {
286  for(Connections_t::const_iterator i=_connections.begin();
287  i!=_connections.end();
288  ++i)
289  {
290  i->second->output(os);
291  }
292 }
293 
294 
296 {
297 #ifdef HAVE_OMNIORB4
298  try
299  {
300  using namespace PortableServer;
301  ObjectId_var oid =Orb::inst()._POACurrent->get_object_id();
302  CORBA::String_var oidStr =ObjectId_to_string(oid.in());
303  return string(oidStr.in());
304  }
305  catch(PortableServer::Current::NoContext&) // get_object_id()
306  {
307  DB(0,"No context!!")
308  }
309  catch(CORBA::BAD_PARAM&) // ObjectId_to_string()
310  {
311  // Should never get here in omniORB, because ObjectID is a char*.
312  assert(0);
313  }
314  return "ERROR";
315 #else
316  throw CORBA::NO_IMPLEMENT();
317 #endif
318 }
319 
320 
321 //
322 // ProxyPushConsumer_i::Connection
323 //
324 
325 #if OMNIEVENTS__DEBUG_SERVANT
326 int ProxyPushConsumer_i::Connection::_objectCount =0;
327 #endif
328 
330  const char* channelName,
331  const string& oidstr,
332  CosEventComm::PushSupplier_ptr pushSupplier,
333  bool isProxy
334 ):Callback(),
335  _channelName(channelName),
336  _oidstr(oidstr),
337  _target(pushSupplier),
338  _targetIsProxy(isProxy)
339 {
340 #if OMNIEVENTS__DEBUG_SERVANT
341  ++_objectCount;
342  DB(21,"ProxyPushConsumer_i::Connection::Connection() count="<<_objectCount)
343 #endif
344 }
345 
347 {
348 #if OMNIEVENTS__DEBUG_SERVANT
349  --_objectCount;
350  DB(20,"ProxyPushConsumer_i::Connection::~Connection() count="<<_objectCount)
351 #else
352  DB(20,"ProxyPushConsumer_i::Connection::~Connection()")
353 #endif
354 }
355 
357 
358 void ProxyPushConsumer_i::Connection::callback(CORBA::Request_ptr req)
359 {
360  bool save =_targetIsProxy;
361  if(req->return_value()>>=CORBA::Any::to_boolean(_targetIsProxy))
362  {
363  if(_targetIsProxy && omniEventsLog::exists())
364  {
365  WriteLock log;
366  output(log.os);
367  DB(15,"ProxyPushConsumer is federated.");
368  }
369  }
370  else
371  {
372  DB(2,"ProxyPushConsumer got unexpected callback.");
373  _targetIsProxy=save; // Reset it just to be sure.
374  }
375 }
376 
378 {
379  os<<"ecf/"<<_channelName;
380  os<<"/SupplierAdmin/ProxyPushConsumer/"<<_oidstr;
381 
382  if(!CORBA::is_nil(_target.in()))
383  {
384  CORBA::String_var iorstr;
385  iorstr = Orb::inst()._orb->object_to_string(_target.in());
386  os<<" IOR="<<iorstr.in();
387  if(_targetIsProxy)
388  os<<" proxy=1";
389  }
390  os<<" ;;\n";
391 }
392 
393 
394 }; // end namespace OmniEvents
#define DB(l, x)
Definition: Orb.h:49
#define OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(C)
Defines debug versions of _add/remove_ref() for class C.
Definition: Servant.h:70
Interface for classes that wish to receive callbacks from deferred requests.
Definition: Callback.h:46
OMNIEVENTS__DEBUG_REF_COUNTS__DECL void send(CORBA::Any *event)
Queues a single event for sending to consumers.
static bool exists()
Library code may create Event Service objects without the need for persistency.
Obtains an output stream to the active persistancy logfile, and locks it for exclusive access.
CORBA::ORB_var _orb
Definition: Orb.h:88
void deferredRequest(CORBA::Request_ptr req, Callback *callback=NULL)
Adopts the request and then stores it in _deferredRequests.
Definition: Orb.cc:187
static Orb & inst()
Definition: Orb.h:81
map< string, PersistNode * > _child
Definition: PersistNode.h:71
Default servant for ProxyPushConsumer objects.
ProxyPushConsumer_i(PortableServer::POA_ptr parentPoa, list< CORBA::Any * > &q, ConsumerAdmin_i &consumerAdmin)
void disconnect_push_consumer()
We may not have a record of the supplier, so this method must accept calls from any supplier without ...
void push(const CORBA::Any &event)
Accepts events from any supplier, not just those stored in _connections.
void output(ostream &os) const
Save this object's state to a stream.
CosEventChannelAdmin::ProxyPushConsumer_ptr createObject()
Constructs a new object.
bool _useLocalQueue
Switch between RT/chunked modes.
void reincarnate(const PersistNode &node)
Re-create all servants from information saved in the log file.
void disconnect()
Send disconnect_push_supplier() to all connected PushSuppliers.
void connect_push_supplier(CosEventComm::PushSupplier_ptr pushSupplier)
If pushSupplier is provided, then it is stored in _connections.
void output(ostream &os) const
Save this object's state to a stream.
Base class for servants.
Definition: Servant.h:114
PortableServer::POA_var _poa
Definition: Servant.h:131