My Project
VariableSizeCommunicator.hpp
1 // -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2 // vi: set et ts=4 sw=2 sts=2:
3 
4 // This file is copied from file dune/common/variablesizecommunicator.hh
5 // of dune-common as of commit 30e78443884f7730229e6c59b14406051d71c032.
6 // It is needed as we need an important bugfix (commit 30e78443884)
7 // that is not included in DUNE
8 // versions below 2.8. We have moved the classes to the Opm namespace.
9 // The file's main author is Markus Blatt. Others have only contributed
10 // minor fixes proposed by compilers, and codespell to prevent unused warnings
11 // and variables or improved documentation.
12 /*
13 Copyright 2013-2018 Dr. Blatt - HPC-Simulation-Software & Services
14 Copyright 2013-2017 Statoil ASA
15 */
16 #ifndef OPM_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH // Still fits the line!
17 #define OPM_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH
18 
19 #if HAVE_MPI
20 
21 #include <algorithm>
22 #include <cassert>
23 #include <cstddef>
24 #include <functional>
25 #include <map>
26 #include <memory>
27 #include <utility>
28 #include <vector>
29 #include <algorithm>
30 
31 #include <mpi.h>
32 
33 #include <dune/common/parallel/interface.hh>
34 #include <dune/common/parallel/mpitraits.hh>
35 #include <dune/common/unused.hh>
36 
49 namespace Opm
50 {
51 
52 namespace
53 {
58 template<class T, class Allocator=std::allocator<T> >
59 class MessageBuffer
60 {
61 public:
66  explicit MessageBuffer(int size)
67  : buffer_(new T[size]), size_(size), position_(0)
68  {}
73  explicit MessageBuffer(const MessageBuffer& o)
74  : buffer_(new T[o.size_]), size_(o.size_), position_(o.position_)
75  {
76  }
78  ~MessageBuffer()
79  {
80  delete[] buffer_;
81  }
86  void write(const T& data)
87  {
88  assert(position_<size_);
89  buffer_[position_++]=data;
90  }
91 
96  void read(T& data)
97  {
98  assert(position_<size_);
99  data=buffer_[position_++];
100  }
101 
107  void reset()
108  {
109  position_=0;
110  }
111 
116  bool finished()
117  {
118  return position_==size_;
119  }
120 
126  bool hasSpaceForItems(int noItems)
127  {
128  return position_+noItems<=size_;
129  }
134  std::size_t size() const
135  {
136  return size_;
137  }
138  std::size_t position() const
139  {
140  return position_;
141  }
146  operator T*()
147  {
148  return buffer_;
149  }
150 
151 private:
155  T* buffer_;
159  std::size_t size_;
163  std::size_t position_;
164 };
165 
166 using Dune::InterfaceInformation;
167 using Dune::Interface;
168 
172 class InterfaceTracker
173 {
174 public:
180  InterfaceTracker(int rank, InterfaceInformation info, std::size_t fixedsize=0,
181  bool allocateSizes=false)
182  : fixedSize(fixedsize),rank_(rank), index_(), interface_(info), sizes_()
183  {
184  if(allocateSizes)
185  {
186  sizes_.resize(info.size());
187  }
188  }
189 
193  void moveToNextIndex()
194  {
195  index_++;
196  assert(index_<=interface_.size());
197  skipZeroIndices();
198  }
203  void increment(std::size_t i)
204  {
205  index_+=i;
206  assert(index_<=interface_.size());
207  }
212  bool finished() const
213  {
214  return index_==interface_.size();
215  }
216 
217  void skipZeroIndices()
218  {
219  // skip indices with size zero!
220  while(sizes_.size() && index_!=interface_.size() &&!size())
221  ++index_;
222  }
223 
228  std::size_t index() const
229  {
230  return interface_[index_];
231  }
235  std::size_t size() const
236  {
237  assert(sizes_.size());
238  return sizes_[index_];
239  }
243  std::size_t* getSizesPointer()
244  {
245  return &sizes_[0];
246  }
251  bool empty() const
252  {
253  return !interface_.size();
254  }
255 
260  std::size_t indicesLeft() const
261  {
262  return interface_.size()-index_;
263  }
267  std::size_t fixedSize;
271  int rank() const
272  {
273  return rank_;
274  }
278  std::size_t offset() const
279  {
280  return index_;
281  }
282 private:
284  int rank_;
286  std::size_t index_;
288  InterfaceInformation interface_;
289  std::vector<std::size_t> sizes_;
290 };
291 
292 
293 } // end unnamed namespace
294 
332 template<class Allocator=std::allocator<std::pair<InterfaceInformation,InterfaceInformation> > >
333 class VariableSizeCommunicator
334 {
335 public:
340  typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation>,
341  std::less<int>,
342  typename Allocator::template rebind<std::pair<const int,std::pair<InterfaceInformation,InterfaceInformation> > >::other> InterfaceMap;
343 
344 #ifndef DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE
351  VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf)
352  : maxBufferSize_(32768), interface_(&inf)
353  {
354  MPI_Comm_dup(comm, &communicator_);
355  }
360  VariableSizeCommunicator(const Interface& inf)
361  : maxBufferSize_(32768), interface_(&inf.interfaces())
362  {
363  MPI_Comm_dup(inf.communicator(), &communicator_);
364  }
365 #else
372  VariableSizeCommunicator(MPI_Comm comm, InterfaceMap& inf)
373  : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
374  interface_(&inf)
375  {
376  MPI_Comm_dup(comm, &communicator_);
377  }
382  VariableSizeCommunicator(const Interface& inf)
383  : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
384  interface_(&inf.interfaces())
385  {
386  MPI_Comm_dup(inf.communicator(), &communicator_);
387  }
388 #endif
395  VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf, std::size_t max_buffer_size)
396  : maxBufferSize_(max_buffer_size), interface_(&inf)
397  {
398  MPI_Comm_dup(comm, &communicator_);
399  }
400 
406  VariableSizeCommunicator(const Interface& inf, std::size_t max_buffer_size)
407  : maxBufferSize_(max_buffer_size), interface_(&inf.interfaces())
408  {
409  MPI_Comm_dup(inf.communicator(), &communicator_);
410  }
411 
412  ~VariableSizeCommunicator()
413  {
414  MPI_Comm_free(&communicator_);
415  }
416 
417 
437  template<class DataHandle>
438  void forward(DataHandle& handle)
439  {
440  communicate<true>(handle);
441  }
442 
462  template<class DataHandle>
463  void backward(DataHandle& handle)
464  {
465  communicate<false>(handle);
466  }
467 
468 private:
469  template<bool FORWARD, class DataHandle>
470  void communicateSizes(DataHandle& handle,
471  std::vector<InterfaceTracker>& recv_trackers);
472 
479  template<bool forward,class DataHandle>
480  void communicate(DataHandle& handle);
490  template<bool FORWARD, class DataHandle>
491  void setupInterfaceTrackers(DataHandle& handle,
492  std::vector<InterfaceTracker>& send_trackers,
493  std::vector<InterfaceTracker>& recv_trackers);
501  template<bool FORWARD, class DataHandle>
502  [[maybe_unused]] void communicateFixedSize(DataHandle& handle);
510  template<bool FORWARD, class DataHandle>
511  void communicateVariableSize(DataHandle& handle);
518  std::size_t maxBufferSize_;
526  const InterfaceMap* interface_;
532  MPI_Comm communicator_;
533 };
534 
536 namespace
537 {
541 template<class DataHandle>
542 class SizeDataHandle
543 {
544 public:
545  typedef std::size_t DataType;
546 
547  SizeDataHandle(DataHandle& data,
548  std::vector<InterfaceTracker>& trackers)
549  : data_(data), trackers_(trackers), index_()
550  {}
551  bool fixedsize()
552  {
553  return true;
554  }
555  std::size_t size(std::size_t i)
556  {
557  DUNE_UNUSED_PARAMETER(i);
558  return 1;
559  }
560  template<class B>
561  void gather(B& buf, int i)
562  {
563  buf.write(data_.size(i));
564  }
565  void setReceivingIndex(std::size_t i)
566  {
567  index_=i;
568  }
569  std::size_t* getSizesPointer()
570  {
571  return trackers_[index_].getSizesPointer();
572  }
573 
574 private:
575  DataHandle& data_;
576  std::vector<InterfaceTracker>& trackers_;
577  int index_;
578 };
579 
580 template<class T>
581 void setReceivingIndex(T&, int)
582 {}
583 
584 template<class T>
585 void setReceivingIndex(SizeDataHandle<T>& t, int i)
586 {
587  t.setReceivingIndex(i);
588 }
589 
590 
596 template<bool FORWARD>
597 struct InterfaceInformationChooser
598 {
602  static const InterfaceInformation&
603  getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
604  {
605  return info.first;
606  }
607 
611  static const InterfaceInformation&
612  getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
613  {
614  return info.second;
615  }
616 };
617 
618 template<>
619 struct InterfaceInformationChooser<false>
620 {
621  static const InterfaceInformation&
622  getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
623  {
624  return info.second;
625  }
626 
627  static const InterfaceInformation&
628  getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
629  {
630  return info.first;
631  }
632 };
633 
639 template<class DataHandle>
640 struct PackEntries
641 {
642 
643  int operator()(DataHandle& handle, InterfaceTracker& tracker,
644  MessageBuffer<typename DataHandle::DataType>& buffer,
645  int i) const
646  {
647  DUNE_UNUSED_PARAMETER(i);
648  return operator()(handle,tracker,buffer);
649  }
650 
658  int operator()(DataHandle& handle, InterfaceTracker& tracker,
659  MessageBuffer<typename DataHandle::DataType>& buffer) const
660  {
661  if(tracker.fixedSize) // fixed size if variable is >0!
662  {
663 
664  std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
665  for(std::size_t i=0; i< noIndices; ++i)
666  {
667  handle.gather(buffer, tracker.index());
668  tracker.moveToNextIndex();
669  }
670  return noIndices*tracker.fixedSize;
671  }
672  else
673  {
674  int packed=0;
675  tracker.skipZeroIndices();
676  while(!tracker.finished())
677  if(buffer.hasSpaceForItems(handle.size(tracker.index())))
678  {
679  assert(std::size_t(packed) == buffer.position());
680  handle.gather(buffer, tracker.index());
681  packed+=handle.size(tracker.index());
682  assert(std::size_t(packed) <= buffer.size());
683  assert(std::size_t(packed) == buffer.position());
684  tracker.moveToNextIndex();
685  }
686  else
687  break;
688  return packed;
689  }
690  }
691 };
692 
698 template<class DataHandle>
699 struct UnpackEntries{
700 
708  bool operator()(DataHandle& handle, InterfaceTracker& tracker,
709  MessageBuffer<typename DataHandle::DataType>& buffer,
710  int count=0)
711  {
712  if(tracker.fixedSize) // fixed size if variable is >0!
713  {
714  std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
715 
716  for(std::size_t i=0; i< noIndices; ++i)
717  {
718  handle.scatter(buffer, tracker.index(), tracker.fixedSize);
719  tracker.moveToNextIndex();
720  }
721  return tracker.finished();
722  }
723  else
724  {
725  assert(count);
726  for(int unpacked=0;unpacked<count;)
727  {
728  assert(!tracker.finished());
729  assert(buffer.hasSpaceForItems(tracker.size()));
730  handle.scatter(buffer, tracker.index(), tracker.size());
731  unpacked+=tracker.size();
732  tracker.moveToNextIndex();
733  }
734  return tracker.finished();
735  }
736  }
737 };
738 
739 
743 template<class DataHandle>
744 struct UnpackSizeEntries{
745 
753  bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
754  MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer) const
755  {
756  std::size_t noIndices=std::min(buffer.size(), tracker.indicesLeft());
757  std::copy(static_cast<std::size_t*>(buffer), static_cast<std::size_t*>(buffer)+noIndices,
758  handle.getSizesPointer()+tracker.offset());
759  tracker.increment(noIndices);
760  return noIndices;
761  }
762  bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
763  MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer, int) const
764  {
765  return operator()(handle,tracker,buffer);
766  }
767 };
768 
776 [[maybe_unused]] void sendFixedSize(std::vector<InterfaceTracker>& send_trackers,
777  std::vector<MPI_Request>& send_requests,
778  std::vector<InterfaceTracker>& recv_trackers,
779  std::vector<MPI_Request>& recv_requests,
780  MPI_Comm communicator)
781 {
782  typedef std::vector<InterfaceTracker>::iterator TIter;
783  std::vector<MPI_Request>::iterator mIter=recv_requests.begin();
784 
785  for(TIter iter=recv_trackers.begin(), end=recv_trackers.end(); iter!=end;
786  ++iter, ++mIter)
787  {
788  MPI_Irecv(&(iter->fixedSize), 1, Dune::MPITraits<std::size_t>::getType(),
789  iter->rank(), 933881, communicator, &(*mIter));
790  }
791 
792  // Send our size to all neighbours using non-blocking synchronous communication.
793  std::vector<MPI_Request>::iterator mIter1=send_requests.begin();
794  for(TIter iter=send_trackers.begin(), end=send_trackers.end();
795  iter!=end;
796  ++iter, ++mIter1)
797  {
798  MPI_Issend(&(iter->fixedSize), 1, Dune::MPITraits<std::size_t>::getType(),
799  iter->rank(), 933881, communicator, &(*mIter1));
800  }
801 }
802 
803 
808 template<class DataHandle>
809 struct SetupSendRequest{
810  void operator()(DataHandle& handle,
811  InterfaceTracker& tracker,
812  MessageBuffer<typename DataHandle::DataType>& buffer,
813  MPI_Request& request,
814  MPI_Comm comm) const
815  {
816  buffer.reset();
817  int size=PackEntries<DataHandle>()(handle, tracker, buffer);
818  // Skip indices of zero size.
819  while(!tracker.finished() && !handle.size(tracker.index()))
820  tracker.moveToNextIndex();
821  if(size)
822  {
823  assert(std::size_t(size) <= buffer.size());
824  MPI_Issend(buffer, size, Dune::MPITraits<typename DataHandle::DataType>::getType(),
825  tracker.rank(), 933399, comm, &request);
826  }
827  }
828 };
829 
830 
835 template<class DataHandle>
836 struct SetupRecvRequest{
837  void operator()(DataHandle& /*handle*/,
838  InterfaceTracker& tracker,
839  MessageBuffer<typename DataHandle::DataType>& buffer,
840  MPI_Request& request,
841  MPI_Comm comm) const
842  {
843  buffer.reset();
844  if(tracker.indicesLeft())
845  MPI_Irecv(buffer, buffer.size(), Dune::MPITraits<typename DataHandle::DataType>::getType(),
846  tracker.rank(), 933399, comm, &request);
847  }
848 };
849 
853 template<class DataHandle>
854 struct NullPackUnpackFunctor
855 {
856  int operator()(DataHandle&, InterfaceTracker&,
857  MessageBuffer<typename DataHandle::DataType>&, int)
858  {
859  return 0;
860  }
861  int operator()(DataHandle&, InterfaceTracker&,
862  MessageBuffer<typename DataHandle::DataType>&)
863  {
864  return 0;
865  }
866 };
867 
882 template<class DataHandle, class BufferFunctor, class CommunicationFunctor>
883 std::size_t checkAndContinue(DataHandle& handle,
884  std::vector<InterfaceTracker>& trackers,
885  std::vector<MPI_Request>& requests,
886  std::vector<MPI_Request>& requests2,
887  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
888  MPI_Comm comm,
889  BufferFunctor buffer_func,
890  CommunicationFunctor comm_func,
891  bool valid=true,
892  bool getCount=false)
893 {
894  std::size_t size=requests.size();
895  std::vector<MPI_Status> statuses(size);
896  int no_completed;
897  std::vector<int> indices(size, -1); // the indices for which the communication finished.
898 
899  MPI_Testsome(size, &(requests[0]), &no_completed, &(indices[0]), &(statuses[0]));
900  indices.resize(no_completed);
901  for(std::vector<int>::iterator index=indices.begin(), end=indices.end();
902  index!=end; ++index)
903  {
904  InterfaceTracker& tracker=trackers[*index];
905  setReceivingIndex(handle, *index);
906  if(getCount)
907  {
908  // Get the number of entries received
909  int count;
910  MPI_Get_count(&(statuses[index-indices.begin()]),
911  Dune::MPITraits<typename DataHandle::DataType>::getType(),
912  &count);
913  // Communication completed, we can reuse the buffers, e.g. unpack or repack
914  buffer_func(handle, tracker, buffers[*index], count);
915  }else
916  buffer_func(handle, tracker, buffers[*index]);
917  tracker.skipZeroIndices();
918  if(!tracker.finished()){
919  // Maybe start another communication.
920  comm_func(handle, tracker, buffers[*index], requests2[*index], comm);
921  tracker.skipZeroIndices();
922  if(valid)
923  --no_completed; // communication not finished, decrement counter for finished ones.
924  }
925  }
926  return no_completed;
927 
928 }
929 
939 template<class DataHandle>
940 std::size_t receiveSizeAndSetupReceive(DataHandle& handle,
941  std::vector<InterfaceTracker>& trackers,
942  std::vector<MPI_Request>& size_requests,
943  std::vector<MPI_Request>& data_requests,
944  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
945  MPI_Comm comm)
946 {
947  return checkAndContinue(handle, trackers, size_requests, data_requests, buffers, comm,
948  NullPackUnpackFunctor<DataHandle>(), SetupRecvRequest<DataHandle>(), false);
949 }
950 
959 template<class DataHandle>
960 std::size_t checkSendAndContinueSending(DataHandle& handle,
961  std::vector<InterfaceTracker>& trackers,
962  std::vector<MPI_Request>& requests,
963  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
964  MPI_Comm comm)
965 {
966  return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
967  NullPackUnpackFunctor<DataHandle>(), SetupSendRequest<DataHandle>());
968 }
969 
978 template<class DataHandle>
979 std::size_t checkReceiveAndContinueReceiving(DataHandle& handle,
980  std::vector<InterfaceTracker>& trackers,
981  std::vector<MPI_Request>& requests,
982  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
983  MPI_Comm comm)
984 {
985  return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
986  UnpackEntries<DataHandle>(), SetupRecvRequest<DataHandle>(),
987  true, !handle.fixedsize());
988 }
989 
990 
991 [[maybe_unused]] bool validRecvRequests(const std::vector<MPI_Request> reqs)
992 {
993  for(std::vector<MPI_Request>::const_iterator i=reqs.begin(), end=reqs.end();
994  i!=end; ++i)
995  if(*i!=MPI_REQUEST_NULL)
996  return true;
997  return false;
998 }
999 
1010 template<class DataHandle, class Functor>
1011 std::size_t setupRequests(DataHandle& handle,
1012  std::vector<InterfaceTracker>& trackers,
1013  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
1014  std::vector<MPI_Request>& requests,
1015  const Functor& setupFunctor,
1016  MPI_Comm communicator)
1017 {
1018  typedef typename std::vector<InterfaceTracker>::iterator TIter;
1019  typename std::vector<MessageBuffer<typename DataHandle::DataType> >::iterator
1020  biter=buffers.begin();
1021  typename std::vector<MPI_Request>::iterator riter=requests.begin();
1022  std::size_t complete=0;
1023  for(TIter titer=trackers.begin(), end=trackers.end(); titer!=end; ++titer, ++biter, ++riter)
1024  {
1025  setupFunctor(handle, *titer, *biter, *riter, communicator);
1026  complete+=titer->finished();
1027  }
1028  return complete;
1029 }
1030 } // end unnamed namespace
1031 
1032 template<class Allocator>
1033 template<bool FORWARD, class DataHandle>
1034 void VariableSizeCommunicator<Allocator>::setupInterfaceTrackers(DataHandle& handle,
1035  std::vector<InterfaceTracker>& send_trackers,
1036  std::vector<InterfaceTracker>& recv_trackers)
1037 {
1038  if(interface_->size()==0)
1039  return;
1040  send_trackers.reserve(interface_->size());
1041  recv_trackers.reserve(interface_->size());
1042 
1043  int fixedsize=0;
1044  if(handle.fixedsize())
1045  ++fixedsize;
1046 
1047 
1048  typedef typename InterfaceMap::const_iterator IIter;
1049  for(IIter inf=interface_->begin(), end=interface_->end(); inf!=end; ++inf)
1050  {
1051 
1052  if(handle.fixedsize() && InterfaceInformationChooser<FORWARD>::getSend(inf->second).size())
1053  fixedsize=handle.size(InterfaceInformationChooser<FORWARD>::getSend(inf->second)[0]);
1054  assert(!handle.fixedsize()||fixedsize>0);
1055  send_trackers.push_back(InterfaceTracker(inf->first,
1056  InterfaceInformationChooser<FORWARD>::getSend(inf->second), fixedsize));
1057  recv_trackers.push_back(InterfaceTracker(inf->first,
1058  InterfaceInformationChooser<FORWARD>::getReceive(inf->second), fixedsize, fixedsize==0));
1059  }
1060 }
1061 
1062 template<class Allocator>
1063 template<bool FORWARD, class DataHandle>
1064 void VariableSizeCommunicator<Allocator>::communicateFixedSize(DataHandle& handle)
1065 {
1066  std::vector<MPI_Request> size_send_req(interface_->size());
1067  std::vector<MPI_Request> size_recv_req(interface_->size());
1068 
1069  std::vector<InterfaceTracker> send_trackers;
1070  std::vector<InterfaceTracker> recv_trackers;
1071  setupInterfaceTrackers<FORWARD>(handle,send_trackers, recv_trackers);
1072  sendFixedSize(send_trackers, size_send_req, recv_trackers, size_recv_req, communicator_);
1073 
1074  std::vector<MPI_Request> data_send_req(interface_->size(), MPI_REQUEST_NULL);
1075  std::vector<MPI_Request> data_recv_req(interface_->size(), MPI_REQUEST_NULL);
1076  typedef typename DataHandle::DataType DataType;
1077  std::vector<MessageBuffer<DataType> > send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1078  recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1079 
1080 
1081  setupRequests(handle, send_trackers, send_buffers, data_send_req,
1082  SetupSendRequest<DataHandle>(), communicator_);
1083 
1084  std::size_t no_size_to_recv, no_to_send, no_to_recv, old_size;
1085  no_size_to_recv = no_to_send = no_to_recv = old_size = interface_->size();
1086 
1087  // Skip empty interfaces.
1088  typedef typename std::vector<InterfaceTracker>::const_iterator Iter;
1089  for(Iter i=recv_trackers.begin(), end=recv_trackers.end(); i!=end; ++i)
1090  if(i->empty())
1091  --no_to_recv;
1092  for(Iter i=send_trackers.begin(), end=send_trackers.end(); i!=end; ++i)
1093  if(i->empty())
1094  --no_to_send;
1095 
1096  while(no_size_to_recv+no_to_send+no_to_recv)
1097  {
1098  // Receive the fixedsize and setup receives accordingly
1099  if(no_size_to_recv)
1100  no_size_to_recv -= receiveSizeAndSetupReceive(handle,recv_trackers, size_recv_req,
1101  data_recv_req, recv_buffers,
1102  communicator_);
1103 
1104  // Check send completion and initiate other necessary sends
1105  if(no_to_send)
1106  no_to_send -= checkSendAndContinueSending(handle, send_trackers, data_send_req,
1107  send_buffers, communicator_);
1108  if(validRecvRequests(data_recv_req))
1109  // Receive data and setup new unblocking receives if necessary
1110  no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, data_recv_req,
1111  recv_buffers, communicator_);
1112  }
1113 
1114  // Wait for completion of sending the size.
1115  //std::vector<MPI_Status> statuses(interface_->size(), MPI_STATUSES_IGNORE);
1116  MPI_Waitall(size_send_req.size(), &(size_send_req[0]), MPI_STATUSES_IGNORE);
1117 
1118 }
1119 
1120 template<class Allocator>
1121 template<bool FORWARD, class DataHandle>
1122 void VariableSizeCommunicator<Allocator>::communicateSizes(DataHandle& handle,
1123  std::vector<InterfaceTracker>& data_recv_trackers)
1124 {
1125  std::vector<InterfaceTracker> send_trackers;
1126  std::vector<InterfaceTracker> recv_trackers;
1127  std::size_t size = interface_->size();
1128  std::vector<MPI_Request> send_requests(size, MPI_REQUEST_NULL);
1129  std::vector<MPI_Request> recv_requests(size, MPI_REQUEST_NULL);
1130  std::vector<MessageBuffer<std::size_t> >
1131  send_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_)),
1132  recv_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_));
1133  SizeDataHandle<DataHandle> size_handle(handle,data_recv_trackers);
1134  setupInterfaceTrackers<FORWARD>(size_handle,send_trackers, recv_trackers);
1135  setupRequests(size_handle, send_trackers, send_buffers, send_requests,
1136  SetupSendRequest<SizeDataHandle<DataHandle> >(), communicator_);
1137  setupRequests(size_handle, recv_trackers, recv_buffers, recv_requests,
1138  SetupRecvRequest<SizeDataHandle<DataHandle> >(), communicator_);
1139 
1140  // Count valid requests that we have to wait for.
1141  auto valid_req_func =
1142  [](const MPI_Request& req) { return req != MPI_REQUEST_NULL; };
1143 
1144  auto size_to_send = std::count_if(send_requests.begin(), send_requests.end(),
1145  valid_req_func);
1146  auto size_to_recv = std::count_if(recv_requests.begin(), recv_requests.end(),
1147  valid_req_func);
1148 
1149  while(size_to_send+size_to_recv)
1150  {
1151  if(size_to_send)
1152  size_to_send -=
1153  checkSendAndContinueSending(size_handle, send_trackers, send_requests,
1154  send_buffers, communicator_);
1155  if(size_to_recv)
1156  // Could have done this using checkSendAndContinueSending
1157  // But the call below is more efficient as UnpackSizeEntries
1158  // uses std::copy.
1159  size_to_recv -=
1160  checkAndContinue(size_handle, recv_trackers, recv_requests, recv_requests,
1161  recv_buffers, communicator_, UnpackSizeEntries<DataHandle>(),
1162  SetupRecvRequest<SizeDataHandle<DataHandle> >());
1163  }
1164 }
1165 
1166 template<class Allocator>
1167 template<bool FORWARD, class DataHandle>
1168 void VariableSizeCommunicator<Allocator>::communicateVariableSize(DataHandle& handle)
1169 {
1170 
1171  std::vector<InterfaceTracker> send_trackers;
1172  std::vector<InterfaceTracker> recv_trackers;
1173  setupInterfaceTrackers<FORWARD>(handle, send_trackers, recv_trackers);
1174 
1175  std::vector<MPI_Request> send_requests(interface_->size(), MPI_REQUEST_NULL);
1176  std::vector<MPI_Request> recv_requests(interface_->size(), MPI_REQUEST_NULL);
1177  typedef typename DataHandle::DataType DataType;
1178  std::vector<MessageBuffer<DataType> >
1179  send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1180  recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1181 
1182  communicateSizes<FORWARD>(handle, recv_trackers);
1183  // Setup requests for sending and receiving.
1184  setupRequests(handle, send_trackers, send_buffers, send_requests,
1185  SetupSendRequest<DataHandle>(), communicator_);
1186  setupRequests(handle, recv_trackers, recv_buffers, recv_requests,
1187  SetupRecvRequest<DataHandle>(), communicator_);
1188 
1189  // Determine number of valid requests.
1190  auto valid_req_func =
1191  [](const MPI_Request& req) { return req != MPI_REQUEST_NULL;};
1192 
1193  auto no_to_send = std::count_if(send_requests.begin(), send_requests.end(),
1194  valid_req_func);
1195  auto no_to_recv = std::count_if(recv_requests.begin(), recv_requests.end(),
1196  valid_req_func);
1197  while(no_to_send+no_to_recv)
1198  {
1199  // Check send completion and initiate other necessary sends
1200  if(no_to_send)
1201  no_to_send -= checkSendAndContinueSending(handle, send_trackers, send_requests,
1202  send_buffers, communicator_);
1203  if(no_to_recv)
1204  // Receive data and setup new unblocking receives if necessary
1205  no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, recv_requests,
1206  recv_buffers, communicator_);
1207  }
1208 }
1209 
1210 template<class Allocator>
1211 template<bool FORWARD, class DataHandle>
1212 void VariableSizeCommunicator<Allocator>::communicate(DataHandle& handle)
1213 {
1214  if( interface_->size() == 0)
1215  // Simply return as otherwise we will index an empty container
1216  // either for MPI_Wait_all or MPI_Test_some.
1217  return;
1218 
1219  if(handle.fixedsize())
1220  communicateFixedSize<FORWARD>(handle);
1221  else
1222  communicateVariableSize<FORWARD>(handle);
1223 }
1224 } // end namespace Dune
1225 
1226 #endif // HAVE_MPI
1227 
1228 #endif
Holds the implementation of the CpGrid as a pimple.
Definition: CellQuadrature.hpp:29