OmniEvents
EventChannelFactory.cc
Go to the documentation of this file.
1 // -*- Mode: C++; -*-
2 // Package : omniEvents
3 // EventChannelFactory_i.cc Created : 1/4/98
4 // Author : Paul Nader (pwn)
5 //
6 // Copyright (C) 1998 Paul Nader, 2003-2004 Alex Tingle.
7 //
8 // This file is part of the omniEvents application.
9 //
10 // omniEvents is free software; you can redistribute it and/or
11 // modify it under the terms of the GNU Lesser General Public
12 // License as published by the Free Software Foundation; either
13 // version 2.1 of the License, or (at your option) any later version.
14 //
15 // omniEvents is distributed in the hope that it will be useful,
16 // but WITHOUT ANY WARRANTY; without even the implied warranty of
17 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 // Lesser General Public License for more details.
19 //
20 // You should have received a copy of the GNU Lesser General Public
21 // License along with this library; if not, write to the Free Software
22 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 //
24 // Description:
25 // Implementation of the COSS Event Services Event Channel Factory
26 //
27 
28 #include "EventChannelFactory.h"
29 
30 #include "Orb.h"
31 #include "EventChannel.h"
32 #include "PersistNode.h"
33 
34 #include <memory>
35 
36 #ifdef HAVE_OMNIORB4
37 # define STR_MATCH(s1,s2) omni::strMatch((s1),(s2))
38 #else
39 # define STR_MATCH(s1,s2) (0==::strcmp((s1),(s2)))
40 #endif
41 
42 namespace OmniEvents {
43 
44 //------------------------------------------------------------------------
45 // Event Channel Factory Interface Implementation
46 //------------------------------------------------------------------------
48 : Servant(Orb::inst()._omniINSPOA.in()),
49  _port(node.attrLong("port",11169)),
50  _endPointNoListen(node.attrString("endPointNoListen")),
51  _channels()
52 {
53  // Create event channels
54  for(map<string,PersistNode*>::const_iterator i=node._child.begin();
55  i!=node._child.end();
56  ++i)
57  {
58  EventChannel_i* channel =new EventChannel_i(&_channels);
59  channel->activate(
60  i->first.c_str(), // channelName
61  i->second // node
62  );
63  }
64  activateObjectWithId("omniEvents");
65 }
66 
67 
69 {
70  DB(20, "EventChannelFactory_i::~EventChannelFactory_i()");
71 }
72 
73 
74 CORBA::Boolean
75 EventChannelFactory_i::supports(const CosLifeCycle::Key &k)
76 {
77  if((k.length() == 1) &&
78  (strcmp(k[0].id, "EventChannel") == 0) &&
79  (strcmp(k[0].kind, "object interface") == 0))
80  return 1;
81  else
82  return 0;
83 }
84 
85 
86 CORBA::Object_ptr
88  const CosLifeCycle::Key& k,
89  const CosLifeCycle::Criteria& criteria
90 )
91 {
92  // Check the key
93  if(!this->supports(k))
94  throw CosLifeCycle::NoFactory(k);
95 
96  // Process criteria !! MAY THROW !!
97  auto_ptr<PersistNode> criteriaNode( parseCriteria(criteria) );
98 
99  CORBA::String_var channelId;
100  if(criteriaNode->hasAttr("InsName"))
101  channelId=criteriaNode->attrString("InsName").c_str();
102  else
103  channelId=newUniqueId();
104 
105  // Create the channel.
106  // We place it into an auto_ptr - this will automatically clean up if anything
107  // goes wrong.
108  auto_ptr<EventChannel_i> channel( new EventChannel_i(&_channels) );
109  try
110  {
111  channel->activate(channelId.in(),criteriaNode.get()); // !! MAY THROW !!
112  }
113  catch(PortableServer::POA::ObjectAlreadyActive& ex)
114  {
115  throw CosLifeCycle::InvalidCriteria(criteria); //??
116  }
117  catch(PortableServer::POA::AdapterAlreadyExists& ex) // create_POA
118  {
119  throw CosLifeCycle::InvalidCriteria(criteria); //??
120  }
121 
122  // We release() the pointer, as the thread will delete it when it stops.
123  return channel.release()->_this();
124 }
125 
126 
127 CosEventChannelAdmin::EventChannel_ptr
128 EventChannelFactory_i::create_channel(const char* channel_name)
129 {
130  CosEventChannelAdmin::EventChannel_var result;
131 
132  CosLifeCycle::Key key;
133  key.length(1);
134  key[0].id ="EventChannel";
135  key[0].kind="object interface";
136 
137  CosLifeCycle::Criteria criteria;
138  criteria.length(1);
139  criteria[0].name = "InsName";
140  criteria[0].value <<= channel_name;
141 
142  try
143  {
144  CORBA::Object_var obj=create_object(key,criteria);
145  result=CosEventChannelAdmin::EventChannel::_narrow(obj.in());
146  }
147  catch(CosLifeCycle::InvalidCriteria& ex)
148  {
149  if(ex.invalid_criteria.length()>0 &&
150  STR_MATCH(ex.invalid_criteria[0].name,"InsName"))
151  {
152  throw event::NameAlreadyUsed();
153  }
154  else
155  {
156  DB(10,"Failed to create_channel."
157  " Converting InvalidCriteria exception into UNKNOWN.")
158  throw CORBA::UNKNOWN();
159  }
160  }
161  catch(CORBA::UserException& ex)
162  {
163  DB(2,"Failed to create_channel. Converting UserException"
164  IFELSE_OMNIORB4(" '"<<ex._name()<<"'",<<) " into UNKNOWN.")
165  throw CORBA::UNKNOWN();
166  }
167  return result._retn();
168 }
169 
170 
171 CosEventChannelAdmin::EventChannel_ptr
172 EventChannelFactory_i::join_channel(const char* channel_name)
173 {
174  using namespace PortableServer;
175  CosEventChannelAdmin::EventChannel_var result;
176  try
177  {
178  ObjectId_var oid =PortableServer::string_to_ObjectId(channel_name);
179  CORBA::Object_var obj =Orb::inst()._omniINSPOA->id_to_reference(oid.in());
180  result=CosEventChannelAdmin::EventChannel::_narrow(obj.in());
181  }
182  catch(POA::ObjectNotActive&)
183  {
184  DB(10,"Failed to join_channel. Object not active.")
185  throw event::EventChannelNotFound();
186  }
187  catch(CORBA::UserException& ex)
188  {
189  DB(2,"Failed to join_channel. Converting UserException"
190  IFELSE_OMNIORB4(" '"<<ex._name()<<"'",<<) " into UNKNOWN.")
191  throw CORBA::UNKNOWN();
192  }
193  return result._retn();
194 }
195 
196 
198  const CosLifeCycle::Criteria &criteria
199 ) const
200 {
201  using namespace CosLifeCycle;
202  auto_ptr<PersistNode> result( new PersistNode() );
203 
204  for(CORBA::ULong i=0; i<criteria.length(); i++)
205  {
206  if(strcmp(criteria[i].name, "PullRetryPeriod_ms") == 0)
207  {
208  CORBA::ULong pullRetryPeriod_ms;
209  if(! (criteria[i].value >>= pullRetryPeriod_ms))
210  throw InvalidCriteria(extract("PullRetryPeriod_ms",criteria));
211  if(pullRetryPeriod_ms <= 0)
212  throw CannotMeetCriteria(extract("PullRetryPeriod_ms",criteria));
213  result->addattr("PullRetryPeriod_ms",pullRetryPeriod_ms);
214  }
215  else if(strcmp(criteria[i].name, "PullRetryPeriod") == 0)
216  {
217  // This criterion has been deprecated in favour of PullRetryPeriod_ms.
218  // Don't overwrite any value provided by the latter.
219  if(!result->hasAttr("PullRetryPeriod_ms"))
220  {
221  CORBA::ULong pullRetryPeriod;
222  if(! (criteria[i].value >>= pullRetryPeriod))
223  throw InvalidCriteria(extract("PullRetryPeriod",criteria));
224  if(pullRetryPeriod <= 0)
225  throw CannotMeetCriteria(extract("PullRetryPeriod",criteria));
226  result->addattr("PullRetryPeriod_ms",pullRetryPeriod*1000);
227  }
228  }
229  else if(strcmp(criteria[i].name, "MaxQueueLength") == 0)
230  {
231  CORBA::ULong maxQueueLength;
232  if(! (criteria[i].value >>= maxQueueLength))
233  throw InvalidCriteria(extract("MaxQueueLength",criteria));
234  if(maxQueueLength > 0)
235  result->addattr("MaxQueueLength",maxQueueLength);
236  else
237  DB(10,"Ignoring CosLifeCycle criterion: MaxQueueLength=0");
238  }
239  else if(strcmp(criteria[i].name, "MaxNumProxies") == 0)
240  {
241  CORBA::ULong maxNumProxies;
242  if(! (criteria[i].value >>= maxNumProxies))
243  throw InvalidCriteria(extract("MaxNumProxies",criteria));
244  if(maxNumProxies > 0)
245  result->addattr("MaxNumProxies",maxNumProxies);
246  else
247  DB(10,"Ignoring CosLifeCycle criterion: MaxNumProxies=0");
248  }
249  else if(strcmp(criteria[i].name, "CyclePeriod_ns") == 0)
250  {
251  CORBA::ULong cyclePeriod_ns;
252  if(! (criteria[i].value >>= cyclePeriod_ns))
253  throw InvalidCriteria(extract("CyclePeriod_ns",criteria));
254  if(cyclePeriod_ns > 0)
255  result->addattr("CyclePeriod_ns",cyclePeriod_ns);
256  else
257  DB(10,"Ignoring CosLifeCycle criterion: CyclePeriod_ns=0");
258  }
259  else if(strcmp(criteria[i].name, "InsName") == 0)
260  {
261  const char* insName;
262  if(! (criteria[i].value >>= insName))
263  throw InvalidCriteria(extract("InsName",criteria));
264  if(insName && insName[0])
265  result->addattr(string("InsName=")+insName);
266  else
267  DB(10,"Ignoring empty CosLifeCycle criterion: InsName");
268  }
269  else if(strcmp(criteria[i].name, "FilterId") == 0)
270  {
271  const char* repositoryId;
272  if(! (criteria[i].value >>= repositoryId))
273  throw InvalidCriteria(extract("FilterId",criteria));
274  if(repositoryId && repositoryId[0])
275  result->addattr(string("FilterId=")+repositoryId);
276  else
277  DB(10,"Ignoring empty CosLifeCycle criterion: FilterId");
278  }
279  else if(strcmp(criteria[i].name, "MaxEventsPerConsumer") == 0)
280  {
281  DB(10,"Ignoring obsolete CosLifeCycle criterion: MaxEventsPerConsumer");
282  }
283  else
284  {
285  DB(10,"Ignoring unknown CosLifeCycle criterion: "<<criteria[i].name);
286  }
287  } // end loop for(i)
288 
289  return result.release();
290 }
291 
292 
293 CosLifeCycle::Criteria EventChannelFactory_i::extract(
294  const char* name,
295  const CosLifeCycle::Criteria& from
296 ) const
297 {
298  CosLifeCycle::Criteria result;
299  result.length(0);
300  for(CORBA::ULong i=0; i<from.length(); i++)
301  {
302  if(strcmp(from[i].name,name) == 0)
303  {
304  result.length(1);
305  result[0]=from[i];
306  break;
307  }
308  }
309  return result;
310 }
311 
312 
313 void
315 {
316  os<<"ecf port="<<_port;
317  if(!_endPointNoListen.empty())
318  os<<" endPointNoListen="<<_endPointNoListen;
319  os<<" ;;\n";
320  _channels.output(os);
321 }
322 
323 
324 }; // end namespace OmniEvents
#define STR_MATCH(s1, s2)
#define IFELSE_OMNIORB4(omniORB4_code, default_code)
Definition: Orb.h:45
#define DB(l, x)
Definition: Orb.h:49
char * newUniqueId()
Generates a unique object ID string, based upon the current PID and time.
Definition: Servant.cc:71
Servant for CosEventChannelAdmin::EventChannel objects, also inherits from omni_thread.
Definition: EventChannel.h:115
void activate(const char *channelName, const PersistNode *node=NULL)
Creates the channel's POA, and any child objects.
Definition: EventChannel.cc:83
CosLifeCycle::Criteria extract(const char *name, const CosLifeCycle::Criteria &from) const
Utility function: constructs a Criteria that contains a single criterion.
CORBA::Object_ptr create_object(const CosLifeCycle::Key &k, const CosLifeCycle::Criteria &the_criteria)
string _endPointNoListen
Stores the value of the endPointNoListen ORB parameter.
CORBA::Boolean supports(const CosLifeCycle::Key &k)
Returns true if the key passed has the following contents:
EventChannelFactory_i(const PersistNode &node)
Builds an EventChannelFactory_i from the parsed logfile data.
CosEventChannelAdmin::EventChannel_ptr create_channel(const char *channel_name)
DO NOT USE.
PersistNode * parseCriteria(const CosLifeCycle::Criteria &criteria) const
Convert CosLifeCycle::Criteria into a PersistNode.
CosEventChannelAdmin::EventChannel_ptr join_channel(const char *channel_name)
DO NOT USE.
unsigned int _port
The EventChannelFactory listens on this TCP port.
Singleton class that owns the ORB and various initial references.
Definition: Orb.h:70
static Orb & inst()
Definition: Orb.h:81
PortableServer::POA_var _omniINSPOA
Definition: Orb.h:90
map< string, PersistNode * > _child
Definition: PersistNode.h:71
Base class for servants.
Definition: Servant.h:114
void activateObjectWithId(const char *oidStr)
Calls activate_object_with_id() to activate this servant in its POA.
Definition: Servant.cc:125