OmniEvents
SupplierAdmin.cc
Go to the documentation of this file.
1 // Package : omniEvents
2 // SupplierAdmin.h Created : 2003/12/04
3 // Author : Alex Tingle
4 //
5 // Copyright (C) 2003-2005 Alex Tingle.
6 //
7 // This file is part of the omniEvents application.
8 //
9 // omniEvents is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Lesser General Public
11 // License as published by the Free Software Foundation; either
12 // version 2.1 of the License, or (at your option) any later version.
13 //
14 // omniEvents is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Lesser General Public License for more details.
18 //
19 // You should have received a copy of the GNU Lesser General Public
20 // License along with this library; if not, write to the Free Software
21 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 //
23 
24 #include "SupplierAdmin.h"
25 
26 #include "EventChannel.h"
27 #include "ProxyPushConsumer.h"
28 #include "ProxyPullConsumer.h"
29 #include "Orb.h"
30 #include "PersistNode.h"
31 
32 #define MILLION 1000000
33 #define BILLION 1000000000
34 
35 namespace OmniEvents {
36 
37 CosEventChannelAdmin::ProxyPushConsumer_ptr
39 {
40  return _pushConsumer->createObject();
41 }
42 
43 
44 CosEventChannelAdmin::ProxyPullConsumer_ptr
46 {
47  if(!_pullConsumer)
49  return _pullConsumer->createObject();
50 }
51 
52 
54  const EventChannel_i& channel,
55  PortableServer::POA_ptr poa
56 )
57 : Servant(poa),
58  _channel(channel),
59  _pushConsumer(NULL),
60  _pullConsumer(NULL),
61  _queue(),
62  _nextPull(0,0)
63 {
64  // Initialise _nextPull. Only set it if the cycle period is LESS than the
65  // pull retry period - otherwise just pull every cycle.
67  {
68  omni_thread::get_time(&(_nextPull.first),&(_nextPull.second));
69  }
70 
71  // Always create the ProxyPushConsumer_i default servant. This allows
72  // lazy clients to connect suppliers without having to go through the
73  // proper procedure - they can make up an appropriate ObjectId, call push()
74  // and it will just work (TM).
75  // Note: A SupplierAdmin_i is always created by the EventChannel to allow this
76  // behaviour.
78 
79  activateObjectWithId("SupplierAdmin");
80 }
81 
82 
84 {
85  DB(20,"~SupplierAdmin_i()")
86  if(_pullConsumer)
87  {
88  _pullConsumer->_remove_ref();
89  _pullConsumer=NULL;
90  }
91  if(_pushConsumer)
92  {
93  delete _pushConsumer;
94  _pushConsumer=NULL;
95  }
96  for(list<CORBA::Any*>::iterator i=_queue.begin(); i!=_queue.end(); ++i)
97  delete *i;
98 }
99 
100 
102 
103 
104 void SupplierAdmin_i::collect(list<CORBA::Any*>& events)
105 {
106  if(_pullConsumer)
107  {
108  _pullConsumer->collect();
109  if(0==_nextPull.first)
110  { // No delay between pulls.
111  _pullConsumer->triggerRequest();
112  }
113  else
114  { // Only trigger new pull() calls if `pullRetry' ms have passed.
115  pair<unsigned long,unsigned long> now;
116  omni_thread::get_time(&(now.first),&(now.second));
117  if(now>=_nextPull)
118  {
119  _pullConsumer->triggerRequest();
120 
121  CORBA::ULong p =_channel.pullRetryPeriod_ms();
122  do{
123  _nextPull.second += (p%1000)*MILLION; // nsec
124  _nextPull.first += p/1000 + _nextPull.second/BILLION; // sec
125  _nextPull.second %= BILLION; // nsec
126  } while(now>=_nextPull);
127  }
128  }
129  }
130  _pushConsumer->trigger();
131  // Pick up events from both pull & push consumers.
132  events=_queue;
133  _queue.clear();
134 }
135 
136 
138 {
139  if(_pushConsumer)
141  if(_pullConsumer)
143 }
144 
145 
147 {
148  // Build Push Consumer proxies
149  PersistNode* pushcNode =node.child("ProxyPushConsumer");
150  if(pushcNode && !pushcNode->_child.empty())
151  {
152  assert(_pushConsumer!=NULL);
153  _pushConsumer->reincarnate(*pushcNode);
154  }
155 
156  // Build Pull Consumer proxies
157  PersistNode* pullcNode =node.child("ProxyPullConsumer");
158  if(pullcNode && !pullcNode->_child.empty())
159  {
160  if(!_pullConsumer)
162  _pullConsumer->reincarnate(*pullcNode);
163  }
164 }
165 
166 
167 void SupplierAdmin_i::output(ostream& os)
168 {
169  if(_pushConsumer)
170  _pushConsumer->output(os);
171  if(_pullConsumer)
172  _pullConsumer->output(os);
173 }
174 
175 
176 }; // end namespace OmniEvents
#define DB(l, x)
Definition: Orb.h:49
#define OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(C)
Defines debug versions of _add/remove_ref() for class C.
Definition: Servant.h:70
#define BILLION
#define MILLION
Servant for CosEventChannelAdmin::EventChannel objects, also inherits from omni_thread.
Definition: EventChannel.h:115
ConsumerAdmin_i & consumerAdmin() const
Definition: EventChannel.h:168
CORBA::ULong pullRetryPeriod_ms() const
Definition: EventChannel.h:175
unsigned long cyclePeriod_ns() const
Definition: EventChannel.h:181
map< string, PersistNode * > _child
Definition: PersistNode.h:71
PersistNode * child(const string &key) const
Definition: PersistNode.cc:171
void reincarnate(const PersistNode &node)
Re-create servants from information saved in the log file.
Definition: ProxyManager.cc:60
void output(ostream &os)
Save this object's state to a stream.
Definition: ProxyManager.cc:87
void disconnect()
Send disconnect_pull_supplier() to all connected PullSuppliers.
OMNIEVENTS__DEBUG_REF_COUNTS__DECL CosEventChannelAdmin::ProxyPullConsumer_ptr createObject()
Default servant for ProxyPushConsumer objects.
void output(ostream &os) const
Save this object's state to a stream.
CosEventChannelAdmin::ProxyPushConsumer_ptr createObject()
Constructs a new object.
void reincarnate(const PersistNode &node)
Re-create all servants from information saved in the log file.
void disconnect()
Send disconnect_push_supplier() to all connected PushSuppliers.
Base class for servants.
Definition: Servant.h:114
PortableServer::POA_var _poa
Definition: Servant.h:131
void activateObjectWithId(const char *oidStr)
Calls activate_object_with_id() to activate this servant in its POA.
Definition: Servant.cc:125
list< CORBA::Any * > _queue
Incoming queue for the PushConsumer.
Definition: SupplierAdmin.h:83
pair< unsigned long, unsigned long > _nextPull
Next time to retry pull (sec,nsec).
Definition: SupplierAdmin.h:87
CosEventChannelAdmin::ProxyPushConsumer_ptr obtain_push_consumer()
SupplierAdmin_i(const EventChannel_i &channel, PortableServer::POA_ptr poa)
const EventChannel_i & _channel
Definition: SupplierAdmin.h:80
void disconnect()
Send disconnect_XXX_supplier() to all connected consumers.
void output(ostream &os)
Save this object's state to a stream.
ProxyPushConsumer_i * _pushConsumer
Definition: SupplierAdmin.h:81
ProxyPullConsumerManager * _pullConsumer
Definition: SupplierAdmin.h:82
OMNIEVENTS__DEBUG_REF_COUNTS__DECL void collect(list< CORBA::Any * > &events)
Collects all events that have arrived since the last call.
void reincarnate(const PersistNode &node)
Populate this servant from log information.
CosEventChannelAdmin::ProxyPullConsumer_ptr obtain_pull_consumer()