My Project
p2pcommunicator.hh
1 /*
2  Copyright 2015 IRIS AS
3 
4  This file is part of the Open Porous Media project (OPM).
5 
6  OPM is free software: you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation, either version 3 of the License, or
9  (at your option) any later version.
10 
11  OPM is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with OPM. If not, see <http://www.gnu.org/licenses/>.
18 */
19 #ifndef DUNE_COMMUNICATOR_HEADER_INCLUDED
20 #define DUNE_COMMUNICATOR_HEADER_INCLUDED
21 
22 #include <cassert>
23 #include <algorithm>
24 #include <vector>
25 #include <set>
26 #include <map>
27 
28 #include <dune/common/version.hh>
29 
30 #include <dune/common/parallel/mpihelper.hh>
31 #if DUNE_VERSION_NEWER(DUNE_COMMON, 2, 7)
32 #include <dune/common/parallel/communication.hh>
33 #else
34 #include <dune/common/parallel/collectivecommunication.hh>
35 #endif
36 
37 // the following implementation is only available in case MPI is available
38 #if HAVE_MPI
39 #if DUNE_VERSION_NEWER(DUNE_COMMON, 2, 7)
40 #include <dune/common/parallel/mpicommunication.hh>
41 #else
42 #include <dune/common/parallel/mpicollectivecommunication.hh>
43 #endif
44 #endif
45 
46 
47 namespace Dune
48 {
50  {
51  typedef std::vector< char > BufferType;
52 
53  mutable BufferType buffer_;
54  const double factor_;
55  mutable size_t pos_;
56 public:
59  SimpleMessageBuffer( const double factor = 1.1 )
60  : buffer_(), factor_( factor )
61  {
63  }
64 
66  void clear() { buffer_.clear(); resetReadPosition(); }
68  void resetReadPosition() { pos_ = 0 ; }
70  size_t size() const { return buffer_.size(); }
71 
73  void reserve( const size_t size )
74  {
75  buffer_.reserve( size );
76  }
77 
79  void resize( const size_t size )
80  {
81  buffer_.resize( size );
82  }
83 
85  template <class T>
86  void write( const T& value )
87  {
88  // union to access bytes in value
89  const size_t tsize = sizeof( T );
90  size_t pos = buffer_.size();
91  const size_t sizeNeeded = pos + tsize ;
92  // reserve with some 10% overestimation
93  if( buffer_.capacity() < sizeNeeded )
94  {
95  reserve( size_t(factor_ * sizeNeeded) ) ;
96  }
97  // resize to size need to store value
98  buffer_.resize( sizeNeeded );
99  // copy value to buffer
100  std::copy_n( reinterpret_cast<const char *> (&value), tsize, buffer_.data()+pos );
101  }
102 
103  void write( const std::string& str)
104  {
105  int size = str.size();
106  write(size);
107  for (int k = 0; k < size; ++k) {
108  write(str[k]);
109  }
110  }
111 
113  template <class T>
114  void read( T& value ) const
115  {
116  // read bytes from stream and store in value
117  const size_t tsize = sizeof( T );
118  assert( pos_ + tsize <= buffer_.size() );
119  std::copy_n( buffer_.data()+pos_, tsize, reinterpret_cast<char *> (&value) );
120  pos_ += tsize;
121  }
122 
123  void read( std::string& str) const
124  {
125  int size = 0;
126  read(size);
127  str.resize(size);
128  for (int k = 0; k < size; ++k) {
129  read(str[k]);
130  }
131  }
132 
134  std::pair< char* , int > buffer() const
135  {
136  return std::make_pair( buffer_.data(), int(buffer_.size()) );
137  }
138  };
139 
141  template < class MsgBuffer >
142  class Point2PointCommunicator : public CollectiveCommunication< MPIHelper::MPICommunicator >
143  {
144  public:
146  typedef MPIHelper::MPICommunicator MPICommunicator ;
147 
149  typedef MsgBuffer MessageBufferType ;
150 
151  protected:
152 #if DUNE_VERSION_NEWER(DUNE_GRID, 2, 7)
153  using BaseType = Dune::Communication<MPICommunicator>;
154 #else
155  using BaseType = CollectiveCommunication< MPICommunicator>;
156 #endif
158 
159  // starting message tag
160  static const int messagetag = 234;
161 
162  typedef std::map< int, int > linkage_t;
163  typedef std::vector< int > vector_t;
164 
165  linkage_t sendLinkage_ ;
166  linkage_t recvLinkage_ ;
167 
168  vector_t sendDest_ ;
169  vector_t recvSource_ ;
170 
171  mutable vector_t _recvBufferSizes;
172  mutable bool _recvBufferSizesComputed;
173 
174  public :
175  using BaseType :: rank;
176  using BaseType :: size;
177 
178  /* \brief data handle interface that needs to be implemented for use with some of
179  * the exchange methods */
181  {
182  protected:
183  DataHandleInterface () {}
184  public:
185  virtual ~DataHandleInterface () {}
186  virtual void pack( const int link, MessageBufferType& os ) = 0 ;
187  virtual void unpack( const int link, MessageBufferType& os ) = 0 ;
188  // should contain work that could be done between send and receive
189  virtual void localComputation () {}
190  };
191 
192  public:
194  Point2PointCommunicator( const MPICommunicator& mpiComm = MPIHelper::getCommunicator() )
195  : BaseType( mpiComm ) { removeLinkage(); }
196 
198  Point2PointCommunicator( const BaseType& comm ) : BaseType( comm ) { removeLinkage(); }
199 
200 
202  inline void insertRequest( const std::set< int >& sendLinks, const std::set< int >& recvLinks );
203 
205  inline int sendLinks () const { return sendLinkage_.size(); }
206 
208  inline int recvLinks () const { return recvLinkage_.size(); }
209 
211  const vector_t& recvBufferSizes() const { return _recvBufferSizes; }
212 
214  inline int sendLink (const int rank) const
215  {
216  assert (sendLinkage_.end () != sendLinkage_.find (rank)) ;
217  return (* sendLinkage_.find (rank)).second ;
218  }
219 
221  inline int recvLink (const int rank) const
222  {
223  assert (recvLinkage_.end () != recvLinkage_.find (rank)) ;
224  return (* recvLinkage_.find (rank)).second ;
225  }
226 
228  const std::vector< int > &sendDest () const { return sendDest_; }
230  const std::vector< int > &recvSource () const { return recvSource_; }
231 
233  inline void removeLinkage () ;
234 
236  virtual std::vector< MessageBufferType > exchange (const std::vector< MessageBufferType > &) const;
237 
239  virtual void exchange ( DataHandleInterface& ) const;
240 
244  virtual void exchangeCached ( DataHandleInterface& ) const;
245 
246  protected:
247  inline void computeDestinations( const linkage_t& linkage, vector_t& dest );
248 
249  // return new tag number for the exchange messages
250  static int getMessageTag( const unsigned int increment )
251  {
252  static int tag = messagetag + 2 ;
253  // increase tag counter
254  const int retTag = tag;
255  tag += increment ;
256  // the MPI standard guaratees only up to 2^15-1
257  if( tag >= 32767 )
258  {
259  // reset tag to initial value
260  tag = messagetag + 2 ;
261  }
262  return retTag;
263  }
264 
265  // return new tag number for the exchange messages
266  static int getMessageTag()
267  {
268  return getMessageTag( 1 );
269  }
270  };
271 
272 } // namespace Dune
273 
274 // include inline implementation
275 #include "p2pcommunicator_impl.hh"
276 
277 #endif // #ifndef DUNE_COMMUNICATOR_HEADER_INCLUDED
Definition: p2pcommunicator.hh:181
Point-2-Point communicator for exchange messages between processes.
Definition: p2pcommunicator.hh:143
MsgBuffer MessageBufferType
type of message buffer used
Definition: p2pcommunicator.hh:149
int recvLinks() const
return number of processes we will receive data from
Definition: p2pcommunicator.hh:208
int recvLink(const int rank) const
return recv link number for a given recv rank number
Definition: p2pcommunicator.hh:221
MPIHelper::MPICommunicator MPICommunicator
type of MPI communicator, either MPI_Comm or NoComm as defined in MPIHelper
Definition: p2pcommunicator.hh:146
int sendLink(const int rank) const
return send link number for a given send rank number
Definition: p2pcommunicator.hh:214
virtual void exchangeCached(DataHandleInterface &) const
exchange data with peers, handle defines pack and unpack of data, if receive buffers are known from p...
Definition: p2pcommunicator_impl.hh:619
int sendLinks() const
return number of processes we will send data to
Definition: p2pcommunicator.hh:205
virtual std::vector< MessageBufferType > exchange(const std::vector< MessageBufferType > &) const
exchange message buffers with peers defined by inserted linkage
Definition: p2pcommunicator_impl.hh:599
Point2PointCommunicator(const MPICommunicator &mpiComm=MPIHelper::getCommunicator())
constructor taking mpi communicator
Definition: p2pcommunicator.hh:194
const std::vector< int > & recvSource() const
return vector containing all process numbers we will receive from
Definition: p2pcommunicator.hh:230
const std::vector< int > & sendDest() const
return vector containing all process numbers we will send to
Definition: p2pcommunicator.hh:228
void insertRequest(const std::set< int > &sendLinks, const std::set< int > &recvLinks)
insert communication request with a set os ranks to send to and a set of ranks to receive from
Definition: p2pcommunicator_impl.hh:59
void removeLinkage()
remove stored linkage
Definition: p2pcommunicator_impl.hh:30
const vector_t & recvBufferSizes() const
return vector containing possible recv buffer sizes
Definition: p2pcommunicator.hh:211
Point2PointCommunicator(const BaseType &comm)
constructor taking collective communication
Definition: p2pcommunicator.hh:198
Definition: p2pcommunicator.hh:50
void resetReadPosition()
reset read position of buffer to beginning
Definition: p2pcommunicator.hh:68
size_t size() const
return size of buffer
Definition: p2pcommunicator.hh:70
void clear()
clear the buffer
Definition: p2pcommunicator.hh:66
void reserve(const size_t size)
reserve memory for 'size' entries
Definition: p2pcommunicator.hh:73
void resize(const size_t size)
resize buffer to 'size' entries
Definition: p2pcommunicator.hh:79
SimpleMessageBuffer(const double factor=1.1)
constructor taking memory reserve estimation factor (default is 1.1, i.e.
Definition: p2pcommunicator.hh:59
void read(T &value) const
read value from buffer, value must implement the operator= correctly (i.e.
Definition: p2pcommunicator.hh:114
void write(const T &value)
write value to buffer, value must implement the operator= correctly (i.e.
Definition: p2pcommunicator.hh:86
std::pair< char *, int > buffer() const
return pointer to buffer and size for use with MPI functions
Definition: p2pcommunicator.hh:134
Copyright 2019 Equinor AS.
Definition: CartesianIndexMapper.hpp:10