1. MPI_Allgatherv, version 0.6.3

2. try non-blocking MPI communication
This commit is contained in:
zhangshuai 2019-10-24 20:52:15 +08:00
parent 648752d2a6
commit 141e7bcfdb
5 changed files with 211 additions and 52 deletions

View File

@ -1,9 +1,11 @@
#find_package(MPI REQUIRED) # Added by zhangshuai 2019.04.22 for MPI
SET(MPI_INCLUDE_PATH /usr/local)
#SET(MPI_INCLUDE_PATH /usr/local/lib/openmpi;/usr/local/include/openmpi) # Added by zhangshuai 2019.10.23 for MPI
message(STATUS "++++++++++ Using MPI_INCLUDE_PATH: ${MPI_INCLUDE_PATH} ++++++++++") # Added by zhangshuai 2019.04.22 for MPI
SET(MPI_INCLUDE_PATH /usr/local/include;/usr/local) # Added by zhangshuai 2019.10.22 for MPI
SET(MPI_LIBRARY_DIR /usr/local/lib;/usr/local/lib/openmpi) # Added by zhangshuai 2019.10.24 for MPI
#message(STATUS "++++++++++ Using MPI_INCLUDE_PATH: ${MPI_INCLUDE_PATH} ++++++++++") # Added by zhangshuai 2019.10.22 for MPI
#message(STATUS "++++++++++ Using MPI_LIBRARY_DIR: ${MPI_LIBRARY_DIR} ++++++++++") # Added by zhangshuai 2019.10.24 for MPI
include_directories(SYSTEM
${OPENGL_INCLUDE_DIR}
@ -15,7 +17,6 @@ include_directories(SYSTEM
${TBB_INCLUDEDIR}
${tinyxml_INCLUDE_DIRS}
${MPI_INCLUDE_PATH} # Added by zhangshuai 2019.04.22 for MPI
)
link_directories(
@ -25,6 +26,7 @@ link_directories(
${PROJECT_BINARY_DIR}/test
${TBB_LIBRARY_DIR}
${CURL_LIBDIR}
${MPI_LIBRARY_DIR} # Added by zhangshuai 2019.10.24 for MPI
)
if (WIN32)

View File

@ -1,12 +1,15 @@
#find_package(MPI REQUIRED) # Added by zhangshuai 2019.04.22 for MPI
#SET(MPI_INCLUDE_PATH /usr/local/lib/openmpi;/usr/local/include/openmpi) # Added by zhangshuai 2019.10.23 for MPI
SET(MPI_INCLUDE_PATH /usr/local)
SET(MPI_INCLUDE_PATH /usr/local/include;/usr/local) # Added by zhangshuai 2019.10.22 for MPI
SET(MPI_LIBRARY_DIR /usr/local/lib;/usr/local/lib/openmpi) # Added by zhangshuai 2019.10.24 for MPI
include (${gazebo_cmake_dir}/GazeboUtils.cmake)
link_directories(
${CCD_LIBRARY_DIRS}
${Boost_LIBRARY_DIRS}
${MPI_LIBRARY_DIR} # Added by zhangshuai 2019.10.24 for MPI
)
# Build in ODE by default

View File

@ -489,7 +489,7 @@ void Entity::SetWorldPoseDefault(const math::Pose &_pose, bool _notify,
void Entity::SetWorldPose(const math::Pose &_pose, bool _notify, bool _publish)
{
{
boost::mutex::scoped_lock lock(*this->GetWorld()->GetSetWorldPoseMutex());
boost::mutex::scoped_lock lock(*this->GetWorld()->GetSetWorldPoseMutex()); // try closing by zhangshuai 2019.10.23
(*this.*setWorldPoseFunc)(_pose, _notify, _publish);
}
if (_publish)

View File

@ -126,9 +126,13 @@ double wholeUpdataTime2 = 0;
double simtime = 0;
double realtime = 0;
uint64_t start_iterations = 0;
#endif
// Added by zhangshuai 2019.04.03 for count time ----End
MPI_Request request; // Added by zhangshuai 2019.10.24 for MPI_Iallgather
MPI_Status status;
class ModelUpdate_TBB
{
public:
@ -336,11 +340,37 @@ void World::Load(sdf::ElementPtr _sdf)
for (int tmp_gazebo_id = 0; tmp_gazebo_id < tmp_gazebo_count; tmp_gazebo_id++)
{
this->modelCounts = this->modelCounts + this->distribution->GetGazeboIDPtr(tmp_gazebo_id)->GetModelCount();
unsigned int tmp_model_count = this->distribution->GetGazeboIDPtr(tmp_gazebo_id)->GetModelCount();
this->modelCounts = this->modelCounts + tmp_model_count;
this->worldModelIDs_V.push_back(this->worldModelIDs);
for (unsigned int i = 0; i < tmp_local_model_count; i++)
{
// sendBufferData[i].model_pose = this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose();
worldModelIDs_V[tmp_gazebo_id].push_back(0);
}
}
// this->receiveBufferLen = sizeof(CommunicationData) * this->modelCounts;
this->bufferLen = new int[tmp_gazebo_count]; // array that its value is the number of data received from each process
this->displs = new int[tmp_gazebo_count]; // array that its value is the offset number of data received from each process
for (int tmp_gazebo_id = 0; tmp_gazebo_id < tmp_gazebo_count; tmp_gazebo_id++)
{
this->bufferLen[tmp_gazebo_id] = sizeof(CommunicationData) * this->distribution->GetGazeboIDPtr(tmp_gazebo_id)->GetModelCount();
this->displs[tmp_gazebo_id] = 0;
for (int k = 0; k < tmp_gazebo_id; k++)
this->displs[tmp_gazebo_id] = this->displs[tmp_gazebo_id] + this->bufferLen[k];
}
this->sendBufferData = new CommunicationData[tmp_local_model_count];
this->sendBuffer = (char *)sendBufferData;
this->receiveBufferData = new CommunicationData[this->modelCounts];
this->receiveBuffer = (char *)receiveBufferData;
this->mpi_flag = false;
}
}
//Added by zhangshuai based on zenglei 2019.03.19 ----End
@ -821,6 +851,74 @@ void World::Update()
DIAG_TIMER_LAP("World::Update", "Events::worldUpdateBegin");
// // Added by zhangshuai 2019.10.24 for non-block MPI communication----Begin
// if (1 == this->flag && this->mpi_flag == true)
// {
// // Added by zhangshuai 2019.04.03 for count time ----Begin
// #ifdef USE_COUNT_TIME
// gettimeofday(&tv, NULL);
// cur_time = (double)tv.tv_sec + (double)tv.tv_usec / 1.e6;
// #endif
// // MPI_Barrier(MPI_COMM_WORLD);
// // MPI_Wait(&request, &status);
// int test_flag;
// MPI_Test(&request, &test_flag, &status);
// while (test_flag == false)
// {
// std::cout << "MPI_Test flag : false" << std::endl;
// MPI_Test(&request, &test_flag, &status);
// }
// // Added by zhangshuai 2019.04.03 for count time ----Begin
// #ifdef USE_COUNT_TIME
// gettimeofday(&tv, NULL);
// barrierTime += (double)tv.tv_sec + (double)tv.tv_usec / 1.e6 - cur_time;
// #endif
// // MPI_Barrier(MPI_COMM_WORLD);
// // Added by zhangshuai 2019.04.03 for count time ----Begin
// #ifdef USE_COUNT_TIME
// gettimeofday(&tv, NULL);
// cur_time = (double)tv.tv_sec + (double)tv.tv_usec / 1.e6;
// #endif
// // Added by zhangshuai 2019.04.03 for count time ----End
// //receive and get information from other processes
// int tmp_gazebo_count = this->distribution->GetGazeboCount();
// for (int tmp_gazebo_id = 0; tmp_gazebo_id < tmp_gazebo_count; tmp_gazebo_id++)
// {
// if (this->gazeboLocalID != tmp_gazebo_id)
// {
// unsigned int tmp_model_count = this->distribution->GetGazeboIDPtr(tmp_gazebo_id)->GetModelCount();
// CommunicationData *tmpBufferData = new CommunicationData[tmp_model_count];
// memcpy(tmpBufferData, (receiveBuffer + this->displs[tmp_gazebo_id]), this->bufferLen[tmp_gazebo_id]);
// for (unsigned int i = 0; i < tmp_model_count; i++)
// {
// // this->GetModel(tmp_model_name)->GetLink("canonical")->SetWorldPose(tmpBufferData[i].model_pose); // reserve
// // this->GetModelById(this->distribution->GetGazeboIDPtr(tmp_gazebo_id)->GetModelID(i))->SetWorldPose(tmpBufferData[i].model_pose);
// this->GetModelById(this->worldModelIDs_V[tmp_gazebo_id][i])->SetWorldPose(tmpBufferData[i].model_pose);
// }
// delete[] tmpBufferData;
// }
// }
// // Added by zhangshuai 2019.04.03 for count time ----Begin
// #ifdef USE_COUNT_TIME
// gettimeofday(&tv, NULL);
// after_mpiTime += (double)tv.tv_sec + (double)tv.tv_usec / 1.e6 - cur_time;
// #endif
// // Added by zhangshuai 2019.04.03 for count time ----End
// }
// // Added by zhangshuai 2019.10.24 for non-block MPI communication ----End
// Added by zhangshuai 2019.04.03 for count time ----Begin
#ifdef USE_COUNT_TIME
gettimeofday(&tv, NULL);
@ -960,31 +1058,16 @@ void World::Update()
#endif
// Added by zhangshuai 2019.04.03 for count time ----End
// for MPI_Allgatherv
int tmp_gazebo_count = this->distribution->GetGazeboCount();
for (int tmp_gazebo_id = 0; tmp_gazebo_id < tmp_gazebo_count; tmp_gazebo_id++)
{
this->bufferLen[tmp_gazebo_id] = sizeof(CommunicationData) * this->distribution->GetGazeboIDPtr(tmp_gazebo_id)->GetModelCount();
this->displs[tmp_gazebo_id] = 0;
for (int k = 0; k < tmp_gazebo_id; k++)
this->displs[tmp_gazebo_id] = this->displs[tmp_gazebo_id] + this->bufferLen[k];
}
/// make the pose data ready
unsigned int tmp_local_model_count = this->distribution->GetGazeboIDPtr(this->gazeboLocalID)->GetModelCount();
CommunicationData *sendBufferData = new CommunicationData[tmp_local_model_count];
for (unsigned int i = 0; i < tmp_local_model_count; i++)
{
// sendBufferData[i].model_pose = this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose();
sendBufferData[i].model_pose = this->GetModelById(this->distribution->GetGazeboIDPtr(this->gazeboLocalID)->GetModelID(i))->GetWorldPose();
// sendBufferData[i].model_pose = this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose(); // reserve
// sendBufferData[i].model_pose = this->GetModelById(this->distribution->GetGazeboIDPtr(this->gazeboLocalID)->GetModelID(i))->GetWorldPose();
sendBufferData[i].model_pose = this->GetModelById(this->worldModelIDs_V[this->gazeboLocalID][i])->GetWorldPose();
}
this->sendBuffer = (char *)sendBufferData;
CommunicationData *receiveBufferData = new CommunicationData[this->modelCounts];
this->receiveBuffer = (char *)receiveBufferData;
// Added by zhangshuai 2019.04.03 for count time ----Begin
#ifdef USE_COUNT_TIME
gettimeofday(&tv, NULL);
@ -992,19 +1075,19 @@ void World::Update()
#endif
// Added by zhangshuai 2019.04.03 for count time ----End
// Added by zhangshuai 2019.04.03 for count time ----Begin
#ifdef USE_COUNT_TIME
gettimeofday(&tv, NULL);
cur_time = (double)tv.tv_sec + (double)tv.tv_usec / 1.e6;
#endif
// // Added by zhangshuai 2019.04.03 for count time ----Begin
// #ifdef USE_COUNT_TIME
// gettimeofday(&tv, NULL);
// cur_time = (double)tv.tv_sec + (double)tv.tv_usec / 1.e6;
// #endif
MPI_Barrier(MPI_COMM_WORLD);
// // MPI_Barrier(MPI_COMM_WORLD);
// Added by zhangshuai 2019.04.03 for count time ----Begin
#ifdef USE_COUNT_TIME
gettimeofday(&tv, NULL);
barrierTime += (double)tv.tv_sec + (double)tv.tv_usec / 1.e6 - cur_time;
#endif
// // Added by zhangshuai 2019.04.03 for count time ----Begin
// #ifdef USE_COUNT_TIME
// gettimeofday(&tv, NULL);
// barrierTime += (double)tv.tv_sec + (double)tv.tv_usec / 1.e6 - cur_time;
// #endif
// Added by zhangshuai 2019.04.03 for count time ----Begin
#ifdef USE_COUNT_TIME
@ -1013,8 +1096,8 @@ void World::Update()
#endif
// Added by zhangshuai 2019.04.03 for count time ----End
// MPI_Iallgatherv(this->sendBuffer, this->sendBufferLen, MPI_CHAR, this->receiveBuffer, this->bufferLen, this->displs, MPI_CHAR, MPI_COMM_WORLD, this->request);
MPI_Allgatherv(this->sendBuffer, this->sendBufferLen, MPI_CHAR, this->receiveBuffer, this->bufferLen, this->displs, MPI_CHAR, MPI_COMM_WORLD);
MPI_Iallgatherv(this->sendBuffer, this->sendBufferLen, MPI_CHAR, this->receiveBuffer, this->bufferLen, this->displs, MPI_CHAR, MPI_COMM_WORLD, &request);
// MPI_Allgatherv(this->sendBuffer, this->sendBufferLen, MPI_CHAR, this->receiveBuffer, this->bufferLen, this->displs, MPI_CHAR, MPI_COMM_WORLD);
// MPI_Barrier(MPI_COMM_WORLD);
@ -1025,7 +1108,37 @@ void World::Update()
#endif
// Added by zhangshuai 2019.04.03 for count time ----End
// Added by zhangshuai 2019.04.03 for count time ----Begin
// this->mpi_flag = true;
gazebo::common::Time::NSleep(200000);
// Added by zhangshuai 2019.04.03 for count time ----Begin
#ifdef USE_COUNT_TIME
gettimeofday(&tv, NULL);
cur_time = (double)tv.tv_sec + (double)tv.tv_usec / 1.e6;
#endif
// MPI_Barrier(MPI_COMM_WORLD);
MPI_Wait(&request, &status);
// int test_flag;
// MPI_Test(&request, &test_flag, &status);
// while (test_flag == false)
// {
// std::cout << "MPI_Test flag : false" << std::endl;
// MPI_Test(&request, &test_flag, &status);
// }
// Added by zhangshuai 2019.04.03 for count time ----Begin
#ifdef USE_COUNT_TIME
gettimeofday(&tv, NULL);
barrierTime += (double)tv.tv_sec + (double)tv.tv_usec / 1.e6 - cur_time;
#endif
// MPI_Barrier(MPI_COMM_WORLD);
// Added by zhangshuai 2019.04.03 for count time ----Begin
#ifdef USE_COUNT_TIME
gettimeofday(&tv, NULL);
cur_time = (double)tv.tv_sec + (double)tv.tv_usec / 1.e6;
@ -1033,6 +1146,7 @@ void World::Update()
// Added by zhangshuai 2019.04.03 for count time ----End
//receive and get information from other processes
int tmp_gazebo_count = this->distribution->GetGazeboCount();
for (int tmp_gazebo_id = 0; tmp_gazebo_id < tmp_gazebo_count; tmp_gazebo_id++)
{
if (this->gazeboLocalID != tmp_gazebo_id)
@ -1044,16 +1158,15 @@ void World::Update()
for (unsigned int i = 0; i < tmp_model_count; i++)
{
// this->GetModel(tmp_model_name)->GetLink("canonical")->SetWorldPose(tmpBufferData[i].model_pose);
this->GetModelById(this->distribution->GetGazeboIDPtr(tmp_gazebo_id)->GetModelID(i))->SetWorldPose(tmpBufferData[i].model_pose);
// this->GetModel(tmp_model_name)->GetLink("canonical")->SetWorldPose(tmpBufferData[i].model_pose); // reserve
// this->GetModelById(this->distribution->GetGazeboIDPtr(tmp_gazebo_id)->GetModelID(i))->SetWorldPose(tmpBufferData[i].model_pose);
this->GetModelById(this->worldModelIDs_V[tmp_gazebo_id][i])->SetWorldPose(tmpBufferData[i].model_pose);
}
delete[] tmpBufferData;
}
}
delete[] sendBufferData;
delete[] receiveBufferData;
// Added by zhangshuai 2019.04.03 for count time ----Begin
#ifdef USE_COUNT_TIME
gettimeofday(&tv, NULL);
@ -1168,6 +1281,21 @@ void World::Update()
// std::cout << "===== average wholeAddTime:\t" << wholeAddTime * 1000.0 / (this->dataPtr->iterations - start_iterations) << "\tms =====\t" << this->gazeboLocalID << std::endl;
// }
// // test for worldModelIDs_V Begin 2019.10.24
// if (this->gazeboLocalID == 0)
// {
// int tmp_gazebo_count = this->distribution->GetGazeboCount();
// for (int tmp_gazebo_id = 0; tmp_gazebo_id < tmp_gazebo_count; tmp_gazebo_id++)
// {
// unsigned int tmp_model_count = this->distribution->GetGazeboIDPtr(tmp_gazebo_id)->GetModelCount();
// for (unsigned int j = 0; j < tmp_model_count; j++)
// {
// std::cout << GetModelById(this->worldModelIDs_V[tmp_gazebo_id][j])->GetName() << " - Gazebo ID : " << tmp_gazebo_id << " - num: " << j << std::endl;
// }
// }
// }
// // test for worldModelIDs_V End 2019.10.24
}
#endif
}
@ -1292,6 +1420,10 @@ void World::Fini()
delete[] this->bufferLen;
delete[] this->displs;
//Added by zhangshuai 2019.06.10 ----End
//Added by zhangshuai 2019.10.23 ----Begin
delete[] this->sendBufferData;
delete[] this->receiveBufferData;
//Added by zhangshuai 2019.10.23 ----End
}
//////////////////////////////////////////////////
@ -1463,6 +1595,7 @@ ModelPtr World::LoadModel(sdf::ElementPtr _sdf, BasePtr _parent)
{
this->dataPtr->ownModels.push_back(model); // add model to ownModels list
this->distribution->GetGazeboIDPtr(tmp_gazebo_id)->SetModelID(j, model->GetId()); // to maintain model ID list in GazeboID
this->worldModelIDs_V[tmp_gazebo_id][j] = model->GetId(); // add model ID vector in world 2019.10.24
}
}
}
@ -1475,6 +1608,7 @@ ModelPtr World::LoadModel(sdf::ElementPtr _sdf, BasePtr _parent)
{
model->SetStatic(true); // model that not simulated in local gazebo is set to be static to reduce computation in UpdateCollision and UpdatePhysics
this->distribution->GetGazeboIDPtr(tmp_gazebo_id)->SetModelID(j, model->GetId()); // to maintain model ID list in GazeboID
this->worldModelIDs_V[tmp_gazebo_id][j] = model->GetId(); // add model ID vector in world 2019.10.24
}
}
}

