OmniEvents
ConsumerAdmin.cc
Go to the documentation of this file.
1 // Package : omniEvents
2 // ConsumerAdmin.cc Created : 2003/12/04
3 // Author : Alex Tingle
4 //
5 // Copyright (C) 2003-2005 Alex Tingle.
6 //
7 // This file is part of the omniEvents application.
8 //
9 // omniEvents is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Lesser General Public
11 // License as published by the Free Software Foundation; either
12 // version 2.1 of the License, or (at your option) any later version.
13 //
14 // omniEvents is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Lesser General Public License for more details.
18 //
19 // You should have received a copy of the GNU Lesser General Public
20 // License along with this library; if not, write to the Free Software
21 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 //
23 
24 #include "ConsumerAdmin.h"
25 
26 #include "EventChannel.h"
27 #include "ProxyPushSupplier.h"
28 #include "ProxyPullSupplier.h"
29 #include "Orb.h"
30 #include "PersistNode.h"
31 #include "Filter.h"
32 
33 namespace OmniEvents {
34 
35 
36 CosEventChannelAdmin::ProxyPushSupplier_ptr
38 {
39  if(!_pushSupplier)
41  return _pushSupplier->createObject();
42 }
43 
44 
45 CosEventChannelAdmin::ProxyPullSupplier_ptr
47 {
48  if(!_pullSupplier)
50  return _pullSupplier->createObject();
51 }
52 
53 
55  const EventChannel_i& channel,
56  PortableServer::POA_ptr poa
57 )
58 : Servant(poa),
59  _channel(channel),
60  _queue(channel.maxQueueLength()),
61  _pushSupplier(NULL),
62  _pullSupplier(NULL)
63 {
64  if(_channel.properties().hasAttr("FilterId"))
65  {
66  string rid =_channel.properties().attrString("FilterId");
67  _queue.setFilter(new FilterByRepositoryId(rid.c_str()));
68  }
69  else if(_channel.properties().hasAttr("FilterKind"))
70  {
71  CORBA::TCKind kind =
72  CORBA::TCKind(_channel.properties().attrLong("FilterKind"));
73  _queue.setFilter(new FilterByTCKind(kind));
74  }
75 
76  activateObjectWithId("ConsumerAdmin");
77 }
78 
79 
81 {
82  DB(20,"~ConsumerAdmin_i()")
83  if(_pushSupplier)
84  {
85  _pushSupplier->_remove_ref(); // terminates thread.
86  _pushSupplier=NULL;
87  }
88  if(_pullSupplier)
89  {
90  _pullSupplier->_remove_ref();
91  _pullSupplier=NULL;
92  }
93 }
94 
95 
97 
98 
99 void ConsumerAdmin_i::send(CORBA::Any* event)
100 {
102  _queue.append(event);
103 }
104 
105 
106 void ConsumerAdmin_i::send(list<CORBA::Any*>& events)
107 {
108  if(!events.empty())
109  {
111  for(list<CORBA::Any*>::iterator i=events.begin(); i!=events.end(); ++i)
112  _queue.append( *i );
113  events.clear();
114  }
115 }
116 
117 
119 {
120  if(_pushSupplier)
122  if(_pullSupplier)
124 }
125 
126 
128 {
129  // Build Push Supplier proxies
130  PersistNode* pushsNode =node.child("ProxyPushSupplier");
131  if(pushsNode && !pushsNode->_child.empty())
132  {
134  _pushSupplier->reincarnate(*pushsNode);
135  }
136 
137  // Build Pull Supplier proxies
138  PersistNode* pullsNode =node.child("ProxyPullSupplier");
139  if(pullsNode && !pullsNode->_child.empty())
140  {
142  _pullSupplier->reincarnate(*pullsNode);
143  }
144 }
145 
146 
147 void ConsumerAdmin_i::output(ostream& os)
148 {
149  if(_pushSupplier)
150  {
151  omni_mutex_lock l(_pushSupplier->_lock);
152  _pushSupplier->output(os);
153  }
154  if(_pullSupplier)
155  {
156  _pullSupplier->output(os);
157  }
158 }
159 
160 
161 }; // end namespace OmniEvents
#define DB(l, x)
Definition: Orb.h:49
#define OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(C)
Defines debug versions of _add/remove_ref() for class C.
Definition: Servant.h:70
OMNIEVENTS__DEBUG_REF_COUNTS__DECL void send(CORBA::Any *event)
Queues a single event for sending to consumers.
ConsumerAdmin_i(const EventChannel_i &channel, PortableServer::POA_ptr poa)
void reincarnate(const PersistNode &node)
Populate this servant from log information.
void disconnect()
Send disconnect_XXX_consumer() to all connected consumers.
ProxyPushSupplierManager * _pushSupplier
Definition: ConsumerAdmin.h:90
const EventChannel_i & _channel
Definition: ConsumerAdmin.h:88
CosEventChannelAdmin::ProxyPushSupplier_ptr obtain_push_supplier()
CosEventChannelAdmin::ProxyPullSupplier_ptr obtain_pull_supplier()
void output(ostream &os)
Save this object's state to a stream.
ProxyPullSupplierManager * _pullSupplier
Definition: ConsumerAdmin.h:91
Servant for CosEventChannelAdmin::EventChannel objects, also inherits from omni_thread.
Definition: EventChannel.h:115
const PersistNode & properties() const
Definition: EventChannel.h:170
void setFilter(Filter *filter)
Definition: EventQueue.h:72
void append(CORBA::Any *event)
Definition: EventQueue.h:78
The most basic event filter allows only events of a certain CORBA TCKind to pass.
Definition: Filter.h:67
Allows only events of a certain CORBA RepositoryId to pass.
Definition: Filter.h:85
map< string, PersistNode * > _child
Definition: PersistNode.h:71
bool hasAttr(const string &key) const
Definition: PersistNode.cc:151
string attrString(const string &key, const string &fallback="") const
Definition: PersistNode.cc:155
long attrLong(const string &key, long fallback=0) const
Definition: PersistNode.cc:163
PersistNode * child(const string &key) const
Definition: PersistNode.cc:171
void reincarnate(const PersistNode &node)
Re-create servants from information saved in the log file.
Definition: ProxyManager.cc:60
void output(ostream &os)
Save this object's state to a stream.
Definition: ProxyManager.cc:87
void disconnect()
Send disconnect_pull_consumer() to all connected PullConsumers.
OMNIEVENTS__DEBUG_REF_COUNTS__DECL CosEventChannelAdmin::ProxyPullSupplier_ptr createObject()
void disconnect()
Send disconnect_push_consumer() to all connected PushConsumers.
CosEventChannelAdmin::ProxyPushSupplier_ptr createObject()
void _remove_ref()
Shutdown the thread when refCount reaches zero.
Helper class that locks ProxyPushSupplier upon construction, and wakes it up on destruction.
Base class for servants.
Definition: Servant.h:114
PortableServer::POA_var _poa
Definition: Servant.h:131
void activateObjectWithId(const char *oidStr)
Calls activate_object_with_id() to activate this servant in its POA.
Definition: Servant.cc:125