MPI_ALLGATHERV test

This commit is contained in:
zhangshuai 2019-05-30 08:47:04 +08:00
parent be425e781a
commit ac53c85ace
2 changed files with 101 additions and 88 deletions

View File

@ -103,9 +103,9 @@ double updateCollisionTime = 0;
double loggingTime = 0; double loggingTime = 0;
double before_updatePhysicsTime = 0; double before_updatePhysicsTime = 0;
double updatePhysicsTime = 0; double updatePhysicsTime = 0;
double before_tcpTime = 0; double before_mpiTime = 0;
double tcpTime = 0; double mpiTime = 0;
double after_tcpTime = 0; double after_mpiTime = 0;
double wholeUpdataTime = 0; double wholeUpdataTime = 0;
#endif #endif
// Added by zhangshuai 2019.04.03 for count time ----End // Added by zhangshuai 2019.04.03 for count time ----End
@ -275,13 +275,12 @@ void World::Load(sdf::ElementPtr _sdf)
if (this->dataPtr->sdf->HasElement("distribution")) if (this->dataPtr->sdf->HasElement("distribution"))
{ {
sdf::ElementPtr distributionElem = this->dataPtr->sdf->GetElement("distribution"); sdf::ElementPtr distributionElem = this->dataPtr->sdf->GetElement("distribution");
int nproc;
char proc_name[MPI_MAX_PROCESSOR_NAME]; char proc_name[MPI_MAX_PROCESSOR_NAME];
int nameLength; int nameLength;
MPI_Comm_rank(MPI_COMM_WORLD, &this->gazeboLocalID); MPI_Comm_rank(MPI_COMM_WORLD, &this->gazeboLocalID);
MPI_Comm_size(MPI_COMM_WORLD, &nproc); MPI_Comm_size(MPI_COMM_WORLD, &this->nproc);
MPI_Get_processor_name(proc_name, &nameLength); MPI_Get_processor_name(proc_name, &nameLength);
printf("Gazebo, Iam host %s with rank %d of %d\n", proc_name, this->gazeboLocalID, nproc); printf("Gazebo, Iam host %s with rank %d of %d\n", proc_name, this->gazeboLocalID, this->nproc);
this->flag = distributionElem->Get<int>("flag"); this->flag = distributionElem->Get<int>("flag");
@ -914,12 +913,21 @@ void World::Update()
#endif #endif
// Added by zhangshuai 2019.04.03 for count time ----End // Added by zhangshuai 2019.04.03 for count time ----End
MPI_Status *status = (MPI_Status *)malloc(sizeof(MPI_Status) * (this->distribution->GetGazeboCount())); // MPI_Status *status = (MPI_Status *)malloc(sizeof(MPI_Status) * (this->distribution->GetGazeboCount()));
MPI_Request *request = (MPI_Request *)malloc(sizeof(MPI_Request) * (this->distribution->GetGazeboCount())); // MPI_Request *request = (MPI_Request *)malloc(sizeof(MPI_Request) * (this->distribution->GetGazeboCount()));
int tmp_gazebo_count = this->distribution->GetGazeboCount();
char *sendBuffer;
char *receiveBuffer;
int bufferLen[tmp_gazebo_count];
int displs[tmp_gazebo_count];
int receiveBufferLen = 0;
for (int tmp_gazebo_id = 0; tmp_gazebo_id < tmp_gazebo_count; tmp_gazebo_id++)
{
msgs::Pose_V modelPoseListSend; msgs::Pose_V modelPoseListSend;
for (unsigned int i = 0; i < this->distribution->GetGazeboIDPtr(tmp_gazebo_id)->GetModelCount(); i++)
for (unsigned int i = 0; i < this->distribution->GetGazeboIDPtr(this->gazeboLocalID)->GetModelCount(); i++)
{ {
msgs::Pose *modelPose; msgs::Pose *modelPose;
modelPose = modelPoseListSend.add_pose(); modelPose = modelPoseListSend.add_pose();
@ -927,6 +935,8 @@ void World::Update()
std::string tmp_model_name = this->distribution->GetGazeboIDPtr(this->gazeboLocalID)->GetModelName(i); std::string tmp_model_name = this->distribution->GetGazeboIDPtr(this->gazeboLocalID)->GetModelName(i);
modelPose->set_name(tmp_model_name); modelPose->set_name(tmp_model_name);
if (this->gazeboLocalID == tmp_gazebo_id)
{
modelPose->set_id(this->GetModel(tmp_model_name)->GetLink("canonical")->GetId()); modelPose->set_id(this->GetModel(tmp_model_name)->GetLink("canonical")->GetId());
modelPose->mutable_position()->set_x(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().pos.x); modelPose->mutable_position()->set_x(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().pos.x);
modelPose->mutable_position()->set_y(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().pos.y); modelPose->mutable_position()->set_y(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().pos.y);
@ -935,14 +945,45 @@ void World::Update()
modelPose->mutable_orientation()->set_y(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().rot.y); modelPose->mutable_orientation()->set_y(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().rot.y);
modelPose->mutable_orientation()->set_z(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().rot.z); modelPose->mutable_orientation()->set_z(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().rot.z);
modelPose->mutable_orientation()->set_w(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().rot.w); modelPose->mutable_orientation()->set_w(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().rot.w);
}
tmp_model_name.clear(); tmp_model_name.clear();
} }
std::string send_messages = "";
if (modelPoseListSend.pose_size() > 0)
{
modelPoseListSend.SerializeToString(&send_messages);
modelPoseListSend.clear_pose();
}
bufferLen[tmp_gazebo_id] = send_messages.size();
receiveBufferLen = receiveBufferLen + bufferLen[tmp_gazebo_id];
displs[tmp_gazebo_id] = 0;
for (int k = 0; k < tmp_gazebo_id; k++)
displs[tmp_gazebo_id] = displs[tmp_gazebo_id] + bufferLen[k];
if (this->gazeboLocalID == tmp_gazebo_id)
{
sendBuffer = new char[bufferLen[tmp_gazebo_id] + 1];
for (int i = 0; i < bufferLen[tmp_gazebo_id]; i++)
{
sendBuffer[i] = send_messages[i];
}
sendBuffer[bufferLen[tmp_gazebo_id]] = '\0';
}
send_messages.clear();
}
receiveBuffer = new char[receiveBufferLen + 1];
receiveBuffer[receiveBufferLen] = '\0';
// Added by zhangshuai 2019.04.03 for count time ----Begin // Added by zhangshuai 2019.04.03 for count time ----Begin
#ifdef USE_COUNT_TIME #ifdef USE_COUNT_TIME
gettimeofday(&tv, NULL); gettimeofday(&tv, NULL);
before_tcpTime += (double)tv.tv_sec + (double)tv.tv_usec / 1.e6 - cur_time; before_mpiTime += (double)tv.tv_sec + (double)tv.tv_usec / 1.e6 - cur_time;
#endif #endif
// Added by zhangshuai 2019.04.03 for count time ----End // Added by zhangshuai 2019.04.03 for count time ----End
@ -953,36 +994,13 @@ void World::Update()
#endif #endif
// Added by zhangshuai 2019.04.03 for count time ----End // Added by zhangshuai 2019.04.03 for count time ----End
//send poses to another process //gather poses to from all processes
for (int tmp_gazebo_id = 0; tmp_gazebo_id < this->distribution->GetGazeboCount(); tmp_gazebo_id++) MPI_Allgatherv(sendBuffer, bufferLen[this->gazeboLocalID], MPI_CHAR, receiveBuffer, bufferLen, displs, MPI_CHAR, MPI_COMM_WORLD);
{
if (this->gazeboLocalID != tmp_gazebo_id)
{
std::string send_messages = "";
if (modelPoseListSend.pose_size() > 0)
{
modelPoseListSend.SerializeToString(&send_messages);
modelPoseListSend.clear_pose();
}
int bufferLen = send_messages.size();
char *sendBuffer = new char[bufferLen + 1];
for (int i = 0; i < bufferLen; i++)
{
sendBuffer[i] = send_messages[i];
}
sendBuffer[bufferLen] = '\0';
send_messages.clear();
MPI_Isend(sendBuffer, bufferLen, MPI_CHAR, tmp_gazebo_id, 99, MPI_COMM_WORLD, &request[tmp_gazebo_id]);
}
}
// Added by zhangshuai 2019.04.03 for count time ----Begin // Added by zhangshuai 2019.04.03 for count time ----Begin
#ifdef USE_COUNT_TIME #ifdef USE_COUNT_TIME
gettimeofday(&tv, NULL); gettimeofday(&tv, NULL);
tcpTime += (double)tv.tv_sec + (double)tv.tv_usec / 1.e6 - cur_time; mpiTime += (double)tv.tv_sec + (double)tv.tv_usec / 1.e6 - cur_time;
#endif #endif
// Added by zhangshuai 2019.04.03 for count time ----End // Added by zhangshuai 2019.04.03 for count time ----End
@ -997,17 +1015,9 @@ void World::Update()
for (int tmp_gazebo_id = 0; tmp_gazebo_id < this->distribution->GetGazeboCount(); tmp_gazebo_id++) for (int tmp_gazebo_id = 0; tmp_gazebo_id < this->distribution->GetGazeboCount(); tmp_gazebo_id++)
{ {
if (this->gazeboLocalID != tmp_gazebo_id) if (this->gazeboLocalID != tmp_gazebo_id)
{
MPI_Probe(tmp_gazebo_id, 99, MPI_COMM_WORLD, &status[tmp_gazebo_id]);
int size;
MPI_Get_count(&status[tmp_gazebo_id], MPI_CHAR, &size);
char *receiveBuffer = (char *)malloc(sizeof(char) * (size));
MPI_Recv(receiveBuffer, size, MPI_CHAR, tmp_gazebo_id, 99, MPI_COMM_WORLD, &status[tmp_gazebo_id]);
if (size > 0)
{ {
std::string receive_messages; std::string receive_messages;
for (int j = 0; j < size; j++) for (int j = displs[tmp_gazebo_id]; j < bufferLen[tmp_gazebo_id]; j++)
{ {
receive_messages.push_back(receiveBuffer[j]); receive_messages.push_back(receiveBuffer[j]);
} }
@ -1034,19 +1044,18 @@ void World::Update()
} }
} }
} }
}
// Added by zhangshuai 2019.04.03 for count time ----Begin // Added by zhangshuai 2019.04.03 for count time ----Begin
#ifdef USE_COUNT_TIME #ifdef USE_COUNT_TIME
gettimeofday(&tv, NULL); gettimeofday(&tv, NULL);
after_tcpTime += (double)tv.tv_sec + (double)tv.tv_usec / 1.e6 - cur_time; after_mpiTime += (double)tv.tv_sec + (double)tv.tv_usec / 1.e6 - cur_time;
#endif #endif
// Added by zhangshuai 2019.04.03 for count time ----End // Added by zhangshuai 2019.04.03 for count time ----End
// MPI_Waitall(this->distribution->GetGazeboCount(), request, status); // MPI_Waitall(this->distribution->GetGazeboCount(), request, status);
// MPI_Barrier(MPI_COMM_WORLD); // MPI_Barrier(MPI_COMM_WORLD);
} }
MPI_Barrier(MPI_COMM_WORLD); // MPI_Barrier(MPI_COMM_WORLD);
// Added by zhangshuai 2019.04.23 for MPI communication ----End // Added by zhangshuai 2019.04.23 for MPI communication ----End
// Only update state information if logging data. // Only update state information if logging data.
@ -1080,12 +1089,12 @@ void World::Update()
std::cout << "===== average loggingTime:\t" << loggingTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl; std::cout << "===== average loggingTime:\t" << loggingTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl;
std::cout << "===== average before_updatePhysicsTime:\t" << before_updatePhysicsTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl; std::cout << "===== average before_updatePhysicsTime:\t" << before_updatePhysicsTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl;
std::cout << "===== average updatePhysicsTime:\t" << updatePhysicsTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl; std::cout << "===== average updatePhysicsTime:\t" << updatePhysicsTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl;
std::cout << "===== average before_tcpTime:\t" << before_tcpTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl; std::cout << "===== average before_mpiTime:\t" << before_mpiTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl;
std::cout << "===== average tcpTime:\t" << tcpTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl; std::cout << "===== average mpiTime:\t" << mpiTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl;
std::cout << "===== average after_tcpTime:\t" << after_tcpTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl; std::cout << "===== average after_mpiTime:\t" << after_mpiTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl;
double wholeTcpTime = before_tcpTime + tcpTime + after_tcpTime; double wholeMpiTime = before_mpiTime + mpiTime + after_mpiTime;
std::cout << "===== average wholeTcpTime:\t" << wholeTcpTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl; std::cout << "===== average wholeMpiTime:\t" << wholeMpiTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl;
double wholeAddTime = before_worldUpdateBeginTime + worldUpdateBeginTime + modelUpdateFuncTime + updateCollisionTime + loggingTime + before_updatePhysicsTime + updatePhysicsTime + before_tcpTime + tcpTime + after_tcpTime; double wholeAddTime = before_worldUpdateBeginTime + worldUpdateBeginTime + modelUpdateFuncTime + updateCollisionTime + loggingTime + before_updatePhysicsTime + updatePhysicsTime + before_mpiTime + mpiTime + after_mpiTime;
std::cout << "===== average wholeAddTime:\t" << wholeAddTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl; std::cout << "===== average wholeAddTime:\t" << wholeAddTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl;
std::cout << "===== average wholeUpdateTime:\t" << wholeUpdataTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl; std::cout << "===== average wholeUpdateTime:\t" << wholeUpdataTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl;
} }

View File

@ -722,6 +722,10 @@ private:
private: private:
int gazeboLocalID; int gazeboLocalID;
/// \brief the number of MPI processes. // Added by zhangshuai 2019.05.29
private:
int nproc;
/// \brief the . /// \brief the .
private: private:
// Distribution_V distributions; // Distribution_V distributions;