From ac53c85ace7010c250e6afd2853b5c4b6acc3756 Mon Sep 17 00:00:00 2001 From: zhangshuai Date: Thu, 30 May 2019 08:47:04 +0800 Subject: [PATCH] MPI_ALLGATHERV test --- .../gazebo/physics/World.cc | 185 +++++++++--------- .../gazebo/physics/World.hh | 4 + 2 files changed, 101 insertions(+), 88 deletions(-) diff --git a/Gazebo_Distributed_MPI/gazebo/physics/World.cc b/Gazebo_Distributed_MPI/gazebo/physics/World.cc index 413d0e0..e1d7666 100644 --- a/Gazebo_Distributed_MPI/gazebo/physics/World.cc +++ b/Gazebo_Distributed_MPI/gazebo/physics/World.cc @@ -103,9 +103,9 @@ double updateCollisionTime = 0; double loggingTime = 0; double before_updatePhysicsTime = 0; double updatePhysicsTime = 0; -double before_tcpTime = 0; -double tcpTime = 0; -double after_tcpTime = 0; +double before_mpiTime = 0; +double mpiTime = 0; +double after_mpiTime = 0; double wholeUpdataTime = 0; #endif // Added by zhangshuai 2019.04.03 for count time ----End @@ -275,13 +275,12 @@ void World::Load(sdf::ElementPtr _sdf) if (this->dataPtr->sdf->HasElement("distribution")) { sdf::ElementPtr distributionElem = this->dataPtr->sdf->GetElement("distribution"); - int nproc; char proc_name[MPI_MAX_PROCESSOR_NAME]; int nameLength; MPI_Comm_rank(MPI_COMM_WORLD, &this->gazeboLocalID); - MPI_Comm_size(MPI_COMM_WORLD, &nproc); + MPI_Comm_size(MPI_COMM_WORLD, &this->nproc); MPI_Get_processor_name(proc_name, &nameLength); - printf("Gazebo, Iam host %s with rank %d of %d\n", proc_name, this->gazeboLocalID, nproc); + printf("Gazebo, Iam host %s with rank %d of %d\n", proc_name, this->gazeboLocalID, this->nproc); this->flag = distributionElem->Get("flag"); @@ -914,35 +913,77 @@ void World::Update() #endif // Added by zhangshuai 2019.04.03 for count time ----End - MPI_Status *status = (MPI_Status *)malloc(sizeof(MPI_Status) * (this->distribution->GetGazeboCount())); - MPI_Request *request = (MPI_Request *)malloc(sizeof(MPI_Request) * (this->distribution->GetGazeboCount())); + // MPI_Status *status = (MPI_Status *)malloc(sizeof(MPI_Status) * (this->distribution->GetGazeboCount())); + // MPI_Request *request = (MPI_Request *)malloc(sizeof(MPI_Request) * (this->distribution->GetGazeboCount())); - msgs::Pose_V modelPoseListSend; + int tmp_gazebo_count = this->distribution->GetGazeboCount(); - for (unsigned int i = 0; i < this->distribution->GetGazeboIDPtr(this->gazeboLocalID)->GetModelCount(); i++) + char *sendBuffer; + char *receiveBuffer; + int bufferLen[tmp_gazebo_count]; + int displs[tmp_gazebo_count]; + int receiveBufferLen = 0; + + for (int tmp_gazebo_id = 0; tmp_gazebo_id < tmp_gazebo_count; tmp_gazebo_id++) { - msgs::Pose *modelPose; - modelPose = modelPoseListSend.add_pose(); + msgs::Pose_V modelPoseListSend; + for (unsigned int i = 0; i < this->distribution->GetGazeboIDPtr(tmp_gazebo_id)->GetModelCount(); i++) + { + msgs::Pose *modelPose; + modelPose = modelPoseListSend.add_pose(); - std::string tmp_model_name = this->distribution->GetGazeboIDPtr(this->gazeboLocalID)->GetModelName(i); + std::string tmp_model_name = this->distribution->GetGazeboIDPtr(this->gazeboLocalID)->GetModelName(i); - modelPose->set_name(tmp_model_name); - modelPose->set_id(this->GetModel(tmp_model_name)->GetLink("canonical")->GetId()); - modelPose->mutable_position()->set_x(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().pos.x); - modelPose->mutable_position()->set_y(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().pos.y); - modelPose->mutable_position()->set_z(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().pos.z); - modelPose->mutable_orientation()->set_x(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().rot.x); - modelPose->mutable_orientation()->set_y(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().rot.y); - modelPose->mutable_orientation()->set_z(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().rot.z); - modelPose->mutable_orientation()->set_w(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().rot.w); + modelPose->set_name(tmp_model_name); + if (this->gazeboLocalID == tmp_gazebo_id) + { + modelPose->set_id(this->GetModel(tmp_model_name)->GetLink("canonical")->GetId()); + modelPose->mutable_position()->set_x(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().pos.x); + modelPose->mutable_position()->set_y(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().pos.y); + modelPose->mutable_position()->set_z(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().pos.z); + modelPose->mutable_orientation()->set_x(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().rot.x); + modelPose->mutable_orientation()->set_y(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().rot.y); + modelPose->mutable_orientation()->set_z(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().rot.z); + modelPose->mutable_orientation()->set_w(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().rot.w); + } - tmp_model_name.clear(); + tmp_model_name.clear(); + } + + std::string send_messages = ""; + if (modelPoseListSend.pose_size() > 0) + { + modelPoseListSend.SerializeToString(&send_messages); + modelPoseListSend.clear_pose(); + } + + bufferLen[tmp_gazebo_id] = send_messages.size(); + receiveBufferLen = receiveBufferLen + bufferLen[tmp_gazebo_id]; + + displs[tmp_gazebo_id] = 0; + for (int k = 0; k < tmp_gazebo_id; k++) + displs[tmp_gazebo_id] = displs[tmp_gazebo_id] + bufferLen[k]; + + if (this->gazeboLocalID == tmp_gazebo_id) + { + sendBuffer = new char[bufferLen[tmp_gazebo_id] + 1]; + for (int i = 0; i < bufferLen[tmp_gazebo_id]; i++) + { + sendBuffer[i] = send_messages[i]; + } + sendBuffer[bufferLen[tmp_gazebo_id]] = '\0'; + } + + send_messages.clear(); } + receiveBuffer = new char[receiveBufferLen + 1]; + receiveBuffer[receiveBufferLen] = '\0'; + // Added by zhangshuai 2019.04.03 for count time ----Begin #ifdef USE_COUNT_TIME gettimeofday(&tv, NULL); - before_tcpTime += (double)tv.tv_sec + (double)tv.tv_usec / 1.e6 - cur_time; + before_mpiTime += (double)tv.tv_sec + (double)tv.tv_usec / 1.e6 - cur_time; #endif // Added by zhangshuai 2019.04.03 for count time ----End @@ -953,36 +994,13 @@ void World::Update() #endif // Added by zhangshuai 2019.04.03 for count time ----End - //send poses to another process - for (int tmp_gazebo_id = 0; tmp_gazebo_id < this->distribution->GetGazeboCount(); tmp_gazebo_id++) - { - if (this->gazeboLocalID != tmp_gazebo_id) - { - std::string send_messages = ""; - if (modelPoseListSend.pose_size() > 0) - { - modelPoseListSend.SerializeToString(&send_messages); - modelPoseListSend.clear_pose(); - } - - int bufferLen = send_messages.size(); - char *sendBuffer = new char[bufferLen + 1]; - for (int i = 0; i < bufferLen; i++) - { - sendBuffer[i] = send_messages[i]; - } - sendBuffer[bufferLen] = '\0'; - - send_messages.clear(); - - MPI_Isend(sendBuffer, bufferLen, MPI_CHAR, tmp_gazebo_id, 99, MPI_COMM_WORLD, &request[tmp_gazebo_id]); - } - } + //gather poses to from all processes + MPI_Allgatherv(sendBuffer, bufferLen[this->gazeboLocalID], MPI_CHAR, receiveBuffer, bufferLen, displs, MPI_CHAR, MPI_COMM_WORLD); // Added by zhangshuai 2019.04.03 for count time ----Begin #ifdef USE_COUNT_TIME gettimeofday(&tv, NULL); - tcpTime += (double)tv.tv_sec + (double)tv.tv_usec / 1.e6 - cur_time; + mpiTime += (double)tv.tv_sec + (double)tv.tv_usec / 1.e6 - cur_time; #endif // Added by zhangshuai 2019.04.03 for count time ----End @@ -998,40 +1016,31 @@ void World::Update() { if (this->gazeboLocalID != tmp_gazebo_id) { - MPI_Probe(tmp_gazebo_id, 99, MPI_COMM_WORLD, &status[tmp_gazebo_id]); - int size; - MPI_Get_count(&status[tmp_gazebo_id], MPI_CHAR, &size); - char *receiveBuffer = (char *)malloc(sizeof(char) * (size)); - MPI_Recv(receiveBuffer, size, MPI_CHAR, tmp_gazebo_id, 99, MPI_COMM_WORLD, &status[tmp_gazebo_id]); - - if (size > 0) + std::string receive_messages; + for (int j = displs[tmp_gazebo_id]; j < bufferLen[tmp_gazebo_id]; j++) { - std::string receive_messages; - for (int j = 0; j < size; j++) - { - receive_messages.push_back(receiveBuffer[j]); - } + receive_messages.push_back(receiveBuffer[j]); + } - msgs::Pose_V modelPoseListReceive; - modelPoseListReceive.ParseFromString(receive_messages); - receive_messages.clear(); + msgs::Pose_V modelPoseListReceive; + modelPoseListReceive.ParseFromString(receive_messages); + receive_messages.clear(); - //insert models according to name,pose,vel from process 0 - for (int i = 0; i < modelPoseListReceive.pose_size(); i++) - { - math::Pose tmp_pose; - std::string tmp_model_name = modelPoseListReceive.pose(i).name(); - // std::string tmp_model_name = this->distribution->GetGazeboIDPtr(1)->GetModelName(i); - tmp_pose.pos.x = modelPoseListReceive.pose(i).position().x(); - tmp_pose.pos.y = modelPoseListReceive.pose(i).position().y(); - tmp_pose.pos.z = modelPoseListReceive.pose(i).position().z(); - tmp_pose.rot.x = modelPoseListReceive.pose(i).orientation().x(); - tmp_pose.rot.y = modelPoseListReceive.pose(i).orientation().y(); - tmp_pose.rot.z = modelPoseListReceive.pose(i).orientation().z(); - tmp_pose.rot.w = modelPoseListReceive.pose(i).orientation().w(); + //insert models according to name,pose,vel from process 0 + for (int i = 0; i < modelPoseListReceive.pose_size(); i++) + { + math::Pose tmp_pose; + std::string tmp_model_name = modelPoseListReceive.pose(i).name(); + // std::string tmp_model_name = this->distribution->GetGazeboIDPtr(1)->GetModelName(i); + tmp_pose.pos.x = modelPoseListReceive.pose(i).position().x(); + tmp_pose.pos.y = modelPoseListReceive.pose(i).position().y(); + tmp_pose.pos.z = modelPoseListReceive.pose(i).position().z(); + tmp_pose.rot.x = modelPoseListReceive.pose(i).orientation().x(); + tmp_pose.rot.y = modelPoseListReceive.pose(i).orientation().y(); + tmp_pose.rot.z = modelPoseListReceive.pose(i).orientation().z(); + tmp_pose.rot.w = modelPoseListReceive.pose(i).orientation().w(); - this->GetModel(tmp_model_name)->GetLink("canonical")->SetWorldPose(tmp_pose); - } + this->GetModel(tmp_model_name)->GetLink("canonical")->SetWorldPose(tmp_pose); } } } @@ -1039,14 +1048,14 @@ void World::Update() // Added by zhangshuai 2019.04.03 for count time ----Begin #ifdef USE_COUNT_TIME gettimeofday(&tv, NULL); - after_tcpTime += (double)tv.tv_sec + (double)tv.tv_usec / 1.e6 - cur_time; + after_mpiTime += (double)tv.tv_sec + (double)tv.tv_usec / 1.e6 - cur_time; #endif // Added by zhangshuai 2019.04.03 for count time ----End // MPI_Waitall(this->distribution->GetGazeboCount(), request, status); // MPI_Barrier(MPI_COMM_WORLD); } - MPI_Barrier(MPI_COMM_WORLD); + // MPI_Barrier(MPI_COMM_WORLD); // Added by zhangshuai 2019.04.23 for MPI communication ----End // Only update state information if logging data. @@ -1080,12 +1089,12 @@ void World::Update() std::cout << "===== average loggingTime:\t" << loggingTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl; std::cout << "===== average before_updatePhysicsTime:\t" << before_updatePhysicsTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl; std::cout << "===== average updatePhysicsTime:\t" << updatePhysicsTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl; - std::cout << "===== average before_tcpTime:\t" << before_tcpTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl; - std::cout << "===== average tcpTime:\t" << tcpTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl; - std::cout << "===== average after_tcpTime:\t" << after_tcpTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl; - double wholeTcpTime = before_tcpTime + tcpTime + after_tcpTime; - std::cout << "===== average wholeTcpTime:\t" << wholeTcpTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl; - double wholeAddTime = before_worldUpdateBeginTime + worldUpdateBeginTime + modelUpdateFuncTime + updateCollisionTime + loggingTime + before_updatePhysicsTime + updatePhysicsTime + before_tcpTime + tcpTime + after_tcpTime; + std::cout << "===== average before_mpiTime:\t" << before_mpiTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl; + std::cout << "===== average mpiTime:\t" << mpiTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl; + std::cout << "===== average after_mpiTime:\t" << after_mpiTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl; + double wholeMpiTime = before_mpiTime + mpiTime + after_mpiTime; + std::cout << "===== average wholeMpiTime:\t" << wholeMpiTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl; + double wholeAddTime = before_worldUpdateBeginTime + worldUpdateBeginTime + modelUpdateFuncTime + updateCollisionTime + loggingTime + before_updatePhysicsTime + updatePhysicsTime + before_mpiTime + mpiTime + after_mpiTime; std::cout << "===== average wholeAddTime:\t" << wholeAddTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl; std::cout << "===== average wholeUpdateTime:\t" << wholeUpdataTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl; } diff --git a/Gazebo_Distributed_MPI/gazebo/physics/World.hh b/Gazebo_Distributed_MPI/gazebo/physics/World.hh index fdf74fd..ae30e78 100644 --- a/Gazebo_Distributed_MPI/gazebo/physics/World.hh +++ b/Gazebo_Distributed_MPI/gazebo/physics/World.hh @@ -722,6 +722,10 @@ private: private: int gazeboLocalID; + /// \brief the number of MPI processes. // Added by zhangshuai 2019.05.29 +private: + int nproc; + /// \brief the . private: // Distribution_V distributions;