OmniEvents
pullcons.cc
Go to the documentation of this file.
1 // -*- Mode: C++; -*-
2 // Package : omniEvents
3 // pullcons.cc Created on: 1/4/98
4 // Author : Paul Nader (pwn)
5 //
6 // Copyright (C) 1998 Paul Nader, 2003-2004 Alex Tingle
7 //
8 // This file is part of the omniEvents application.
9 //
10 // omniEvents is free software; you can redistribute it and/or
11 // modify it under the terms of the GNU Lesser General Public
12 // License as published by the Free Software Foundation; either
13 // version 2.1 of the License, or (at your option) any later version.
14 //
15 // omniEvents is distributed in the hope that it will be useful,
16 // but WITHOUT ANY WARRANTY; without even the implied warranty of
17 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 // Lesser General Public License for more details.
19 //
20 // You should have received a copy of the GNU Lesser General Public
21 // License along with this library; if not, write to the Free Software
22 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 //
24 // Description:
25 // Pull Model consumer implementation.
26 //
27 
28 /*
29  $Log: pullcons.cc,v $
30  Revision 1.12 2004/10/08 09:06:08 alextingle
31  More robust exception minor code handling.
32 
33  Revision 1.11 2004/08/18 17:49:45 alextingle
34  Added check for SIGPIPE before trying to use it.
35 
36  Revision 1.10 2004/08/06 16:19:23 alextingle
37  -k & -K options removed.
38  Naming service names may now be as complex as you like.
39 
40  Revision 1.9 2004/04/20 16:52:02 alextingle
41  All examples updated for latest version on omniEvents. Server may now be
42  specified as a 'corbaloc' string or IOR, instead of as naming service id/kind.
43 
44  Revision 1.8 2004/03/08 18:00:13 alextingle
45  One too many newlines.
46 
47  Revision 1.7 2004/03/08 17:48:25 alextingle
48  Added newlines to cirrectly format output when exceptions occur.
49 
50  Revision 1.6 2004/02/21 19:07:45 alextingle
51  Corrected servants to use POA instead of BOA.
52 
53  Revision 1.5 2004/02/04 22:29:55 alextingle
54  Reworked all C++ examples.
55  Removed catch(...) as it tends to make it harder to see what's going on.
56  Now uses POA instead of BOA.
57  Uses omniORB4's Exception name probing.
58  No longer uses 'naming.h/cc' utility code.
59 
60  Revision 1.4 2004/01/11 16:59:28 alextingle
61  Added more helpful exception handling error messages to pullcons.cc.
62 
63  Revision 1.3 2003/11/03 22:20:54 alextingle
64  Removed all platform specific switches. Now uses autoconf, config.h.
65  Removed stub header in order to allow makefile dependency checking to work
66  correctly.
67  Changed int to bool where appropriate.
68 
69  Revision 1.1.1.1.2.1 2002/09/28 22:20:51 shamus13
70  Added ifdefs to enable omniEvents to compile
71  with both omniORB3 and omniORB4. If __OMNIORB4__
72  is defined during compilation, omniORB4 headers
73  and command line option syntax is used, otherwise
74  fall back to omniORB3 style.
75 
76  Revision 1.1.1.1 2002/09/25 19:00:25 shamus13
77  Import of OmniEvents source tree from release 2.1.1
78 
79  Revision 0.11 2000/10/11 01:16:21 naderp
80  *** empty log message ***
81 
82  Revision 0.10 2000/08/30 04:39:48 naderp
83  Port to omniORB 3.0.1.
84 
85  Revision 0.9 2000/03/16 05:37:27 naderp
86  Added stdlib.h for getopt.
87 
88  Revision 0.8 2000/03/06 13:25:44 naderp
89  Using util getRootNamingContext function.
90  Using stub headers.
91  Fixed error messages.
92 
93  Revision 0.7 2000/03/02 02:11:27 naderp
94  Added -r option to connect using nil reference.
95  Added retry resiliency for handling COMM_FAUILURE exceptions.
96 
97  Revision 0.6 1999/11/02 13:38:57 naderp
98  Added <signal.h>
99 
100  Revision 0.5 1999/11/01 15:55:11 naderp
101  omniEvents 2.0 Release.
102  Ignoring SIGPIPE for UNIX platforms.
103 
104 Revision 0.4 99/04/23 16:05:38 16:05:38 naderp (Paul Nader)
105 gcc port.
106 
107 Revision 0.3 99/04/23 09:33:40 09:33:40 naderp (Paul Nader)
108 Windows Port.
109 
110 Revision 0.2 99/04/21 18:06:25 18:06:25 naderp (Paul Nader)
111 *** empty log message ***
112 
113 Revision 0.1.1.1 98/11/27 16:59:07 16:59:07 naderp (Paul Nader)
114 Added -s option to sleep after disconnecting.
115 
116 Revision 0.1 98/11/25 14:08:04 14:08:04 naderp (Paul Nader)
117 Initial Revision
118 
119 */
120 
121 #ifdef HAVE_CONFIG_H
122 # include "config.h"
123 #endif
124 
125 #ifdef HAVE_GETOPT
126 # include <unistd.h>
127 extern char* optarg;
128 extern int optind;
129 #else
130 # include "getopt.h"
131 #endif
132 
133 #ifdef HAVE_IOSTREAM
134 # include <iostream>
135 #else
136 # include <iostream.h>
137 #endif
138 
139 #ifdef HAVE_STD_IOSTREAM
140 using namespace std;
141 #endif
142 
143 #ifdef HAVE_STDLIB_H
144 # include <stdlib.h>
145 #endif
146 
147 #ifdef HAVE_SIGNAL_H
148 # include <signal.h>
149 #endif
150 
151 #include <cstdio>
152 
153 #include "CosEventComm.hh"
154 #include "CosEventChannelAdmin.hh"
155 #include "naming.h"
156 
157 static void usage(int argc, char **argv);
158 
159 class Consumer_i : virtual public POA_CosEventComm::PullConsumer {
160 public:
161  Consumer_i () {};
162  void disconnect_pull_consumer ();
163 };
164 
166  cout << "Pull Consumer: disconnected." << endl;
167 }
168 
169 int
170 main(int argc, char **argv)
171 {
172  //
173  // Start orb.
174  CORBA::ORB_ptr orb = CORBA::ORB_init(argc,argv);
175 
176  // Process Options
177  bool trymode =false;
178  int discnum =0;
179  bool refnil =false;
180  int sleepInterval =0;
181  const char* channelName ="EventChannel";
182 
183  int c;
184  while ((c = getopt(argc,argv,"td:rs:n:h")) != EOF)
185  {
186  switch (c)
187  {
188  case 't': trymode = true;
189  break;
190 
191  case 'd': discnum = atoi(optarg);
192  break;
193 
194  case 'r': refnil = true;
195  break;
196 
197  case 's': sleepInterval = atoi(optarg);
198  break;
199 
200  case 'n': channelName = optarg;
201  break;
202 
203  case 'h':
204  default : usage(argc,argv);
205  exit(-1);
206  break;
207  }
208  }
209 
210 #if defined(HAVE_SIGNAL_H) && defined(SIGPIPE)
211  // Ignore broken pipes
212  signal(SIGPIPE, SIG_IGN);
213 #endif
214 
215  Consumer_i* consumer =NULL;
216  CosEventChannelAdmin::EventChannel_var channel;
217 
218  const char* action=""; // Use this variable to help report errors.
219  try {
220  CORBA::Object_var obj;
221 
222  // A Pull Consumer can be implemented as a pure client or as a mixed
223  // client-server process, depending on whether it requires and is
224  // prepared to service disconnect requests from the channel.
225  // If it is, then create the servant object and activate the POA.
226  if (! refnil)
227  {
228  consumer=new Consumer_i();
229 
230  action="resolve initial reference 'RootPOA'";
231  obj=orb->resolve_initial_references("RootPOA");
232  PortableServer::POA_var rootPoa =PortableServer::POA::_narrow(obj);
233  if(CORBA::is_nil(rootPoa))
234  throw CORBA::OBJECT_NOT_EXIST();
235 
236  action="activate the RootPOA's POAManager";
237  PortableServer::POAManager_var pman =rootPoa->the_POAManager();
238  pman->activate();
239  }
240 
241  //
242  // Obtain object reference to EventChannel
243  // (from command-line argument or from the Naming Service).
244  if(optind<argc)
245  {
246  action="convert URI from command line into object reference";
247  obj=orb->string_to_object(argv[optind]);
248  }
249  else
250  {
251  action="resolve initial reference 'NameService'";
252  obj=orb->resolve_initial_references("NameService");
253  CosNaming::NamingContext_var rootContext=
254  CosNaming::NamingContext::_narrow(obj);
255  if(CORBA::is_nil(rootContext))
256  throw CORBA::OBJECT_NOT_EXIST();
257 
258  action="find EventChannel in NameService";
259  cout << action << endl;
260  obj=rootContext->resolve(str2name(channelName));
261  }
262 
263  action="narrow object reference to event channel";
264  channel=CosEventChannelAdmin::EventChannel::_narrow(obj);
265  if(CORBA::is_nil(channel))
266  {
267  cerr << "Failed to narrow Event Channel reference." << endl;
268  exit(1);
269  }
270 
271  }
272  catch(CORBA::ORB::InvalidName& ex) { // resolve_initial_references
273  cerr<<"Failed to "<<action<<". ORB::InvalidName"<<endl;
274  exit(1);
275  }
276  catch(CosNaming::NamingContext::InvalidName& ex) { // resolve
277  cerr<<"Failed to "<<action<<". NamingContext::InvalidName"<<endl;
278  exit(1);
279  }
280  catch(CosNaming::NamingContext::NotFound& ex) { // resolve
281  cerr<<"Failed to "<<action<<". NamingContext::NotFound"<<endl;
282  exit(1);
283  }
284  catch(CosNaming::NamingContext::CannotProceed& ex) { // resolve
285  cerr<<"Failed to "<<action<<". NamingContext::CannotProceed"<<endl;
286  exit(1);
287  }
288  catch(CORBA::TRANSIENT& ex) { // _narrow()
289  cerr<<"Failed to "<<action<<". TRANSIENT"<<endl;
290  exit(1);
291  }
292  catch(CORBA::OBJECT_NOT_EXIST& ex) { // _narrow()
293  cerr<<"Failed to "<<action<<". OBJECT_NOT_EXIST"<<endl;
294  exit(1);
295  }
296  catch(CORBA::SystemException& ex) {
297  cerr<<"Failed to "<<action<<".";
298 #if defined(HAVE_OMNIORB4)
299  cerr<<" "<<ex._name();
300  if(ex.NP_minorString())
301  cerr<<" ("<<ex.NP_minorString()<<")";
302 #endif
303  cerr<<endl;
304  exit(1);
305  }
306  catch(CORBA::Exception& ex) {
307  cerr<<"Failed to "<<action<<"."
308 #if defined(HAVE_OMNIORB4)
309  " "<<ex._name()
310 #endif
311  <<endl;
312  exit(1);
313  }
314 
315  //
316  // Get Consumer admin interface - retrying on Comms Failure.
317  CosEventChannelAdmin::ConsumerAdmin_var consumer_admin;
318  while (1)
319  {
320  try {
321  consumer_admin = channel->for_consumers ();
322  if (CORBA::is_nil (consumer_admin))
323  {
324  cerr << "Event Channel returned nil Consumer Admin!" << endl;
325  exit (1);
326  }
327  break;
328  }
329  catch (CORBA::COMM_FAILURE& ex) {
330  cerr << "Caught COMM_FAILURE exception "
331  << "obtaining Consumer Admin! Retrying..."
332  << endl;
333  continue;
334  }
335  }
336  cout << "Obtained Consumer Admin." << endl;
337 
338  while (1)
339  {
340  //
341  // Get proxy supplier - retrying on Comms Failure.
342  CosEventChannelAdmin::ProxyPullSupplier_var proxy_supplier;
343  while (1)
344  {
345  try {
346  proxy_supplier = consumer_admin->obtain_pull_supplier ();
347  if (CORBA::is_nil (proxy_supplier))
348  {
349  cerr << "Consumer Admin returned nil Proxy Supplier!" << endl;
350  exit (1);
351  }
352  break;
353  }
354  catch (CORBA::COMM_FAILURE& ex) {
355  cerr << "Caught COMM_FAILURE Exception "
356  << "obtaining Pull Supplier! Retrying..."
357  << endl;
358  continue;
359  }
360  }
361  cout << "Obtained ProxyPullSupplier." << endl;
362 
363  //
364  // Connect Pull Consumer - retrying on Comms Failure.
365  CosEventComm::PullConsumer_ptr cptr =CosEventComm::PullConsumer::_nil();
366  if (! refnil) {
367  cptr=consumer->_this();
368  }
369 
370  while (1)
371  {
372  try {
373  proxy_supplier->connect_pull_consumer(cptr);
374  break;
375  }
376  catch (CORBA::BAD_PARAM& ex) {
377  cerr << "Caught BAD_PARAM exception connecting Pull Consumer!"<<endl;
378  exit (1);
379  }
380  catch (CosEventChannelAdmin::AlreadyConnected& ex) {
381  cerr << "Proxy Pull Supplier already connected!"
382  << endl;
383  break;
384  }
385  catch (CORBA::COMM_FAILURE& ex) {
386  cerr << "Caught COMM_FAILURE Exception "
387  << "connecting Pull Consumer! Retrying..."
388  << endl;
389  continue;
390  }
391  }
392  cout << "Connected Pull Consumer." << endl;
393 
394  // Pull data.
395  CORBA::Any *data;
396  CORBA::ULong l = 0;
397  for (int i=0; (discnum == 0) || (i < discnum); i++)
398  {
399  if(trymode)
400  {
401  try {
402  CORBA::Boolean has_event;
403  data = proxy_supplier->try_pull(has_event);
404  cout << "Consumer: try_pull() called. Data : " << flush;
405  if (has_event)
406  {
407  l = 0;
408  *data >>= l;
409  cout << l << endl;
410  delete data;
411  }
412  else
413  {
414  cout << "None" << endl;
415  }
416  }
417  catch (CosEventComm::Disconnected& ex) {
418  cout << endl;
419  cerr << "Failed. Caught Disconnected Exception !" << endl;
420  }
421  catch (CORBA::COMM_FAILURE& ex) {
422  cout << endl;
423  cerr << "Failed. Caught COMM_FAILURE Exception !" << endl;
424  }
425  catch (CORBA::Exception& ex) {
426  cout << endl;
427  cerr<<"CORBA exception, unable to try_pull()"
428 #ifdef HAVE_OMNIORB4
429  <<": "<<ex._name()
430 #endif
431  << endl;
432  }
433  }
434  else
435  {
436  try {
437  cout << "Pull Consumer: pull() called. ";
438  cout.flush();
439  data = proxy_supplier->pull();
440  l = 0;
441  *data >>= l;
442  cout << "Data : " << l << endl;
443  delete data;
444  }
445  catch(CORBA::TRANSIENT&) {
446  cout << "caught TRANSIENT." << endl;
447  omni_thread::sleep(1);
448  }
449  catch (CosEventComm::Disconnected& ex) {
450  cout << endl;
451  cerr << "Failed. Caught Disconnected exception!" << endl;
452  exit(1);
453  }
454  catch (CORBA::COMM_FAILURE& ex) {
455  cout << endl;
456  cerr << "Failed. Caught COMM_FAILURE exception!" << endl;
457  exit(1);
458  }
459  catch (CORBA::SystemException& ex) {
460  cout << endl;
461  cerr<<"System exception, unable to pull()";
462 #ifdef HAVE_OMNIORB4
463  cerr<<": "<<ex._name();
464  if(ex.NP_minorString())
465  cerr<<" ("<<ex.NP_minorString()<<")";
466 #endif
467  cerr<< endl;
468  exit(1);
469  }
470  catch (CORBA::Exception& ex) {
471  cout << endl;
472  cerr<<"CORBA exception, unable to pull()"
473 #ifdef HAVE_OMNIORB4
474  <<": "<<ex._name()
475 #endif
476  << endl;
477  exit(1);
478  }
479  }
480  }
481 
482  // Disconnect - retrying on Comms Failure.
483  while (1)
484  {
485  try {
486  proxy_supplier->disconnect_pull_supplier();
487  break;
488  }
489  catch (CORBA::COMM_FAILURE& ex) {
490  cerr << "Caught COMM_FAILURE exception "
491  << "disconnecting Pull Consumer! Retrying..."
492  << endl;
493  continue;
494  }
495  }
496  cout << "Disconnected Pull Consumer." << endl;
497 
498  // Yawn
499  cout << "Sleeping " << sleepInterval << " seconds." << endl;
500  omni_thread::sleep(sleepInterval);
501  }
502 
503  // Not Reached
504  return 0;
505 }
506 
507 static void
508 usage(int argc, char **argv)
509 {
510  cerr<<
511 "\nCreate a PullConsumer to receive events from a channel.\n"
512 "syntax: "<<(argc?argv[0]:"pullcons")<<" OPTIONS [CHANNEL_URI]\n"
513 "\n"
514 "CHANNEL_URI: The event channel may be specified as a URI.\n"
515 " This may be an IOR, or a corbaloc::: or corbaname::: URI.\n"
516 "\n"
517 "OPTIONS: DEFAULT:\n"
518 " -t enable try_pull mode\n"
519 " -r connect using a nil reference\n"
520 " -d NUM disconnect after receiving NUM events [0 - never disconnect]\n"
521 " -s SECS sleep SECS seconds after disconnecting [0]\n"
522 " -n NAME channel name (if URI is not specified) [\"EventChannel\"]\n"
523 " -h display this help text\n" << endl;
524 }
int optind
Definition: getopt.cc:82
char * optarg
Definition: getopt.cc:83
int getopt(int argc, char *argv[], const char *optionS)
Definition: getopt.cc:88
CosNaming::Name str2name(const char *namestr)
Converts stringified name to naming service name.
Definition: naming.cc:117
CORBA::ORB_ptr orb
Definition: eventf.cc:60
int main(int argc, char **argv)
The main process entry point.
Definition: pullcons.cc:170
static void usage(int argc, char **argv)
Definition: pullcons.cc:508
void disconnect_pull_consumer()
Definition: pullcons.cc:165