diff --git a/Gazebo_Distributed_MPI/gazebo/physics/CMakeLists.txt b/Gazebo_Distributed_MPI/gazebo/physics/CMakeLists.txt index a8d5e85..2f5157f 100644 --- a/Gazebo_Distributed_MPI/gazebo/physics/CMakeLists.txt +++ b/Gazebo_Distributed_MPI/gazebo/physics/CMakeLists.txt @@ -91,7 +91,7 @@ set (sources ${sources} UserCmdManager.cc World.cc WorldState.cc - TcpCommunication.cc #zhangshuai 2019.03.19 + # TcpCommunication.cc #zhangshuai 2019.03.19 Distribution.cc #zhangshuai 2019.03.28 GazeboID.cc #zhangshuai 2019.03.28 ) @@ -147,7 +147,8 @@ set (headers UserCmdManager.hh World.hh WorldState.hh - TcpCommunication.hh #zhangshuai 2019.03.19 + # TcpCommunication.hh #zhangshuai 2019.03.19 + MpiCommunication.hh #zhangshuai 2019.06.10 Distribution.hh #zhangshuai 2019.03.28 GazeboID.hh #zhangshuai 2019.03.28 ) diff --git a/Gazebo_Distributed_MPI/gazebo/physics/MpiCommunication.hh b/Gazebo_Distributed_MPI/gazebo/physics/MpiCommunication.hh new file mode 100644 index 0000000..ac14cd3 --- /dev/null +++ b/Gazebo_Distributed_MPI/gazebo/physics/MpiCommunication.hh @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2019 AIRC 01 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * +*/ +/* Desc: A MPI communication to send and receive Pose informations of robot entities + * Author: Zhang Shuai + */ + +#ifndef _MPICOMMUNICATION_HH_ +#define _MPICOMMUNICATION_HH_ + +#include +#include + +#include + +#include "gazebo/util/system.hh" +#include "gazebo/math/Pose.hh" + +#define QUEUE 1 +#define BUF_SIZE 8192 + +namespace gazebo +{ +namespace physics +{ + +/// \brief Structure of Pose information communicated with Tcp +struct CommunicationData +{ + char model_name[16]; + math::Pose model_pose; +}; + +} // namespace physics +} // namespace gazebo +#endif diff --git a/Gazebo_Distributed_MPI/gazebo/physics/World.cc b/Gazebo_Distributed_MPI/gazebo/physics/World.cc index 0fd780f..4b61b19 100644 --- a/Gazebo_Distributed_MPI/gazebo/physics/World.cc +++ b/Gazebo_Distributed_MPI/gazebo/physics/World.cc @@ -291,9 +291,31 @@ void World::Load(sdf::ElementPtr _sdf) this->distribution = distribution_tmp; this->distribution->Load(distributionElem); - std::cout << "================= gazebo_counts: " << this->distribution->GetGazeboCount() << " =================" << std::endl; + + int tmp_gazebo_count = this->distribution->GetGazeboCount(); + unsigned int tmp_local_model_count = this->distribution->GetGazeboIDPtr(this->gazeboLocalID)->GetModelCount(); + + std::cout << "================= gazebo_counts: " << tmp_gazebo_count << " =================" << std::endl; std::cout << "================= gazebo_ID: " << this->distribution->GetGazeboIDPtr(this->gazeboLocalID)->GetGazeboID() << " =================" << std::endl; - std::cout << "================= model_counts: " << this->distribution->GetGazeboIDPtr(this->gazeboLocalID)->GetModelCount() << " =================" << std::endl; + std::cout << "================= model_counts: " << tmp_local_model_count << " =================" << std::endl; + + // for MPI_Allgatherv + this->sendBufferLen = sizeof(CommunicationData) * tmp_local_model_count; + this->receiveBufferLen = 0; + this->modelCounts = 0; + this->bufferLen = new int[tmp_gazebo_count]; + this->displs = new int[tmp_gazebo_count]; + for (int tmp_gazebo_id = 0; tmp_gazebo_id < tmp_gazebo_count; tmp_gazebo_id++) + { + this->bufferLen[tmp_gazebo_id] = sizeof(CommunicationData) * this->distribution->GetGazeboIDPtr(tmp_gazebo_id)->GetModelCount(); + this->modelCounts = this->modelCounts + this->distribution->GetGazeboIDPtr(tmp_gazebo_id)->GetModelCount(); + + this->receiveBufferLen = this->receiveBufferLen + this->bufferLen[tmp_gazebo_id]; + + this->displs[tmp_gazebo_id] = 0; + for (int k = 0; k < tmp_gazebo_id; k++) + this->displs[tmp_gazebo_id] = this->displs[tmp_gazebo_id] + this->bufferLen[k]; + } } } //Added by zhangshuai based on zenglei 2019.03.19 ----End @@ -913,73 +935,33 @@ void World::Update() #endif // Added by zhangshuai 2019.04.03 for count time ----End - // MPI_Status *status = (MPI_Status *)malloc(sizeof(MPI_Status) * (this->distribution->GetGazeboCount())); - // MPI_Request *request = (MPI_Request *)malloc(sizeof(MPI_Request) * (this->distribution->GetGazeboCount())); - + std::cout << "===== begin iterations:\t" << this->dataPtr->iterations << std::endl; int tmp_gazebo_count = this->distribution->GetGazeboCount(); char *sendBuffer; - char *receiveBuffer; - int *bufferLen = new int[tmp_gazebo_count]; - int *displs = new int[tmp_gazebo_count]; + char *receiveBuffer; - int receiveBufferLen = 0; - - for (int tmp_gazebo_id = 0; tmp_gazebo_id < tmp_gazebo_count; tmp_gazebo_id++) + /// make the pose data ready + unsigned int tmp_local_model_count = this->distribution->GetGazeboIDPtr(this->gazeboLocalID)->GetModelCount(); + CommunicationData *sendBufferData = new CommunicationData[tmp_local_model_count]; + for (unsigned int i = 0; i < tmp_local_model_count; i++) { - msgs::Pose_V modelPoseListSend; - for (unsigned int i = 0; i < this->distribution->GetGazeboIDPtr(tmp_gazebo_id)->GetModelCount(); i++) - { - msgs::Pose *modelPose; - modelPose = modelPoseListSend.add_pose(); + std::string tmp_model_name = this->distribution->GetGazeboIDPtr(this->gazeboLocalID)->GetModelName(i); + unsigned int j = 0; + for (j = 0; j < tmp_model_name.length(); j++) + sendBufferData[i].model_name[j] = tmp_model_name[j]; + sendBufferData[i].model_name[j] = '\0'; + sendBufferData[i].model_pose = this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose(); - std::string tmp_model_name = this->distribution->GetGazeboIDPtr(this->gazeboLocalID)->GetModelName(i); - - modelPose->set_name(tmp_model_name); - // if (this->gazeboLocalID == tmp_gazebo_id) - // { - modelPose->set_id(this->GetModel(tmp_model_name)->GetLink("canonical")->GetId()); - modelPose->mutable_position()->set_x(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().pos.x); - modelPose->mutable_position()->set_y(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().pos.y); - modelPose->mutable_position()->set_z(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().pos.z); - modelPose->mutable_orientation()->set_x(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().rot.x); - modelPose->mutable_orientation()->set_y(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().rot.y); - modelPose->mutable_orientation()->set_z(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().rot.z); - modelPose->mutable_orientation()->set_w(this->GetModel(tmp_model_name)->GetLink("canonical")->GetWorldPose().rot.w); - // } - - tmp_model_name.clear(); - } - - std::string send_messages = ""; - if (modelPoseListSend.pose_size() > 0) - { - modelPoseListSend.SerializeToString(&send_messages); - modelPoseListSend.clear_pose(); - } - - bufferLen[tmp_gazebo_id] = send_messages.size(); - receiveBufferLen = receiveBufferLen + bufferLen[tmp_gazebo_id]; - - displs[tmp_gazebo_id] = 0; - for (int k = 0; k < tmp_gazebo_id; k++) - displs[tmp_gazebo_id] = displs[tmp_gazebo_id] + bufferLen[k]; - - if (this->gazeboLocalID == tmp_gazebo_id) - { - sendBuffer = new char[bufferLen[tmp_gazebo_id] + 1]; - for (int i = 0; i < bufferLen[tmp_gazebo_id]; i++) - { - sendBuffer[i] = send_messages[i]; - } - sendBuffer[bufferLen[tmp_gazebo_id]] = '\0'; - } - - send_messages.clear(); + // std::cout << "======fuzhi:" << i << std::endl; + // std::cout << tmp_model_name << std::endl; + tmp_model_name.clear(); } - receiveBuffer = new char[receiveBufferLen + 1]; - receiveBuffer[receiveBufferLen] = '\0'; + sendBuffer = (char *)sendBufferData; + + CommunicationData *receiveBufferData = new CommunicationData[this->modelCounts]; + receiveBuffer = (char *)receiveBufferData; // Added by zhangshuai 2019.04.03 for count time ----Begin #ifdef USE_COUNT_TIME @@ -996,7 +978,9 @@ void World::Update() // Added by zhangshuai 2019.04.03 for count time ----End //gather poses to from all processes - MPI_Allgatherv(sendBuffer, bufferLen[this->gazeboLocalID], MPI_CHAR, receiveBuffer, bufferLen, displs, MPI_CHAR, MPI_COMM_WORLD); + std::cout << "======begin allgatherv:" << std::endl; + MPI_Allgatherv(sendBuffer, this->sendBufferLen, MPI_CHAR, receiveBuffer, bufferLen, displs, MPI_CHAR, MPI_COMM_WORLD); + MPI_Barrier(MPI_COMM_WORLD); // Added by zhangshuai 2019.04.03 for count time ----Begin #ifdef USE_COUNT_TIME @@ -1013,42 +997,28 @@ void World::Update() // Added by zhangshuai 2019.04.03 for count time ----End //receive and get information from other processes + std::cout << "======begin set pose:" << std::endl; for (int tmp_gazebo_id = 0; tmp_gazebo_id < tmp_gazebo_count; tmp_gazebo_id++) { if (this->gazeboLocalID != tmp_gazebo_id) { - std::string receive_messages = ""; - for (int j = displs[tmp_gazebo_id]; j < (displs[tmp_gazebo_id] + bufferLen[tmp_gazebo_id]); j++) + unsigned int tmp_model_count = this->distribution->GetGazeboIDPtr(tmp_gazebo_id)->GetModelCount(); + CommunicationData *tmpBufferData = new CommunicationData[tmp_model_count]; + tmpBufferData = (CommunicationData *)(receiveBuffer + displs[tmp_gazebo_id]); + for (unsigned int i = 0; i < this->distribution->GetGazeboIDPtr(tmp_gazebo_id)->GetModelCount(); i++) { - receive_messages.push_back(receiveBuffer[j]); - } - - msgs::Pose_V modelPoseListReceive; - modelPoseListReceive.ParseFromString(receive_messages); - receive_messages.clear(); - - //insert models according to name,pose,vel from process 0 - for (int i = 0; i < modelPoseListReceive.pose_size(); i++) - { - math::Pose tmp_pose; - std::string tmp_model_name = modelPoseListReceive.pose(i).name(); - tmp_pose.pos.x = modelPoseListReceive.pose(i).position().x(); - tmp_pose.pos.y = modelPoseListReceive.pose(i).position().y(); - tmp_pose.pos.z = modelPoseListReceive.pose(i).position().z(); - tmp_pose.rot.x = modelPoseListReceive.pose(i).orientation().x(); - tmp_pose.rot.y = modelPoseListReceive.pose(i).orientation().y(); - tmp_pose.rot.z = modelPoseListReceive.pose(i).orientation().z(); - tmp_pose.rot.w = modelPoseListReceive.pose(i).orientation().w(); - - this->GetModel(tmp_model_name)->GetLink("canonical")->SetWorldPose(tmp_pose); + std::string tmp_model_name = tmpBufferData[i].model_name; + this->GetModel(tmp_model_name)->GetLink("canonical")->SetWorldPose(tmpBufferData[i].model_pose); + tmp_model_name.clear(); } + delete[] tmpBufferData; } } delete[] sendBuffer; delete[] receiveBuffer; - delete[] bufferLen; - delete[] displs; + delete[] sendBufferData; + delete[] receiveBufferData; // Added by zhangshuai 2019.04.03 for count time ----Begin #ifdef USE_COUNT_TIME @@ -1082,7 +1052,8 @@ void World::Update() wholeUpdataTime += (double)tv.tv_sec + (double)tv.tv_usec / 1.e6 - start_time; #endif - // std::cout << "===== iterations:\t" << this->dataPtr->iterations << std::endl; +#ifdef USE_COUNT_TIME + // std::cout << "===== end iterations:\t" << this->dataPtr->iterations << std::endl; if (this->dataPtr->iterations == 100000) { if (this->gazeboLocalID == 0) @@ -1104,6 +1075,7 @@ void World::Update() std::cout << "===== average wholeUpdateTime:\t" << wholeUpdataTime * 1000.0 / this->dataPtr->iterations << "\tms =====" << std::endl; } } +#endif } ////////////////////////////////////////////////// @@ -1221,6 +1193,11 @@ void World::Fini() delete this->dataPtr->thread; this->dataPtr->thread = nullptr; } + + //Added by zhangshuai 2019.06.10 ----Begin + delete[] this->bufferLen; + delete[] this->displs; + //Added by zhangshuai 2019.06.10 ----End } ////////////////////////////////////////////////// diff --git a/Gazebo_Distributed_MPI/gazebo/physics/World.hh b/Gazebo_Distributed_MPI/gazebo/physics/World.hh index ae30e78..ef300e2 100644 --- a/Gazebo_Distributed_MPI/gazebo/physics/World.hh +++ b/Gazebo_Distributed_MPI/gazebo/physics/World.hh @@ -49,7 +49,7 @@ // Added by zhangshuai based on zenglei 2019.03.19 ----Begin /// add for Tcp communication -#include "gazebo/physics/TcpCommunication.hh" +#include "gazebo/physics/MpiCommunication.hh" // Added by zhangshuai based on zenglei 2019.03.19 ----End // Added by zhangshuai 2019.04.01 ----Begin @@ -740,6 +740,42 @@ public: public: int GetFlag(); // Added by zhangshuai 2019.04.01 ----End + + // Added by zhangshuai 2019.06.10 ----Begin + /// \brief the . +private: + int receiveBufferLen; + + /// \brief the . +private: + int sendBufferLen; + + /// \brief the . +private: + int *bufferLen; + + /// \brief the . +private: + int *displs; + + /// \brief the . +private: + int modelCounts; + + // /// \brief the . + // private: + // // Distribution_V distributions; + // std::vector distribution; + + // public: + // DistributionPtr GetDistribution(); + + // public: + // int GetGazeboLocalID(); + + // public: + // int GetFlag(); + // Added by zhangshuai 2019.06.10 ----End }; /// \} } // namespace physics diff --git a/Gazebo_Distributed_MPI/mpi_run/MPI_Gazebo_launch.sh b/Gazebo_Distributed_MPI/mpi_run/MPI_Gazebo_launch.sh index b94a9dd..54f4566 100755 --- a/Gazebo_Distributed_MPI/mpi_run/MPI_Gazebo_launch.sh +++ b/Gazebo_Distributed_MPI/mpi_run/MPI_Gazebo_launch.sh @@ -70,7 +70,7 @@ do echo "${hosts[$i]}:roslaunch_good" done -sleep 20s +sleep 50s #登入各节点开始仿真 for((i=0;i