OmniEvents
pushcons.cc
Go to the documentation of this file.
1 // -*- Mode: C++; -*-
2 // Package : omniEvents
3 // pushcons.cc Created : 1/4/98
4 // Author : Paul Nader (pwn)
5 //
6 // Copyright (C) 1998 Paul Nader, 2003-2004 Alex Tingle
7 //
8 // This file is part of the omniEvents application.
9 //
10 // omniEvents is free software; you can redistribute it and/or
11 // modify it under the terms of the GNU Lesser General Public
12 // License as published by the Free Software Foundation; either
13 // version 2.1 of the License, or (at your option) any later version.
14 //
15 // omniEvents is distributed in the hope that it will be useful,
16 // but WITHOUT ANY WARRANTY; without even the implied warranty of
17 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 // Lesser General Public License for more details.
19 //
20 // You should have received a copy of the GNU Lesser General Public
21 // License along with this library; if not, write to the Free Software
22 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 //
24 // Description:
25 // Push Model consumer implementation
26 //
27 
28 /*
29  $Log: pushcons.cc,v $
30  Revision 1.12.2.1 2005/06/16 09:39:49 alextingle
31  Fixed theoretical race caused by sloppy use of condition variable.
32 
33  Revision 1.12 2004/10/08 09:06:08 alextingle
34  More robust exception minor code handling.
35 
36  Revision 1.11 2004/08/18 17:49:45 alextingle
37  Added check for SIGPIPE before trying to use it.
38 
39  Revision 1.10 2004/08/06 16:19:23 alextingle
40  -k & -K options removed.
41  Naming service names may now be as complex as you like.
42 
43  Revision 1.9 2004/04/30 17:54:47 alextingle
44  Corrected handling of CORBA::Any.
45 
46  Revision 1.8 2004/04/20 16:52:17 alextingle
47  All examples updated for latest version on omniEvents. Server may now be
48  specified as a 'corbaloc' string or IOR, instead of as naming service id/kind.
49 
50  Revision 1.7 2004/04/01 22:28:36 alextingle
51  Corrected usage message.
52 
53  Revision 1.6 2004/03/23 19:09:26 alextingle
54  Fixed typos.
55 
56  Revision 1.5 2004/02/21 19:07:45 alextingle
57  Corrected servants to use POA instead of BOA.
58 
59  Revision 1.4 2004/02/04 22:29:55 alextingle
60  Reworked all C++ examples.
61  Removed catch(...) as it tends to make it harder to see what's going on.
62  Now uses POA instead of BOA.
63  Uses omniORB4's Exception name probing.
64  No longer uses 'naming.h/cc' utility code.
65 
66  Revision 1.3 2003/11/03 22:19:56 alextingle
67  Removed all platform specific switches. Now uses autoconf, config.h.
68  Removed stub header in order to allow makefile dependency checking to work
69  correctly.
70  Corrected usage of omni_condition/omni_mutex. Mutexes are now always unlocked by
71  the same thread that locked them.
72 
73  Revision 1.1.1.1.2.1 2002/09/28 22:20:51 shamus13
74  Added ifdefs to enable omniEvents to compile
75  with both omniORB3 and omniORB4. If __OMNIORB4__
76  is defined during compilation, omniORB4 headers
77  and command line option syntax is used, otherwise
78  fall back to omniORB3 style.
79 
80  Revision 1.1.1.1 2002/09/25 19:00:26 shamus13
81  Import of OmniEvents source tree from release 2.1.1
82 
83  Revision 0.13 2000/08/30 04:39:48 naderp
84  Port to omniORB 3.0.1.
85 
86  Revision 0.12 2000/03/16 05:37:27 naderp
87  Added stdlib.h for getopt.
88 
89  Revision 0.11 2000/03/06 13:27:02 naderp
90  Using util getRootNamingContext function.
91  Using stub headers.
92  Fixed error messages.
93 
94  Revision 0.10 2000/03/02 03:20:24 naderp
95  Added retry resiliency for handling COMM_FAUILURE exceptions.
96 
97  Revision 0.9 1999/11/02 13:39:15 naderp
98  Added <signal.h>
99 
100  Revision 0.8 1999/11/02 07:57:04 naderp
101  Updated usage.
102 
103 Revision 0.7 99/11/01 18:10:29 18:10:29 naderp (Paul Nader)
104 Added ahndling of COMM_FAILURE exception for connect_push_consumer.
105 
106 Revision 0.6 99/11/01 16:11:03 16:11:03 naderp (Paul Nader)
107 omniEvents 2.0 Release.
108 
109 Revision 0.5 99/10/27 19:46:01 19:46:01 naderp (Paul Nader)
110 Ignoring Unix SIGPIPE signal.
111 Catching COMM_FAILURE exception for obtain_push_supplier.
112 Continuing if it fails to obtain Proxy Supplier.
113 Try/Catch block for disconnect_push_supplier.
114 
115 Revision 0.4 99/04/23 16:05:46 16:05:46 naderp (Paul Nader)
116 gcc port.
117 
118 Revision 0.3 99/04/23 09:34:03 09:34:03 naderp (Paul Nader)
119 Windows Port.
120 
121 Revision 0.2 99/04/21 18:06:26 18:06:26 naderp (Paul Nader)
122 *** empty log message ***
123 
124 Revision 0.1.1.1 98/11/27 16:59:37 16:59:37 naderp (Paul Nader)
125 Added -s option to sleep after disconnecting.
126 
127 Revision 0.1 98/11/25 14:08:21 14:08:21 naderp (Paul Nader)
128 Initial Revision
129 
130 */
131 
132 #ifdef HAVE_CONFIG_H
133 # include "config.h"
134 #endif
135 
136 #ifdef HAVE_GETOPT
137 # include <unistd.h>
138 extern char* optarg;
139 extern int optind;
140 #else
141 # include "getopt.h"
142 #endif
143 
144 #ifdef HAVE_IOSTREAM
145 # include <iostream>
146 #else
147 # include <iostream.h>
148 #endif
149 
150 #ifdef HAVE_STD_IOSTREAM
151 using namespace std;
152 #endif
153 
154 #ifdef HAVE_STDLIB_H
155 # include <stdlib.h>
156 #endif
157 
158 #ifdef HAVE_SIGNAL_H
159 # include <signal.h>
160 #endif
161 
162 #include <cstdio>
163 
164 #include "CosEventComm.hh"
165 #include "CosEventChannelAdmin.hh"
166 #include "naming.h"
167 
168 static omni_mutex mutex;
169 static omni_condition connect_cond(&mutex);
170 static void usage(int argc, char **argv);
171 
172 class Consumer_i : virtual public POA_CosEventComm::PushConsumer {
173 public:
174  Consumer_i(long disconnect=0): _disconnect(disconnect) {}
175 
176  void push(const CORBA::Any& data);
178 
179 private:
181 };
182 
183 void Consumer_i::push(const CORBA::Any& data) {
184  CORBA::ULong l;
185  static int i = 0;
186 
187  i++;
188  if( data>>=l )
189  {
190  cout<<"Push Consumer: push() called. Data : "<< l <<endl;
191 
192  // Exercise Disconnect
193  if (i == _disconnect)
194  {
195  i = 0;
196  // NOTE : The proxy_supplier object is disposed at the server
197  // during the disconnect_push_supplier call. Do NOT
198  // use the proxy_supplier reference after disconnecting.
199 
200  // Signal main thread to disconnect and re-connect.
201  omni_mutex_lock condition_lock(mutex); // ensure main thread in wait()
202  connect_cond.signal();
203  }
204  }
205  else
206  {
207  cerr<<"Push Consumer: push() called. UNEXPECTED TYPE"<<endl;
208  }
209 }
210 
212  cout << "Push Consumer: disconnected." << endl;
213 }
214 
215 int
216 main(int argc, char **argv)
217 {
218  //
219  // Start orb.
220  CORBA::ORB_ptr orb = CORBA::ORB_init(argc,argv);
221 
222  // Process Options
223  int discnum =0;
224  int sleepInterval =0;
225  const char* channelName ="EventChannel";
226 
227  int c;
228  while ((c = getopt(argc,argv,"hd:s:n:")) != EOF)
229  {
230  switch (c)
231  {
232  case 'd': discnum = atoi(optarg);
233  break;
234 
235  case 's': sleepInterval = atoi(optarg);
236  break;
237 
238  case 'n': channelName = optarg;
239  break;
240 
241  case 'h': usage(argc,argv);
242  exit(0);
243  default : usage(argc,argv);
244  exit(-1);
245  }
246  }
247 
248 #if defined(HAVE_SIGNAL_H) && defined(SIGPIPE)
249  // Ignore broken pipes
250  signal(SIGPIPE, SIG_IGN);
251 #endif
252 
253  Consumer_i* consumer = new Consumer_i (discnum);
254  CosEventChannelAdmin::EventChannel_var channel;
255 
256  const char* action=""; // Use this variable to help report errors.
257  try {
258  CORBA::Object_var obj;
259 
260  action="resolve initial reference 'RootPOA'";
261  obj=orb->resolve_initial_references("RootPOA");
262  PortableServer::POA_var rootPoa =PortableServer::POA::_narrow(obj);
263  if(CORBA::is_nil(rootPoa))
264  throw CORBA::OBJECT_NOT_EXIST();
265 
266  action="activate the RootPOA's POAManager";
267  PortableServer::POAManager_var pman =rootPoa->the_POAManager();
268  pman->activate();
269 
270  //
271  // Obtain object reference to EventChannel
272  // (from command-line argument or from the Naming Service).
273  if(optind<argc)
274  {
275  action="convert URI from command line into object reference";
276  obj=orb->string_to_object(argv[optind]);
277  }
278  else
279  {
280  action="resolve initial reference 'NameService'";
281  obj=orb->resolve_initial_references("NameService");
282  CosNaming::NamingContext_var rootContext=
283  CosNaming::NamingContext::_narrow(obj);
284  if(CORBA::is_nil(rootContext))
285  throw CORBA::OBJECT_NOT_EXIST();
286 
287  action="find EventChannel in NameService";
288  cout << action << endl;
289  obj=rootContext->resolve(str2name(channelName));
290  }
291 
292  action="narrow object reference to event channel";
293  channel=CosEventChannelAdmin::EventChannel::_narrow(obj);
294  if(CORBA::is_nil(channel))
295  {
296  cerr << "Failed to narrow Event Channel reference." << endl;
297  exit(1);
298  }
299 
300  }
301  catch(CORBA::ORB::InvalidName& ex) { // resolve_initial_references
302  cerr<<"Failed to "<<action<<". ORB::InvalidName"<<endl;
303  exit(1);
304  }
305  catch(CosNaming::NamingContext::InvalidName& ex) { // resolve
306  cerr<<"Failed to "<<action<<". NamingContext::InvalidName"<<endl;
307  exit(1);
308  }
309  catch(CosNaming::NamingContext::NotFound& ex) { // resolve
310  cerr<<"Failed to "<<action<<". NamingContext::NotFound"<<endl;
311  exit(1);
312  }
313  catch(CosNaming::NamingContext::CannotProceed& ex) { // resolve
314  cerr<<"Failed to "<<action<<". NamingContext::CannotProceed"<<endl;
315  exit(1);
316  }
317  catch(CORBA::TRANSIENT& ex) { // _narrow()
318  cerr<<"Failed to "<<action<<". TRANSIENT"<<endl;
319  exit(1);
320  }
321  catch(CORBA::OBJECT_NOT_EXIST& ex) { // _narrow()
322  cerr<<"Failed to "<<action<<". OBJECT_NOT_EXIST"<<endl;
323  exit(1);
324  }
325  catch(CORBA::SystemException& ex) {
326  cerr<<"Failed to "<<action<<".";
327 #if defined(HAVE_OMNIORB4)
328  cerr<<" "<<ex._name();
329  if(ex.NP_minorString())
330  cerr<<" ("<<ex.NP_minorString()<<")";
331 #endif
332  cerr<<endl;
333  exit(1);
334  }
335  catch(CORBA::Exception& ex) {
336  cerr<<"Failed to "<<action<<"."
337 #if defined(HAVE_OMNIORB4)
338  " "<<ex._name()
339 #endif
340  <<endl;
341  exit(1);
342  }
343 
344  //
345  // Get Consumer admin interface - retrying on Comms Failure.
346  CosEventChannelAdmin::ConsumerAdmin_var consumer_admin;
347  while (1)
348  {
349  try {
350  consumer_admin = channel->for_consumers ();
351  if (CORBA::is_nil (consumer_admin))
352  {
353  cerr << "Event Channel returned nil Consumer Admin!" << endl;
354  exit(1);
355  }
356  break;
357  }
358  catch (CORBA::COMM_FAILURE& ex) {
359  cerr << "Caught COMM_FAILURE exception "
360  << "obtaining Consumer Admin! Retrying..."
361  << endl;
362  continue;
363  }
364  }
365  cout << "Obtained ConsumerAdmin." << endl;
366 
367  omni_mutex_lock condition_lock(mutex);
368  while (1) {
369  //
370  // Get proxy supplier - retrying on Comms Failure.
371  CosEventChannelAdmin::ProxyPushSupplier_var proxy_supplier;
372  while (1)
373  {
374  try {
375  proxy_supplier = consumer_admin->obtain_push_supplier ();
376  if (CORBA::is_nil (proxy_supplier))
377  {
378  cerr << "Consumer Admin returned nil proxy_supplier!"
379  << endl;
380  exit (1);
381  }
382  break;
383  }
384  catch (CORBA::COMM_FAILURE& ex) {
385  cerr << "Caught COMM_FAILURE Exception "
386  << "obtaining Push Supplier! Retrying..."
387  << endl;
388  continue;
389  }
390  }
391  cout << "Obtained ProxyPushSupplier." << endl;
392 
393  //
394  // Connect Push Consumer - retrying on Comms Failure.
395  while (1)
396  {
397  try {
398  proxy_supplier->connect_push_consumer(consumer->_this());
399  break;
400  }
401  catch (CORBA::BAD_PARAM& ex) {
402  cerr << "Caught BAD_PARAM Exception connecting Push Consumer!"
403  << endl;
404  exit (1);
405  }
406  catch (CosEventChannelAdmin::AlreadyConnected& ex) {
407  cerr << "Proxy Push Supplier already connected!"
408  << endl;
409  break;
410  }
411  catch (CORBA::COMM_FAILURE& ex) {
412  cerr << "Caught COMM_FAILURE exception "
413  << "connecting Push Consumer! Retrying..."
414  << endl;
415  continue;
416  }
417  }
418  cout << "Connected Push Consumer." << endl;
419 
420  // Wait for indication to disconnect before re-connecting.
421  connect_cond.wait();
422 
423  // Disconnect - retrying on Comms Failure.
424  while (1)
425  {
426  try {
427  proxy_supplier->disconnect_push_supplier();
428  break;
429  }
430  catch (CORBA::COMM_FAILURE& ex) {
431  cerr << "Caught COMM_FAILURE Exception "
432  << "disconnecting Push Consumer! Retrying..."
433  << endl;
434  continue;
435  }
436  }
437  cout << "Disconnected Push Consumer." << endl;
438 
439  // Yawn
440  cout << "Sleeping " << sleepInterval << " seconds." << endl;
441  omni_thread::sleep(sleepInterval);
442  }
443 
444  // NEVER GET HERE
445  return 0;
446 }
447 
448 static void
449 usage(int argc, char **argv)
450 {
451  cerr<<
452 "\nCreate a PushConsumer to receive events from a channel.\n"
453 "syntax: "<<(argc?argv[0]:"pushcons")<<" OPTIONS [CHANNEL_URI]\n"
454 "\n"
455 "CHANNEL_URI: The event channel may be specified as a URI.\n"
456 " This may be an IOR, or a corbaloc::: or corbaname::: URI.\n"
457 "\n"
458 "OPTIONS: DEFAULT:\n"
459 " -d NUM disconnect after receiving NUM events [0 - never disconnect]\n"
460 " -s SECS sleep SECS seconds after disconnecting [0]\n"
461 " -n NAME channel name (if URI is not specified) [\"EventChannel\"]\n"
462 " -h display this help text\n" << endl;
463 }
int optind
Definition: getopt.cc:82
char * optarg
Definition: getopt.cc:83
int getopt(int argc, char *argv[], const char *optionS)
Definition: getopt.cc:88
CosNaming::Name str2name(const char *namestr)
Converts stringified name to naming service name.
Definition: naming.cc:117
CORBA::ORB_ptr orb
Definition: eventf.cc:60
static omni_semaphore connect_cond(0)
int main(int argc, char **argv)
The main process entry point.
Definition: pushcons.cc:216
static omni_mutex mutex
Definition: pushcons.cc:168
static void usage(int argc, char **argv)
Definition: pushcons.cc:449
void push(const CORBA::Any &data)
void disconnect_push_consumer()
Consumer_i(long disconnect=0)
Definition: pushcons.cc:174
long _disconnect
Definition: pushcons.cc:180