View File

@ -23,6 +23,7 @@
#include <Winsock2.h>
#endif
// #include <mpi.h> // Added by zhangshuai 2019.10.22 for MPI
#include <vector>
#include <list>
#include <set>
@ -56,8 +57,6 @@
#include "gazebo/physics/Distribution.hh"
// Added by zhangshuai 2019.04.01 ----End
// #include <mpi.h> // Added by zhangshuai 2019.10.22 for MPI
namespace gazebo
{
namespace physics
@ -763,17 +762,38 @@ private:
/// \brief the .
private:
char *sendBuffer; // Added by zhangshuai 2019.10.22 for MPI_Iallgather
/// \brief the .
private:
char *receiveBuffer; // Added by zhangshuai 2019.10.22 for MPI_Iallgather
/// \brief the .
private:
CommunicationData *sendBufferData; // Added by zhangshuai 2019.10.22 for MPI_Iallgather
/// \brief the .
private:
CommunicationData *receiveBufferData; // Added by zhangshuai 2019.10.22 for MPI_Iallgather
/// \brief the Models ID in every gazebo.
private:
std::vector<unsigned int> worldModelIDs; // Added by zhangshuai 2019.10.23 for MPI_Iallgather
/// \brief every Models ID list.
private:
std::vector<std::vector<unsigned int>> worldModelIDs_V; // Added by zhangshuai 2019.10.23 for MPI_Iallgather
/// \brief non-block MPI communication flag.
private:
bool mpi_flag; // Added by zhangshuai 2019.10.24 for MPI_Iallgather
/// \brief the whole number of models in parallel simulation.
private:
int modelCounts;
// /// \brief the .
// private:
// MPI_Request *request; // Added by zhangshuai 2019.10.22 for MPI_Iallgather
// /// \brief the .
// private:
// MPI_Request *request; // Added by zhangshuai 2019.10.22 for MPI_Iallgather
// /// \brief the .
// private: