From 141e7bcfdb374bd881b3c685964107e8738da20d Mon Sep 17 00:00:00 2001 From: zhangshuai Date: Thu, 24 Oct 2019 20:52:15 +0800 Subject: [PATCH] 1. MPI_Allgatherv, version 0.6.3 2. try non-blocking MPI communication --- Gazebo_Distributed_MPI/gazebo/CMakeLists.txt | 10 +- .../gazebo/physics/CMakeLists.txt | 5 +- .../gazebo/physics/Entity.cc | 2 +- .../gazebo/physics/World.cc | 216 ++++++++++++++---- .../gazebo/physics/World.hh | 30 ++- 5 files changed, 211 insertions(+), 52 deletions(-) diff --git a/Gazebo_Distributed_MPI/gazebo/CMakeLists.txt b/Gazebo_Distributed_MPI/gazebo/CMakeLists.txt index 9c38e8f..49cc9c7 100644 --- a/Gazebo_Distributed_MPI/gazebo/CMakeLists.txt +++ b/Gazebo_Distributed_MPI/gazebo/CMakeLists.txt @@ -1,9 +1,11 @@ #find_package(MPI REQUIRED) # Added by zhangshuai 2019.04.22 for MPI - -SET(MPI_INCLUDE_PATH /usr/local) #SET(MPI_INCLUDE_PATH /usr/local/lib/openmpi;/usr/local/include/openmpi) # Added by zhangshuai 2019.10.23 for MPI -message(STATUS "++++++++++ Using MPI_INCLUDE_PATH: ${MPI_INCLUDE_PATH} ++++++++++") # Added by zhangshuai 2019.04.22 for MPI +SET(MPI_INCLUDE_PATH /usr/local/include;/usr/local) # Added by zhangshuai 2019.10.22 for MPI +SET(MPI_LIBRARY_DIR /usr/local/lib;/usr/local/lib/openmpi) # Added by zhangshuai 2019.10.24 for MPI + +#message(STATUS "++++++++++ Using MPI_INCLUDE_PATH: ${MPI_INCLUDE_PATH} ++++++++++") # Added by zhangshuai 2019.10.22 for MPI +#message(STATUS "++++++++++ Using MPI_LIBRARY_DIR: ${MPI_LIBRARY_DIR} ++++++++++") # Added by zhangshuai 2019.10.24 for MPI include_directories(SYSTEM ${OPENGL_INCLUDE_DIR} @@ -15,7 +17,6 @@ include_directories(SYSTEM ${TBB_INCLUDEDIR} ${tinyxml_INCLUDE_DIRS} ${MPI_INCLUDE_PATH} # Added by zhangshuai 2019.04.22 for MPI - ) link_directories( @@ -25,6 +26,7 @@ link_directories( ${PROJECT_BINARY_DIR}/test ${TBB_LIBRARY_DIR} ${CURL_LIBDIR} + ${MPI_LIBRARY_DIR} # Added by zhangshuai 2019.10.24 for MPI ) if (WIN32) diff --git a/Gazebo_Distributed_MPI/gazebo/physics/CMakeLists.txt b/Gazebo_Distributed_MPI/gazebo/physics/CMakeLists.txt index 69e6915..62daf1a 100644 --- a/Gazebo_Distributed_MPI/gazebo/physics/CMakeLists.txt +++ b/Gazebo_Distributed_MPI/gazebo/physics/CMakeLists.txt @@ -1,12 +1,15 @@ #find_package(MPI REQUIRED) # Added by zhangshuai 2019.04.22 for MPI #SET(MPI_INCLUDE_PATH /usr/local/lib/openmpi;/usr/local/include/openmpi) # Added by zhangshuai 2019.10.23 for MPI -SET(MPI_INCLUDE_PATH /usr/local) + +SET(MPI_INCLUDE_PATH /usr/local/include;/usr/local) # Added by zhangshuai 2019.10.22 for MPI +SET(MPI_LIBRARY_DIR /usr/local/lib;/usr/local/lib/openmpi) # Added by zhangshuai 2019.10.24 for MPI include (${gazebo_cmake_dir}/GazeboUtils.cmake) link_directories( ${CCD_LIBRARY_DIRS} ${Boost_LIBRARY_DIRS} + ${MPI_LIBRARY_DIR} # Added by zhangshuai 2019.10.24 for MPI ) # Build in ODE by default diff --git a/Gazebo_Distributed_MPI/gazebo/physics/Entity.cc b/Gazebo_Distributed_MPI/gazebo/physics/Entity.cc index 6623773..1489fae 100644 --- a/Gazebo_Distributed_MPI/gazebo/physics/Entity.cc +++ b/Gazebo_Distributed_MPI/gazebo/physics/Entity.cc @@ -489,7 +489,7 @@ void Entity::SetWorldPoseDefault(const math::Pose &_pose, bool _notify, void Entity::SetWorldPose(const math::Pose &_pose, bool _notify, bool _publish) { { - boost::mutex::scoped_lock lock(*this->GetWorld()->GetSetWorldPoseMutex()); + boost::mutex::scoped_lock lock(*this->GetWorld()->GetSetWorldPoseMutex()); // try closing by zhangshuai 2019.10.23 (*this.*setWorldPoseFunc)(_pose, _notify, _publish); } if (_publish) diff --git a/Gazebo_Distributed_MPI/gazebo/physics/World.cc b/Gazebo_Distributed_MPI/gazebo/physics/World.cc index 0c9f923..238c2b9 100644 --- a/Gazebo_Distributed_MPI/gazebo/physics/World.cc +++ b/Gazebo_Distributed_MPI/gazebo/physics/World.cc @@ -126,9 +126,13 @@ double wholeUpdataTime2 = 0; double simtime = 0; double realtime = 0; uint64_t start_iterations = 0; + #endif // Added by zhangshuai 2019.04.03 for count time ----End +MPI_Request request; // Added by zhangshuai 2019.10.24 for MPI_Iallgather +MPI_Status status; + class ModelUpdate_TBB { public: @@ -336,11 +340,37 @@ void World::Load(sdf::ElementPtr _sdf) for (int tmp_gazebo_id = 0; tmp_gazebo_id < tmp_gazebo_count; tmp_gazebo_id++) { - this->modelCounts = this->modelCounts + this->distribution->GetGazeboIDPtr(tmp_gazebo_id)->GetModelCount(); + unsigned int tmp_model_count = this->distribution->GetGazeboIDPtr(tmp_gazebo_id)->GetModelCount(); + this->modelCounts = this->modelCounts + tmp_model_count; + + this->worldModelIDs_V.push_back(this->worldModelIDs); + + for (unsigned int i = 0; i < tmp_local_model_count; i++) + { + // sendBufferData[i].model_pose = this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose(); + worldModelIDs_V[tmp_gazebo_id].push_back(0); + } } // this->receiveBufferLen = sizeof(CommunicationData) * this->modelCounts; this->bufferLen = new int[tmp_gazebo_count]; // array that its value is the number of data received from each process this->displs = new int[tmp_gazebo_count]; // array that its value is the offset number of data received from each process + + for (int tmp_gazebo_id = 0; tmp_gazebo_id < tmp_gazebo_count; tmp_gazebo_id++) + { + this->bufferLen[tmp_gazebo_id] = sizeof(CommunicationData) * this->distribution->GetGazeboIDPtr(tmp_gazebo_id)->GetModelCount(); + + this->displs[tmp_gazebo_id] = 0; + for (int k = 0; k < tmp_gazebo_id; k++) + this->displs[tmp_gazebo_id] = this->displs[tmp_gazebo_id] + this->bufferLen[k]; + } + + this->sendBufferData = new CommunicationData[tmp_local_model_count]; + this->sendBuffer = (char *)sendBufferData; + + this->receiveBufferData = new CommunicationData[this->modelCounts]; + this->receiveBuffer = (char *)receiveBufferData; + + this->mpi_flag = false; } } //Added by zhangshuai based on zenglei 2019.03.19 ----End @@ -821,6 +851,74 @@ void World::Update() DIAG_TIMER_LAP("World::Update", "Events::worldUpdateBegin"); + // // Added by zhangshuai 2019.10.24 for non-block MPI communication----Begin + // if (1 == this->flag && this->mpi_flag == true) + // { + + // // Added by zhangshuai 2019.04.03 for count time ----Begin + // #ifdef USE_COUNT_TIME + // gettimeofday(&tv, NULL); + // cur_time = (double)tv.tv_sec + (double)tv.tv_usec / 1.e6; + // #endif + + // // MPI_Barrier(MPI_COMM_WORLD); + + // // MPI_Wait(&request, &status); + + // int test_flag; + // MPI_Test(&request, &test_flag, &status); + // while (test_flag == false) + // { + // std::cout << "MPI_Test flag : false" << std::endl; + // MPI_Test(&request, &test_flag, &status); + // } + + // // Added by zhangshuai 2019.04.03 for count time ----Begin + // #ifdef USE_COUNT_TIME + // gettimeofday(&tv, NULL); + // barrierTime += (double)tv.tv_sec + (double)tv.tv_usec / 1.e6 - cur_time; + // #endif + + // // MPI_Barrier(MPI_COMM_WORLD); + + // // Added by zhangshuai 2019.04.03 for count time ----Begin + // #ifdef USE_COUNT_TIME + // gettimeofday(&tv, NULL); + // cur_time = (double)tv.tv_sec + (double)tv.tv_usec / 1.e6; + // #endif + // // Added by zhangshuai 2019.04.03 for count time ----End + + // //receive and get information from other processes + // int tmp_gazebo_count = this->distribution->GetGazeboCount(); + // for (int tmp_gazebo_id = 0; tmp_gazebo_id < tmp_gazebo_count; tmp_gazebo_id++) + // { + // if (this->gazeboLocalID != tmp_gazebo_id) + // { + // unsigned int tmp_model_count = this->distribution->GetGazeboIDPtr(tmp_gazebo_id)->GetModelCount(); + // CommunicationData *tmpBufferData = new CommunicationData[tmp_model_count]; + + // memcpy(tmpBufferData, (receiveBuffer + this->displs[tmp_gazebo_id]), this->bufferLen[tmp_gazebo_id]); + + // for (unsigned int i = 0; i < tmp_model_count; i++) + // { + // // this->GetModel(tmp_model_name)->GetLink("canonical")->SetWorldPose(tmpBufferData[i].model_pose); // reserve + + // // this->GetModelById(this->distribution->GetGazeboIDPtr(tmp_gazebo_id)->GetModelID(i))->SetWorldPose(tmpBufferData[i].model_pose); + // this->GetModelById(this->worldModelIDs_V[tmp_gazebo_id][i])->SetWorldPose(tmpBufferData[i].model_pose); + // } + + // delete[] tmpBufferData; + // } + // } + // // Added by zhangshuai 2019.04.03 for count time ----Begin + // #ifdef USE_COUNT_TIME + // gettimeofday(&tv, NULL); + // after_mpiTime += (double)tv.tv_sec + (double)tv.tv_usec / 1.e6 - cur_time; + // #endif + // // Added by zhangshuai 2019.04.03 for count time ----End + // } + // // Added by zhangshuai 2019.10.24 for non-block MPI communication ----End + // Added by zhangshuai 2019.04.03 for count time ----Begin #ifdef USE_COUNT_TIME gettimeofday(&tv, NULL); @@ -960,31 +1058,16 @@ void World::Update() #endif // Added by zhangshuai 2019.04.03 for count time ----End - // for MPI_Allgatherv - int tmp_gazebo_count = this->distribution->GetGazeboCount(); - for (int tmp_gazebo_id = 0; tmp_gazebo_id < tmp_gazebo_count; tmp_gazebo_id++) - { - this->bufferLen[tmp_gazebo_id] = sizeof(CommunicationData) * this->distribution->GetGazeboIDPtr(tmp_gazebo_id)->GetModelCount(); - - this->displs[tmp_gazebo_id] = 0; - for (int k = 0; k < tmp_gazebo_id; k++) - this->displs[tmp_gazebo_id] = this->displs[tmp_gazebo_id] + this->bufferLen[k]; - } - /// make the pose data ready unsigned int tmp_local_model_count = this->distribution->GetGazeboIDPtr(this->gazeboLocalID)->GetModelCount(); - CommunicationData *sendBufferData = new CommunicationData[tmp_local_model_count]; for (unsigned int i = 0; i < tmp_local_model_count; i++) { - // sendBufferData[i].model_pose = this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose(); - sendBufferData[i].model_pose = this->GetModelById(this->distribution->GetGazeboIDPtr(this->gazeboLocalID)->GetModelID(i))->GetWorldPose(); + // sendBufferData[i].model_pose = this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose(); // reserve + + // sendBufferData[i].model_pose = this->GetModelById(this->distribution->GetGazeboIDPtr(this->gazeboLocalID)->GetModelID(i))->GetWorldPose(); + sendBufferData[i].model_pose = this->GetModelById(this->worldModelIDs_V[this->gazeboLocalID][i])->GetWorldPose(); } - this->sendBuffer = (char *)sendBufferData; - - CommunicationData *receiveBufferData = new CommunicationData[this->modelCounts]; - this->receiveBuffer = (char *)receiveBufferData; - // Added by zhangshuai 2019.04.03 for count time ----Begin #ifdef USE_COUNT_TIME gettimeofday(&tv, NULL); @@ -992,19 +1075,19 @@ void World::Update() #endif // Added by zhangshuai 2019.04.03 for count time ----End - // Added by zhangshuai 2019.04.03 for count time ----Begin -#ifdef USE_COUNT_TIME - gettimeofday(&tv, NULL); - cur_time = (double)tv.tv_sec + (double)tv.tv_usec / 1.e6; -#endif + // // Added by zhangshuai 2019.04.03 for count time ----Begin + // #ifdef USE_COUNT_TIME + // gettimeofday(&tv, NULL); + // cur_time = (double)tv.tv_sec + (double)tv.tv_usec / 1.e6; + // #endif - MPI_Barrier(MPI_COMM_WORLD); + // // MPI_Barrier(MPI_COMM_WORLD); - // Added by zhangshuai 2019.04.03 for count time ----Begin -#ifdef USE_COUNT_TIME - gettimeofday(&tv, NULL); - barrierTime += (double)tv.tv_sec + (double)tv.tv_usec / 1.e6 - cur_time; -#endif + // // Added by zhangshuai 2019.04.03 for count time ----Begin + // #ifdef USE_COUNT_TIME + // gettimeofday(&tv, NULL); + // barrierTime += (double)tv.tv_sec + (double)tv.tv_usec / 1.e6 - cur_time; + // #endif // Added by zhangshuai 2019.04.03 for count time ----Begin #ifdef USE_COUNT_TIME @@ -1013,8 +1096,8 @@ void World::Update() #endif // Added by zhangshuai 2019.04.03 for count time ----End - // MPI_Iallgatherv(this->sendBuffer, this->sendBufferLen, MPI_CHAR, this->receiveBuffer, this->bufferLen, this->displs, MPI_CHAR, MPI_COMM_WORLD, this->request); - MPI_Allgatherv(this->sendBuffer, this->sendBufferLen, MPI_CHAR, this->receiveBuffer, this->bufferLen, this->displs, MPI_CHAR, MPI_COMM_WORLD); + MPI_Iallgatherv(this->sendBuffer, this->sendBufferLen, MPI_CHAR, this->receiveBuffer, this->bufferLen, this->displs, MPI_CHAR, MPI_COMM_WORLD, &request); + // MPI_Allgatherv(this->sendBuffer, this->sendBufferLen, MPI_CHAR, this->receiveBuffer, this->bufferLen, this->displs, MPI_CHAR, MPI_COMM_WORLD); // MPI_Barrier(MPI_COMM_WORLD); @@ -1025,7 +1108,37 @@ void World::Update() #endif // Added by zhangshuai 2019.04.03 for count time ----End -// Added by zhangshuai 2019.04.03 for count time ----Begin + // this->mpi_flag = true; + + gazebo::common::Time::NSleep(200000); + + // Added by zhangshuai 2019.04.03 for count time ----Begin +#ifdef USE_COUNT_TIME + gettimeofday(&tv, NULL); + cur_time = (double)tv.tv_sec + (double)tv.tv_usec / 1.e6; +#endif + + // MPI_Barrier(MPI_COMM_WORLD); + + MPI_Wait(&request, &status); + + // int test_flag; + // MPI_Test(&request, &test_flag, &status); + // while (test_flag == false) + // { + // std::cout << "MPI_Test flag : false" << std::endl; + // MPI_Test(&request, &test_flag, &status); + // } + + // Added by zhangshuai 2019.04.03 for count time ----Begin +#ifdef USE_COUNT_TIME + gettimeofday(&tv, NULL); + barrierTime += (double)tv.tv_sec + (double)tv.tv_usec / 1.e6 - cur_time; +#endif + + // MPI_Barrier(MPI_COMM_WORLD); + + // Added by zhangshuai 2019.04.03 for count time ----Begin #ifdef USE_COUNT_TIME gettimeofday(&tv, NULL); cur_time = (double)tv.tv_sec + (double)tv.tv_usec / 1.e6; @@ -1033,6 +1146,7 @@ void World::Update() // Added by zhangshuai 2019.04.03 for count time ----End //receive and get information from other processes + int tmp_gazebo_count = this->distribution->GetGazeboCount(); for (int tmp_gazebo_id = 0; tmp_gazebo_id < tmp_gazebo_count; tmp_gazebo_id++) { if (this->gazeboLocalID != tmp_gazebo_id) @@ -1044,16 +1158,15 @@ void World::Update() for (unsigned int i = 0; i < tmp_model_count; i++) { - // this->GetModel(tmp_model_name)->GetLink("canonical")->SetWorldPose(tmpBufferData[i].model_pose); - this->GetModelById(this->distribution->GetGazeboIDPtr(tmp_gazebo_id)->GetModelID(i))->SetWorldPose(tmpBufferData[i].model_pose); + // this->GetModel(tmp_model_name)->GetLink("canonical")->SetWorldPose(tmpBufferData[i].model_pose); // reserve + + // this->GetModelById(this->distribution->GetGazeboIDPtr(tmp_gazebo_id)->GetModelID(i))->SetWorldPose(tmpBufferData[i].model_pose); + this->GetModelById(this->worldModelIDs_V[tmp_gazebo_id][i])->SetWorldPose(tmpBufferData[i].model_pose); } + delete[] tmpBufferData; } } - - delete[] sendBufferData; - delete[] receiveBufferData; - // Added by zhangshuai 2019.04.03 for count time ----Begin #ifdef USE_COUNT_TIME gettimeofday(&tv, NULL); @@ -1130,7 +1243,7 @@ void World::Update() double perce_before_mpiTime = (ave_before_mpiTime / ave_wholeMpiTime) * 100; double perce_mpiTime = (ave_mpiTime / ave_wholeMpiTime) * 100; double perce_after_mpiTime = (ave_after_mpiTime / ave_wholeMpiTime) * 100; - + if (this->gazeboLocalID != 0) gazebo::common::Time::MSleep((unsigned int)this->gazeboLocalID * 100); @@ -1168,6 +1281,21 @@ void World::Update() // std::cout << "===== average wholeAddTime:\t" << wholeAddTime * 1000.0 / (this->dataPtr->iterations - start_iterations) << "\tms =====\t" << this->gazeboLocalID << std::endl; // } + + // // test for worldModelIDs_V Begin 2019.10.24 + // if (this->gazeboLocalID == 0) + // { + // int tmp_gazebo_count = this->distribution->GetGazeboCount(); + // for (int tmp_gazebo_id = 0; tmp_gazebo_id < tmp_gazebo_count; tmp_gazebo_id++) + // { + // unsigned int tmp_model_count = this->distribution->GetGazeboIDPtr(tmp_gazebo_id)->GetModelCount(); + // for (unsigned int j = 0; j < tmp_model_count; j++) + // { + // std::cout << GetModelById(this->worldModelIDs_V[tmp_gazebo_id][j])->GetName() << " - Gazebo ID : " << tmp_gazebo_id << " - num: " << j << std::endl; + // } + // } + // } + // // test for worldModelIDs_V End 2019.10.24 } #endif } @@ -1292,6 +1420,10 @@ void World::Fini() delete[] this->bufferLen; delete[] this->displs; //Added by zhangshuai 2019.06.10 ----End + //Added by zhangshuai 2019.10.23 ----Begin + delete[] this->sendBufferData; + delete[] this->receiveBufferData; + //Added by zhangshuai 2019.10.23 ----End } ////////////////////////////////////////////////// @@ -1463,6 +1595,7 @@ ModelPtr World::LoadModel(sdf::ElementPtr _sdf, BasePtr _parent) { this->dataPtr->ownModels.push_back(model); // add model to ownModels list this->distribution->GetGazeboIDPtr(tmp_gazebo_id)->SetModelID(j, model->GetId()); // to maintain model ID list in GazeboID + this->worldModelIDs_V[tmp_gazebo_id][j] = model->GetId(); // add model ID vector in world 2019.10.24 } } } @@ -1475,6 +1608,7 @@ ModelPtr World::LoadModel(sdf::ElementPtr _sdf, BasePtr _parent) { model->SetStatic(true); // model that not simulated in local gazebo is set to be static to reduce computation in UpdateCollision and UpdatePhysics this->distribution->GetGazeboIDPtr(tmp_gazebo_id)->SetModelID(j, model->GetId()); // to maintain model ID list in GazeboID + this->worldModelIDs_V[tmp_gazebo_id][j] = model->GetId(); // add model ID vector in world 2019.10.24 } } } diff --git a/Gazebo_Distributed_MPI/gazebo/physics/World.hh b/Gazebo_Distributed_MPI/gazebo/physics/World.hh index 17d8611..8e3cd5b 100644 --- a/Gazebo_Distributed_MPI/gazebo/physics/World.hh +++ b/Gazebo_Distributed_MPI/gazebo/physics/World.hh @@ -23,6 +23,7 @@ #include #endif +// #include // Added by zhangshuai 2019.10.22 for MPI #include #include #include @@ -56,8 +57,6 @@ #include "gazebo/physics/Distribution.hh" // Added by zhangshuai 2019.04.01 ----End -// #include // Added by zhangshuai 2019.10.22 for MPI - namespace gazebo { namespace physics @@ -763,17 +762,38 @@ private: /// \brief the . private: char *sendBuffer; // Added by zhangshuai 2019.10.22 for MPI_Iallgather + /// \brief the . private: char *receiveBuffer; // Added by zhangshuai 2019.10.22 for MPI_Iallgather /// \brief the . +private: + CommunicationData *sendBufferData; // Added by zhangshuai 2019.10.22 for MPI_Iallgather + + /// \brief the . +private: + CommunicationData *receiveBufferData; // Added by zhangshuai 2019.10.22 for MPI_Iallgather + + /// \brief the Models ID in every gazebo. +private: + std::vector worldModelIDs; // Added by zhangshuai 2019.10.23 for MPI_Iallgather + + /// \brief every Models ID list. +private: + std::vector> worldModelIDs_V; // Added by zhangshuai 2019.10.23 for MPI_Iallgather + + /// \brief non-block MPI communication flag. +private: + bool mpi_flag; // Added by zhangshuai 2019.10.24 for MPI_Iallgather + + /// \brief the whole number of models in parallel simulation. private: int modelCounts; -// /// \brief the . -// private: -// MPI_Request *request; // Added by zhangshuai 2019.10.22 for MPI_Iallgather + // /// \brief the . + // private: + // MPI_Request *request; // Added by zhangshuai 2019.10.22 for MPI_Iallgather // /// \brief the . // private: