51 PortableServer::Servant
53 const PortableServer::ObjectId& oid,
54 PortableServer::POA_ptr poa
65 const PortableServer::ObjectId& oid,
66 PortableServer::POA_ptr adapter,
67 PortableServer::Servant serv,
68 CORBA::Boolean cleanup_in_progress,
69 CORBA::Boolean remaining_activations
76 omni_mutex_lock pause(
_lock);
78 assert(narrowed!=NULL);
79 set<Proxy*>::iterator pos =
_servants.find(narrowed);
83 narrowed->_remove_ref();
87 DB(1,
"\t\teh? - POA attempted to etherealize unknown servant.");
92 PortableServer::POA_ptr parentPoa,
96 omni_thread(NULL,PRIORITY_HIGH),
98 _lock(),_condition(&_lock),
107 DB(20,
"~ProxyPushSupplierManager()")
110 CosEventChannelAdmin::ProxyPushSupplier_ptr
113 return createNarrowedReference<CosEventChannelAdmin::ProxyPushSupplier>(
115 CosEventChannelAdmin::_tc_ProxyPushSupplier->id()
127 CosEventChannelAdmin::ProxyPushSupplier_var ppsv =pps->_this();
150 const unsigned long sleepTimeNanosec0 =0x8000;
151 const unsigned long maxSleepNanosec =0x800000;
152 unsigned long sleepTimeNanosec =sleepTimeNanosec0;
154 omni_mutex_lock conditionLock(
_lock);
179 sleepTimeNanosec=sleepTimeNanosec0;
185 if(sleepTimeNanosec<maxSleepNanosec)
186 sleepTimeNanosec<<=1;
187 unsigned long sec,nsec;
188 omni_thread::get_time(&sec,&nsec,0,sleepTimeNanosec);
198 catch (CORBA::SystemException& ex) {
199 DB(2,
"ProxyPushSupplierManager ignoring CORBA system exception"
202 catch (CORBA::Exception& ex) {
203 DB(2,
"ProxyPushSupplierManager ignoring CORBA exception"
207 DB(2,
"ProxyPushSupplierManager thread killed by unknown exception.")
216 #if OMNIEVENTS__DEBUG_REF_COUNTS
217 DB(20,
"ProxyPushSupplierManager::_add_ref()")
219 omni_mutex_lock pause(
_lock);
225 #if OMNIEVENTS__DEBUG_REF_COUNTS
226 DB(20,
"ProxyPushSupplierManager::_remove_ref()")
235 DB(2,
"ProxyPushSupplierManager has negative ref count! "<<myref)
239 DB(15,
"ProxyPushSupplierManager has zero ref count -- shutdown.")
250 CosEventComm::PushConsumer_ptr pushConsumer)
252 if(CORBA::is_nil(pushConsumer))
253 throw CORBA::BAD_PARAM();
254 if(!CORBA::is_nil(
_target) || !CORBA::is_nil(
_req))
255 throw CosEventChannelAdmin::AlreadyConnected();
256 _target=CosEventComm::PushConsumer::_duplicate(pushConsumer);
260 CORBA::Request_var req =
_target->_request(
"_is_a");
261 req->add_in_arg() <<= CosEventChannelAdmin::_tc_ProxyPushConsumer->id();
262 req->set_return_type(CORBA::_tc_boolean);
263 req->send_deferred();
276 DB(5,
"ProxyPushSupplier_i::disconnect_push_supplier()");
277 eraseKey(
"ConsumerAdmin/ProxyPushSupplier");
281 throw CORBA::OBJECT_NOT_EXIST(
288 CORBA::Request_var req=
_target->_request(
"disconnect_push_consumer");
289 _target=CosEventComm::PushConsumer::_nil();
290 req->send_deferred();
297 PortableServer::POA_ptr poa,
302 _target(CosEventComm::PushConsumer::_nil()),
303 _targetIsProxy(false)
310 DB(20,
"~ProxyPushSupplier_i()")
317 if(!CORBA::is_nil(_req) && _req->poll_response())
319 CORBA::Environment_ptr env=_req->env();
320 if(!CORBA::is_nil(env) && env->exception())
323 CORBA::Exception* ex =env->exception();
324 DB(10,
"ProxyPushSupplier got exception" IF_OMNIORB4(
": "<<ex->_name()) );
326 _req=CORBA::Request::_nil();
329 CORBA::Request_var req=_target->_request(
"disconnect_push_consumer");
330 req->send_deferred();
333 _target=CosEventComm::PushConsumer::_nil();
334 eraseKey(
"ConsumerAdmin/ProxyPushSupplier");
338 _req=CORBA::Request::_nil();
341 if(CORBA::is_nil(_req) && !CORBA::is_nil(_target) && moreEvents())
343 _req=_target->_request(
"push");
344 _req->add_in_arg() <<= *(nextEvent());
345 _req->send_deferred();
348 if(!CORBA::is_nil(_req))
359 DB(2,
"WARNING: Multiple connections to ProxyPushSupplier.");
361 else if(req->return_value()>>=CORBA::Any::to_boolean(
_targetIsProxy))
367 DB(15,
"ProxyPushSupplier is federated.");
372 DB(2,
"ProxyPushSupplier got unexpected callback.");
385 using namespace CosEventChannelAdmin;
388 CosEventComm::PushConsumer_var pushConsumer =
389 string_to_<CosEventComm::PushConsumer>(ior.c_str());
399 DB(15,
"Attempting to reconnect ProxyPushSupplier: "<<oid.c_str())
402 ProxyPushConsumer_var proxyCons =
403 string_to_<ProxyPushConsumer>(ior.c_str());
404 CosEventComm::PushSupplier_var thisSupp =_this();
405 proxyCons->connect_push_supplier(thisSupp);
406 DB(7,
"Reconnected ProxyPushSupplier: "<<oid.c_str())
409 catch(CosEventChannelAdmin::AlreadyConnected&){
411 DB(7,
"Remote ProxyPushConsumer already connected: "<<oid.c_str())
413 catch(CosEventChannelAdmin::TypeError&){
415 DB(2,
"Remote ProxyPushConsumer threw TypeError: "<<oid.c_str())
417 catch(CORBA::OBJECT_NOT_EXIST&) {}
418 catch(CORBA::TRANSIENT& ) {}
419 catch(CORBA::COMM_FAILURE& ) {}
426 os,
"ConsumerAdmin/ProxyPushSupplier",
#define HERE
Generates a string literal that describes the filename and line number.
#define IFELSE_OMNIORB4(omniORB4_code, default_code)
#define IF_OMNIORB4(omniORB4_code)
#define NP_MINORSTRING(systemException)
#define OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(C)
Defines debug versions of _add/remove_ref() for class C.
The EventQueue is a circular buffer, that contains _size-1 events.
static bool exists()
Library code may create Event Service objects without the need for persistency.
Obtains an output stream to the active persistancy logfile, and locks it for exclusive access.
void deferredRequest(CORBA::Request_ptr req, Callback *callback=NULL)
Adopts the request and then stores it in _deferredRequests.
void reportObjectFailure(const char *here, CORBA::Object_ptr obj, CORBA::Exception *ex)
Called by omniEvents when an object has failed (fatal exception).
string attrString(const string &key, const string &fallback="") const
long attrLong(const string &key, long fallback=0) const
Base class for ServantActivator classes that manage Proxy servants.
void activate(const char *name)
Creates the Proxy-type's POA, and registers this object as its ServantManager.
set< Proxy * > _servants
The set of all active Proxies in this object's _managedPoa.
PortableServer::POA_var _managedPoa
The POA owned & managed by this object.
Base class for three of the four Proxy servants.
void basicOutput(ostream &os, const char *name, CORBA::Object_ptr target=CORBA::Object::_nil(), const char *extraAttributes=NULL)
Helper method for constructing persistency output.
void eraseKey(const char *name)
Helper method for constructing persistency output.
The opposite of omni_mutex_lock, unlocks the mutex upon construction and re-locks it upon destruction...
omni_mutex_kcol(omni_mutex &m)
omni_mutex_kcol & operator=(const omni_mutex_kcol &)
omni_mutex_kcol(const omni_mutex_kcol &)
ProxyPushSupplierManager(PortableServer::POA_ptr parentPoa, EventQueue &q)
void * run_undetached(void *)
~ProxyPushSupplierManager()
void disconnect()
Send disconnect_push_consumer() to all connected PushConsumers.
omni_condition _condition
CosEventChannelAdmin::ProxyPushSupplier_ptr createObject()
void etherealize(const PortableServer::ObjectId &oid, PortableServer::POA_ptr adapter, PortableServer::Servant serv, CORBA::Boolean cleanup_in_progress, CORBA::Boolean remaining_activations)
Pauses the thread, and then calls the parent's implementation.
PortableServer::Servant incarnate(const PortableServer::ObjectId &oid, PortableServer::POA_ptr poa)
void _remove_ref()
Shutdown the thread when refCount reaches zero.
Helper class that locks ProxyPushSupplier upon construction, and wakes it up on destruction.
CosEventComm::PushConsumer_var _target
void connect_push_consumer(CosEventComm::PushConsumer_ptr pushConsumer)
ProxyPushSupplier_i(PortableServer::POA_ptr poa, EventQueue &q)
OMNIEVENTS__DEBUG_REF_COUNTS__DECL void trigger(bool &busy, bool &waiting)
Sets 'busy' if some work was done.
void reincarnate(const string &oid, const PersistNode &node)
Re-create a servant from information saved in the log file.
bool _targetIsProxy
TRUE if _target is a ProxyPushConsumer.
void output(ostream &os)
Save this object's state to a stream.
void callback(CORBA::Request_ptr req)
Sets _targetIsProxy, if it is.
void disconnect_push_supplier()
void activateObjectWithId(const char *oidStr)
Calls activate_object_with_id() to activate this servant in its POA.
void deactivateObject()
Calls deactivate_object() to deactivate this servant in its POA.