pxmlw6n2f/Gazebo_Distributed_TCP/gazebo/transport/ConnectionManager.cc

712 lines
19 KiB
C++

/*
* Copyright (C) 2012 Open Source Robotics Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifdef _WIN32
// Ensure that Winsock2.h is included before Windows.h, which can get
// pulled in by anybody (e.g., Boost).
#include <Winsock2.h>
#endif
#include <boost/bind.hpp>
#include "gazebo/msgs/msgs.hh"
#include "gazebo/common/Console.hh"
#include "gazebo/common/Events.hh"
#include "gazebo/transport/TopicManager.hh"
#include "gazebo/transport/ConnectionManager.hh"
#include "gazebo/gazebo_config.h"
using namespace gazebo;
using namespace transport;
/// TBB task to process nodes.
class TopicManagerProcessTask : public tbb::task
{
/// Implements the necessary execute function
public: tbb::task *execute()
{
TopicManager::Instance()->ProcessNodes();
return NULL;
}
};
/// TBB task to establish subscriber to publisher connection.
class TopicManagerConnectionTask : public tbb::task
{
/// \brief Constructor.
/// \param[in] _pub Publish message
public: TopicManagerConnectionTask(msgs::Publish _pub) : pub(_pub) {}
/// Implements the necessary execute function
public: tbb::task *execute()
{
TopicManager::Instance()->ConnectSubToPub(pub);
return NULL;
}
/// \brief Publish message
private: msgs::Publish pub;
};
//////////////////////////////////////////////////
ConnectionManager::ConnectionManager()
{
this->tmpIndex = 0;
this->initialized = false;
this->stop = false;
this->stopped = true;
this->serverConn = NULL;
this->eventConnections.push_back(
event::Events::ConnectStop(boost::bind(&ConnectionManager::Stop, this)));
}
//////////////////////////////////////////////////
ConnectionManager::~ConnectionManager()
{
this->eventConnections.clear();
delete this->serverConn;
this->serverConn = NULL;
this->Fini();
}
//////////////////////////////////////////////////
bool ConnectionManager::Init(const std::string &_masterHost,
unsigned int _masterPort,
uint32_t _timeoutIterations)
{
this->stop = false;
this->masterConn.reset(new Connection());
delete this->serverConn;
this->serverConn = new Connection();
// Create a new TCP server on a free port
this->serverConn->Listen(0,
boost::bind(&ConnectionManager::OnAccept, this, _1));
gzmsg << "Waiting for master." << std::endl;
uint32_t timeoutCount = 0;
uint32_t waitDurationMS = 1000;
while (!this->masterConn->Connect(_masterHost, _masterPort) &&
this->IsRunning() && timeoutCount < _timeoutIterations)
{
++timeoutCount;
if (timeoutCount < _timeoutIterations)
common::Time::MSleep(waitDurationMS);
}
if (timeoutCount >= _timeoutIterations)
{
gzerr << "Failed to connect to master in "
<< (timeoutCount * waitDurationMS) / 1000.0 << " seconds."
<< std::endl;
return false;
}
if (!this->IsRunning())
{
gzerr << "Connection Manager is not running" << std::endl;
return false;
}
std::string initData, namespacesData, publishersData;
try
{
this->masterConn->Read(initData);
this->masterConn->Read(namespacesData);
this->masterConn->Read(publishersData);
}
catch(...)
{
gzerr << "Unable to read from master" << std::endl;
return false;
}
msgs::Packet packet;
packet.ParseFromString(initData);
if (packet.type() == "version_init")
{
msgs::GzString msg;
msg.ParseFromString(packet.serialized_data());
if (msg.data() == std::string("gazebo ") + GAZEBO_VERSION)
{
// TODO: set some flag.. maybe start "serverConn" when initialized
gzmsg << "Connected to gazebo master @ "
<< this->masterConn->GetRemoteURI() << std::endl;
}
else
{
// TODO: MAke this a proper error
gzerr << "Conflicting gazebo versions" << std::endl;
}
}
else
gzerr << "Didn't receive an init from the master" << std::endl;
packet.ParseFromString(namespacesData);
if (packet.type() == "topic_namepaces_init")
{
msgs::GzString_V result;
result.ParseFromString(packet.serialized_data());
boost::mutex::scoped_lock lock(this->namespaceMutex);
for (int i = 0; i < result.data_size(); i++)
{
this->namespaces.push_back(std::string(result.data(i)));
}
this->namespaceCondition.notify_all();
}
else
gzerr << "Did not get topic_namespaces_init msg from master" << std::endl;
packet.ParseFromString(publishersData);
if (packet.type() == "publishers_init")
{
msgs::Publishers pubs;
pubs.ParseFromString(packet.serialized_data());
boost::recursive_mutex::scoped_lock lock(this->listMutex);
for (int i = 0; i < pubs.publisher_size(); i++)
{
const msgs::Publish &p = pubs.publisher(i);
this->publishers.push_back(p);
}
}
else
gzerr << "Did not get publishers_init msg from master" << std::endl;
this->masterConn->AsyncRead(
boost::bind(&ConnectionManager::OnMasterRead, this, _1));
this->initialized = true;
// Tell the user what address will be publicized to other nodes.
gzmsg << "Publicized address: "
<< this->masterConn->GetLocalHostname() << std::endl;
return true;
}
//////////////////////////////////////////////////
bool ConnectionManager::IsInitialized() const
{
return this->initialized;
}
//////////////////////////////////////////////////
void ConnectionManager::Fini()
{
if (!this->initialized)
return;
this->Stop();
if (this->masterConn)
{
this->masterConn->ProcessWriteQueue();
this->masterConn->Shutdown();
this->masterConn.reset();
}
if (this->serverConn)
{
this->serverConn->ProcessWriteQueue();
this->serverConn->Shutdown();
delete this->serverConn;
this->serverConn = NULL;
}
this->eventConnections.clear();
this->connections.clear();
this->publishers.clear();
this->namespaces.clear();
this->masterMessages.clear();
this->initialized = false;
}
//////////////////////////////////////////////////
void ConnectionManager::Stop()
{
this->stop = true;
this->updateCondition.notify_all();
if (this->initialized)
while (this->stopped == false)
common::Time::MSleep(100);
}
//////////////////////////////////////////////////
void ConnectionManager::RunUpdate()
{
std::list<ConnectionPtr>::iterator iter;
std::list<ConnectionPtr>::iterator endIter;
unsigned int msize = 0;
{
boost::recursive_mutex::scoped_lock lock(this->masterMessagesMutex);
msize = this->masterMessages.size();
}
while (msize > 0)
{
this->ProcessMessage(this->masterMessages.front());
{
boost::recursive_mutex::scoped_lock lock(this->masterMessagesMutex);
this->masterMessages.pop_front();
msize = this->masterMessages.size();
}
}
if (this->masterConn)
this->masterConn->ProcessWriteQueue();
// Use TBB to process nodes. Need more testing to see if this makes
// a difference.
// TopicManagerProcessTask *task = new(tbb::task::allocate_root())
// TopicManagerProcessTask();
// tbb::task::enqueue(*task);
TopicManager::Instance()->ProcessNodes();
{
boost::recursive_mutex::scoped_lock lock(this->connectionMutex);
iter = this->connections.begin();
endIter = this->connections.end();
}
while (iter != endIter)
{
if ((*iter)->IsOpen())
{
(*iter)->ProcessWriteQueue();
++iter;
}
else
{
boost::recursive_mutex::scoped_lock lock(this->connectionMutex);
iter = this->connections.erase(iter);
}
}
}
//////////////////////////////////////////////////
void ConnectionManager::Run()
{
boost::mutex::scoped_lock lock(this->updateMutex);
this->stopped = false;
while (!this->stop && this->masterConn && this->masterConn->IsOpen())
{
this->RunUpdate();
this->updateCondition.timed_wait(lock,
boost::posix_time::milliseconds(100));
}
this->RunUpdate();
this->stopped = true;
this->masterConn->Shutdown();
}
//////////////////////////////////////////////////
bool ConnectionManager::IsRunning() const
{
return !this->stop;
}
//////////////////////////////////////////////////
void ConnectionManager::OnMasterRead(const std::string &_data)
{
if (this->masterConn && this->masterConn->IsOpen())
this->masterConn->AsyncRead(
boost::bind(&ConnectionManager::OnMasterRead, this, _1));
if (!_data.empty())
{
boost::recursive_mutex::scoped_lock lock(this->masterMessagesMutex);
this->masterMessages.push_back(std::string(_data));
}
else
gzerr << "ConnectionManager::OnMasterRead empty data\n";
// Tell the ourself that we need an update
this->TriggerUpdate();
}
/////////////////////////////////////////////////
void ConnectionManager::ProcessMessage(const std::string &_data)
{
msgs::Packet packet;
packet.ParseFromString(_data);
if (packet.type() == "publisher_add")
{
msgs::Publish result;
result.ParseFromString(packet.serialized_data());
this->publishers.push_back(result);
}
else if (packet.type() == "publisher_del")
{
msgs::Publish result;
result.ParseFromString(packet.serialized_data());
std::list<msgs::Publish>::iterator iter = this->publishers.begin();
while (iter != this->publishers.end())
{
if ((*iter).topic() == result.topic() &&
(*iter).host() == result.host() &&
(*iter).port() == result.port())
iter = this->publishers.erase(iter);
else
++iter;
}
}
else if (packet.type() == "topic_namespace_add")
{
msgs::GzString result;
result.ParseFromString(packet.serialized_data());
boost::mutex::scoped_lock lock(this->namespaceMutex);
this->namespaces.push_back(std::string(result.data()));
this->namespaceCondition.notify_all();
}
// FIXME "publisher_update" is currently not used and have been separated out
// into "publisher_subscribe" and "publisher_advertise". This is implemented
// as a workaround to address transport blocking issue when gzclient connects
// to gzserver, see issue #714. "publisher_advertise", intended
// for gzserver when gzclient connects, is parallelized and made non-blocking.
else if (packet.type() == "publisher_update")
{
msgs::Publish pub;
pub.ParseFromString(packet.serialized_data());
if (pub.host() != this->serverConn->GetLocalAddress() ||
pub.port() != this->serverConn->GetLocalPort())
{
TopicManager::Instance()->ConnectSubToPub(pub);
}
}
else if (packet.type() == "publisher_advertise")
{
msgs::Publish pub;
pub.ParseFromString(packet.serialized_data());
if (pub.host() != this->serverConn->GetLocalAddress() ||
pub.port() != this->serverConn->GetLocalPort())
{
TopicManagerConnectionTask *task = new(tbb::task::allocate_root())
TopicManagerConnectionTask(pub);
tbb::task::enqueue(*task);
}
}
// publisher_subscribe. This occurs when we try to subscribe to a topic, and
// the master informs us of a remote host that is publishing on our
// requested topic
else if (packet.type() == "publisher_subscribe")
{
msgs::Publish pub;
pub.ParseFromString(packet.serialized_data());
if (pub.host() != this->serverConn->GetLocalAddress() ||
pub.port() != this->serverConn->GetLocalPort())
{
TopicManager::Instance()->ConnectSubToPub(pub);
}
}
else if (packet.type() == "unsubscribe")
{
msgs::Subscribe sub;
sub.ParseFromString(packet.serialized_data());
// Disconnect a local publisher from a remote subscriber
TopicManager::Instance()->DisconnectPubFromSub(sub.topic(),
sub.host(), sub.port());
}
else if (packet.type() == "unadvertise")
{
msgs::Publish pub;
pub.ParseFromString(packet.serialized_data());
// Disconnection all local subscribers from a remote publisher
TopicManager::Instance()->DisconnectSubFromPub(pub.topic(),
pub.host(), pub.port());
}
else
{
gzerr << "ConnectionManager::OnMasterRead unknown type["
<< packet.type() << "][" << packet.serialized_data()
<< "] Data[" << _data << "]\n";
}
}
//////////////////////////////////////////////////
void ConnectionManager::OnAccept(ConnectionPtr _newConnection)
{
_newConnection->AsyncRead(
boost::bind(&ConnectionManager::OnRead, this, _newConnection, _1));
// Add the connection to the list of connections
boost::recursive_mutex::scoped_lock lock(this->connectionMutex);
this->connections.push_back(_newConnection);
}
//////////////////////////////////////////////////
void ConnectionManager::OnRead(ConnectionPtr _connection,
const std::string &_data)
{
if (_data.empty())
{
gzerr << "Data was empty, try again\n";
_connection->AsyncRead(
boost::bind(&ConnectionManager::OnRead, this, _connection, _1));
return;
}
msgs::Packet packet;
packet.ParseFromString(_data);
// If we have an incoming (remote) subscription
if (packet.type() == "sub")
{
msgs::Subscribe sub;
sub.ParseFromString(packet.serialized_data());
// Create a transport link for the publisher to the remote subscriber
// via the connection
SubscriptionTransportPtr subLink(new SubscriptionTransport());
subLink->Init(_connection, sub.latching());
// Connect the publisher to this transport mechanism
TopicManager::Instance()->ConnectPubToSub(sub.topic(), subLink);
}
else
gzerr << "Error est here\n";
}
//////////////////////////////////////////////////
void ConnectionManager::Advertise(const std::string &topic,
const std::string &msgType)
{
if (!this->initialized)
return;
msgs::Publish msg;
msg.set_topic(topic);
msg.set_msg_type(msgType);
msg.set_host(this->serverConn->GetLocalAddress());
msg.set_port(this->serverConn->GetLocalPort());
this->masterConn->EnqueueMsg(msgs::Package("advertise", msg));
}
//////////////////////////////////////////////////
void ConnectionManager::RegisterTopicNamespace(const std::string &_name)
{
if (!this->initialized)
return;
msgs::GzString msg;
msg.set_data(_name);
this->masterConn->EnqueueMsg(msgs::Package("register_topic_namespace", msg));
}
//////////////////////////////////////////////////
void ConnectionManager::Unadvertise(const std::string &_topic)
{
msgs::Publish msg;
msg.set_topic(_topic);
msg.set_msg_type("");
if (this->serverConn)
{
msg.set_host(this->serverConn->GetLocalAddress());
msg.set_port(this->serverConn->GetLocalPort());
}
if (this->masterConn)
{
this->masterConn->EnqueueMsg(msgs::Package("unadvertise", msg), true);
}
}
//////////////////////////////////////////////////
void ConnectionManager::GetAllPublishers(std::list<msgs::Publish> &_publishers)
{
_publishers.clear();
std::list<msgs::Publish>::iterator iter;
boost::recursive_mutex::scoped_lock lock(this->listMutex);
for (iter = this->publishers.begin(); iter != this->publishers.end(); ++iter)
_publishers.push_back(*iter);
}
//////////////////////////////////////////////////
void ConnectionManager::GetTopicNamespaces(std::list<std::string> &_namespaces)
{
if (!this->initialized)
{
gzerr << "Not initialized\n";
return;
}
_namespaces.clear();
boost::mutex::scoped_lock lock(this->namespaceMutex);
for (std::list<std::string>::iterator iter = this->namespaces.begin();
iter != this->namespaces.end(); ++iter)
{
_namespaces.push_back(*iter);
}
}
//////////////////////////////////////////////////
void ConnectionManager::Unsubscribe(const msgs::Subscribe &_sub)
{
// Inform the master that we want to unsubscribe from a topic.
this->masterConn->EnqueueMsg(msgs::Package("unsubscribe", _sub), true);
}
//////////////////////////////////////////////////
void ConnectionManager::Unsubscribe(const std::string &_topic,
const std::string &_msgType)
{
if (this->serverConn)
{
msgs::Subscribe msg;
msg.set_topic(_topic);
msg.set_msg_type(_msgType);
msg.set_host(this->serverConn->GetLocalAddress());
msg.set_port(this->serverConn->GetLocalPort());
// Inform the master that we want to unsubscribe from a topic.
this->masterConn->EnqueueMsg(msgs::Package("unsubscribe", msg), true);
}
}
//////////////////////////////////////////////////
void ConnectionManager::Subscribe(const std::string &_topic,
const std::string &_msgType,
bool _latching)
{
if (!this->initialized)
{
gzerr << "ConnectionManager is not initialized\n";
return;
}
// TODO:
// Find a current connection on the topic
// ConnectionPtr conn = this->FindConnection(topic);
// If the connection to a remote publisher does not exist, then we need
// to establish a connection.
// if (!conn)
{
msgs::Subscribe msg;
msg.set_topic(_topic);
msg.set_msg_type(_msgType);
msg.set_host(this->serverConn->GetLocalAddress());
msg.set_port(this->serverConn->GetLocalPort());
msg.set_latching(_latching);
// Inform the master that we want to subscribe to a topic.
// This will result in Connection::OnMasterRead getting called with a
// packet type of "publisher_update"
this->masterConn->EnqueueMsg(msgs::Package("subscribe", msg));
}
}
//////////////////////////////////////////////////
ConnectionPtr ConnectionManager::ConnectToRemoteHost(const std::string &_host,
unsigned int _port)
{
ConnectionPtr conn;
if (!this->initialized)
return conn;
// Sharing connections is broken
// conn = this->FindConnection(_host, _port);
// if (!conn)
{
// Connect to the remote host
conn.reset(new Connection());
if (conn->Connect(_host, _port))
{
boost::recursive_mutex::scoped_lock lock(this->connectionMutex);
this->connections.push_back(conn);
}
else
{
conn.reset();
return ConnectionPtr();
}
}
return conn;
}
//////////////////////////////////////////////////
void ConnectionManager::RemoveConnection(ConnectionPtr &_conn)
{
std::list<ConnectionPtr>::iterator iter;
boost::recursive_mutex::scoped_lock lock(this->connectionMutex);
iter = this->connections.begin();
while (iter != this->connections.end())
{
if ((*iter) == _conn)
{
iter = this->connections.erase(iter);
}
else
++iter;
}
}
//////////////////////////////////////////////////
ConnectionPtr ConnectionManager::FindConnection(const std::string &_host,
unsigned int _port)
{
ConnectionPtr conn;
std::list<ConnectionPtr>::iterator iter;
boost::recursive_mutex::scoped_lock lock(this->connectionMutex);
// Check to see if we are already connected to the remote publisher
for (iter = this->connections.begin();
iter != this->connections.end(); ++iter)
{
if ((*iter)->IsOpen() && (*iter)->GetRemoteAddress() == _host &&
(*iter)->GetRemotePort() == _port)
conn = *iter;
}
return conn;
}
//////////////////////////////////////////////////
void ConnectionManager::TriggerUpdate()
{
this->updateCondition.notify_all();
}