OmniEvents
ProxyPushSupplier.cc
Go to the documentation of this file.
1 // Package : omniEvents
2 // ProxyPushSupplier.cc Created : 2003/12/04
3 // Author : Alex Tingle
4 //
5 // Copyright (C) 2003,2005 Alex Tingle.
6 //
7 // This file is part of the omniEvents application.
8 //
9 // omniEvents is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Lesser General Public
11 // License as published by the Free Software Foundation; either
12 // version 2.1 of the License, or (at your option) any later version.
13 //
14 // omniEvents is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Lesser General Public License for more details.
18 //
19 // You should have received a copy of the GNU Lesser General Public
20 // License along with this library; if not, write to the Free Software
21 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 //
23 
24 #include "ProxyPushSupplier.h"
25 #include "Orb.h"
26 #include "omniEventsLog.h"
27 #include "PersistNode.h"
28 #include <assert.h>
29 
30 namespace OmniEvents {
31 
36  omni_mutex& mutex;
37 public:
38  omni_mutex_kcol(omni_mutex& m) : mutex(m) { mutex.unlock(); }
39  ~omni_mutex_kcol(void) { mutex.lock(); }
40 private:
41  // dummy copy constructor and operator= to prevent copying
44 };
45 
46 
47 //
48 // ProxyPushSupplierManager
49 //
50 
51 PortableServer::Servant
53  const PortableServer::ObjectId& oid,
54  PortableServer::POA_ptr poa
55 )
56 {
58  PauseThenWake p(this);
59  _servants.insert(result);
60  return result;
61 }
62 
63 void
65  const PortableServer::ObjectId& oid,
66  PortableServer::POA_ptr adapter,
67  PortableServer::Servant serv,
68  CORBA::Boolean cleanup_in_progress,
69  CORBA::Boolean remaining_activations
70 )
71 {
72  // This etherealize method needs a special implementation because
73  // ProxyPushSupplier_i objects are freed with _remove_ref() rather than
74  // delete.
75  // Otherwise, this method strongly resembles ProxyManager::etherealize().
76  omni_mutex_lock pause(_lock);
77  ProxyPushSupplier_i* narrowed =dynamic_cast<ProxyPushSupplier_i*>(serv);
78  assert(narrowed!=NULL);
79  set<Proxy*>::iterator pos =_servants.find(narrowed);
80  if(pos!=_servants.end())
81  {
82  _servants.erase(pos);
83  narrowed->_remove_ref();
84  }
85  else
86  {
87  DB(1,"\t\teh? - POA attempted to etherealize unknown servant.");
88  }
89 }
90 
92  PortableServer::POA_ptr parentPoa,
93  EventQueue& q
94 )
95 : ProxyManager(parentPoa),
96  omni_thread(NULL,PRIORITY_HIGH),
97  _queue(q),
98  _lock(),_condition(&_lock),
99  _refCount(1)
100 {
101  ProxyManager::activate("ProxyPushSupplier");
102  start_undetached();
103 }
104 
106 {
107  DB(20,"~ProxyPushSupplierManager()")
108 }
109 
110 CosEventChannelAdmin::ProxyPushSupplier_ptr
112 {
113  return createNarrowedReference<CosEventChannelAdmin::ProxyPushSupplier>(
114  _managedPoa.in(),
115  CosEventChannelAdmin::_tc_ProxyPushSupplier->id()
116  );
117 }
118 
120 {
121  for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
122  {
123  Proxy* p =*i; // Sun's CC requires this temporary.
124  ProxyPushSupplier_i* pps =static_cast<ProxyPushSupplier_i*>(p);
125  // We are in the EventChannel's thread.
126  // Make sure all calls go though the ProxyPushSupplier POA.
127  CosEventChannelAdmin::ProxyPushSupplier_var ppsv =pps->_this();
128  ppsv->disconnect_push_supplier();
129  }
130 }
131 
132 void*
134 {
135  // This loop repeatedly triggers all of the servants in turn. As long as
136  // something happens each time, then we loop as fast as we can.
137  // As soon as activity dries up, we start to wait longer and longer between
138  // loops (up to a maximum). When there is no work to do, just block until
139  // a new event arrives.
140  //
141  // Rationale: The faster we loop the more events we can deliver to each
142  // consumer per second. However, when nothing is happening, this busy loop
143  // just soaks up CPU and kills performance. The optimum sleep time varies
144  // wildly from platform to platform, and also depends upon the typical ping
145  // time to the consumers.
146  //
147  // This dynamic approach should deliver reasonable performance when things
148  // are hectic, but not soak up too much CPU when not much is happening.
149  //
150  const unsigned long sleepTimeNanosec0 =0x8000; // 33us (doubled before use)
151  const unsigned long maxSleepNanosec =0x800000; // 8.4ms
152  unsigned long sleepTimeNanosec =sleepTimeNanosec0;
153 
154  omni_mutex_lock conditionLock(_lock);
155  while(true)
156  {
157  try {
158  if(_refCount<1)
159  break;
160 
161  bool busy=false;
162  bool waiting=false;
163 
164  // Trigger each servant in turn.
165  for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
166  {
167  Proxy* p =*i; // Sun's CC requires this temporary.
168  ProxyPushSupplier_i* pps =static_cast<ProxyPushSupplier_i*>(p);
169  pps->trigger(busy,waiting);
170  }
171 
172  if(busy)
173  {
174  // Something happened last time round. So we'll be optimistic and
175  // immediately go round for another go. Briefly unlock the mutex first,
176  // just to let the other kids get in if they need to.
177  omni_mutex_kcol l(_lock); // 'lock' reversed!
178  // Reset the sleep time.
179  sleepTimeNanosec=sleepTimeNanosec0;
180  }
181  else if(waiting)
182  {
183  // Nothing happened, so we'll wait for a bit and then give it another
184  // go. Each time we wait for twice as long, up to the maximum.
185  if(sleepTimeNanosec<maxSleepNanosec)
186  sleepTimeNanosec<<=1; // (multiply by 2)
187  unsigned long sec,nsec;
188  omni_thread::get_time(&sec,&nsec,0,sleepTimeNanosec);
189  _condition.timedwait(sec,nsec);
190  }
191  else
192  {
193  // There is nothing to do, so block until a new event arrives.
194  _condition.wait();
195  }
196 
197  }
198  catch (CORBA::SystemException& ex) {
199  DB(2,"ProxyPushSupplierManager ignoring CORBA system exception"
200  IF_OMNIORB4(": "<<ex._name()<<" ("<<NP_MINORSTRING(ex)<<")") ".")
201  }
202  catch (CORBA::Exception& ex) {
203  DB(2,"ProxyPushSupplierManager ignoring CORBA exception"
204  IF_OMNIORB4(": "<<ex._name()<<) ".")
205  }
206  catch(...) {
207  DB(2,"ProxyPushSupplierManager thread killed by unknown exception.")
208  break;
209  }
210  }
211  return NULL;
212 }
213 
215 {
216 #if OMNIEVENTS__DEBUG_REF_COUNTS
217  DB(20,"ProxyPushSupplierManager::_add_ref()")
218 #endif
219  omni_mutex_lock pause(_lock);
220  ++_refCount;
221 }
222 
224 {
225 #if OMNIEVENTS__DEBUG_REF_COUNTS
226  DB(20,"ProxyPushSupplierManager::_remove_ref()")
227 #endif
228  int myref;
229  {
230  PauseThenWake p(this);
231  myref = --_refCount;
232  }
233  if(myref<0)
234  {
235  DB(2,"ProxyPushSupplierManager has negative ref count! "<<myref)
236  }
237  else if(myref==0)
238  {
239  DB(15,"ProxyPushSupplierManager has zero ref count -- shutdown.")
240  join(NULL);
241  }
242 }
243 
244 
245 //
246 // ProxyPushSupplier_i
247 //
248 
250  CosEventComm::PushConsumer_ptr pushConsumer)
251 {
252  if(CORBA::is_nil(pushConsumer))
253  throw CORBA::BAD_PARAM();
254  if(!CORBA::is_nil(_target) || !CORBA::is_nil(_req))
255  throw CosEventChannelAdmin::AlreadyConnected();
256  _target=CosEventComm::PushConsumer::_duplicate(pushConsumer);
257 
258  // Test to see whether pushSupplier is a ProxyPushSupplier.
259  // If so, then we will aggressively try to reconnect, when we are reincarnated
260  CORBA::Request_var req =_target->_request("_is_a");
261  req->add_in_arg() <<= CosEventChannelAdmin::_tc_ProxyPushConsumer->id();
262  req->set_return_type(CORBA::_tc_boolean);
263  req->send_deferred();
264  Orb::inst().deferredRequest(req._retn(),this); // Register for callback
265 
267  {
268  WriteLock log;
269  output(log.os);
270  }
271 }
272 
273 
275 {
276  DB(5,"ProxyPushSupplier_i::disconnect_push_supplier()");
277  eraseKey("ConsumerAdmin/ProxyPushSupplier");
279  if(CORBA::is_nil(_target))
280  {
281  throw CORBA::OBJECT_NOT_EXIST(
282  IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
283  CORBA::COMPLETED_NO
284  );
285  }
286  else
287  {
288  CORBA::Request_var req=_target->_request("disconnect_push_consumer");
289  _target=CosEventComm::PushConsumer::_nil();
290  req->send_deferred();
291  Orb::inst().deferredRequest(req._retn());
292  }
293 }
294 
295 
297  PortableServer::POA_ptr poa,
298  EventQueue& q
299 )
300 : Proxy(poa),
301  EventQueue::Reader(q),
302  _target(CosEventComm::PushConsumer::_nil()),
303  _targetIsProxy(false)
304 {
305  // pass
306 }
307 
309 {
310  DB(20,"~ProxyPushSupplier_i()")
311 }
312 
314 
315 inline void ProxyPushSupplier_i::trigger(bool& busy, bool& waiting)
316 {
317  if(!CORBA::is_nil(_req) && _req->poll_response()) // response has arrived
318  {
319  CORBA::Environment_ptr env=_req->env(); // No need to free environment.
320  if(!CORBA::is_nil(env) && env->exception())
321  {
322  // Shut down the connection
323  CORBA::Exception* ex =env->exception(); // No need to free exception.
324  DB(10,"ProxyPushSupplier got exception" IF_OMNIORB4(": "<<ex->_name()) );
325  Orb::inst().reportObjectFailure(HERE,_target.in(),ex);
326  _req=CORBA::Request::_nil();
327 
328  // Try to notify the Consumer that the connection is closing.
329  CORBA::Request_var req=_target->_request("disconnect_push_consumer");
330  req->send_deferred();
331  Orb::inst().deferredRequest(req._retn());
332 
333  _target=CosEventComm::PushConsumer::_nil(); // disconnected.
334  eraseKey("ConsumerAdmin/ProxyPushSupplier");
335  deactivateObject();
336  return; // No more work to do
337  }
338  _req=CORBA::Request::_nil();
339  busy=true;
340  }
341  if(CORBA::is_nil(_req) && !CORBA::is_nil(_target) && moreEvents())
342  {
343  _req=_target->_request("push");
344  _req->add_in_arg() <<= *(nextEvent());
345  _req->send_deferred();
346  busy=true;
347  }
348  if(!CORBA::is_nil(_req)) // More work to do, if _req NOT nil.
349  waiting=true;
350 }
351 
352 
353 void ProxyPushSupplier_i::callback(CORBA::Request_ptr req)
354 {
355  if(_targetIsProxy)
356  {
357  // There should only ever be one of these callbacks per proxy,
358  // because each proxy should only be connected once.
359  DB(2,"WARNING: Multiple connections to ProxyPushSupplier.");
360  }
361  else if(req->return_value()>>=CORBA::Any::to_boolean(_targetIsProxy))
362  {
364  {
365  WriteLock log;
366  output(log.os);
367  DB(15,"ProxyPushSupplier is federated.");
368  }
369  }
370  else
371  {
372  DB(2,"ProxyPushSupplier got unexpected callback.");
373  _targetIsProxy=false; // Reset it just to be sure.
374  }
375 }
376 
377 
379  const string& oid,
380  const PersistNode& node
381 )
382 {
383  try
384  {
385  using namespace CosEventChannelAdmin;
386 
387  string ior( node.attrString("IOR").c_str() );
388  CosEventComm::PushConsumer_var pushConsumer =
389  string_to_<CosEventComm::PushConsumer>(ior.c_str());
390  // Do not activate until we know that we have read a valid target.
391  activateObjectWithId(oid.c_str());
392  _remove_ref();
393  _target=pushConsumer._retn();
394  _targetIsProxy=bool(node.attrLong("proxy"));
395 
396  // If pushConsumer is a proxy, then try to reconnect.
397  if(_targetIsProxy)
398  {
399  DB(15,"Attempting to reconnect ProxyPushSupplier: "<<oid.c_str())
400  // This will only work if the proxy is implemented in the same way as
401  // omniEvents, so connect_() automatically creates a proxy.
402  ProxyPushConsumer_var proxyCons =
403  string_to_<ProxyPushConsumer>(ior.c_str());
404  CosEventComm::PushSupplier_var thisSupp =_this();
405  proxyCons->connect_push_supplier(thisSupp);
406  DB(7,"Reconnected ProxyPushSupplier: "<<oid.c_str())
407  }
408  }
409  catch(CosEventChannelAdmin::AlreadyConnected&){ // connect_push_supplier()
410  // The supplier doesn't need to be reconnected.
411  DB(7,"Remote ProxyPushConsumer already connected: "<<oid.c_str())
412  }
413  catch(CosEventChannelAdmin::TypeError&){ // connect_push_supplier()
414  // Don't know what to make of this...
415  DB(2,"Remote ProxyPushConsumer threw TypeError: "<<oid.c_str())
416  }
417  catch(CORBA::OBJECT_NOT_EXIST&) {} // object 'pushConsumer' not responding.
418  catch(CORBA::TRANSIENT& ) {} // object 'pushConsumer' not responding.
419  catch(CORBA::COMM_FAILURE& ) {} // object 'pushConsumer' not responding.
420 }
421 
422 
424 {
425  basicOutput(
426  os,"ConsumerAdmin/ProxyPushSupplier",
427  _target.in(),
428  _targetIsProxy? " proxy=1": NULL
429  );
430 }
431 
432 
433 }; // end namespace OmniEvents
#define HERE
Generates a string literal that describes the filename and line number.
#define IFELSE_OMNIORB4(omniORB4_code, default_code)
Definition: Orb.h:45
#define DB(l, x)
Definition: Orb.h:49
#define IF_OMNIORB4(omniORB4_code)
Definition: Orb.h:46
#define NP_MINORSTRING(systemException)
Definition: Orb.h:52
#define OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(C)
Defines debug versions of _add/remove_ref() for class C.
Definition: Servant.h:70
The EventQueue is a circular buffer, that contains _size-1 events.
Definition: EventQueue.h:57
static bool exists()
Library code may create Event Service objects without the need for persistency.
Obtains an output stream to the active persistancy logfile, and locks it for exclusive access.
void deferredRequest(CORBA::Request_ptr req, Callback *callback=NULL)
Adopts the request and then stores it in _deferredRequests.
Definition: Orb.cc:187
void reportObjectFailure(const char *here, CORBA::Object_ptr obj, CORBA::Exception *ex)
Called by omniEvents when an object has failed (fatal exception).
Definition: Orb.cc:204
static Orb & inst()
Definition: Orb.h:81
string attrString(const string &key, const string &fallback="") const
Definition: PersistNode.cc:155
long attrLong(const string &key, long fallback=0) const
Definition: PersistNode.cc:163
Base class for ServantActivator classes that manage Proxy servants.
Definition: ProxyManager.h:60
void activate(const char *name)
Creates the Proxy-type's POA, and registers this object as its ServantManager.
set< Proxy * > _servants
The set of all active Proxies in this object's _managedPoa.
Definition: ProxyManager.h:90
PortableServer::POA_var _managedPoa
The POA owned & managed by this object.
Definition: ProxyManager.h:95
Base class for three of the four Proxy servants.
Definition: ProxyManager.h:107
void basicOutput(ostream &os, const char *name, CORBA::Object_ptr target=CORBA::Object::_nil(), const char *extraAttributes=NULL)
Helper method for constructing persistency output.
CORBA::Request_var _req
Definition: ProxyManager.h:128
void eraseKey(const char *name)
Helper method for constructing persistency output.
The opposite of omni_mutex_lock, unlocks the mutex upon construction and re-locks it upon destruction...
omni_mutex_kcol & operator=(const omni_mutex_kcol &)
omni_mutex_kcol(const omni_mutex_kcol &)
ProxyPushSupplierManager(PortableServer::POA_ptr parentPoa, EventQueue &q)
void disconnect()
Send disconnect_push_consumer() to all connected PushConsumers.
CosEventChannelAdmin::ProxyPushSupplier_ptr createObject()
void etherealize(const PortableServer::ObjectId &oid, PortableServer::POA_ptr adapter, PortableServer::Servant serv, CORBA::Boolean cleanup_in_progress, CORBA::Boolean remaining_activations)
Pauses the thread, and then calls the parent's implementation.
PortableServer::Servant incarnate(const PortableServer::ObjectId &oid, PortableServer::POA_ptr poa)
void _remove_ref()
Shutdown the thread when refCount reaches zero.
Helper class that locks ProxyPushSupplier upon construction, and wakes it up on destruction.
CosEventComm::PushConsumer_var _target
void connect_push_consumer(CosEventComm::PushConsumer_ptr pushConsumer)
ProxyPushSupplier_i(PortableServer::POA_ptr poa, EventQueue &q)
OMNIEVENTS__DEBUG_REF_COUNTS__DECL void trigger(bool &busy, bool &waiting)
Sets 'busy' if some work was done.
void reincarnate(const string &oid, const PersistNode &node)
Re-create a servant from information saved in the log file.
bool _targetIsProxy
TRUE if _target is a ProxyPushConsumer.
void output(ostream &os)
Save this object's state to a stream.
void callback(CORBA::Request_ptr req)
Sets _targetIsProxy, if it is.
void activateObjectWithId(const char *oidStr)
Calls activate_object_with_id() to activate this servant in its POA.
Definition: Servant.cc:125
void deactivateObject()
Calls deactivate_object() to deactivate this servant in its POA.
Definition: Servant.cc:160