OmniEvents
pullsupp.cc
Go to the documentation of this file.
1 // -*- Mode: C++; -*-
2 // Package : omniEvents
3 // pullsupp.cc Created : 1/4/98
4 // Author : Paul Nader (pwn)
5 //
6 // Copyright (C) 1998 Paul Nader, 2003-2004 Alex Tingle
7 //
8 // This file is part of the omniEvents application.
9 //
10 // omniEvents is free software; you can redistribute it and/or
11 // modify it under the terms of the GNU Lesser General Public
12 // License as published by the Free Software Foundation; either
13 // version 2.1 of the License, or (at your option) any later version.
14 //
15 // omniEvents is distributed in the hope that it will be useful,
16 // but WITHOUT ANY WARRANTY; without even the implied warranty of
17 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 // Lesser General Public License for more details.
19 //
20 // You should have received a copy of the GNU Lesser General Public
21 // License along with this library; if not, write to the Free Software
22 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 //
24 // Description:
25 // Pull Model supplier implementation.
26 //
27 
28 /*
29  $Log: pullsupp.cc,v $
30  Revision 1.9 2004/10/08 09:06:08 alextingle
31  More robust exception minor code handling.
32 
33  Revision 1.8 2004/08/18 17:49:45 alextingle
34  Added check for SIGPIPE before trying to use it.
35 
36  Revision 1.7 2004/08/06 16:19:23 alextingle
37  -k & -K options removed.
38  Naming service names may now be as complex as you like.
39 
40  Revision 1.6 2004/04/20 16:52:05 alextingle
41  All examples updated for latest version on omniEvents. Server may now be
42  specified as a 'corbaloc' string or IOR, instead of as naming service id/kind.
43 
44  Revision 1.5 2004/02/04 22:29:55 alextingle
45  Reworked all C++ examples.
46  Removed catch(...) as it tends to make it harder to see what's going on.
47  Now uses POA instead of BOA.
48  Uses omniORB4's Exception name probing.
49  No longer uses 'naming.h/cc' utility code.
50 
51  Revision 1.4 2003/11/17 08:47:09 alextingle
52  Corrected typo.
53 
54  Revision 1.2 2003/11/16 23:24:22 alex
55  Corrected typo.
56 
57  Revision 1.1.1.1 2003/11/11 22:28:56 alex
58  Import of testing 2.3.0
59 
60  Revision 1.3 2003/11/03 22:20:26 alextingle
61  Removed all platform specific switches. Now uses autoconf, config.h.
62  Removed stub header in order to allow makefile dependency checking to work
63  correctly.
64 
65  Revision 1.1.1.1.2.1 2002/09/28 22:20:51 shamus13
66  Added ifdefs to enable omniEvents to compile
67  with both omniORB3 and omniORB4. If __OMNIORB4__
68  is defined during compilation, omniORB4 headers
69  and command line option syntax is used, otherwise
70  fall back to omniORB3 style.
71 
72  Revision 1.1.1.1 2002/09/25 19:00:26 shamus13
73  Import of OmniEvents source tree from release 2.1.1
74 
75  Revision 0.11 2000/08/30 04:39:48 naderp
76  Port to omniORB 3.0.1.
77 
78  Revision 0.10 2000/03/16 05:37:27 naderp
79  Added stdlib.h for getopt.
80 
81  Revision 0.9 2000/03/06 13:26:32 naderp
82  Using util getRootNamingContext function.
83  Using stub headers.
84  Fixed error messages.
85 
86  Revision 0.8 2000/03/02 03:16:26 naderp
87  Added retry resiliency for handling COMM_FAUILURE exceptions.
88  Replaced condition variable by counting semaphore.
89 
90  Revision 0.7 1999/11/02 13:39:13 naderp
91  Added <signal.h>
92 
93  Revision 0.6 1999/11/02 07:57:22 naderp
94  Updated usage.
95 
96 Revision 0.5 99/11/01 16:10:12 16:10:12 naderp (Paul Nader)
97 omniEvents 2.0 Release.
98 Ignoring SIGPIPE for UNIX platforms.
99 
100 Revision 0.4 99/04/23 16:05:44 16:05:44 naderp (Paul Nader)
101 gcc port.
102 
103 Revision 0.3 99/04/23 09:34:01 09:34:01 naderp (Paul Nader)
104 Windows Port.
105 
106 Revision 0.2 99/04/21 18:06:25 18:06:25 naderp (Paul Nader)
107 *** empty log message ***
108 
109 Revision 0.1.1.1 98/11/27 16:59:33 16:59:33 naderp (Paul Nader)
110 Added -s option to sleep after disconnecting.
111 
112 Revision 0.1 98/11/25 14:08:07 14:08:07 naderp (Paul Nader)
113 Initial Revision
114 
115 */
116 
117 #ifdef HAVE_CONFIG_H
118 # include "config.h"
119 #endif
120 
121 #ifdef HAVE_GETOPT
122 # include <unistd.h>
123 extern char* optarg;
124 extern int optind;
125 #else
126 # include "getopt.h"
127 #endif
128 
129 #ifdef HAVE_IOSTREAM
130 # include <iostream>
131 #else
132 # include <iostream.h>
133 #endif
134 
135 #ifdef HAVE_STD_IOSTREAM
136 using namespace std;
137 #endif
138 
139 #ifdef HAVE_STDLIB_H
140 # include <stdlib.h>
141 #endif
142 
143 #ifdef HAVE_SIGNAL_H
144 # include <signal.h>
145 #endif
146 
147 #include <cstdio>
148 
149 #include "CosEventComm.hh"
150 #include "CosEventChannelAdmin.hh"
151 #include "naming.h"
152 
153 static omni_semaphore connect_cond(0);
154 static void usage(int argc, char **argv);
155 
156 class Supplier_i : virtual public POA_CosEventComm::PullSupplier {
157 public:
158  Supplier_i (long disconnect = 0) : i(0), _disconnect(disconnect), l(0) {};
159  CORBA::Any *pull();
160  CORBA::Any *try_pull(CORBA::Boolean &has_event);
161  void disconnect_pull_supplier ();
162 
163 private:
164  long i;
166  CORBA::ULong l;
167 };
168 
169 void
171  cout << "Pull Supplier: disconnected by channel." << endl;
172 }
173 
174 CORBA::Any *
176  cout << "Pull Supplier: pull() called. Data : ";
177  CORBA::Any *any = new CORBA::Any();
178  *any <<= l++;
179  cout << l-1 << endl;
180 
181  // Exercise Disconnect
182  if ((_disconnect > 0) && (i == _disconnect)) {
183  i = 0;
184  // Signal main thread to disconnect and re-connect.
185  connect_cond.post();
186  }
187  i++;
188  return (any);
189 }
190 
191 CORBA::Any *
192 Supplier_i::try_pull(CORBA::Boolean &has_event)
193 {
194  cout << "Pull Supplier: try_pull() called. Data : ";
195  CORBA::Any *any = new CORBA::Any();
196  *any <<= l++;
197  cout << l-1 << endl;
198  has_event = 1;
199 
200  // Exercise Disconnect
201  if ((_disconnect > 0) && (i == _disconnect)) {
202  i = 0;
203  // Signal main thread to disconnect and re-connect.
204  connect_cond.post();
205  }
206  i++;
207  return (any);
208 }
209 
210 
211 int
212 main (int argc, char** argv)
213 {
214 #if defined(HAVE_OMNIORB4)
215  CORBA::ORB_var orb =CORBA::ORB_init(argc,argv,"omniORB4");
216 #else
217  CORBA::ORB_var orb =CORBA::ORB_init(argc,argv,"omniORB3");
218 #endif
219 
220  // Process Options
221  int discnum =0;
222  int sleepInterval =0;
223  const char* channelName ="EventChannel";
224 
225  int c;
226  while ((c = getopt(argc,argv,"d:s:n:h")) != EOF)
227  {
228  switch (c)
229  {
230  case 'd': discnum = atoi(optarg);
231  break;
232 
233  case 's': sleepInterval = atoi(optarg);
234  break;
235 
236  case 'n': channelName = optarg;
237  break;
238 
239  case 'h':
240  default : usage(argc,argv);
241  exit(-1);
242  break;
243  }
244  }
245 
246 #if defined(HAVE_SIGNAL_H) && defined(SIGPIPE)
247  // Ignore broken pipes
248  signal(SIGPIPE, SIG_IGN);
249 #endif
250 
251  Supplier_i* supplier = new Supplier_i (discnum);
252  CosEventChannelAdmin::EventChannel_var channel;
253 
254  const char* action=""; // Use this variable to help report errors.
255  try {
256  CORBA::Object_var obj;
257 
258  action="resolve initial reference 'RootPOA'";
259  obj=orb->resolve_initial_references("RootPOA");
260  PortableServer::POA_var rootPoa =PortableServer::POA::_narrow(obj);
261  if(CORBA::is_nil(rootPoa))
262  throw CORBA::OBJECT_NOT_EXIST();
263 
264  action="activate the RootPOA's POAManager";
265  PortableServer::POAManager_var pman =rootPoa->the_POAManager();
266  pman->activate();
267 
268  //
269  // Obtain object reference to EventChannel
270  // (from command-line argument or from the Naming Service).
271  if(optind<argc)
272  {
273  action="convert URI from command line into object reference";
274  obj=orb->string_to_object(argv[optind]);
275  }
276  else
277  {
278  action="resolve initial reference 'NameService'";
279  obj=orb->resolve_initial_references("NameService");
280  CosNaming::NamingContext_var rootContext=
281  CosNaming::NamingContext::_narrow(obj);
282  if(CORBA::is_nil(rootContext))
283  throw CORBA::OBJECT_NOT_EXIST();
284 
285  action="find EventChannel in NameService";
286  cout << action << endl;
287  obj=rootContext->resolve(str2name(channelName));
288  }
289 
290  action="narrow object reference to event channel";
291  channel=CosEventChannelAdmin::EventChannel::_narrow(obj);
292  if(CORBA::is_nil(channel))
293  {
294  cerr << "Failed to narrow Event Channel reference." << endl;
295  exit(1);
296  }
297 
298  }
299  catch(CORBA::ORB::InvalidName& ex) { // resolve_initial_references
300  cerr<<"Failed to "<<action<<". ORB::InvalidName"<<endl;
301  exit(1);
302  }
303  catch(CosNaming::NamingContext::InvalidName& ex) { // resolve
304  cerr<<"Failed to "<<action<<". NamingContext::InvalidName"<<endl;
305  exit(1);
306  }
307  catch(CosNaming::NamingContext::NotFound& ex) { // resolve
308  cerr<<"Failed to "<<action<<". NamingContext::NotFound"<<endl;
309  exit(1);
310  }
311  catch(CosNaming::NamingContext::CannotProceed& ex) { // resolve
312  cerr<<"Failed to "<<action<<". NamingContext::CannotProceed"<<endl;
313  exit(1);
314  }
315  catch(CORBA::TRANSIENT& ex) { // _narrow()
316  cerr<<"Failed to "<<action<<". TRANSIENT"<<endl;
317  exit(1);
318  }
319  catch(CORBA::OBJECT_NOT_EXIST& ex) { // _narrow()
320  cerr<<"Failed to "<<action<<". OBJECT_NOT_EXIST"<<endl;
321  exit(1);
322  }
323  catch(CORBA::SystemException& ex) {
324  cerr<<"Failed to "<<action<<".";
325 #if defined(HAVE_OMNIORB4)
326  cerr<<" "<<ex._name();
327  if(ex.NP_minorString())
328  cerr<<" ("<<ex.NP_minorString()<<")";
329 #endif
330  cerr<<endl;
331  exit(1);
332  }
333  catch(CORBA::Exception& ex) {
334  cerr<<"Failed to "<<action<<"."
335 #if defined(HAVE_OMNIORB4)
336  " "<<ex._name()
337 #endif
338  <<endl;
339  exit(1);
340  }
341 
342  //
343  // Get Supplier Admin interface - retrying on Comms Failure.
344  CosEventChannelAdmin::SupplierAdmin_var supplier_admin;
345  while (1)
346  {
347  try {
348  supplier_admin = channel->for_suppliers ();
349  if (CORBA::is_nil(supplier_admin))
350  {
351  cerr << "Event Channel returned nil Supplier Admin!"
352  << endl;
353  exit(1);
354  }
355  break;
356  }
357  catch (CORBA::COMM_FAILURE& ex) {
358  cerr << "Caught COMM_FAILURE exception "
359  << "obtaining Supplier Admin! Retrying..."
360  << endl;
361  continue;
362  }
363  }
364  cout << "Obtained SupplierAdmin." << endl;
365 
366  while (1)
367  {
368  //
369  // Get proxy consumer - retrying on Comms Failure.
370  CosEventChannelAdmin::ProxyPullConsumer_var proxy_consumer;
371  while (1)
372  {
373  try {
374  proxy_consumer = supplier_admin->obtain_pull_consumer ();
375  if (CORBA::is_nil(proxy_consumer))
376  {
377  cerr << "Supplier Admin returned nil proxy_consumer!"
378  << endl;
379  exit(1);
380  }
381  break;
382  }
383  catch (CORBA::COMM_FAILURE& ex) {
384  cerr << "Caught COMM_FAILURE exception "
385  << "obtaining Proxy Pull Consumer! Retrying..."
386  << endl;
387  continue;
388  }
389  }
390  cout << "Obtained ProxyPullConsumer." << endl;
391 
392  // Connect Pull Supplier - retrying on Comms Failure.
393  CosEventComm::PullSupplier_var supplierRef =supplier->_this();
394  while (1)
395  {
396  try {
397  proxy_consumer->connect_pull_supplier(supplierRef.in());
398  break;
399  }
400  catch (CORBA::BAD_PARAM& ex) {
401  cerr<<"Caught BAD_PARAM Exception connecting Pull Supplier!"<<endl;
402  exit(1);
403  }
404  catch (CosEventChannelAdmin::AlreadyConnected& ex) {
405  cerr << "Pull Supplier already connected!"
406  << endl;
407  break;
408  }
409  catch (CORBA::COMM_FAILURE& ex) {
410  cerr << "Caught COMM_FAILURE exception "
411  << "connecting Pull Supplier! Retrying..."
412  << endl;
413  continue;
414  }
415  }
416  cout << "Connected Pull Supplier." << endl;
417 
418  // Wait for indication to disconnect before re-connecting.
419  connect_cond.wait();
420 
421  // Disconnect - retrying on Comms Failure.
422  while (1)
423  {
424  try {
425  proxy_consumer->disconnect_pull_consumer();
426  break;
427  }
428  catch (CORBA::COMM_FAILURE& ex) {
429  cerr << "Caught COMM_FAILURE exception "
430  << "disconnecting Pull Supplier! Retrying..."
431  << endl;
432  continue;
433  }
434  }
435  cout << "Disconnected Pull Supplier." << endl;
436 
437  // Yawn.
438  cout << "Sleeping " << sleepInterval << " seconds." << endl;
439  omni_thread::sleep(sleepInterval);
440  }
441 
442  // Not Reached
443  return 0;
444 }
445 
446 static void
447 usage(int argc, char **argv)
448 {
449  cerr<<
450 "\nCreate a PullSupplier to send events to a channel.\n"
451 "syntax: "<<(argc?argv[0]:"pullsupp")<<" OPTIONS [CHANNEL_URI]\n"
452 "\n"
453 "CHANNEL_URI: The event channel may be specified as a URI.\n"
454 " This may be an IOR, or a corbaloc::: or corbaname::: URI.\n"
455 "\n"
456 "OPTIONS: DEFAULT:\n"
457 " -d NUM disconnect after sending NUM events [0 - never disconnect]\n"
458 " -s SECS sleep SECS seconds after disconnecting [0]\n"
459 " -n NAME channel name (if URI is not specified) [\"EventChannel\"]\n"
460 " -h display this help text\n" << endl;
461 }
int optind
Definition: getopt.cc:82
char * optarg
Definition: getopt.cc:83
int getopt(int argc, char *argv[], const char *optionS)
Definition: getopt.cc:88
CosNaming::Name str2name(const char *namestr)
Converts stringified name to naming service name.
Definition: naming.cc:117
CORBA::ORB_ptr orb
Definition: eventf.cc:60
int main(int argc, char **argv)
The main process entry point.
Definition: pullsupp.cc:212
static void usage(int argc, char **argv)
Definition: pullsupp.cc:447
static omni_semaphore connect_cond(0)
long _disconnect
Definition: pullsupp.cc:165
Supplier_i(long disconnect=0)
Definition: pullsupp.cc:158
CORBA::Any * pull()
Definition: pullsupp.cc:175
CORBA::Any * try_pull(CORBA::Boolean &has_event)
Definition: pullsupp.cc:192
CORBA::ULong l
Definition: pullsupp.cc:166
void disconnect_pull_supplier()
Definition: pullsupp.cc:170