OmniEvents
ProxyPullConsumer.cc
Go to the documentation of this file.
1 // Package : omniEvents
2 // ProxyPullConsumer.cc Created : 2003/12/04
3 // Author : Alex Tingle
4 //
5 // Copyright (C) 2003-2005 Alex Tingle.
6 //
7 // This file is part of the omniEvents application.
8 //
9 // omniEvents is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Lesser General Public
11 // License as published by the Free Software Foundation; either
12 // version 2.1 of the License, or (at your option) any later version.
13 //
14 // omniEvents is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Lesser General Public License for more details.
18 //
19 // You should have received a copy of the GNU Lesser General Public
20 // License along with this library; if not, write to the Free Software
21 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 //
23 
24 #include "ProxyPullConsumer.h"
25 #include "Orb.h"
26 #include "omniEventsLog.h"
27 #include "PersistNode.h"
28 #include <assert.h>
29 
30 namespace OmniEvents {
31 
32 //
33 // ProxyPullConsumerManager
34 //
35 
36 PortableServer::Servant
38  const PortableServer::ObjectId& oid,
39  PortableServer::POA_ptr poa
40 )
41 {
42  DB(20,"ProxyPullConsumerManager::incarnate()")
44  _servants.insert(result);
45  return result;
46 }
47 
49  PortableServer::POA_ptr parentPoa,
50  list<CORBA::Any*>& q
51 )
52 : ProxyManager(parentPoa),
53  _queue(q)
54 {
55  ProxyManager::activate("ProxyPullConsumer");
56 }
57 
59 {
60  DB(20,"~ProxyPullConsumerManager()")
61 }
62 
64 
65 CosEventChannelAdmin::ProxyPullConsumer_ptr
67 {
68  return createNarrowedReference<CosEventChannelAdmin::ProxyPullConsumer>(
69  _managedPoa.in(),
70  CosEventChannelAdmin::_tc_ProxyPullConsumer->id()
71  );
72 }
73 
75 {
76  // Collect events from each servant in turn.
77  for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
78  {
79  ProxyPullConsumer_i* proxy=dynamic_cast<ProxyPullConsumer_i*>(*i);
80  proxy->collect();
81  }
82 }
83 
85 {
86  // Trigger each servant in turn.
87  for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
88  {
89  ProxyPullConsumer_i* proxy=dynamic_cast<ProxyPullConsumer_i*>(*i);
90  proxy->triggerRequest();
91  }
92 }
93 
95 {
96  for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
97  {
98  Proxy* p =*i; // Sun's CC requires this temporary.
99  ProxyPullConsumer_i* ppc =static_cast<ProxyPullConsumer_i*>(p);
100  // We are in the EventChannel's thread.
101  // Make sure all calls go though the ProxyPullConsumer POA.
102  CosEventChannelAdmin::ProxyPullConsumer_var ppcv =ppc->_this();
103  ppcv->disconnect_pull_consumer();
104  }
105 }
106 
107 
108 //
109 // ProxyPullConsumer_i
110 //
111 
112 // CORBA interface methods
113 
115  CosEventComm::PullSupplier_ptr pullSupplier
116 )
117 {
118  if(CORBA::is_nil(pullSupplier))
119  throw CORBA::BAD_PARAM();
120  if(!CORBA::is_nil(_target) || !CORBA::is_nil(_req))
121  throw CosEventChannelAdmin::AlreadyConnected();
122  _target=CosEventComm::PullSupplier::_duplicate(pullSupplier);
123 
125  {
126  WriteLock log;
127  output(log.os);
128  }
129 }
130 
132 {
133  DB(5,"ProxyPullConsumer_i::disconnect_pull_consumer()");
134  eraseKey("SupplierAdmin/ProxyPullConsumer");
136  if(CORBA::is_nil(_target))
137  {
138  throw CORBA::OBJECT_NOT_EXIST(
139  IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
140  CORBA::COMPLETED_NO
141  );
142  }
143  else
144  {
145  CORBA::Request_var req=_target->_request("disconnect_pull_supplier");
146  _target=CosEventComm::PullSupplier::_nil();
147  req->send_deferred();
148  Orb::inst().deferredRequest(req._retn());
149  }
150 }
151 
152 //
153 
155  PortableServer::POA_ptr poa,
156  list<CORBA::Any*>& q
157 )
158 : Proxy(poa),
159  _target(CosEventComm::PullSupplier::_nil()),
160  _queue(q),
161  _mode(Pull), // Prefer 'pull' method calls.
162  _exceptionCount(0)
163 {}
164 
166 {
167  DB(20,"~ProxyPullConsumer_i()")
168 }
169 
171 {
172  if(!CORBA::is_nil(_req) && _req->poll_response())
173  {
174  const char* opname =_req->operation();
175  assert(opname);
176  CORBA::Environment_ptr env =_req->env(); // No need to release environment.
177 
178  if(!CORBA::is_nil(env) && env->exception())
179  {
180  CORBA::Exception* ex =env->exception(); // No need to free exception.
181  DB(10,"ProxyPullConsumer got exception"
182  IF_OMNIORB4(<<": "<<ex->_name())<<", op:"<<opname);
183  if(0==strcmp("pull",opname) || 0==strcmp("try_pull",opname))
184  {
185  ++_exceptionCount;
186  _mode=( _mode==Pull? TryPull: Pull ); // Try something else next time.
187  }
188  else
189  DB(2,"Ignoring unrecognised response. operation:"<<opname);
190  if(_exceptionCount>=4)
191  {
193 
194  // Try to notify the Supplier that the connection is closing.
195  CORBA::Request_var req=_target->_request("disconnect_pull_supplier");
196  req->send_deferred();
197  Orb::inst().deferredRequest(req._retn());
198 
199  _target=CosEventComm::PullSupplier::_nil(); // disconnected
200  eraseKey("SupplierAdmin/ProxyPullConsumer");
202  }
203  }
204  else
205  {
206  // Do we have an event?
207  bool hasEvent=false;
208  if(0==strcmp("pull",opname))
209  {
210  hasEvent=true;
211  }
212  else if(0==strcmp("try_pull",opname))
213  {
214  CORBA::NVList_ptr args=_req->arguments(); // No need to release args.
215  if(args->count()==1)
216  {
217  CORBA::NamedValue_var hasEventArg=args->item(0);
218  if(0==strcmp(hasEventArg->name(),"has_event"))
219  {
220  CORBA::Any* a =hasEventArg->value();
221  CORBA::Boolean b;
222  CORBA::Any::to_boolean tb(b); //MS VC++6 is on drugs!
223  hasEvent=(((*a)>>=tb) && b);
224  }
225  }
226  }
227  // Pick up an event, if we have one.
228  if(hasEvent)
229  {
230  CORBA::Any* event =new CORBA::Any();
231  _req->return_value() >>= (*event);
232  _queue.push_back(event);
233  }
234  // Reset the exception count.
235  _exceptionCount=0;
236  }
237  _req=CORBA::Request::_nil();
238  }
239 } // ProxyPullConsumer_i::end collect()
240 
242 {
243  if(CORBA::is_nil(_req) && !CORBA::is_nil(_target))
244  {
245  switch(_mode)
246  {
247  case Pull:
248  _req=_target->_request("pull");
249  break;
250  case TryPull:
251  _req=_target->_request("try_pull");
252  _req->add_out_arg("has_event")<<=CORBA::Any::from_boolean(1);
253  break;
254  default:
255  assert(0);
256  }
257  _req->set_return_type(CORBA::_tc_any);
258  _req->send_deferred();
259  }
260 }
261 
263  const string& oid,
264  const PersistNode& node
265 )
266 {
267  CosEventComm::PullSupplier_var pullSupplier =
268  string_to_<CosEventComm::PullSupplier>(node.attrString("IOR").c_str());
269  // Do not activate until we know that we have read a valid target.
270  activateObjectWithId(oid.c_str());
271  connect_pull_supplier(pullSupplier.in());
272 }
273 
275 {
276  basicOutput(os,"SupplierAdmin/ProxyPullConsumer",_target.in());
277 }
278 
279 }; // end namespace OmniEvents
#define HERE
Generates a string literal that describes the filename and line number.
#define IFELSE_OMNIORB4(omniORB4_code, default_code)
Definition: Orb.h:45
#define DB(l, x)
Definition: Orb.h:49
#define IF_OMNIORB4(omniORB4_code)
Definition: Orb.h:46
#define OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(C)
Defines debug versions of _add/remove_ref() for class C.
Definition: Servant.h:70
static bool exists()
Library code may create Event Service objects without the need for persistency.
Obtains an output stream to the active persistancy logfile, and locks it for exclusive access.
void deferredRequest(CORBA::Request_ptr req, Callback *callback=NULL)
Adopts the request and then stores it in _deferredRequests.
Definition: Orb.cc:187
void reportObjectFailure(const char *here, CORBA::Object_ptr obj, CORBA::Exception *ex)
Called by omniEvents when an object has failed (fatal exception).
Definition: Orb.cc:204
static Orb & inst()
Definition: Orb.h:81
string attrString(const string &key, const string &fallback="") const
Definition: PersistNode.cc:155
Base class for ServantActivator classes that manage Proxy servants.
Definition: ProxyManager.h:60
void activate(const char *name)
Creates the Proxy-type's POA, and registers this object as its ServantManager.
set< Proxy * > _servants
The set of all active Proxies in this object's _managedPoa.
Definition: ProxyManager.h:90
PortableServer::POA_var _managedPoa
The POA owned & managed by this object.
Definition: ProxyManager.h:95
Base class for three of the four Proxy servants.
Definition: ProxyManager.h:107
void basicOutput(ostream &os, const char *name, CORBA::Object_ptr target=CORBA::Object::_nil(), const char *extraAttributes=NULL)
Helper method for constructing persistency output.
CORBA::Request_var _req
Definition: ProxyManager.h:128
void eraseKey(const char *name)
Helper method for constructing persistency output.
void disconnect()
Send disconnect_pull_supplier() to all connected PullSuppliers.
void triggerRequest()
For each connected proxy, if there is no request in progress, send a new request to the current opera...
void collect()
Collects events that have arrived at connected proxies.
PortableServer::Servant incarnate(const PortableServer::ObjectId &oid, PortableServer::POA_ptr poa)
OMNIEVENTS__DEBUG_REF_COUNTS__DECL CosEventChannelAdmin::ProxyPullConsumer_ptr createObject()
ProxyPullConsumerManager(PortableServer::POA_ptr parentPoa, list< CORBA::Any * > &q)
Implementation of the ProxyPullConsumer interface.
void output(ostream &os)
Save this object's state to a stream.
CosEventComm::PullSupplier_var _target
int _exceptionCount
Only when two consecutive exceptions have been received from each mode, do we consider the connection...
void connect_pull_supplier(CosEventComm::PullSupplier_ptr pullSupplier)
ProxyPullConsumer_i(PortableServer::POA_ptr poa, list< CORBA::Any * > &q)
void reincarnate(const string &oid, const PersistNode &node)
Re-create a servant from information saved in the log file.
void triggerRequest()
When _req is NIL, sends out a new pull() or try_pull() call.
void collect()
Collects responses since the last trigger.
void activateObjectWithId(const char *oidStr)
Calls activate_object_with_id() to activate this servant in its POA.
Definition: Servant.cc:125
void deactivateObject()
Calls deactivate_object() to deactivate this servant in its POA.
Definition: Servant.cc:160