* Make *-md5sum subscribers decay to real types when they are available (either a 2nd concrete subscriber in the same node or once a publisher connects with a non-* md5sum)

* Fix bug where multiple subscribers on the same topic with different md5sums would silently succeed but not work in some cases
 * Fix UDPROS to fail the connection if the md5sums don't match (do the same validation TCPROS does)
 * Fix possible infinite loop in TransportUDP
 * Remove subscriber-side disconnect on publisherUpdate b/c it does not match rospy's behavior
 * Fix sending of header errors to not disconnect before the error is sent
This commit is contained in:
Josh Faust 2010-04-16 19:58:15 +00:00
parent 5b0d0bba36
commit 63cdef698c
21 changed files with 627 additions and 101 deletions

View File

@ -93,6 +93,11 @@ public:
*/
bool isDropped();
/**
* \brief Returns true if we're currently sending a header error (and will be automatically dropped when it's finished)
*/
bool isSendingHeaderError() { return sending_header_error_; }
/**
* \brief Send a header error message, of the form "error=<message>". Drops the connection once the data has written successfully (or fails to write)
* \param error_message The error message
@ -143,6 +148,7 @@ public:
* \brief Add a callback to be called when this connection has dropped
*/
boost::signals::connection addDropListener(const DropFunc& slot);
void removeDropListener(const boost::signals::connection& c);
/**
* \brief Set the header receipt callback

View File

@ -56,6 +56,17 @@ public:
{}
};
/**
* \brief Thrown when a second (third,...) subscription is attempted with conflicting
* arguments.
*/
class ConflictingSubscriptionException : public ros::Exception
{
public:
ConflictingSubscriptionException(const std::string& msg)
: Exception(msg)
{}
};
} // namespace ros

View File

@ -144,6 +144,8 @@ public:
void publish(SerializedMessage& m);
void processPublishQueue();
bool validateHeader(const Header& h, std::string& error_msg);
private:
void dropAllConnections();

View File

@ -83,6 +83,8 @@ public:
virtual std::string getTransportType() = 0;
virtual void drop() = 0;
const std::string& getMD5Sum();
protected:
SubscriptionWPtr parent_;
unsigned int connection_id_;
@ -95,6 +97,7 @@ protected:
bool latched_;
std::string caller_id_;
Header header_;
std::string md5sum_;
};
} // namespace ros

View File

@ -33,6 +33,7 @@
#include <boost/thread/mutex.hpp>
#include <boost/shared_array.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/signals/connection.hpp>
#include <queue>
@ -78,6 +79,7 @@ private:
ConnectionPtr connection_;
ServicePublicationWPtr parent_;
bool persistent_;
boost::signals::connection dropped_conn_;
};
typedef boost::shared_ptr<ServiceClientLink> ServiceClientLinkPtr;

View File

@ -180,12 +180,16 @@ public:
void getPublishTypes(bool& ser, bool& nocopy, const std::type_info& ti);
void headerReceived(const PublisherLinkPtr& link, const Header& h);
private:
Subscription(const Subscription &); // not copyable
Subscription &operator =(const Subscription &); // nor assignable
void dropAllConnections();
void addPublisherLink(const PublisherLinkPtr& link);
struct CallbackInfo
{
CallbackQueueInterface* callback_queue_;
@ -200,6 +204,7 @@ private:
typedef std::vector<CallbackInfoPtr> V_CallbackInfo;
std::string name_;
boost::mutex md5sum_mutex_;
std::string md5sum_;
std::string datatype_;
boost::mutex callbacks_mutex_;

View File

@ -29,6 +29,8 @@
#define ROSCPP_TRANSPORT_SUBSCRIBER_LINK_H
#include "subscriber_link.h"
#include <boost/signals/connection.hpp>
namespace ros
{
@ -62,6 +64,7 @@ private:
bool header_written_;
ConnectionPtr connection_;
boost::signals::connection dropped_conn_;
std::queue<SerializedMessage> outbox_;
boost::mutex outbox_mutex_;

View File

@ -87,9 +87,16 @@ void Connection::initialize(const TransportPtr& transport, bool is_server, const
boost::signals::connection Connection::addDropListener(const DropFunc& slot)
{
boost::recursive_mutex::scoped_lock lock(drop_mutex_);
return drop_signal_.connect(slot);
}
void Connection::removeDropListener(const boost::signals::connection& c)
{
boost::recursive_mutex::scoped_lock lock(drop_mutex_);
c.disconnect();
}
void Connection::onReadable(const TransportPtr& transport)
{
ROS_ASSERT(transport == transport_);

View File

@ -441,4 +441,54 @@ void Publication::processPublishQueue()
}
}
bool Publication::validateHeader(const Header& header, std::string& error_msg)
{
std::string md5sum, topic, client_callerid;
if (!header.getValue("md5sum", md5sum)
|| !header.getValue("topic", topic)
|| !header.getValue("callerid", client_callerid))
{
std::string msg("Header from subscriber did not have the required elements: md5sum, topic, callerid");
ROS_ERROR("%s", msg.c_str());
error_msg = msg;
return false;
}
// Check whether the topic has been deleted from
// advertised_topics through a call to unadvertise(), which could
// have happened while we were waiting for the subscriber to
// provide the md5sum.
if(isDropped())
{
std::string msg = std::string("received a tcpros connection for a nonexistent topic [") +
topic + std::string("] from [" + client_callerid +"].");
ROS_ERROR("%s", msg.c_str());
error_msg = msg;
return false;
}
if (getMD5Sum() != md5sum &&
(md5sum != std::string("*") && getMD5Sum() != std::string("*")))
{
std::string datatype;
header.getValue("type", datatype);
std::string msg = std::string("Client [") + client_callerid + std::string("] wants topic ") + topic +
std::string(" to have datatype/md5sum [") + datatype + "/" + md5sum +
std::string("], but our version has [") + getDataType() + "/" + getMD5Sum() +
std::string("]. Dropping connection.");
ROS_ERROR("%s", msg.c_str());
error_msg = msg;
return false;
}
return true;
}
} // namespace ros

View File

@ -71,6 +71,8 @@ bool PublisherLink::setHeader(const Header& header)
return false;
}
md5sum_ = md5sum;
if (!header.getValue("type", type))
{
ROS_ERROR("Publisher header did not have required element: type");
@ -89,6 +91,11 @@ bool PublisherLink::setHeader(const Header& header)
connection_id_ = ConnectionManager::instance()->getNewConnectionID();
header_ = header;
if (SubscriptionPtr parent = parent_.lock())
{
parent->headerReceived(shared_from_this(), header);
}
return true;
}
@ -97,5 +104,11 @@ const std::string& PublisherLink::getPublisherXMLRPCURI()
return publisher_xmlrpc_uri_;
}
const std::string& PublisherLink::getMD5Sum()
{
ROS_ASSERT(!md5sum_.empty());
return md5sum_;
}
} // namespace ros

View File

@ -53,16 +53,23 @@ ServiceClientLink::ServiceClientLink()
ServiceClientLink::~ServiceClientLink()
{
if (connection_)
if (connection_ && !connection_->isSendingHeaderError())
{
connection_->drop(Connection::Destructing);
if (connection_->isSendingHeaderError())
{
connection_->removeDropListener(dropped_conn_);
}
else
{
connection_->drop(Connection::Destructing);
}
}
}
bool ServiceClientLink::initialize(const ConnectionPtr& connection)
{
connection_ = connection;
connection_->addDropListener(boost::bind(&ServiceClientLink::onConnectionDropped, this, _1));
dropped_conn_ = connection_->addDropListener(boost::bind(&ServiceClientLink::onConnectionDropped, this, _1));
return true;
}

View File

@ -193,7 +193,7 @@ void Subscription::addLocalConnection(const PublicationPtr& pub)
pub_link->setPublisher(sub_link);
sub_link->setSubscriber(pub_link);
publisher_links_.push_back(pub_link);
addPublisherLink(pub_link);
pub->addSubscriberLink(sub_link);
}
@ -312,22 +312,6 @@ bool Subscription::pubUpdate(const V_string& new_pubs)
}
}
for (V_PublisherLink::iterator i = subtractions.begin();
i != subtractions.end(); ++i)
{
const PublisherLinkPtr& link = *i;
if (link->getPublisherXMLRPCURI() != XMLRPCManager::instance()->getServerURI())
{
ROSCPP_LOG_DEBUG("Disconnecting from publisher [%s] of topic [%s] at [%s]",
link->getCallerID().c_str(), name_.c_str(), link->getPublisherXMLRPCURI().c_str());
link->drop();
}
else
{
ROSCPP_LOG_DEBUG("Disconnect: skipping myself for topic [%s]", name_.c_str());
}
}
return retval;
}
@ -402,7 +386,10 @@ bool Subscription::negotiateConnection(const std::string& xmlrpc_uri)
ROSCPP_LOG_DEBUG("Failed to contact publisher [%s:%d] for topic [%s]",
peer_host.c_str(), peer_port, name_.c_str());
delete c;
return false;
if (udp_transport)
{
udp_transport->close();
}
}
ROSCPP_LOG_DEBUG("Began asynchronous xmlrpc connection to [%s:%d]", peer_host.c_str(), peer_port);
@ -421,6 +408,14 @@ bool Subscription::negotiateConnection(const std::string& xmlrpc_uri)
return true;
}
void closeTransport(const TransportUDPPtr& trans)
{
if (trans)
{
trans->close();
}
}
void Subscription::pendingConnectionDone(const PendingConnectionPtr& conn, XmlRpcValue& result)
{
boost::mutex::scoped_lock lock(shutdown_mutex_);
@ -448,24 +443,28 @@ void Subscription::pendingConnectionDone(const PendingConnectionPtr& conn, XmlRp
{
ROSCPP_LOG_DEBUG("Failed to contact publisher [%s:%d] for topic [%s]",
peer_host.c_str(), peer_port, name_.c_str());
return;
closeTransport(udp_transport);
return;
}
if (proto.size() == 0)
{
ROSCPP_LOG_DEBUG("Couldn't agree on any common protocols with [%s] for topic [%s]", xmlrpc_uri.c_str(), name_.c_str());
return;
closeTransport(udp_transport);
return;
}
if (proto.getType() != XmlRpcValue::TypeArray)
{
ROSCPP_LOG_DEBUG("Available protocol info returned from %s is not a list.", xmlrpc_uri.c_str());
return;
closeTransport(udp_transport);
return;
}
if (proto[0].getType() != XmlRpcValue::TypeString)
{
ROSCPP_LOG_DEBUG("Available protocol info list doesn't have a string as its first element.");
return;
closeTransport(udp_transport);
return;
}
std::string proto_name = proto[0];
@ -495,7 +494,7 @@ void Subscription::pendingConnectionDone(const PendingConnectionPtr& conn, XmlRp
ConnectionManager::instance()->addConnection(connection);
boost::mutex::scoped_lock lock(publisher_links_mutex_);
publisher_links_.push_back(pub_link);
addPublisherLink(pub_link);
ROSCPP_LOG_DEBUG("Connected to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
}
@ -515,6 +514,7 @@ void Subscription::pendingConnectionDone(const PendingConnectionPtr& conn, XmlRp
{
ROSCPP_LOG_DEBUG("publisher implements UDPROS, but the " \
"parameters aren't string,int,int,int,base64");
closeTransport(udp_transport);
return;
}
std::string pub_host = proto[1];
@ -529,23 +529,23 @@ void Subscription::pendingConnectionDone(const PendingConnectionPtr& conn, XmlRp
if (!h.parse(buffer, header_bytes.size(), err))
{
ROSCPP_LOG_DEBUG("Unable to parse UDPROS connection header: %s", err.c_str());
closeTransport(udp_transport);
return;
}
ROSCPP_LOG_DEBUG("Connecting via udpros to topic [%s] at host [%s:%d] connection id [%08x] max_datagram_size [%d]", name_.c_str(), pub_host.c_str(), pub_port, conn_id, max_datagram_size);
//TransportUDPPtr transport(new TransportUDP(&g_node->getPollSet()));
std::string error_msg;
if (h.getValue("error", error_msg))
{
ROSCPP_LOG_DEBUG("Received error message in header for connection to [%s]: [%s]", xmlrpc_uri.c_str(), error_msg.c_str());
closeTransport(udp_transport);
return;
}
//if (udp_transport->connect(pub_host, pub_port, conn_id))
// Using if(1) below causes a bizarre compiler error on some OS X
// machines. Creating a variable and testing it doesn't. Presumably
// it's related to the conditional compilation that goes on inside
// ROS_ERROR.
int foo=1;
if (foo)
TransportPublisherLinkPtr pub_link(new TransportPublisherLink(shared_from_this(), xmlrpc_uri, transport_hints_));
if (pub_link->setHeader(h))
{
ConnectionPtr connection(new Connection());
TransportPublisherLinkPtr pub_link(new TransportPublisherLink(shared_from_this(), xmlrpc_uri, transport_hints_));
connection->initialize(udp_transport, false, NULL);
connection->setHeader(h);
pub_link->initialize(connection);
@ -553,13 +553,15 @@ void Subscription::pendingConnectionDone(const PendingConnectionPtr& conn, XmlRp
ConnectionManager::instance()->addConnection(connection);
boost::mutex::scoped_lock lock(publisher_links_mutex_);
publisher_links_.push_back(pub_link);
addPublisherLink(pub_link);
ROSCPP_LOG_DEBUG("Connected to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
}
else
{
ROSCPP_LOG_DEBUG("Failed to connect to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
ROSCPP_LOG_DEBUG("Failed to connect to publisher of topic [%s] at [%s:%d]", name_.c_str(), pub_host.c_str(), pub_port);
closeTransport(udp_transport);
return;
}
}
else
@ -651,7 +653,18 @@ bool Subscription::addCallback(const SubscriptionCallbackHelperPtr& helper, cons
{
ROS_ASSERT(helper);
ROS_ASSERT(queue);
if (md5sum != this->md5sum())
// Decay to a real type as soon as we have a subscriber with a real type
{
boost::mutex::scoped_lock lock(md5sum_mutex_);
if (md5sum_ == "*" && md5sum != "*")
{
md5sum_ = md5sum;
}
}
if (md5sum != "*" && md5sum != this->md5sum())
{
return false;
}
@ -734,6 +747,20 @@ void Subscription::removeCallback(const SubscriptionCallbackHelperPtr& helper)
}
}
void Subscription::headerReceived(const PublisherLinkPtr& link, const Header& h)
{
boost::mutex::scoped_lock lock(md5sum_mutex_);
if (md5sum_ == "*")
{
md5sum_ = link->getMD5Sum();
}
}
void Subscription::addPublisherLink(const PublisherLinkPtr& link)
{
publisher_links_.push_back(link);
}
void Subscription::removePublisherLink(const PublisherLinkPtr& pub_link)
{
boost::mutex::scoped_lock lock(publisher_links_mutex_);
@ -780,6 +807,7 @@ const std::string Subscription::datatype()
const std::string Subscription::md5sum()
{
boost::mutex::scoped_lock lock(md5sum_mutex_);
return md5sum_;
}

View File

@ -192,10 +192,16 @@ PublicationPtr TopicManager::lookupPublication(const std::string& topic)
return lookupPublicationWithoutLock(topic);
}
bool md5sumsMatch(const std::string& lhs, const std::string& rhs)
{
return lhs == "*" || rhs == "*" || lhs == rhs;
}
bool TopicManager::addSubCallback(const SubscribeOptions& ops)
{
// spin through the subscriptions and see if we find a match. if so, use it.
bool found = false;
bool found_topic = false;
SubscriptionPtr sub;
@ -211,16 +217,23 @@ bool TopicManager::addSubCallback(const SubscribeOptions& ops)
sub = *s;
if (!sub->isDropped() && sub->getName() == ops.topic)
{
if (sub->md5sum() == ops.md5sum)
found_topic = true;
if (md5sumsMatch(ops.md5sum, sub->md5sum()))
{
found = true;
break;
}
break;
}
}
}
if (found)
if (found_topic && !found)
{
std::stringstream ss;
ss << "Tried to subscribe to a topic with the same name but different md5sum as a topic that was already subscribed [" << ops.datatype << "/" << ops.md5sum << " vs. " << sub->datatype() << "/" << sub->md5sum() << "]";
throw ConflictingSubscriptionException(ss.str());
}
else if (found)
{
if (!sub->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object))
{
@ -330,7 +343,7 @@ bool TopicManager::advertise(const AdvertiseOptions& ops, const SubscriberCallba
for (L_Subscription::iterator s = subscriptions_.begin();
s != subscriptions_.end() && !found; ++s)
{
if ((*s)->getName() == ops.topic && (*s)->md5sum() == ops.md5sum && !(*s)->isDropped())
if ((*s)->getName() == ops.topic && md5sumsMatch((*s)->md5sum(), ops.md5sum) && !(*s)->isDropped())
{
found = true;
sub = *s;
@ -451,6 +464,7 @@ bool TopicManager::registerSubscriber(const SubscriptionPtr& s, const string &da
bool self_subscribed = false;
PublicationPtr pub;
const std::string& sub_md5sum = s->md5sum();
// Figure out if we have a local publisher
{
boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
@ -459,7 +473,8 @@ bool TopicManager::registerSubscriber(const SubscriptionPtr& s, const string &da
for (; it != end; ++it)
{
pub = *it;
if (pub->getName() == s->getName() && pub->getDataType() == s->datatype() && !pub->isDropped())
const std::string& pub_md5sum = pub->getMD5Sum();
if (pub->getName() == s->getName() && md5sumsMatch(pub_md5sum, sub_md5sum) && !pub->isDropped())
{
self_subscribed = true;
break;
@ -589,6 +604,15 @@ bool TopicManager::requestTopic(const string &topic,
std::string host = proto[2];
int port = proto[3];
M_string m;
std::string error_msg;
if (!pub_ptr->validateHeader(h, error_msg))
{
ROSCPP_LOG_DEBUG("Error validating header from [%s:%d] for topic [%s]: %s", host.c_str(), port, topic.c_str(), error_msg.c_str());
return false;
}
int max_datagram_size = proto[4];
int conn_id = connection_manager_->getUDPServerTransport()->generateConnectionId();
TransportUDPPtr transport = connection_manager_->getUDPServerTransport()->createOutgoing(host, port, conn_id, max_datagram_size);
@ -600,7 +624,6 @@ bool TopicManager::requestTopic(const string &topic,
udpros_params[2] = connection_manager_->getUDPServerTransport()->getServerPort();
udpros_params[3] = conn_id;
udpros_params[4] = max_datagram_size;
M_string m;
m["topic"] = topic;
m["md5sum"] = pub_ptr->getMD5Sum();
m["type"] = pub_ptr->getDataType();
@ -811,7 +834,7 @@ size_t TopicManager::getNumPublishers(const std::string &topic)
for (L_Subscription::const_iterator t = subscriptions_.begin();
t != subscriptions_.end(); ++t)
{
if ((*t)->getName() == topic)
if (!(*t)->isDropped() && (*t)->getName() == topic)
{
return (*t)->getNumPublishers();
}

View File

@ -351,15 +351,16 @@ int32_t TransportUDP::read(uint8_t* buffer, uint32_t size)
}
if (num_bytes < 0)
{
if (errno != EAGAIN)
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
ROSCPP_LOG_DEBUG("readv() failed with error [%s]", strerror(errno));
close();
num_bytes = 0;
break;
}
else
{
num_bytes = 0;
ROSCPP_LOG_DEBUG("readv() failed with error [%s]", strerror(errno));
close();
break;
}
}
else if (num_bytes == 0)

View File

@ -57,19 +57,17 @@ TransportSubscriberLink::~TransportSubscriberLink()
bool TransportSubscriberLink::initialize(const ConnectionPtr& connection)
{
connection_ = connection;
connection_->addDropListener(boost::bind(&TransportSubscriberLink::onConnectionDropped, this, _1));
dropped_conn_ = connection_->addDropListener(boost::bind(&TransportSubscriberLink::onConnectionDropped, this, _1));
return true;
}
bool TransportSubscriberLink::handleHeader(const Header& header)
{
std::string md5sum, topic, client_callerid;
if (!header.getValue("md5sum", md5sum)
|| !header.getValue("topic", topic)
|| !header.getValue("callerid", client_callerid))
std::string topic;
if (!header.getValue("topic", topic))
{
std::string msg("Header from subscriber did not have the required elements: md5sum, topic, callerid");
std::string msg("Header from subscriber did not have the required element: topic");
ROS_ERROR("%s", msg.c_str());
connection_->sendHeaderError(msg);
@ -77,7 +75,9 @@ bool TransportSubscriberLink::handleHeader(const Header& header)
return false;
}
ROSCPP_LOG_DEBUG("Client [%s] wants topic [%s] with md5sum [%s]", client_callerid.c_str(), topic.c_str(), md5sum.c_str());
// This will get validated by validateHeader below
std::string client_callerid;
header.getValue("callerid", client_callerid);
PublicationPtr pt = TopicManager::instance()->lookupPublication(topic);
if (!pt)
{
@ -90,55 +90,30 @@ bool TransportSubscriberLink::handleHeader(const Header& header)
return false;
}
if (pt->getMD5Sum() != md5sum &&
(md5sum != std::string("*") && pt->getMD5Sum() != std::string("*")))
std::string error_msg;
if (!pt->validateHeader(header, error_msg))
{
std::string datatype;
header.getValue("type", datatype);
std::string msg = std::string("Client [") + client_callerid + std::string("] wants topic ") + topic +
std::string(" to have datatype/md5sum [") + datatype + "/" + md5sum +
std::string("], but our version has [") + pt->getDataType() + "/" + pt->getMD5Sum() +
std::string("]. Dropping connection.");
ROS_ERROR("%s", msg.c_str());
connection_->sendHeaderError(msg);
ROSCPP_LOG_DEBUG("%s", error_msg.c_str());
connection_->sendHeaderError(error_msg);
return false;
}
// Check whether the topic (pt here) has been deleted from
// advertised_topics through a call to unadvertise(), which could
// have happened while we were waiting for the subscriber to
// provide the md5sum.
if(pt->isDropped())
{
std::string msg = std::string("received a tcpros connection for a nonexistent topic [") +
topic + std::string("] from [" + connection_->getTransport()->getTransportInfo() + "] [" + client_callerid +"].");
destination_caller_id_ = client_callerid;
connection_id_ = ConnectionManager::instance()->getNewConnectionID();
topic_ = pt->getName();
parent_ = PublicationWPtr(pt);
ROS_ERROR("%s", msg.c_str());
connection_->sendHeaderError(msg);
// Send back a success, with info
M_string m;
m["type"] = pt->getDataType();
m["md5sum"] = pt->getMD5Sum();
m["message_definition"] = pt->getMessageDefinition();
m["callerid"] = this_node::getName();
m["latching"] = pt->isLatching() ? "1" : "0";
connection_->writeHeader(m, boost::bind(&TransportSubscriberLink::onHeaderWritten, this, _1));
return false;
}
else
{
destination_caller_id_ = client_callerid;
connection_id_ = ConnectionManager::instance()->getNewConnectionID();
topic_ = pt->getName();
parent_ = PublicationWPtr(pt);
// Send back a success, with info
M_string m;
m["type"] = pt->getDataType();
m["md5sum"] = pt->getMD5Sum();
m["message_definition"] = pt->getMessageDefinition();
m["callerid"] = this_node::getName();
m["latching"] = pt->isLatching() ? "1" : "0";
connection_->writeHeader(m, boost::bind(&TransportSubscriberLink::onHeaderWritten, this, _1));
pt->addSubscriberLink(shared_from_this());
}
pt->addSubscriberLink(shared_from_this());
return true;
}
@ -247,7 +222,16 @@ std::string TransportSubscriberLink::getTransportType()
void TransportSubscriberLink::drop()
{
connection_->drop(Connection::Destructing);
// Only drop the connection if it's not already sending a header error
// If it is, it will automatically drop itself
if (connection_->isSendingHeaderError())
{
connection_->removeDropListener(dropped_conn_);
}
else
{
connection_->drop(Connection::Destructing);
}
}
} // namespace ros

View File

@ -7,6 +7,7 @@
<review status="test" notes=""/>
<depend package="roscpp"/>
<depend package="std_msgs"/>
<depend package="std_srvs"/>
<depend package="test_ros"/>
</package>

View File

@ -146,4 +146,8 @@ rosbuild_add_executable(nonconst_subscriptions nonconst_subscriptions.cpp)
rosbuild_add_gtest_build_flags(nonconst_subscriptions)
rosbuild_add_executable(subscribe_retry_tcp subscribe_retry_tcp.cpp)
rosbuild_add_gtest_build_flags(subscribe_retry_tcp)
rosbuild_add_gtest_build_flags(subscribe_retry_tcp)
rosbuild_add_executable(subscribe_star subscribe_star.cpp)
rosbuild_add_gtest_build_flags(subscribe_star)
rosbuild_add_executable(publisher_for_star_subscriber publisher_for_star_subscriber.cpp)

View File

@ -37,6 +37,7 @@
#include <stdlib.h>
#include "ros/ros.h"
#include <test_roscpp/TestArray.h>
#include <test_roscpp/TestEmpty.h>
int g_argc;
char** g_argv;
@ -152,10 +153,37 @@ TEST_F(Subscriptions, multipleSubscriptions)
SUCCEED();
}
void callback1(const test_roscpp::TestArrayConstPtr&)
{
}
void callback2(const test_roscpp::TestEmptyConstPtr&)
{
}
TEST(Subscriptions2, multipleDifferentMD5Sums)
{
ros::NodeHandle nh;
ros::Subscriber sub1 = nh.subscribe("test", 0, callback1);
try
{
ros::Subscriber sub2 = nh.subscribe("test", 0, callback2);
FAIL();
}
catch (ros::ConflictingSubscriptionException&)
{
SUCCEED();
}
}
int main(int argc, char** argv)
{
testing::InitGoogleTest(&argc, argv);
ros::init(argc, argv, "multiple_subscriptions");
ros::NodeHandle nh;
g_argc = argc;
g_argv = argv;
return RUN_ALL_TESTS();

View File

@ -0,0 +1,73 @@
/*
* Copyright (c) 2010, Willow Garage, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Willow Garage, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/*
* Author: Josh Faust
*/
#include <ros/ros.h>
#include <std_srvs/Empty.h>
#include <test_roscpp/TestEmpty.h>
#include <test_roscpp/TestArray.h>
ros::Publisher g_pub;
int8_t type = 0;
bool switchPublisherType(std_srvs::EmptyRequest&, std_srvs::EmptyResponse&)
{
ros::NodeHandle nh;
g_pub.shutdown();
type = (type + 1) % 2;
switch (type)
{
case 0:
g_pub = nh.advertise<test_roscpp::TestEmpty>("test_star_inter", 0);
break;
case 1:
g_pub = nh.advertise<test_roscpp::TestArray>("test_star_inter", 0);
break;
}
return true;
}
void pubTimer(const ros::TimerEvent&)
{
g_pub.publish(test_roscpp::TestEmpty());
}
int main(int argc, char** argv)
{
ros::init(argc, argv, "publisher_for_star_subscriber");
ros::NodeHandle nh;
g_pub = nh.advertise<test_roscpp::TestEmpty>("test_star_inter", 0);
ros::Timer t = nh.createTimer(ros::Duration(0.01), pubTimer);
ros::ServiceServer s = nh.advertiseService("switch_publisher_type", switchPublisherType);
ros::spin();
}

View File

@ -0,0 +1,274 @@
/*
* Copyright (c) 2010, Willow Garage, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Willow Garage, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/*
* Author: Josh Faust
*/
#include <gtest/gtest.h>
#include <ros/ros.h>
#include <ros/connection_manager.h>
#include "test_roscpp/TestEmpty.h"
#include "test_roscpp/TestArray.h"
#include <std_srvs/Empty.h>
class AnyMessage
{
};
typedef boost::shared_ptr<AnyMessage> AnyMessagePtr;
typedef boost::shared_ptr<AnyMessage const> AnyMessageConstPtr;
namespace ros
{
namespace message_traits
{
template<>
struct MD5Sum<AnyMessage>
{
static const char* value() { return "*"; }
static const char* value(const AnyMessage&) { return "*"; }
};
template<>
struct DataType<AnyMessage>
{
static const char* value() { return "*"; }
static const char* value(const AnyMessage&) { return "*"; }
};
template<>
struct Definition<AnyMessage>
{
};
}
namespace serialization
{
template<>
struct Serializer<AnyMessage>
{
template<typename Stream, typename T>
static void allInOne(Stream s, T t)
{
}
ROS_DECLARE_ALLINONE_SERIALIZER;
};
}
}
struct AnyHelper
{
AnyHelper()
: count(0)
{
}
void cb(const AnyMessageConstPtr& msg)
{
++count;
}
uint32_t count;
};
TEST(SubscribeStar, simpleSubFirstIntra)
{
ros::NodeHandle nh;
AnyHelper h;
ros::Subscriber sub = nh.subscribe("test_star_intra", 0, &AnyHelper::cb, &h);
ros::Publisher pub = nh.advertise<test_roscpp::TestEmpty>("test_star_intra", 0);
EXPECT_EQ(pub.getNumSubscribers(), 1U);
EXPECT_EQ(sub.getNumPublishers(), 1U);
AnyMessagePtr msg(new AnyMessage);
pub.publish(msg);
ros::spinOnce();
EXPECT_EQ(h.count, 1U);
}
TEST(SubscribeStar, simplePubFirstIntra)
{
ros::NodeHandle nh;
AnyHelper h;
ros::Publisher pub = nh.advertise<test_roscpp::TestEmpty>("test_star_intra", 0);
ros::Subscriber sub = nh.subscribe("test_star_intra", 0, &AnyHelper::cb, &h);
EXPECT_EQ(pub.getNumSubscribers(), 1U);
EXPECT_EQ(sub.getNumPublishers(), 1U);
AnyMessagePtr msg(new AnyMessage);
pub.publish(msg);
ros::spinOnce();
EXPECT_EQ(h.count, 1U);
}
void emptyCallback(const test_roscpp::TestEmptyConstPtr&)
{
}
TEST(SubscribeStar, multipleSubsStarFirstIntra)
{
ros::NodeHandle nh;
AnyHelper h;
ros::Subscriber sub = nh.subscribe("test_star_intra", 0, &AnyHelper::cb, &h);
ros::Subscriber sub2 = nh.subscribe("test_star_intra", 0, emptyCallback);
ros::Publisher pub = nh.advertise<test_roscpp::TestEmpty>("test_star_intra", 0);
EXPECT_EQ(pub.getNumSubscribers(), 1U);
EXPECT_EQ(sub.getNumPublishers(), 1U);
EXPECT_EQ(sub2.getNumPublishers(), 1U);
pub.shutdown();
pub = nh.advertise<test_roscpp::TestArray>("test_star_intra", 0);
EXPECT_EQ(pub.getNumSubscribers(), 0U);
EXPECT_EQ(sub.getNumPublishers(), 0U);
EXPECT_EQ(sub2.getNumPublishers(), 0U);
}
TEST(SubscribeStar, multipleSubsConcreteFirstIntra)
{
ros::NodeHandle nh;
AnyHelper h;
ros::Subscriber sub2 = nh.subscribe("test_star_intra", 0, emptyCallback);
ros::Subscriber sub = nh.subscribe("test_star_intra", 0, &AnyHelper::cb, &h);
ros::Publisher pub = nh.advertise<test_roscpp::TestEmpty>("test_star_intra", 0);
EXPECT_EQ(pub.getNumSubscribers(), 1U);
EXPECT_EQ(sub.getNumPublishers(), 1U);
EXPECT_EQ(sub2.getNumPublishers(), 1U);
pub.shutdown();
pub = nh.advertise<test_roscpp::TestArray>("test_star_intra", 0);
EXPECT_EQ(pub.getNumSubscribers(), 0U);
EXPECT_EQ(sub.getNumPublishers(), 0U);
EXPECT_EQ(sub2.getNumPublishers(), 0U);
}
TEST(SubscribeStar, multipleShutdownConcreteIntra)
{
ros::NodeHandle nh;
AnyHelper h;
ros::Subscriber sub = nh.subscribe("test_star_intra", 0, &AnyHelper::cb, &h);
ros::Subscriber sub2 = nh.subscribe("test_star_intra", 0, emptyCallback);
sub2.shutdown();
ros::Publisher pub = nh.advertise<test_roscpp::TestEmpty>("test_star_intra", 0);
EXPECT_EQ(pub.getNumSubscribers(), 1U);
EXPECT_EQ(sub.getNumPublishers(), 1U);
pub.shutdown();
pub = nh.advertise<test_roscpp::TestArray>("test_star_intra", 0);
EXPECT_EQ(pub.getNumSubscribers(), 0U);
EXPECT_EQ(sub.getNumPublishers(), 0U);
}
TEST(SubscribeStar, simpleInter)
{
ros::NodeHandle nh;
AnyHelper h;
ros::Subscriber sub = nh.subscribe("test_star_inter", 0, &AnyHelper::cb, &h);
ros::WallDuration(1.0).sleep();
ros::spinOnce();
EXPECT_EQ(sub.getNumPublishers(), 1U);
EXPECT_GT(h.count, 0U);
}
TEST(SubscribeStar, simpleInterUDP)
{
ros::NodeHandle nh;
AnyHelper h;
ros::Subscriber sub = nh.subscribe("test_star_inter", 0, &AnyHelper::cb, &h, ros::TransportHints().udp());
ros::WallDuration(1.0).sleep();
ros::spinOnce();
EXPECT_EQ(sub.getNumPublishers(), 1U);
EXPECT_GT(h.count, 0U);
}
TEST(SubscribeStar, switchTypeInter)
{
ros::NodeHandle nh;
AnyHelper h;
ros::Subscriber sub = nh.subscribe("test_star_inter", 0, &AnyHelper::cb, &h);
ros::WallDuration(1.0).sleep();
ros::spinOnce();
ASSERT_EQ(sub.getNumPublishers(), 1U);
std_srvs::Empty srv;
ASSERT_TRUE(ros::service::call("switch_publisher_type", srv));
ros::WallDuration(1.0).sleep();
ros::spinOnce();
ASSERT_EQ(sub.getNumPublishers(), 0U);
}
TEST(SubscribeStar, switchTypeInterUDP)
{
ros::NodeHandle nh;
AnyHelper h;
ros::Subscriber sub = nh.subscribe("test_star_inter", 0, &AnyHelper::cb, &h, ros::TransportHints().udp());
ros::WallDuration(1.0).sleep();
ros::spinOnce();
ASSERT_EQ(sub.getNumPublishers(), 1U);
std_srvs::Empty srv;
ASSERT_TRUE(ros::service::call("switch_publisher_type", srv));
ros::WallDuration(1.0).sleep();
ros::spinOnce();
// UDPROS does not yet detect disconnects. If you've fixed this and num publishers now == 0,
// you may change this assertion
ASSERT_EQ(sub.getNumPublishers(), 1U);
}
int main(int argc, char** argv)
{
testing::InitGoogleTest(&argc, argv);
ros::init(argc, argv, "subscribe_star");
ros::NodeHandle nh;
return RUN_ALL_TESTS();
}

View File

@ -82,3 +82,4 @@ rosbuild_add_rostest(${CMAKE_CURRENT_SOURCE_DIR}/service_callback_types.xml)
rosbuild_add_rostest(${CMAKE_CURRENT_SOURCE_DIR}/intraprocess_subscriptions.xml)
rosbuild_add_rostest(${CMAKE_CURRENT_SOURCE_DIR}/nonconst_subscriptions.xml)
rosbuild_add_rostest(${CMAKE_CURRENT_SOURCE_DIR}/subscribe_retry_tcp.xml)
rosbuild_add_rostest(${CMAKE_CURRENT_SOURCE_DIR}/subscribe_star.xml)