dune-common 3.0-git
communicator.hh
Go to the documentation of this file.
1// -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2// vi: set et ts=4 sw=2 sts=2:
3#ifndef DUNE_COMMUNICATOR
4#define DUNE_COMMUNICATOR
5
6#include "remoteindices.hh"
7#include "interface.hh"
11
12#if HAVE_MPI
13// MPI header
14#include <mpi.h>
15
16namespace Dune
17{
101 struct SizeOne
102 {};
103
110 {};
111
112
118 template<class V>
120 {
132 typedef V Type;
133
139 typedef typename V::value_type IndexedType;
140
146
155 static const void* getAddress(const V& v, int index);
156
162 static int getSize(const V&, int index);
163 };
164
165 template<class K, int n> class FieldVector;
166
167 template<class B, class A> class VariableBlockVector;
168
169 template<class K, class A, int n>
171 {
173
174 typedef typename Type::B IndexedType;
175
177
178 static const void* getAddress(const Type& v, int i);
179
180 static int getSize(const Type& v, int i);
181 };
182
187 {};
188
192 template<class T>
194 {
196
197 static const IndexedType& gather(const T& vec, std::size_t i);
198
199 static void scatter(T& vec, const IndexedType& v, std::size_t i);
200
201 };
202
214 template<typename T>
215 class DatatypeCommunicator : public InterfaceBuilder
216 {
217 public:
218
222 typedef T ParallelIndexSet;
223
228
232 typedef typename RemoteIndices::GlobalIndex GlobalIndex;
233
237 typedef typename RemoteIndices::Attribute Attribute;
238
242 typedef typename RemoteIndices::LocalIndex LocalIndex;
243
247 DatatypeCommunicator();
248
252 ~DatatypeCommunicator();
253
280 template<class T1, class T2, class V>
281 void build(const RemoteIndices& remoteIndices, const T1& sourceFlags, V& sendData, const T2& destFlags, V& receiveData);
282
286 void forward();
287
291 void backward();
292
296 void free();
297 private:
298 enum {
302 commTag_ = 234
303 };
304
308 const RemoteIndices* remoteIndices_;
309
310 typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >
311 MessageTypeMap;
312
316 MessageTypeMap messageTypes;
317
321 void* data_;
322
323 MPI_Request* requests_[2];
324
328 bool created_;
329
333 template<class V, bool FORWARD>
334 void createRequests(V& sendData, V& receiveData);
335
339 template<class T1, class T2, class V, bool send>
340 void createDataTypes(const T1& source, const T2& destination, V& data);
341
345 void sendRecv(MPI_Request* req);
346
350 struct IndexedTypeInformation
351 {
357 void build(int i)
358 {
359 length = new int[i];
360 displ = new MPI_Aint[i];
361 size = i;
362 }
363
367 void free()
368 {
369 delete[] length;
370 delete[] displ;
371 }
373 int* length;
375 MPI_Aint* displ;
381 int elements;
385 int size;
386 };
387
393 template<class V>
394 struct MPIDatatypeInformation
395 {
400 MPIDatatypeInformation(const V& data) : data_(data)
401 {}
402
408 void reserve(int proc, int size)
409 {
410 information_[proc].build(size);
411 }
418 void add(int proc, int local)
419 {
420 IndexedTypeInformation& info=information_[proc];
421 assert((info.elements)<info.size);
422 MPI_Get_address( const_cast<void*>(CommPolicy<V>::getAddress(data_, local)),
423 info.displ+info.elements);
424 info.length[info.elements]=CommPolicy<V>::getSize(data_, local);
425 info.elements++;
426 }
427
432 std::map<int,IndexedTypeInformation> information_;
436 const V& data_;
437
438 };
439
440 };
441
452 {
453
454 public:
459
466 template<class Data, class Interface>
467 typename std::enable_if<std::is_same<SizeOne,typename CommPolicy<Data>::IndexedTypeFlag>::value, void>::type
468 build(const Interface& interface);
469
477 template<class Data, class Interface>
478 void build(const Data& source, const Data& target, const Interface& interface);
479
508 template<class GatherScatter, class Data>
509 void forward(const Data& source, Data& dest);
510
539 template<class GatherScatter, class Data>
540 void backward(Data& source, const Data& dest);
541
567 template<class GatherScatter, class Data>
568 void forward(Data& data);
569
595 template<class GatherScatter, class Data>
596 void backward(Data& data);
597
601 void free();
602
607
608 private:
609
613 typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
614 InterfaceMap;
615
616
620 template<class Data, typename IndexedTypeFlag>
621 struct MessageSizeCalculator
622 {};
623
628 template<class Data>
629 struct MessageSizeCalculator<Data,SizeOne>
630 {
637 inline int operator()(const InterfaceInformation& info) const;
646 inline int operator()(const Data& data, const InterfaceInformation& info) const;
647 };
648
653 template<class Data>
654 struct MessageSizeCalculator<Data,VariableSize>
655 {
664 inline int operator()(const Data& data, const InterfaceInformation& info) const;
665 };
666
670 template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
671 struct MessageGatherer
672 {};
673
678 template<class Data, class GatherScatter, bool send>
679 struct MessageGatherer<Data,GatherScatter,send,SizeOne>
680 {
682 typedef typename CommPolicy<Data>::IndexedType Type;
683
688 typedef GatherScatter Gatherer;
689
690 enum {
696 forward=send
697 };
698
706 inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
707 };
708
713 template<class Data, class GatherScatter, bool send>
714 struct MessageGatherer<Data,GatherScatter,send,VariableSize>
715 {
717 typedef typename CommPolicy<Data>::IndexedType Type;
718
723 typedef GatherScatter Gatherer;
724
725 enum {
731 forward=send
732 };
733
741 inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
742 };
743
747 template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
748 struct MessageScatterer
749 {};
750
755 template<class Data, class GatherScatter, bool send>
756 struct MessageScatterer<Data,GatherScatter,send,SizeOne>
757 {
759 typedef typename CommPolicy<Data>::IndexedType Type;
760
765 typedef GatherScatter Scatterer;
766
767 enum {
773 forward=send
774 };
775
783 inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
784 };
789 template<class Data, class GatherScatter, bool send>
790 struct MessageScatterer<Data,GatherScatter,send,VariableSize>
791 {
793 typedef typename CommPolicy<Data>::IndexedType Type;
794
799 typedef GatherScatter Scatterer;
800
801 enum {
807 forward=send
808 };
809
817 inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
818 };
819
823 struct MessageInformation
824 {
826 MessageInformation()
827 : start_(0), size_(0)
828 {}
829
837 MessageInformation(size_t start, size_t size)
838 : start_(start), size_(size)
839 {}
843 size_t start_;
847 size_t size_;
848 };
849
856 typedef std::map<int,std::pair<MessageInformation,MessageInformation> >
857 InformationMap;
861 InformationMap messageInformation_;
865 char* buffers_[2];
869 size_t bufferSize_[2];
870
871 enum {
875 commTag_
876 };
877
881 std::map<int,std::pair<InterfaceInformation,InterfaceInformation> > interfaces_;
882
883 MPI_Comm communicator_;
884
888 template<class GatherScatter, bool FORWARD, class Data>
889 void sendRecv(const Data& source, Data& target);
890
891 };
892
893#ifndef DOXYGEN
894
895 template<class V>
896 inline const void* CommPolicy<V>::getAddress(const V& v, int index)
897 {
898 return &(v[index]);
899 }
900
901 template<class V>
902 inline int CommPolicy<V>::getSize(const V& v, int index)
903 {
906 return 1;
907 }
908
909 template<class K, class A, int n>
910 inline const void* CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getAddress(const Type& v, int index)
911 {
912 return &(v[index][0]);
913 }
914
915 template<class K, class A, int n>
916 inline int CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getSize(const Type& v, int index)
917 {
918 return v[index].getsize();
919 }
920
921 template<class T>
922 inline const typename CopyGatherScatter<T>::IndexedType& CopyGatherScatter<T>::gather(const T & vec, std::size_t i)
923 {
924 return vec[i];
925 }
926
927 template<class T>
928 inline void CopyGatherScatter<T>::scatter(T& vec, const IndexedType& v, std::size_t i)
929 {
930 vec[i]=v;
931 }
932
933 template<typename T>
934 DatatypeCommunicator<T>::DatatypeCommunicator()
935 : remoteIndices_(0), created_(false)
936 {
937 requests_[0]=0;
938 requests_[1]=0;
939 }
940
941
942
943 template<typename T>
944 DatatypeCommunicator<T>::~DatatypeCommunicator()
945 {
946 free();
947 }
948
949 template<typename T>
950 template<class T1, class T2, class V>
951 inline void DatatypeCommunicator<T>::build(const RemoteIndices& remoteIndices,
952 const T1& source, V& sendData,
953 const T2& destination, V& receiveData)
954 {
955 remoteIndices_ = &remoteIndices;
956 free();
957 createDataTypes<T1,T2,V,false>(source,destination, receiveData);
958 createDataTypes<T1,T2,V,true>(source,destination, sendData);
959 createRequests<V,true>(sendData, receiveData);
960 createRequests<V,false>(receiveData, sendData);
961 created_=true;
962 }
963
964 template<typename T>
965 void DatatypeCommunicator<T>::free()
966 {
967 if(created_) {
968 delete[] requests_[0];
969 delete[] requests_[1];
970 typedef MessageTypeMap::iterator iterator;
971 typedef MessageTypeMap::const_iterator const_iterator;
972
973 const const_iterator end=messageTypes.end();
974
975 for(iterator process = messageTypes.begin(); process != end; ++process) {
976 MPI_Datatype *type = &(process->second.first);
977 int finalized=0;
978 MPI_Finalized(&finalized);
979 if(*type!=MPI_DATATYPE_NULL && !finalized)
980 MPI_Type_free(type);
981 type = &(process->second.second);
982 if(*type!=MPI_DATATYPE_NULL && !finalized)
983 MPI_Type_free(type);
984 }
985 messageTypes.clear();
986 created_=false;
987 }
988
989 }
990
991 template<typename T>
992 template<class T1, class T2, class V, bool send>
993 void DatatypeCommunicator<T>::createDataTypes(const T1& sourceFlags, const T2& destFlags, V& data)
994 {
995
996 MPIDatatypeInformation<V> dataInfo(data);
997 this->template buildInterface<RemoteIndices,T1,T2,MPIDatatypeInformation<V>,send>(*remoteIndices_,sourceFlags, destFlags, dataInfo);
998
999 typedef typename RemoteIndices::RemoteIndexMap::const_iterator const_iterator;
1000 const const_iterator end=this->remoteIndices_->end();
1001
1002 // Allocate MPI_Datatypes and deallocate memory for the type construction.
1003 for(const_iterator process=this->remoteIndices_->begin(); process != end; ++process) {
1004 IndexedTypeInformation& info=dataInfo.information_[process->first];
1005 // Shift the displacement
1006 MPI_Aint base;
1007 MPI_Get_address(const_cast<void *>(CommPolicy<V>::getAddress(data, 0)), &base);
1008
1009 for(int i=0; i< info.elements; i++) {
1010 info.displ[i]-=base;
1011 }
1012
1013 // Create data type
1014 MPI_Datatype* type = &( send ? messageTypes[process->first].first : messageTypes[process->first].second);
1015 MPI_Type_create_hindexed(info.elements, info.length, info.displ,
1016 MPITraits<typename CommPolicy<V>::IndexedType>::getType(), type);
1017 MPI_Type_commit(type);
1018 // Deallocate memory
1019 info.free();
1020 }
1021 }
1022
1023 template<typename T>
1024 template<class V, bool createForward>
1025 void DatatypeCommunicator<T>::createRequests(V& sendData, V& receiveData)
1026 {
1027 typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >::const_iterator MapIterator;
1028 int rank;
1029 static int index = createForward ? 1 : 0;
1030 int noMessages = messageTypes.size();
1031 // allocate request handles
1032 requests_[index] = new MPI_Request[2*noMessages];
1033 const MapIterator end = messageTypes.end();
1034 int request=0;
1035 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1036
1037 // Set up the requests for receiving first
1038 for(MapIterator process = messageTypes.begin(); process != end;
1039 ++process, ++request) {
1040 MPI_Datatype type = createForward ? process->second.second : process->second.first;
1041 void* address = const_cast<void*>(CommPolicy<V>::getAddress(receiveData,0));
1042 MPI_Recv_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1043 }
1044
1045 // And now the send requests
1046
1047 for(MapIterator process = messageTypes.begin(); process != end;
1048 ++process, ++request) {
1049 MPI_Datatype type = createForward ? process->second.first : process->second.second;
1050 void* address = const_cast<void*>(CommPolicy<V>::getAddress(sendData, 0));
1051 MPI_Ssend_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1052 }
1053 }
1054
1055 template<typename T>
1056 void DatatypeCommunicator<T>::forward()
1057 {
1058 sendRecv(requests_[1]);
1059 }
1060
1061 template<typename T>
1062 void DatatypeCommunicator<T>::backward()
1063 {
1064 sendRecv(requests_[0]);
1065 }
1066
1067 template<typename T>
1068 void DatatypeCommunicator<T>::sendRecv(MPI_Request* requests)
1069 {
1070 int noMessages = messageTypes.size();
1071 // Start the receive calls first
1072 MPI_Startall(noMessages, requests);
1073 // Now the send calls
1074 MPI_Startall(noMessages, requests+noMessages);
1075
1076 // Wait for completion of the communication send first then receive
1077 MPI_Status* status=new MPI_Status[2*noMessages];
1078 for(int i=0; i<2*noMessages; i++)
1079 status[i].MPI_ERROR=MPI_SUCCESS;
1080
1081 int send = MPI_Waitall(noMessages, requests+noMessages, status+noMessages);
1082 int receive = MPI_Waitall(noMessages, requests, status);
1083
1084 // Error checks
1085 int success=1, globalSuccess=0;
1086 if(send==MPI_ERR_IN_STATUS) {
1087 int rank;
1088 MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1089 std::cerr<<rank<<": Error in sending :"<<std::endl;
1090 // Search for the error
1091 for(int i=noMessages; i< 2*noMessages; i++)
1092 if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1093 char message[300];
1094 int messageLength;
1095 MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1096 std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1097 for(int j = 0; j < messageLength; j++)
1098 std::cout << message[j];
1099 }
1100 std::cerr<<std::endl;
1101 success=0;
1102 }
1103
1104 if(receive==MPI_ERR_IN_STATUS) {
1105 int rank;
1106 MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1107 std::cerr<<rank<<": Error in receiving!"<<std::endl;
1108 // Search for the error
1109 for(int i=0; i< noMessages; i++)
1110 if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1111 char message[300];
1112 int messageLength;
1113 MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1114 std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1115 for(int j = 0; j < messageLength; j++)
1116 std::cerr << message[j];
1117 }
1118 std::cerr<<std::endl;
1119 success=0;
1120 }
1121
1122 MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, this->remoteIndices_->communicator());
1123
1124 delete[] status;
1125
1126 if(!globalSuccess)
1127 DUNE_THROW(CommunicationError, "A communication error occurred!");
1128
1129 }
1130
1132 {
1133 buffers_[0]=0;
1134 buffers_[1]=0;
1135 bufferSize_[0]=0;
1136 bufferSize_[1]=0;
1137 }
1138
1139 template<class Data, class Interface>
1140 typename std::enable_if<std::is_same<SizeOne, typename CommPolicy<Data>::IndexedTypeFlag>::value, void>::type
1141 BufferedCommunicator::build(const Interface& interface)
1142 {
1143 interfaces_=interface.interfaces();
1144 communicator_=interface.communicator();
1145 typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1146 ::const_iterator const_iterator;
1147 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1148 const const_iterator end = interfaces_.end();
1149 int lrank;
1150 MPI_Comm_rank(communicator_, &lrank);
1151
1152 bufferSize_[0]=0;
1153 bufferSize_[1]=0;
1154
1155 for(const_iterator interfacePair = interfaces_.begin();
1156 interfacePair != end; ++interfacePair) {
1157 int noSend = MessageSizeCalculator<Data,Flag>() (interfacePair->second.first);
1158 int noRecv = MessageSizeCalculator<Data,Flag>() (interfacePair->second.second);
1159 if (noSend + noRecv > 0)
1160 messageInformation_.insert(std::make_pair(interfacePair->first,
1161 std::make_pair(MessageInformation(bufferSize_[0],
1162 noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1163 MessageInformation(bufferSize_[1],
1164 noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1165 bufferSize_[0] += noSend;
1166 bufferSize_[1] += noRecv;
1167 }
1168
1169 // allocate the buffers
1170 bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1171 bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1172
1173 buffers_[0] = new char[bufferSize_[0]];
1174 buffers_[1] = new char[bufferSize_[1]];
1175 }
1176
1177 template<class Data, class Interface>
1178 void BufferedCommunicator::build(const Data& source, const Data& dest, const Interface& interface)
1179 {
1180
1181 interfaces_=interface.interfaces();
1182 communicator_=interface.communicator();
1183 typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1184 ::const_iterator const_iterator;
1185 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1186 const const_iterator end = interfaces_.end();
1187
1188 bufferSize_[0]=0;
1189 bufferSize_[1]=0;
1190
1191 for(const_iterator interfacePair = interfaces_.begin();
1192 interfacePair != end; ++interfacePair) {
1193 int noSend = MessageSizeCalculator<Data,Flag>() (source, interfacePair->second.first);
1194 int noRecv = MessageSizeCalculator<Data,Flag>() (dest, interfacePair->second.second);
1195 if (noSend + noRecv > 0)
1196 messageInformation_.insert(std::make_pair(interfacePair->first,
1197 std::make_pair(MessageInformation(bufferSize_[0],
1198 noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1199 MessageInformation(bufferSize_[1],
1200 noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1201 bufferSize_[0] += noSend;
1202 bufferSize_[1] += noRecv;
1203 }
1204
1205 bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1206 bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1207 // allocate the buffers
1208 buffers_[0] = new char[bufferSize_[0]];
1209 buffers_[1] = new char[bufferSize_[1]];
1210 }
1211
1212 inline void BufferedCommunicator::free()
1213 {
1214 messageInformation_.clear();
1215 if(buffers_[0])
1216 delete[] buffers_[0];
1217
1218 if(buffers_[1])
1219 delete[] buffers_[1];
1220 buffers_[0]=buffers_[1]=0;
1221 }
1222
1224 {
1225 free();
1226 }
1227
1228 template<class Data>
1229 inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1230 (const InterfaceInformation& info) const
1231 {
1232 return info.size();
1233 }
1234
1235
1236 template<class Data>
1237 inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1238 (const Data&, const InterfaceInformation& info) const
1239 {
1240 return operator()(info);
1241 }
1242
1243
1244 template<class Data>
1245 inline int BufferedCommunicator::MessageSizeCalculator<Data, VariableSize>::operator()
1246 (const Data& data, const InterfaceInformation& info) const
1247 {
1248 int entries=0;
1249
1250 for(size_t i=0; i < info.size(); i++)
1251 entries += CommPolicy<Data>::getSize(data,info[i]);
1252
1253 return entries;
1254 }
1255
1256
1257 template<class Data, class GatherScatter, bool FORWARD>
1258 inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces,const Data& data, Type* buffer, size_t bufferSize) const
1259 {
1260 DUNE_UNUSED_PARAMETER(bufferSize);
1261 typedef typename InterfaceMap::const_iterator
1262 const_iterator;
1263
1264 int rank;
1265 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1266 const const_iterator end = interfaces.end();
1267 size_t index=0;
1268
1269 for(const_iterator interfacePair = interfaces.begin();
1270 interfacePair != end; ++interfacePair) {
1271 int size = forward ? interfacePair->second.first.size() :
1272 interfacePair->second.second.size();
1273
1274 for(int i=0; i < size; i++) {
1275 int local = forward ? interfacePair->second.first[i] :
1276 interfacePair->second.second[i];
1277 for(std::size_t j=0; j < CommPolicy<Data>::getSize(data, local); j++, index++) {
1278
1279#ifdef DUNE_ISTL_WITH_CHECKING
1280 assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1281#endif
1282 buffer[index]=GatherScatter::gather(data, local, j);
1283 }
1284
1285 }
1286 }
1287
1288 }
1289
1290
1291 template<class Data, class GatherScatter, bool FORWARD>
1292 inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,SizeOne>::operator()(const InterfaceMap& interfaces, const Data& data, Type* buffer, size_t bufferSize) const
1293 {
1294 DUNE_UNUSED_PARAMETER(bufferSize);
1295 typedef typename InterfaceMap::const_iterator
1296 const_iterator;
1297 const const_iterator end = interfaces.end();
1298 size_t index = 0;
1299
1300 int rank;
1301 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1302
1303 for(const_iterator interfacePair = interfaces.begin();
1304 interfacePair != end; ++interfacePair) {
1305 size_t size = FORWARD ? interfacePair->second.first.size() :
1306 interfacePair->second.second.size();
1307
1308 for(size_t i=0; i < size; i++) {
1309
1310#ifdef DUNE_ISTL_WITH_CHECKING
1311 assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1312#endif
1313
1314 buffer[index++] = GatherScatter::gather(data, FORWARD ? interfacePair->second.first[i] :
1315 interfacePair->second.second[i]);
1316 }
1317 }
1318
1319 }
1320
1321
1322 template<class Data, class GatherScatter, bool FORWARD>
1323 inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
1324 {
1325 typedef typename InterfaceMap::value_type::second_type::first_type Information;
1326 const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1327
1328 assert(infoPair!=interfaces.end());
1329
1330 const Information& info = FORWARD ? infoPair->second.second :
1331 infoPair->second.first;
1332
1333 for(size_t i=0, index=0; i < info.size(); i++) {
1334 for(size_t j=0; j < CommPolicy<Data>::getSize(data, info[i]); j++)
1335 GatherScatter::scatter(data, buffer[index++], info[i], j);
1336 }
1337 }
1338
1339
1340 template<class Data, class GatherScatter, bool FORWARD>
1341 inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,SizeOne>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
1342 {
1343 typedef typename InterfaceMap::value_type::second_type::first_type Information;
1344 const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1345
1346 assert(infoPair!=interfaces.end());
1347
1348 const Information& info = FORWARD ? infoPair->second.second :
1349 infoPair->second.first;
1350
1351 for(size_t i=0; i < info.size(); i++) {
1352 GatherScatter::scatter(data, buffer[i], info[i]);
1353 }
1354 }
1355
1356
1357 template<class GatherScatter,class Data>
1358 void BufferedCommunicator::forward(Data& data)
1359 {
1360 this->template sendRecv<GatherScatter,true>(data, data);
1361 }
1362
1363
1364 template<class GatherScatter, class Data>
1365 void BufferedCommunicator::backward(Data& data)
1366 {
1367 this->template sendRecv<GatherScatter,false>(data, data);
1368 }
1369
1370
1371 template<class GatherScatter, class Data>
1372 void BufferedCommunicator::forward(const Data& source, Data& dest)
1373 {
1374 this->template sendRecv<GatherScatter,true>(source, dest);
1375 }
1376
1377
1378 template<class GatherScatter, class Data>
1379 void BufferedCommunicator::backward(Data& source, const Data& dest)
1380 {
1381 this->template sendRecv<GatherScatter,false>(dest, source);
1382 }
1383
1384
1385 template<class GatherScatter, bool FORWARD, class Data>
1386 void BufferedCommunicator::sendRecv(const Data& source, Data& dest)
1387 {
1388 int rank, lrank;
1389
1390 MPI_Comm_rank(MPI_COMM_WORLD,&rank);
1391 MPI_Comm_rank(MPI_COMM_WORLD,&lrank);
1392
1393 typedef typename CommPolicy<Data>::IndexedType Type;
1394 Type *sendBuffer, *recvBuffer;
1395 size_t sendBufferSize;
1396#ifndef NDEBUG
1397 size_t recvBufferSize;
1398#endif
1399
1400 if(FORWARD) {
1401 sendBuffer = reinterpret_cast<Type*>(buffers_[0]);
1402 sendBufferSize = bufferSize_[0];
1403 recvBuffer = reinterpret_cast<Type*>(buffers_[1]);
1404#ifndef NDEBUG
1405 recvBufferSize = bufferSize_[1];
1406#endif
1407 }else{
1408 sendBuffer = reinterpret_cast<Type*>(buffers_[1]);
1409 sendBufferSize = bufferSize_[1];
1410 recvBuffer = reinterpret_cast<Type*>(buffers_[0]);
1411#ifndef NDEBUG
1412 recvBufferSize = bufferSize_[0];
1413#endif
1414 }
1415 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1416
1417 MessageGatherer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, source, sendBuffer, sendBufferSize);
1418
1419 MPI_Request* sendRequests = new MPI_Request[messageInformation_.size()];
1420 MPI_Request* recvRequests = new MPI_Request[messageInformation_.size()];
1421 /* Number of recvRequests that are not MPI_REQUEST_NULL */
1422 size_t numberOfRealRecvRequests = 0;
1423
1424 // Setup receive first
1425 typedef typename InformationMap::const_iterator const_iterator;
1426
1427 const const_iterator end = messageInformation_.end();
1428 size_t i=0;
1429 int* processMap = new int[messageInformation_.size()];
1430
1431 for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i) {
1432 processMap[i]=info->first;
1433 if(FORWARD) {
1434 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1435 Dune::dvverb<<rank<<": receiving "<<info->second.second.size_<<" from "<<info->first<<std::endl;
1436 if(info->second.second.size_) {
1437 MPI_Irecv(recvBuffer+info->second.second.start_, info->second.second.size_,
1438 MPI_BYTE, info->first, commTag_, communicator_,
1439 recvRequests+i);
1440 numberOfRealRecvRequests += 1;
1441 } else {
1442 // Nothing to receive -> set request to inactive
1443 recvRequests[i]=MPI_REQUEST_NULL;
1444 }
1445 }else{
1446 assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= recvBufferSize );
1447 Dune::dvverb<<rank<<": receiving "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1448 if(info->second.first.size_) {
1449 MPI_Irecv(recvBuffer+info->second.first.start_, info->second.first.size_,
1450 MPI_BYTE, info->first, commTag_, communicator_,
1451 recvRequests+i);
1452 numberOfRealRecvRequests += 1;
1453 } else {
1454 // Nothing to receive -> set request to inactive
1455 recvRequests[i]=MPI_REQUEST_NULL;
1456 }
1457 }
1458 }
1459
1460 // now the send requests
1461 i=0;
1462 for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i)
1463 if(FORWARD) {
1464 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1465 Dune::dvverb<<rank<<": sending "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1466 assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= sendBufferSize );
1467 if(info->second.first.size_)
1468 MPI_Issend(sendBuffer+info->second.first.start_, info->second.first.size_,
1469 MPI_BYTE, info->first, commTag_, communicator_,
1470 sendRequests+i);
1471 else
1472 // Nothing to send -> set request to inactive
1473 sendRequests[i]=MPI_REQUEST_NULL;
1474 }else{
1475 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= sendBufferSize );
1476 Dune::dvverb<<rank<<": sending "<<info->second.second.size_<<" to "<<info->first<<std::endl;
1477 if(info->second.second.size_)
1478 MPI_Issend(sendBuffer+info->second.second.start_, info->second.second.size_,
1479 MPI_BYTE, info->first, commTag_, communicator_,
1480 sendRequests+i);
1481 else
1482 // Nothing to send -> set request to inactive
1483 sendRequests[i]=MPI_REQUEST_NULL;
1484 }
1485
1486 // Wait for completion of receive and immediately start scatter
1487 i=0;
1488 //int success = 1;
1489 int finished = MPI_UNDEFINED;
1490 MPI_Status status; //[messageInformation_.size()];
1491 //MPI_Waitall(messageInformation_.size(), recvRequests, status);
1492
1493 for(i=0; i< numberOfRealRecvRequests; i++) {
1494 status.MPI_ERROR=MPI_SUCCESS;
1495 MPI_Waitany(messageInformation_.size(), recvRequests, &finished, &status);
1496 assert(finished != MPI_UNDEFINED);
1497
1498 if(status.MPI_ERROR==MPI_SUCCESS) {
1499 int& proc = processMap[finished];
1500 typename InformationMap::const_iterator infoIter = messageInformation_.find(proc);
1501 assert(infoIter != messageInformation_.end());
1502
1503 MessageInformation info = (FORWARD) ? infoIter->second.second : infoIter->second.first;
1504 assert(info.start_+info.size_ <= recvBufferSize);
1505
1506 MessageScatterer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, dest, recvBuffer+info.start_, proc);
1507 }else{
1508 std::cerr<<rank<<": MPI_Error occurred while receiving message from "<<processMap[finished]<<std::endl;
1509 //success=0;
1510 }
1511 }
1512
1513 MPI_Status recvStatus;
1514
1515 // Wait for completion of sends
1516 for(i=0; i< messageInformation_.size(); i++)
1517 if(MPI_SUCCESS!=MPI_Wait(sendRequests+i, &recvStatus)) {
1518 std::cerr<<rank<<": MPI_Error occurred while sending message to "<<processMap[finished]<<std::endl;
1519 //success=0;
1520 }
1521 /*
1522 int globalSuccess;
1523 MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, interface_->communicator());
1524
1525 if(!globalSuccess)
1526 DUNE_THROW(CommunicationError, "A communication error occurred!");
1527 */
1528 delete[] processMap;
1529 delete[] sendRequests;
1530 delete[] recvRequests;
1531
1532 }
1533
1534#endif // DOXYGEN
1535
1537}
1538
1539#endif
1540
1541#endif
Provides classes for building the communication interface between remote indices.
Classes describing a distributed indexset.
Traits for type conversions and type information.
Standard Dune debug streams.
A few common exception classes.
#define DUNE_UNUSED_PARAMETER(parm)
A macro to mark intentionally unused function parameters with.
Definition unused.hh:18
#define DUNE_THROW(E, m)
Definition exceptions.hh:216
DVVerbType dvverb(std::cout)
stream for very verbose output.
Definition stdstreams.hh:93
Dune namespace.
Definition alignment.hh:11
constexpr auto size(const Dune::FieldVector< T, i > *, const PriorityTag< 5 > &) -> decltype(std::integral_constant< std::size_t, i >())
Definition hybridutilities.hh:22
Default exception class for I/O errors.
Definition exceptions.hh:229
Flag for marking indexed data structures where data at each index is of the same size.
Definition communicator.hh:102
Flag for marking indexed data structures where the data at each index may be a variable multiple of a...
Definition communicator.hh:110
Default policy used for communicating an indexed type.
Definition communicator.hh:120
V::value_type IndexedType
The type we get at each index with operator[].
Definition communicator.hh:139
static int getSize(const V &, int index)
Get the number of primitve elements at that index.
SizeOne IndexedTypeFlag
Whether the indexed type has variable size or there is always one value at each index.
Definition communicator.hh:145
static const void * getAddress(const V &v, int index)
Get the address of entry at an index.
V Type
The type the policy is for.
Definition communicator.hh:132
Definition communicator.hh:165
Definition communicator.hh:167
VariableBlockVector< FieldVector< K, n >, A > Type
Definition communicator.hh:172
Error thrown if there was a problem with the communication.
Definition communicator.hh:187
GatherScatter default implementation that just copies data.
Definition communicator.hh:194
static void scatter(T &vec, const IndexedType &v, std::size_t i)
CommPolicy< T >::IndexedType IndexedType
Definition communicator.hh:195
static const IndexedType & gather(const T &vec, std::size_t i)
A communicator that uses buffers to gather and scatter the data to be send or received.
Definition communicator.hh:452
void backward(Data &data)
Backward send where target and source are the same.
BufferedCommunicator()
Constructor.
~BufferedCommunicator()
Destructor.
void forward(const Data &source, Data &dest)
Send from source to target.
void free()
Free the allocated memory (i.e. buffers and message information.
std::enable_if< std::is_same< SizeOne, typenameCommPolicy< Data >::IndexedTypeFlag >::value, void >::type build(const Interface &interface)
Build the buffers and information for the communication process.
void backward(Data &source, const Data &dest)
Communicate in the reverse direction, i.e. send from target to source.
void build(const Data &source, const Data &target, const Interface &interface)
Build the buffers and information for the communication process.
void forward(Data &data)
Forward send where target and source are the same.
Manager class for the mapping between local indices and globally unique indices.
Definition indexset.hh:217
Base class of all classes representing a communication interface.
Definition interface.hh:33
Information describing an interface.
Definition interface.hh:99
Communication interface between remote and local indices.
Definition interface.hh:207
An index present on the local process.
Definition localindex.hh:33
The indices present on remote processes.
Definition remoteindices.hh:181
ParallelIndexSet::GlobalIndex GlobalIndex
The type of the global index.
Definition remoteindices.hh:207
LocalIndex::Attribute Attribute
The type of the attribute.
Definition remoteindices.hh:218
ParallelIndexSet::LocalIndex LocalIndex
The type of the local index.
Definition remoteindices.hh:213