OmniEvents
events.cc
Go to the documentation of this file.
1 // -*- Mode: C++; -*-
2 // Package : omniEvents
3 // events.cc Created : 2004/05/02
4 // Author : Alex Tingle
5 //
6 // Copyright (C) 2004 Alex Tingle
7 //
8 // This file is part of the omniEvents application.
9 //
10 // omniEvents is free software; you can redistribute it and/or
11 // modify it under the terms of the GNU Lesser General Public
12 // License as published by the Free Software Foundation; either
13 // version 2.1 of the License, or (at your option) any later version.
14 //
15 // omniEvents is distributed in the hope that it will be useful,
16 // but WITHOUT ANY WARRANTY; without even the implied warranty of
17 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 // Lesser General Public License for more details.
19 //
20 // You should have received a copy of the GNU Lesser General Public
21 // License along with this library; if not, write to the Free Software
22 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 //
24 // Description:
25 // Push Model streamer.
26 //
27 
28 #ifdef HAVE_CONFIG_H
29 # include "config.h"
30 #endif
31 
32 #ifdef HAVE_GETOPT
33 # include <unistd.h>
34 extern char* optarg;
35 extern int optind;
36 #else
37 # include "getopt.h"
38 #endif
39 
40 #ifdef HAVE_IOSTREAM
41 # include <iostream>
42 #else
43 # include <iostream.h>
44 #endif
45 
46 #ifdef HAVE_STD_IOSTREAM
47 using namespace std;
48 #endif
49 
50 #ifdef HAVE_STDLIB_H
51 # include <stdlib.h>
52 #endif
53 
54 #include <stdio.h>
55 
56 #if defined HAVE_UNISTD_H
57 # include <unistd.h> // read(), write()
58 #elif defined __WIN32__
59 # include <io.h>
60 # define write(fd,buf,count) _write(fd,buf,count)
61 # define read(fd,buf,count) _read(fd,buf,count)
62 # define ssize_t int
63 #endif
64 
65 #ifdef HAVE_SIGNAL_H
66 # include <signal.h>
67 #endif
68 
69 #include "CosEventComm.hh"
70 #include "CosEventChannelAdmin.hh"
71 #include "naming.h"
72 
73 #ifndef STDIN_FILENO
74 # define STDIN_FILENO 0
75 # define STDOUT_FILENO 1
76 #endif
77 
78 CORBA::ORB_ptr orb;
79 
80 static void usage(int argc, char **argv);
81 
82 //
83 // Time
84 //
85 
86 #define BILLION 1000000000
87 
88 class Time;
89 class Time
90 {
91 private:
92  CORBA::ULong _sec;
93  CORBA::ULong _nano;
94 public:
95  static Time current()
96  {
97  Time result;
98  unsigned long sec,nano;
99  omni_thread::get_time(&sec,&nano);
100  result._sec=sec;
101  result._nano=nano;
102  return result;
103  }
104  static void sleepUntil(const Time& futureTime)
105  {
106  Time now =current();
107  if(now<futureTime)
108  {
109  Time offset=futureTime-now;
110  omni_thread::sleep(offset._sec,offset._nano);
111  }
112  }
113  //
114  Time():_sec(0),_nano(0){}
115  Time(CORBA::ULong sec,CORBA::ULong nano):_sec(sec),_nano(nano){}
116  Time(const Time& right):_sec(right._sec),_nano(right._nano){}
117  Time& operator=(const Time& right)
118  {
119  if(this!=&right)
120  {
121  _sec =right._sec;
122  _nano=right._nano;
123  }
124  return *this;
125  }
126  bool operator<(const Time& right) const
127  {
128  if(_sec==right._sec)
129  return _nano<right._nano;
130  else
131  return _sec<right._sec;
132  }
133  Time& operator+=(const Time& right)
134  {
135  _sec +=right._sec;
136  _nano+=right._nano;
137  if(_nano>BILLION)
138  {
139  _nano=_nano%BILLION;
140  ++_sec;
141  }
142  return *this;
143  }
144  Time operator+(const Time& right) const
145  {
146  Time result(*this);
147  result+=right;
148  return result;
149  }
150  Time& operator-=(const Time& right)
151  {
152  if(operator<(right))
153  {
154  cerr<<"Negative time!"<<endl;
155  throw CORBA::BAD_PARAM();
156  }
157  _sec-=right._sec;
158  if(_nano<right._nano)
159  {
160  _nano+=BILLION;
161  --_sec;
162  }
163  _nano-=right._nano;
164  return *this;
165  }
166  Time operator-(const Time& right) const
167  {
168  Time result(*this);
169  result-=right;
170  return result;
171  }
172  void operator>>=(cdrMemoryStream& s) const
173  {
174  _sec>>=s;
175  _nano>>=s;
176  }
177  void operator<<=(cdrMemoryStream& s)
178  {
179  _sec<<=s;
180  _nano<<=s;
181  }
182  bool is_nil() const { return(_sec==0 && _nano==0); }
183 }; // end class Time
184 
185 
186 //
187 // Consumer_i
188 //
189 
190 class Consumer_i : virtual public POA_CosEventComm::PushConsumer
191 {
192 public:
193  Consumer_i(long disconnect=0): _memstream() {}
194  void push(const CORBA::Any& data)
195  {
196  // Record the event timestamp.
197  Time now=Time::current();
198  now>>=_memstream;
199  // stream event data.
200  data>>=_memstream;
201  // Write to file.
202  write(STDOUT_FILENO,_memstream.bufPtr(),_memstream.bufSize());
203  // Reset.
204  _memstream.rewindPtrs();
205  }
207  {
208  cout<<"disconnected"<<endl;
209  orb->shutdown(0);
210  }
211  void consume(
212  CosEventChannelAdmin::EventChannel_ptr channel,
213  const char*& action)
214  {
215  action="get ConsumerAdmin";
216  CosEventChannelAdmin::ConsumerAdmin_var consumer_admin =
217  channel->for_consumers();
218 
219  action="get ProxyPushSupplier";
220  CosEventChannelAdmin::ProxyPushSupplier_var proxy_supplier =
221  consumer_admin->obtain_push_supplier();
222 
223  action="connect to ProxyPushSupplier";
224  proxy_supplier->connect_push_consumer(_this());
225  }
226 private:
227  cdrMemoryStream _memstream;
228 };
229 
230 
231 //
232 // Supplier_i
233 //
234 
235 class Supplier_i : virtual public POA_CosEventComm::PushSupplier
236 {
237 public:
238  Supplier_i(): _connected(true) {}
240  {
241  cout<<"disconnected"<<endl;
242  _connected=false;
243  }
244  void supply(
245  CosEventChannelAdmin::EventChannel_ptr channel,
246  const char*& action)
247  {
248  action="get SupplierAdmin";
249  CosEventChannelAdmin::SupplierAdmin_var supplier_admin =
250  channel->for_suppliers();
251 
252  action="get ProxyPushConsumer";
253  CosEventChannelAdmin::ProxyPushConsumer_var proxy_consumer =
254  supplier_admin->obtain_push_consumer();
255 
256  action="connect to ProxyPushConsumer";
257  proxy_consumer->connect_push_supplier(_this());
258 
259  char buf[1024];
260  ssize_t len;
261  action="read standard input";
262  // Stream start time (seconds,nanoseconds)
263  Time offsetTime;
264  while(_connected && (len=read(STDIN_FILENO,buf,1024)))
265  {
266  CORBA::Any any;
267  cdrMemoryStream memstr;
268  action="put_octet_array";
269  memstr.put_octet_array( (_CORBA_Octet*)buf, (int)len );
270  while(_connected && memstr.currentInputPtr()<memstr.bufSize())
271  {
272  action="unmarshal";
273  Time eventTime;
274  eventTime<<=memstr;
275  any<<=memstr;
276 
277  if(offsetTime.is_nil()) // first time special.
278  offsetTime=Time::current()-eventTime;
279  Time::sleepUntil(eventTime+offsetTime);
280 
281  action="push";
282  proxy_consumer->push(any);
283  }
284  }
285  }
286 private:
288 };
289 
290 
291 //
292 // main()
293 //
294 
295 int main(int argc, char **argv)
296 {
297  //
298  // Start orb.
299 #if defined(HAVE_OMNIORB4)
300  orb=CORBA::ORB_init(argc,argv,"omniORB4");
301 #else
302  orb=CORBA::ORB_init(argc,argv,"omniORB3");
303 #endif
304 
305  // Process Options
306  bool supplierMode =false;
307  const char* channelName ="EventChannel";
308 
309  int c;
310  while ((c = getopt(argc,argv,"shn:")) != EOF)
311  {
312  switch (c)
313  {
314  case 's': supplierMode=true;
315  break;
316 
317  case 'n': channelName = optarg;
318  break;
319 
320  case 'h': usage(argc,argv);
321  exit(0);
322  default : usage(argc,argv);
323  exit(-1);
324  }
325  }
326 
327 #if defined(HAVE_SIGNAL_H) && defined(SIGPIPE)
328  // Ignore broken pipes
329  signal(SIGPIPE, SIG_IGN);
330 #endif
331 
332  const char* action=""; // Use this variable to help report errors.
333  try {
334  CORBA::Object_var obj;
335 
336  action="resolve initial reference 'RootPOA'";
337  obj=orb->resolve_initial_references("RootPOA");
338  PortableServer::POA_var rootPoa =PortableServer::POA::_narrow(obj);
339  if(CORBA::is_nil(rootPoa))
340  throw CORBA::OBJECT_NOT_EXIST();
341 
342  action="activate the RootPOA's POAManager";
343  PortableServer::POAManager_var pman =rootPoa->the_POAManager();
344  pman->activate();
345 
346  //
347  // Obtain object reference to EventChannel
348  // (from command-line argument or from the Naming Service).
349  if(optind<argc)
350  {
351  action="convert URI from command line into object reference";
352  obj=orb->string_to_object(argv[optind]);
353  }
354  else
355  {
356  action="resolve initial reference 'NameService'";
357  obj=orb->resolve_initial_references("NameService");
358  CosNaming::NamingContext_var rootContext=
359  CosNaming::NamingContext::_narrow(obj);
360  if(CORBA::is_nil(rootContext))
361  throw CORBA::OBJECT_NOT_EXIST();
362 
363  action="find EventChannel in NameService";
364  cout << action << endl;
365  obj=rootContext->resolve(str2name(channelName));
366  }
367 
368  action="narrow object reference to event channel";
369  CosEventChannelAdmin::EventChannel_var channel =
370  CosEventChannelAdmin::EventChannel::_narrow(obj);
371  if(CORBA::is_nil(channel))
372  {
373  cerr << "Failed to narrow Event Channel reference." << endl;
374  exit(1);
375  }
376 
377  if(supplierMode)
378  {
379  action="construct PushSupplier";
380  Supplier_i* supplier =new Supplier_i();
381  supplier->supply(channel,action);
382  }
383  else
384  {
385  action="construct PushConsumer";
386  Consumer_i* consumer =new Consumer_i();
387  consumer->consume(channel,action);
388 
389  action="run ORB";
390  orb->run();
391  }
392 
393  return 0;
394 
395  }
396  catch(CORBA::ORB::InvalidName& ex) { // resolve_initial_references
397  cerr<<"Failed to "<<action<<". ORB::InvalidName"<<endl;
398  }
399  catch(CosNaming::NamingContext::InvalidName& ex) { // resolve
400  cerr<<"Failed to "<<action<<". NamingContext::InvalidName"<<endl;
401  }
402  catch(CosNaming::NamingContext::NotFound& ex) { // resolve
403  cerr<<"Failed to "<<action<<". NamingContext::NotFound"<<endl;
404  }
405  catch(CosNaming::NamingContext::CannotProceed& ex) { // resolve
406  cerr<<"Failed to "<<action<<". NamingContext::CannotProceed"<<endl;
407  }
408  catch(CORBA::TRANSIENT& ex) { // _narrow()
409  cerr<<"Failed to "<<action<<". TRANSIENT"<<endl;
410  }
411  catch(CORBA::OBJECT_NOT_EXIST& ex) { // _narrow()
412  cerr<<"Failed to "<<action<<". OBJECT_NOT_EXIST"<<endl;
413  }
414  catch(CORBA::SystemException& ex) {
415  cerr<<"Failed to "<<action<<"."
416 #if defined(HAVE_OMNIORB4)
417  " "<<ex._name()<<" ("<<ex.NP_minorString()<<")"
418 #endif
419  <<endl;
420  }
421  catch(CORBA::Exception& ex) {
422  cerr<<"Failed to "<<action<<"."
423 #if defined(HAVE_OMNIORB4)
424  " "<<ex._name()
425 #endif
426  <<endl;
427  }
428 
429  return 1;
430 }
431 
432 static void usage(int argc, char **argv)
433 {
434  cerr<<
435 "\nStream events from a channel to stdout, or (-s) from stdin to a channel.\n"
436 "syntax: "<<(argc?argv[0]:"events")<<" OPTIONS [CHANNEL_URI]\n"
437 "\n"
438 "CHANNEL_URI: The event channel may be specified as a URI.\n"
439 " This may be an IOR, or a corbaloc::: or corbaname::: URI.\n"
440 "\n"
441 "OPTIONS: DEFAULT:\n"
442 " -s supply mode. Read events from stdin.\n"
443 " -n NAME channel name (if URI is not specified) [\"EventChannel\"]\n"
444 " -h display this help text\n" << endl;
445 }
int optind
Definition: getopt.cc:82
char * optarg
Definition: getopt.cc:83
int getopt(int argc, char *argv[], const char *optionS)
Definition: getopt.cc:88
CosNaming::Name str2name(const char *namestr)
Converts stringified name to naming service name.
Definition: naming.cc:117
#define BILLION
Definition: events.cc:86
int main(int argc, char **argv)
The main process entry point.
Definition: events.cc:295
CORBA::ORB_ptr orb
Definition: events.cc:78
#define STDOUT_FILENO
Definition: events.cc:75
static void usage(int argc, char **argv)
Definition: events.cc:432
#define STDIN_FILENO
Definition: events.cc:74
Definition: events.cc:90
bool is_nil() const
Definition: events.cc:182
bool operator<(const Time &right) const
Definition: events.cc:126
static Time current()
Definition: events.cc:95
Time(const Time &right)
Definition: events.cc:116
Time operator+(const Time &right) const
Definition: events.cc:144
Time()
Definition: events.cc:114
CORBA::ULong _nano
Definition: events.cc:93
Time(CORBA::ULong sec, CORBA::ULong nano)
Definition: events.cc:115
static void sleepUntil(const Time &futureTime)
Definition: events.cc:104
Time & operator-=(const Time &right)
Definition: events.cc:150
Time operator-(const Time &right) const
Definition: events.cc:166
Time & operator=(const Time &right)
Definition: events.cc:117
void operator>>=(cdrMemoryStream &s) const
Definition: events.cc:172
Time & operator+=(const Time &right)
Definition: events.cc:133
void operator<<=(cdrMemoryStream &s)
Definition: events.cc:177
CORBA::ULong _sec
Definition: events.cc:92
void consume(CosEventChannelAdmin::EventChannel_ptr channel, const char *&action)
Definition: events.cc:211
void push(const CORBA::Any &data)
Definition: events.cc:194
cdrMemoryStream _memstream
Definition: events.cc:227
void disconnect_push_consumer()
Definition: events.cc:206
Consumer_i(long disconnect=0)
Definition: events.cc:193
void disconnect_push_supplier()
Definition: events.cc:239
Supplier_i()
Definition: events.cc:238
void supply(CosEventChannelAdmin::EventChannel_ptr channel, const char *&action)
Definition: events.cc:244
bool _connected
Definition: events.cc:287