OmniEvents
ProxyPullSupplier.cc
Go to the documentation of this file.
1 // Package : omniEvents
2 // ProxyPullSupplier.cc Created : 2003/12/04
3 // Author : Alex Tingle
4 //
5 // Copyright (C) 2003-2005 Alex Tingle.
6 //
7 // This file is part of the omniEvents application.
8 //
9 // omniEvents is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Lesser General Public
11 // License as published by the Free Software Foundation; either
12 // version 2.1 of the License, or (at your option) any later version.
13 //
14 // omniEvents is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Lesser General Public License for more details.
18 //
19 // You should have received a copy of the GNU Lesser General Public
20 // License along with this library; if not, write to the Free Software
21 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 //
23 
24 #include "ProxyPullSupplier.h"
25 #include "EventChannel.h"
26 #include "Orb.h"
27 #include "omniEventsLog.h"
28 #include "PersistNode.h"
29 #include <assert.h>
30 
31 namespace OmniEvents {
32 
33 //
34 // ProxyPullSupplierManager
35 //
36 
37 PortableServer::Servant ProxyPullSupplierManager::incarnate(
38  const PortableServer::ObjectId& oid,
39  PortableServer::POA_ptr poa
40 )
41 {
42  // Evict the oldest proxy servant, if we have reached the maximum number.
43  if(_servants.size()>=_channel.maxNumProxies())
44  {
45  ProxyPullSupplier_i* oldest =NULL;
46  unsigned long age =0;
47  for(set<Proxy*>::iterator i=_servants.begin(); i!=_servants.end(); ++i)
48  if(!oldest || dynamic_cast<ProxyPullSupplier_i*>(*i)->timestamp()<age)
49  {
50  oldest=dynamic_cast<ProxyPullSupplier_i*>(*i);
51  age=oldest->timestamp();
52  }
53  DB(5,"Evicting oldest ProxyPullSupplier to make space for a new one")
54  try{ oldest->disconnect_pull_supplier(); }catch(CORBA::OBJECT_NOT_EXIST&){}
55  }
56  // Make a new servant.
58  _servants.insert(result);
59  return result;
60 }
61 
63  const EventChannel_i& channel,
64  PortableServer::POA_ptr parentPoa,
65  EventQueue& q
66 )
67 : ProxyManager(parentPoa),
68  _queue(q),
69  _channel(channel)
70 {
71  ProxyManager::activate("ProxyPullSupplier");
72 }
73 
75 {
76  DB(20,"~ProxyPullSupplierManager()")
77 }
78 
80 
81 CosEventChannelAdmin::ProxyPullSupplier_ptr
83 {
84  return createNarrowedReference<CosEventChannelAdmin::ProxyPullSupplier>(
85  _managedPoa.in(),
86  CosEventChannelAdmin::_tc_ProxyPullSupplier->id()
87  );
88 }
89 
91 {
92  for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
93  {
94  ProxyPullSupplier_i* pps =dynamic_cast<ProxyPullSupplier_i*>(*i);
95  // We are in the EventChannel's thread.
96  // Make sure all calls go though the ProxyPullSupplier POA.
97  CosEventChannelAdmin::ProxyPullSupplier_var ppsv =pps->_this();
99 
100  }
101 }
102 
103 
104 //
105 // ProxyPullSupplier_i
106 //
107 
108 // CORBA interface methods
109 
111  CosEventComm::PullConsumer_ptr pullConsumer
112 )
113 {
114  if(_connected || !CORBA::is_nil(_target) || !CORBA::is_nil(_req))
115  throw CosEventChannelAdmin::AlreadyConnected();
116  touch();
117  _connected=true;
118  if(!CORBA::is_nil(pullConsumer))
119  _target=CosEventComm::PullConsumer::_duplicate(pullConsumer);
120 
122  {
123  WriteLock log;
124  output(log.os);
125  }
126 }
127 
129 {
130  DB(5,"ProxyPullSupplier_i::disconnect_pull_supplier()");
131  touch();
132  eraseKey("ConsumerAdmin/ProxyPullSupplier");
134  if(!_connected)
135  {
136  throw CORBA::OBJECT_NOT_EXIST(
137  IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
138  CORBA::COMPLETED_NO
139  );
140  }
141  else if(!CORBA::is_nil(_target))
142  {
143  CORBA::Request_var req=_target->_request("disconnect_pull_consumer");
144  _target=CosEventComm::PullConsumer::_nil();
145  req->send_deferred();
146  Orb::inst().deferredRequest(req._retn());
147  }
148 }
149 
151 {
152  if(!_connected)
153  throw CosEventComm::Disconnected();
154  touch();
155  if(moreEvents())
156  return new CORBA::Any(*nextEvent());
157  else
158  throw CORBA::TRANSIENT(
159  IFELSE_OMNIORB4(omni::TRANSIENT_CallTimedout,0),
160  CORBA::COMPLETED_NO
161  );
162 }
163 
164 CORBA::Any* ProxyPullSupplier_i::try_pull(CORBA::Boolean& has_event)
165 {
166  if(!_connected)
167  throw CosEventComm::Disconnected();
168  touch();
169  if(moreEvents())
170  {
171  has_event=1;
172  return new CORBA::Any(*nextEvent());
173  }
174  else
175  {
176  has_event=0;
177  return new CORBA::Any();
178  }
179 }
180 
181 //
182 
184  PortableServer::POA_ptr poa,
185  EventQueue& q
186 )
187 : Proxy(poa),
188  EventQueue::Reader(q),
189  _target(CosEventComm::PullConsumer::_nil()),
190  _connected(false),
191  _timestamp(0)
192 {
193  touch();
194 }
195 
197 {
198  DB(20,"~ProxyPullSupplier_i()")
199 }
200 
202  const string& oid,
203  const PersistNode& node
204 )
205 {
206  CosEventComm::PullConsumer_var pullConsumer =
207  string_to_<CosEventComm::PullConsumer>(node.attrString("IOR").c_str());
208  // Do not activate until we know that we have read a valid target.
209  activateObjectWithId(oid.c_str());
210  connect_pull_consumer(pullConsumer.in());
211 }
212 
214 {
215  basicOutput(os,"ConsumerAdmin/ProxyPullSupplier",_target.in());
216 }
217 
219 {
220  unsigned long nsec; // dummy
221  omni_thread::get_time(&_timestamp,&nsec);
222 }
223 
224 }; // end namespace OmniEvents
#define IFELSE_OMNIORB4(omniORB4_code, default_code)
Definition: Orb.h:45
#define DB(l, x)
Definition: Orb.h:49
#define OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(C)
Defines debug versions of _add/remove_ref() for class C.
Definition: Servant.h:70
Servant for CosEventChannelAdmin::EventChannel objects, also inherits from omni_thread.
Definition: EventChannel.h:115
CORBA::ULong maxNumProxies() const
Definition: EventChannel.h:179
The EventQueue is a circular buffer, that contains _size-1 events.
Definition: EventQueue.h:57
static bool exists()
Library code may create Event Service objects without the need for persistency.
Obtains an output stream to the active persistancy logfile, and locks it for exclusive access.
void deferredRequest(CORBA::Request_ptr req, Callback *callback=NULL)
Adopts the request and then stores it in _deferredRequests.
Definition: Orb.cc:187
static Orb & inst()
Definition: Orb.h:81
string attrString(const string &key, const string &fallback="") const
Definition: PersistNode.cc:155
Base class for ServantActivator classes that manage Proxy servants.
Definition: ProxyManager.h:60
void activate(const char *name)
Creates the Proxy-type's POA, and registers this object as its ServantManager.
set< Proxy * > _servants
The set of all active Proxies in this object's _managedPoa.
Definition: ProxyManager.h:90
PortableServer::POA_var _managedPoa
The POA owned & managed by this object.
Definition: ProxyManager.h:95
Base class for three of the four Proxy servants.
Definition: ProxyManager.h:107
void basicOutput(ostream &os, const char *name, CORBA::Object_ptr target=CORBA::Object::_nil(), const char *extraAttributes=NULL)
Helper method for constructing persistency output.
CORBA::Request_var _req
Definition: ProxyManager.h:128
void eraseKey(const char *name)
Helper method for constructing persistency output.
EventQueue & _queue
Reference to queue shared with ProxyPushSuppliers.
void disconnect()
Send disconnect_pull_consumer() to all connected PullConsumers.
OMNIEVENTS__DEBUG_REF_COUNTS__DECL CosEventChannelAdmin::ProxyPullSupplier_ptr createObject()
ProxyPullSupplierManager(const EventChannel_i &channel, PortableServer::POA_ptr parentPoa, EventQueue &q)
PortableServer::Servant incarnate(const PortableServer::ObjectId &oid, PortableServer::POA_ptr poa)
Servant for ProxyPullSupplier interface.
void output(ostream &os)
Save this object's state to a stream.
ProxyPullSupplier_i(PortableServer::POA_ptr poa, EventQueue &q)
void touch()
Update the _timestamp to the current moment.
unsigned long timestamp() const
CORBA::Any * try_pull(CORBA::Boolean &has_event)
CosEventComm::PullConsumer_var _target
unsigned long _timestamp
Keep track of when this proxy was last contacted.
bool _connected
Can't use _target to keep track of whether this object is connected, because it is legal to connect w...
void connect_pull_consumer(CosEventComm::PullConsumer_ptr pullConsumer)
void reincarnate(const string &oid, const PersistNode &node)
Re-create a servant from information saved in the log file.
void activateObjectWithId(const char *oidStr)
Calls activate_object_with_id() to activate this servant in its POA.
Definition: Servant.cc:125
void deactivateObject()
Calls deactivate_object() to deactivate this servant in its POA.
Definition: Servant.cc:160