OmniEvents
EventChannel.cc
Go to the documentation of this file.
1 // Package : omniEvents
2 // EventChannel.cc Created : 2003/12/04
3 // Author : Alex Tingle
4 //
5 // Copyright (C) 2003-2005 Alex Tingle.
6 //
7 // This file is part of the omniEvents application.
8 //
9 // omniEvents is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Lesser General Public
11 // License as published by the Free Software Foundation; either
12 // version 2.1 of the License, or (at your option) any later version.
13 //
14 // omniEvents is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Lesser General Public License for more details.
18 //
19 // You should have received a copy of the GNU Lesser General Public
20 // License along with this library; if not, write to the Free Software
21 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 //
23 
24 #include "EventChannel.h"
25 #include "ConsumerAdmin.h"
26 #include "SupplierAdmin.h"
27 #include "omniEventsLog.h"
28 #include "Orb.h"
29 
30 #include <list>
31 
32 namespace OmniEvents {
33 
34 // CORBA interface methods
35 CosEventChannelAdmin::ConsumerAdmin_ptr EventChannel_i::for_consumers()
36 {
38  throw CORBA::OBJECT_NOT_EXIST();
39  return _consumerAdmin->_this();
40 }
41 
42 
43 CosEventChannelAdmin::SupplierAdmin_ptr EventChannel_i::for_suppliers()
44 {
46  throw CORBA::OBJECT_NOT_EXIST();
47  return _supplierAdmin->_this();
48 }
49 
50 
52 {
54  throw CORBA::OBJECT_NOT_EXIST();
55 
56  // Prevent further incoming connections.
57  _shutdownRequested=true;
58 
59  DB(5,"EventChannel_i::destroy()")
60 
61  // Send disconnect messages to connected clients.
62  if(_consumerAdmin)
64  if(_supplierAdmin)
66 }
67 
68 
70 : Servant(PortableServer::POA::_nil()),
71  _eventChannelStore(store),
72  _consumerAdmin(NULL),
73  _supplierAdmin(NULL),
74  _poaManager(),
75  _shutdownRequested(false),
76  _properties(),
77  _mapper(NULL),
78  _lock(),
79  _refCount(1)
80 {}
81 
82 
84  const char* channelName,
85  const PersistNode* node
86 )
87 {
88  // The order of these various initialization methods is very important.
89  // I've documented dependencies as 'REQUIRES' comments.
90 
91  createPoa(channelName);
92 
93  if(node)
94  _properties._attr=node->_attr;
95 
96  // REQUIRES: _properties
98 
99  // REQUIRES: _consumerAdmin, _properties
101 
102  if(node)
103  {
104  PersistNode* saNode =node->child("SupplierAdmin");
105  if(saNode)
106  _supplierAdmin->reincarnate(*saNode);
107 
108  PersistNode* caNode =node->child("ConsumerAdmin");
109  if(caNode)
110  _consumerAdmin->reincarnate(*caNode);
111  }
112 
113  activateObjectWithId("EventChannel");
114 
115  // Remove the constructor's reference. This object will now be destroyed when
116  // the POA releases it.
117  _remove_ref();
118 
119  // REQUIRES: activate() ...since it uses _this().
120  setInsName(_properties.attrString("InsName"));
121 
122  // Start the channel's thread running.
123  start_undetached();
124 }
125 
126 
128 {
129  DB(20,"~EventChannel_i()")
130  // Destroy the mapper object, even when the EventChannel is being shut down
131  // without a call to destroy(). This can happen if the channel is
132  // implemented through libomniEvents - the channel could be shut down and
133  // later reincarnated in the same process. The Mapper's lifecycle should
134  // match that of the EventChannel.
135  if(_mapper)
136  {
137  _mapper->destroy();
138  _mapper=NULL;
139  }
140  if(_consumerAdmin)
141  {
142  _consumerAdmin->_remove_ref();
143  _consumerAdmin=NULL;
144  }
145  if(_supplierAdmin)
146  {
147  _supplierAdmin->_remove_ref();
148  _supplierAdmin=NULL;
149  }
150 }
151 
152 
154 {
155  // Ensure that activate() is called before start()/run().
156  assert(!CORBA::is_nil(_poa));
157 
158  const char* action="";
159  try
160  {
162  {
163  action="add this object to the store";
164  _eventChannelStore->insert(this);
165  }
166 
168  {
169  action="create this object in the persistency database";
170  WriteLock log;
171  output(log.os);
172  }
173 
174  // Process events until the channel is destroyed.
175  action="run main loop";
176  mainLoop();
177 
179  {
180  action="remove this object from the store";
181  _eventChannelStore->erase(this);
182  }
183 
185  {
187  {
188  action="remove record from persistency database";
189  CORBA::String_var poaName =_poa->the_name();
190  WriteLock log;
191  log.os<<"-ecf/"<<poaName.in()<<'\n';
192  }
193  action="destroy POA";
194  _poa->destroy(
195  CORBA::Boolean(1) /* etherealize_objects */,
196  CORBA::Boolean(0) /* wait_for_completion */
197  );
198  _poa=PortableServer::POA::_nil();
199 
200  } // end if(_shutdownRequested)
201 
202  }
203  catch(PortableServer::POAManager::AdapterInactive& ex) {
204  DB(0,"EventChannel_i::run_undetached() - failed to "<<action<<
205  ", POA deactivated from the outside.")
206  }
207  catch (CORBA::SystemException& ex) {
208  DB(0,"EventChannel_i::run_undetached() - failed to "<<action<<
209  ", System exception: "<<ex._name()<<" ("<<NP_MINORSTRING(ex)<<")")
210  }
211  catch (CORBA::Exception& ex) {
212  DB(0,"EventChannel_i::run_undetached() - failed to "<<action<<
213  ", CORBA exception: "<<ex._name())
214  }
215 
216  // Thread now exits, and this object is deleted.
217  return NULL;
218 }
219 
220 
222 {
223  _poaManager->activate();
224  unsigned long localCyclePeriod_ns=cyclePeriod_ns();
225  while(_refCount>0 && !_shutdownRequested)
226  {
227  //
228  // TRANSFER PHASE - transfer events from SupplierAdmin to ConsumerAdmin.
229  _poaManager->hold_requests(CORBA::Boolean(1) /* wait_for_completion */);
230 
231  if(_shutdownRequested) break;
232 
233  list<CORBA::Any*> events;
234  _supplierAdmin->collect(events);
235  _consumerAdmin->send(events);
236  assert(events.empty());
237 
238  _poaManager->activate();
239 
240  //
241  // COMMUNICATION PHASE - talk with clients' suppliers & consumers.
242  // Note: On Linux the resolution of nanosleep is a huge 10ms.
243  omni_thread::sleep(0,localCyclePeriod_ns);
244  }
245 }
246 
247 
249 {
250 #if OMNIEVENTS__DEBUG_REF_COUNTS
251  DB(20,"EventChannel_i::_add_ref()")
252 #endif
253  omni_mutex_lock pause(_lock);
254  ++_refCount;
255 }
256 
257 
259 {
260 #if OMNIEVENTS__DEBUG_REF_COUNTS
261  DB(20,"EventChannel_i::_remove_ref()")
262 #endif
263  int myref;
264  {
265  omni_mutex_lock pause(_lock);
266  myref = --_refCount;
267  }
268 
269  if(myref<0)
270  {
271  DB(2,"EventChannel has negative ref count! "<<myref)
272  }
273  else if(myref==0)
274  {
275  DB(15,"EventChannel has zero ref count -- shutdown.")
276  join(NULL);
277  }
278 }
279 
280 
281 void EventChannel_i::output(ostream& os)
282 {
283  CORBA::String_var poaName =_poa->the_name();
284  string name =string("ecf/")+poaName.in();
285  _properties.output(os,name);
286  if(_supplierAdmin)
287  _supplierAdmin->output(os);
288  if(_consumerAdmin)
289  _consumerAdmin->output(os);
290 }
291 
292 
293 void EventChannel_i::setInsName(const string v)
294 {
295  Mapper* newMapper =NULL;
296  try
297  {
298 
299  // If _insName is set, then create a mapper object to allow clients to
300  // find this object with a `corbaloc' string.
301  if(!v.empty())
302  {
303  // !! Throws when there is already an object named 'v' in the INSPOA.
304  CORBA::Object_var obj( _this() );
305  newMapper=new Mapper(v.c_str(),obj.in());
306  }
307  // Deactivate the old _mapper object.
308  if(_mapper)
309  _mapper->destroy();
310  _mapper=newMapper;
311 
312  }
313  catch(...)
314  {
315  // Can't use an auto_ptr, because MS VC++ 6 has no auto_ptr::reset()
316  delete newMapper;
317  throw;
318  }
319 }
320 
321 
322 void EventChannel_i::createPoa(const char* channelName)
323 {
324  using namespace PortableServer;
325  POA_ptr p=Orb::inst()._RootPOA.in();
326 
327  // POLICIES:
328  // Lifespan =PERSISTENT // we can persist
329  // Assignment =USER_ID // write our own oid
330  // Uniqueness =[default] UNIQUE_ID // one servant per object
331  // ImplicitActivation=[default] IMPLICIT_ACTIVATION // auto activation
332  // RequestProcessing =[default] USE_ACTIVE_OBJECT_MAP_ONLY
333  // ServantRetention =[default] RETAIN // stateless POA
334  // Thread =SINGLE_THREAD_MODEL // keep it simple
335 
336  CORBA::PolicyList policies;
337  policies.length(3);
338  policies[0]=p->create_lifespan_policy(PERSISTENT);
339  policies[1]=p->create_id_assignment_policy(USER_ID);
340  policies[2]=p->create_thread_policy(SINGLE_THREAD_MODEL);
341 
342  try // finally
343  {
344  try
345  {
346  // Create a new POA (and new POAManager) for this channel.
347  // The POAManager will be used for all of this channel's POAs.
348  _poa=p->create_POA(channelName,POAManager::_nil(),policies);
349  _poaManager=_poa->the_POAManager();
350  }
351  catch(POA::AdapterAlreadyExists& ex) // create_POA
352  {
353  DB(0,"EventChannel_i::createPoa() - POA::AdapterAlreadyExists")
354  throw;
355  }
356  catch(POA::InvalidPolicy& ex) // create_POA
357  {
358  DB(0,"EventChannel_i::createPoa() - POA::InvalidPolicy: "<<ex.index)
359  throw;
360  }
361  }
362  catch(...) // finally
363  {
364  // Destroy the policy objects (Not strictly necessary in omniORB)
365  for(CORBA::ULong i=0; i<policies.length(); ++i)
366  policies[i]->destroy();
367  throw;
368  }
369 
370  // Destroy the policy objects (Not strictly necessary in omniORB)
371  for(CORBA::ULong i=0; i<policies.length(); ++i)
372  policies[i]->destroy();
373 }
374 
375 
376 //
377 // class EventChannelStore
378 //
379 
380 
382 :_channels(),_lock()
383 {}
384 
386 {
387  // ?? IMPLEMENT ME
388 }
389 
391 {
392  omni_mutex_lock l(_lock);
393  bool insertOK =_channels.insert(channel).second;
394  if(!insertOK)
395  DB(2,"Attempted to store an EventChannel, when it is already stored.");
396 }
397 
399 {
400  omni_mutex_lock l(_lock);
401  set<EventChannel_i*>::iterator pos =_channels.find(channel);
402  if(pos==_channels.end())
403  DB(2,"Failed to erase unknown EventChannel.")
404  else
405  _channels.erase(pos);
406 }
407 
408 void EventChannelStore::output(ostream &os)
409 {
410  omni_mutex_lock l(_lock);
411  for(set<EventChannel_i*>::iterator i=_channels.begin();
412  i!=_channels.end();
413  ++i)
414  {
415  (*i)->output(os);
416  }
417 }
418 
419 
420 }; // end namespace OmniEvents
421 
#define DB(l, x)
Definition: Orb.h:49
#define NP_MINORSTRING(systemException)
Definition: Orb.h:52
OMNIEVENTS__DEBUG_REF_COUNTS__DECL void send(CORBA::Any *event)
Queues a single event for sending to consumers.
void reincarnate(const PersistNode &node)
Populate this servant from log information.
void disconnect()
Send disconnect_XXX_consumer() to all connected consumers.
void output(ostream &os)
Save this object's state to a stream.
Servant for CosEventChannelAdmin::EventChannel objects, also inherits from omni_thread.
Definition: EventChannel.h:115
void activate(const char *channelName, const PersistNode *node=NULL)
Creates the channel's POA, and any child objects.
Definition: EventChannel.cc:83
~EventChannel_i()
Cleans up the _poa, if this object is deleted before its thread starts.
void _remove_ref()
Shutdown the thread when refCount reaches zero.
void mainLoop()
The main loop for a channel.
void * run_undetached(void *)
Entry point for the channel's thread.
EventChannelStore * _eventChannelStore
Definition: EventChannel.h:196
ConsumerAdmin_i * _consumerAdmin
Definition: EventChannel.h:198
PortableServer::POAManager_var _poaManager
Definition: EventChannel.h:199
void createPoa(const char *channelName)
Constructs the main POA for this channel.
unsigned long cyclePeriod_ns() const
Definition: EventChannel.h:181
CosEventChannelAdmin::ConsumerAdmin_ptr for_consumers()
Definition: EventChannel.cc:35
CosEventChannelAdmin::SupplierAdmin_ptr for_suppliers()
Definition: EventChannel.cc:43
SupplierAdmin_i * _supplierAdmin
Definition: EventChannel.h:197
EventChannel_i(EventChannelStore *store=NULL)
Definition: EventChannel.cc:69
void output(ostream &os)
void setInsName(const string v)
Construct a new Mapper object, and registers it in the INSPOA.
Container for Event Channels.
Definition: EventChannel.h:210
void insert(EventChannel_i *channel)
set< EventChannel_i * > _channels
Definition: EventChannel.h:218
void erase(EventChannel_i *channel)
A dummy servant that installs itself into the INSPOA and redirects all calls to the real destination.
Definition: Mapper.h:36
void destroy()
Definition: Mapper.h:49
static bool exists()
Library code may create Event Service objects without the need for persistency.
Obtains an output stream to the active persistancy logfile, and locks it for exclusive access.
PortableServer::POA_var _RootPOA
Definition: Orb.h:89
static Orb & inst()
Definition: Orb.h:81
void output(ostream &os, string name) const
Definition: PersistNode.cc:44
map< string, string > _attr
Definition: PersistNode.h:72
string attrString(const string &key, const string &fallback="") const
Definition: PersistNode.cc:155
PersistNode * child(const string &key) const
Definition: PersistNode.cc:171
Base class for servants.
Definition: Servant.h:114
PortableServer::POA_var _poa
Definition: Servant.h:131
void activateObjectWithId(const char *oidStr)
Calls activate_object_with_id() to activate this servant in its POA.
Definition: Servant.cc:125
void disconnect()
Send disconnect_XXX_supplier() to all connected consumers.
void output(ostream &os)
Save this object's state to a stream.
OMNIEVENTS__DEBUG_REF_COUNTS__DECL void collect(list< CORBA::Any * > &events)
Collects all events that have arrived since the last call.
void reincarnate(const PersistNode &node)
Populate this servant from log information.