Subscriber connect callbacks now get called for any existing connections as soon as a new Publisher to an existing topic is created. (#2788)

This commit is contained in:
Josh Faust 2010-06-14 22:08:40 +00:00
parent 4cf80e8d57
commit b99d357cb0
2 changed files with 75 additions and 38 deletions

View File

@ -36,6 +36,43 @@
namespace ros
{
class PeerConnDisconnCallback : public CallbackInterface
{
public:
PeerConnDisconnCallback(const SubscriberStatusCallback& callback, const SubscriberLinkPtr& sub_link, bool use_tracked_object, const VoidConstWPtr& tracked_object)
: callback_(callback)
, sub_link_(sub_link)
, use_tracked_object_(use_tracked_object)
, tracked_object_(tracked_object)
{
}
virtual CallResult call()
{
VoidConstPtr tracker;
if (use_tracked_object_)
{
tracker = tracked_object_.lock();
if (!tracker)
{
return Invalid;
}
}
SingleSubscriberPublisher pub(sub_link_);
callback_(pub);
return Success;
}
private:
SubscriberStatusCallback callback_;
SubscriberLinkPtr sub_link_;
bool use_tracked_object_;
VoidConstWPtr tracked_object_;
};
Publication::Publication(const std::string &name,
const std::string &datatype,
const std::string &_md5sum,
@ -66,6 +103,20 @@ void Publication::addCallbacks(const SubscriberCallbacksPtr& callbacks)
boost::mutex::scoped_lock lock(callbacks_mutex_);
callbacks_.push_back(callbacks);
// Add connect callbacks for all current subscriptions if this publisher wants them
if (callbacks->connect_ && callbacks->callback_queue_)
{
boost::mutex::scoped_lock lock(subscriber_links_mutex_);
V_SubscriberLink::iterator it = subscriber_links_.begin();
V_SubscriberLink::iterator end = subscriber_links_.end();
for (; it != end; ++it)
{
const SubscriberLinkPtr& sub_link = *it;
CallbackInterfacePtr cb(new PeerConnDisconnCallback(callbacks->connect_, sub_link, callbacks->has_tracked_object_, callbacks->tracked_object_));
callbacks->callback_queue_->addCallback(cb, (uint64_t)callbacks.get());
}
}
}
void Publication::removeCallbacks(const SubscriberCallbacksPtr& callbacks)
@ -272,43 +323,6 @@ void Publication::dropAllConnections()
}
}
class PeerConnDisconnCallback : public CallbackInterface
{
public:
PeerConnDisconnCallback(const SubscriberStatusCallback& callback, const SubscriberLinkPtr& sub_link, bool use_tracked_object, const VoidConstWPtr& tracked_object)
: callback_(callback)
, sub_link_(sub_link)
, use_tracked_object_(use_tracked_object)
, tracked_object_(tracked_object)
{
}
virtual CallResult call()
{
VoidConstPtr tracker;
if (use_tracked_object_)
{
tracker = tracked_object_.lock();
if (!tracker)
{
return Invalid;
}
}
SingleSubscriberPublisher pub(sub_link_);
callback_(pub);
return Success;
}
private:
SubscriberStatusCallback callback_;
SubscriberLinkPtr sub_link_;
bool use_tracked_object_;
VoidConstWPtr tracked_object_;
};
void Publication::peerConnect(const SubscriberLinkPtr& sub_link)
{
V_Callback::iterator it = callbacks_.begin();

View File

@ -483,7 +483,7 @@ TEST(RoscppHandles, spinAfterHandleShutdownWithAdvertiseSubscriberCallback)
while (pub.getNumSubscribers() == 0)
{
ros::WallDuration().sleep();
ros::WallDuration(0.01).sleep();
}
pub.shutdown();
@ -493,6 +493,29 @@ TEST(RoscppHandles, spinAfterHandleShutdownWithAdvertiseSubscriberCallback)
ASSERT_EQ(g_sub_count, 0);
}
TEST(RoscppHandles, multiplePublishersWithSubscriberConnectCallback)
{
ros::NodeHandle n;
ros::Publisher pub = n.advertise<test_roscpp::TestArray>("/test", 0, connectedCallback, SubscriberStatusCallback());
g_sub_count = 0;
ros::Subscriber sub = n.subscribe("/test", 0, subscriberCallback);
while (g_sub_count == 0)
{
ros::WallDuration(0.01).sleep();
ros::spinOnce();
}
ASSERT_EQ(g_sub_count, 1);
g_sub_count = 0;
ros::Publisher pub2 = n.advertise<test_roscpp::TestArray>("/test", 0, connectedCallback, SubscriberStatusCallback());
ros::spinOnce();
ASSERT_EQ(g_sub_count, 1);
}
class ServiceClass
{
public: