35 CosEventComm::PushSupplier_ptr pushSupplier)
38 if(CORBA::is_nil(pushSupplier))
45 throw CosEventChannelAdmin::AlreadyConnected();
51 CosEventComm::PushSupplier::_duplicate(pushSupplier)
53 _connections.insert( Connections_t::value_type(oidstr,newConnection) );
57 CORBA::Request_var req =pushSupplier->_request(
"_is_a");
58 req->add_in_arg() <<= CosEventChannelAdmin::_tc_ProxyPushSupplier->id();
59 req->set_return_type(CORBA::_tc_boolean);
74 DB(5,
"ProxyPushConsumer_i::disconnect_push_consumer()")
80 CORBA::Request_var req =
81 pos->second->_target->_request(
"disconnect_push_supplier");
82 pos->second->_remove_ref();
93 log.
os<<
"/SupplierAdmin/ProxyPushConsumer/"<<oidstr<<
'\n';
97 DB(5,
"Ignoring disconnect_push_consumer(). Upgrade to omniORB4!")
104 #ifdef OMNIEVENTS_REAL_TIME_PUSH
112 _queue.push_back(
new CORBA::Any(event));
117 PortableServer::POA_ptr p,
118 list<CORBA::Any*>& q,
121 :
Servant(PortableServer::POA::_nil()),
123 _channelName(p->the_name()),
124 _consumerAdmin(consumerAdmin),
126 _useLocalQueue(false)
130 using namespace PortableServer;
141 CORBA::PolicyList policies;
143 policies[0]=p->create_lifespan_policy(PERSISTENT);
144 policies[1]=p->create_id_assignment_policy(USER_ID);
145 policies[2]=p->create_id_uniqueness_policy(MULTIPLE_ID);
146 policies[3]=p->create_implicit_activation_policy(NO_IMPLICIT_ACTIVATION);
147 policies[4]=p->create_request_processing_policy(USE_DEFAULT_SERVANT);
148 policies[5]=p->create_servant_retention_policy(NON_RETAIN);
149 policies[6]=p->create_thread_policy(SINGLE_THREAD_MODEL);
154 string poaName =string(
_channelName.in())+
".ProxyPushConsumer";
155 POAManager_var parentManager =p->the_POAManager();
156 _poa=p->create_POA(poaName.c_str(),parentManager.in(),policies);
158 catch(POA::AdapterAlreadyExists&)
160 DB(0,
"ProxyPushConsumer_i::ProxyPushConsumer_i() - "
161 "POA::AdapterAlreadyExists")
163 catch(POA::InvalidPolicy& ex)
165 DB(0,
"ProxyPushConsumer_i::ProxyPushConsumer_i() - "
166 "POA::InvalidPolicy: "<<ex.index)
170 for(CORBA::ULong i=0; i<policies.length(); ++i)
171 policies[i]->destroy();
174 _poa->set_servant(
this);
180 DB(20,
"~ProxyPushConsumer_i()")
185 i->second->_remove_ref();
193 CosEventChannelAdmin::ProxyPushConsumer_ptr
196 return createNarrowedReference<CosEventChannelAdmin::ProxyPushConsumer>(
198 CosEventChannelAdmin::_tc_ProxyPushConsumer->id()
210 CORBA::Request_var req =
211 curr->second->_target->_request(
"disconnect_push_supplier");
212 curr->second->_remove_ref();
216 req->send_deferred();
225 for(map<string,PersistNode*>::const_iterator i=node.
_child.begin();
229 const char* oidstr =i->first.c_str();
230 string ior( i->second->attrString(
"IOR") );
231 bool isProxy( i->second->attrLong(
"proxy") );
235 using namespace CosEventComm;
236 using namespace CosEventChannelAdmin;
238 PushSupplier_var supp =string_to_<PushSupplier>(ior.c_str());
243 DB(5,
"Reincarnated ProxyPushConsumer: "<<oidstr)
248 DB(15,
"Attempting to reconnect ProxyPushConsumer: "<<oidstr)
251 ProxyPushSupplier_var proxySupp =
252 string_to_<ProxyPushSupplier>(ior.c_str());
253 PortableServer::ObjectId_var objectId =
254 PortableServer::string_to_ObjectId(oidstr);
255 CORBA::Object_var obj =
256 _poa->create_reference_with_id(
258 CosEventChannelAdmin::_tc_ProxyPushConsumer->id()
260 PushConsumer_var thisCons =CosEventComm::PushConsumer::_narrow(obj);
261 proxySupp->connect_push_consumer(thisCons.in());
262 DB(7,
"Reconnected ProxyPushConsumer: "<<oidstr)
265 catch(CORBA::BAD_PARAM&) {
267 DB(5,
"Failed to reincarnate ProxyPushConsumer: "<<oidstr)
269 catch(CosEventChannelAdmin::AlreadyConnected&){
271 DB(7,
"Remote ProxyPushSupplier already connected: "<<oidstr)
273 catch(CosEventChannelAdmin::TypeError&){
275 DB(2,
"Remote ProxyPushSupplier threw TypeError: "<<oidstr)
277 catch(CORBA::OBJECT_NOT_EXIST&) {}
278 catch(CORBA::TRANSIENT& ) {}
279 catch(CORBA::COMM_FAILURE& ) {}
286 for(Connections_t::const_iterator i=
_connections.begin();
290 i->second->output(os);
300 using namespace PortableServer;
301 ObjectId_var oid =
Orb::inst()._POACurrent->get_object_id();
302 CORBA::String_var oidStr =ObjectId_to_string(oid.in());
303 return string(oidStr.in());
305 catch(PortableServer::Current::NoContext&)
309 catch(CORBA::BAD_PARAM&)
316 throw CORBA::NO_IMPLEMENT();
325 #if OMNIEVENTS__DEBUG_SERVANT
326 int ProxyPushConsumer_i::Connection::_objectCount =0;
330 const char* channelName,
331 const string& oidstr,
332 CosEventComm::PushSupplier_ptr pushSupplier,
337 _target(pushSupplier),
338 _targetIsProxy(isProxy)
340 #if OMNIEVENTS__DEBUG_SERVANT
342 DB(21,
"ProxyPushConsumer_i::Connection::Connection() count="<<_objectCount)
348 #if OMNIEVENTS__DEBUG_SERVANT
350 DB(20,
"ProxyPushConsumer_i::Connection::~Connection() count="<<_objectCount)
352 DB(20,
"ProxyPushConsumer_i::Connection::~Connection()")
360 bool save =_targetIsProxy;
361 if(req->return_value()>>=CORBA::Any::to_boolean(_targetIsProxy))
367 DB(15,
"ProxyPushConsumer is federated.");
372 DB(2,
"ProxyPushConsumer got unexpected callback.");
380 os<<
"/SupplierAdmin/ProxyPushConsumer/"<<_oidstr;
382 if(!CORBA::is_nil(_target.in()))
384 CORBA::String_var iorstr;
386 os<<
" IOR="<<iorstr.in();
#define OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(C)
Defines debug versions of _add/remove_ref() for class C.
Interface for classes that wish to receive callbacks from deferred requests.
OMNIEVENTS__DEBUG_REF_COUNTS__DECL void send(CORBA::Any *event)
Queues a single event for sending to consumers.
static bool exists()
Library code may create Event Service objects without the need for persistency.
Obtains an output stream to the active persistancy logfile, and locks it for exclusive access.
void deferredRequest(CORBA::Request_ptr req, Callback *callback=NULL)
Adopts the request and then stores it in _deferredRequests.
map< string, PersistNode * > _child
Default servant for ProxyPushConsumer objects.
ProxyPushConsumer_i(PortableServer::POA_ptr parentPoa, list< CORBA::Any * > &q, ConsumerAdmin_i &consumerAdmin)
void disconnect_push_consumer()
We may not have a record of the supplier, so this method must accept calls from any supplier without ...
list< CORBA::Any * > & _queue
ConsumerAdmin_i & _consumerAdmin
CORBA::String_var _channelName
Connections_t _connections
virtual ~ProxyPushConsumer_i()
void push(const CORBA::Any &event)
Accepts events from any supplier, not just those stored in _connections.
string currentObjectId() const
void output(ostream &os) const
Save this object's state to a stream.
CosEventChannelAdmin::ProxyPushConsumer_ptr createObject()
Constructs a new object.
bool _useLocalQueue
Switch between RT/chunked modes.
void reincarnate(const PersistNode &node)
Re-create all servants from information saved in the log file.
void disconnect()
Send disconnect_push_supplier() to all connected PushSuppliers.
void connect_push_supplier(CosEventComm::PushSupplier_ptr pushSupplier)
If pushSupplier is provided, then it is stored in _connections.
Connection()
NO IMPLEMENTATION.
void output(ostream &os) const
Save this object's state to a stream.
PortableServer::POA_var _poa