Provide option for concurrent subscription callbacks (#2107)

This commit is contained in:
Josh Faust 2010-05-11 21:23:34 +00:00
parent 93fbb7c44b
commit d927a65b61
7 changed files with 50 additions and 14 deletions

View File

@ -47,6 +47,7 @@ struct SubscribeOptions
SubscribeOptions()
: queue_size(1)
, callback_queue(0)
, allow_concurrent_callbacks(false)
{
}
@ -65,6 +66,7 @@ struct SubscribeOptions
, md5sum(_md5sum)
, datatype(_datatype)
, callback_queue(0)
, allow_concurrent_callbacks(false)
{}
/**
@ -119,6 +121,10 @@ struct SubscribeOptions
CallbackQueueInterface* callback_queue; ///< Queue to add callbacks to. If NULL, the global callback queue will be used
/// By default subscription callbacks are guaranteed to arrive in-order, with only one callback happening for this subscription at any given
/// time. Setting this to true allows you to receive multiple messages on the same topic from multiple threads at the same time
bool allow_concurrent_callbacks;
/**
* \brief An object whose destruction will prevent the callback associated with this subscription
*

View File

@ -95,7 +95,7 @@ public:
XmlRpc::XmlRpcValue getStats();
void getInfo(XmlRpc::XmlRpcValue& info);
bool addCallback(const SubscriptionCallbackHelperPtr& helper, const std::string& md5sum, CallbackQueueInterface* queue, int32_t queue_size, const VoidConstPtr& tracked_object);
bool addCallback(const SubscriptionCallbackHelperPtr& helper, const std::string& md5sum, CallbackQueueInterface* queue, int32_t queue_size, const VoidConstPtr& tracked_object, bool allow_concurrent_callbacks);
void removeCallback(const SubscriptionCallbackHelperPtr& helper);
typedef std::map<std::string, std::string> M_string;

View File

@ -64,7 +64,7 @@ private:
typedef std::deque<Item> D_Item;
public:
SubscriptionQueue(const std::string& topic, int32_t queue_size);
SubscriptionQueue(const std::string& topic, int32_t queue_size, bool allow_concurrent_callbacks);
~SubscriptionQueue();
void push(const SubscriptionCallbackHelperPtr& helper, const MessageDeserializerPtr& deserializer, bool has_tracked_object, const VoidConstWPtr& tracked_object, bool nonconst_need_copy, ros::Time receipt_time = ros::Time(), bool* was_full = 0);
void clear();
@ -82,6 +82,7 @@ private:
boost::mutex queue_mutex_;
D_Item queue_;
uint32_t queue_size_;
bool allow_concurrent_callbacks_;
boost::recursive_mutex callback_mutex_;
};

View File

@ -649,7 +649,7 @@ uint32_t Subscription::handleMessage(const SerializedMessage& m, bool ser, bool
return drops;
}
bool Subscription::addCallback(const SubscriptionCallbackHelperPtr& helper, const std::string& md5sum, CallbackQueueInterface* queue, int32_t queue_size, const VoidConstPtr& tracked_object)
bool Subscription::addCallback(const SubscriptionCallbackHelperPtr& helper, const std::string& md5sum, CallbackQueueInterface* queue, int32_t queue_size, const VoidConstPtr& tracked_object, bool allow_concurrent_callbacks)
{
ROS_ASSERT(helper);
ROS_ASSERT(queue);
@ -675,7 +675,7 @@ bool Subscription::addCallback(const SubscriptionCallbackHelperPtr& helper, cons
CallbackInfoPtr info(new CallbackInfo);
info->helper_ = helper;
info->callback_queue_ = queue;
info->subscription_queue_.reset(new SubscriptionQueue(name_, queue_size));
info->subscription_queue_.reset(new SubscriptionQueue(name_, queue_size, allow_concurrent_callbacks));
info->tracked_object_ = tracked_object;
info->has_tracked_object_ = false;
if (tracked_object)

View File

@ -33,11 +33,12 @@
namespace ros
{
SubscriptionQueue::SubscriptionQueue(const std::string& topic, int32_t queue_size)
SubscriptionQueue::SubscriptionQueue(const std::string& topic, int32_t queue_size, bool allow_concurrent_callbacks)
: topic_(topic)
, size_(queue_size)
, full_(false)
, queue_size_(0)
, allow_concurrent_callbacks_(allow_concurrent_callbacks)
{}
SubscriptionQueue::~SubscriptionQueue()
@ -104,7 +105,7 @@ CallbackInterface::CallResult SubscriptionQueue::call()
// that outlasts the scoped_try_lock
boost::shared_ptr<SubscriptionQueue> self;
boost::recursive_mutex::scoped_try_lock lock(callback_mutex_);
if (!lock.owns_lock())
if (!allow_concurrent_callbacks_ && !lock.owns_lock())
{
return CallbackInterface::TryAgain;
}
@ -141,6 +142,11 @@ CallbackInterface::CallResult SubscriptionQueue::call()
--queue_size_;
}
if (allow_concurrent_callbacks_)
{
lock.unlock();
}
VoidConstPtr msg = i.deserializer->deserialize();
// msg can be null here if deserialization failed

View File

@ -235,7 +235,7 @@ bool TopicManager::addSubCallback(const SubscribeOptions& ops)
}
else if (found)
{
if (!sub->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object))
if (!sub->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks))
{
return false;
}
@ -278,7 +278,7 @@ bool TopicManager::subscribe(const SubscribeOptions& ops)
std::string datatype = ops.datatype;
SubscriptionPtr s(new Subscription(ops.topic, md5sum, datatype, ops.transport_hints));
s->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object);
s->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks);
if (!registerSubscriber(s, ops.datatype))
{

View File

@ -97,7 +97,7 @@ typedef boost::shared_ptr<FakeSubHelper> FakeSubHelperPtr;
TEST(SubscriptionQueue, queueSize)
{
SubscriptionQueue queue("blah", 1);
SubscriptionQueue queue("blah", 1, false);
FakeSubHelperPtr helper(new FakeSubHelper);
MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
@ -130,7 +130,7 @@ TEST(SubscriptionQueue, queueSize)
TEST(SubscriptionQueue, infiniteQueue)
{
SubscriptionQueue queue("blah", 0);
SubscriptionQueue queue("blah", 0, false);
FakeSubHelperPtr helper(new FakeSubHelper);
MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
@ -161,7 +161,7 @@ TEST(SubscriptionQueue, infiniteQueue)
TEST(SubscriptionQueue, clearCall)
{
SubscriptionQueue queue("blah", 1);
SubscriptionQueue queue("blah", 1, false);
FakeSubHelperPtr helper(new FakeSubHelper);
MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
@ -173,7 +173,7 @@ TEST(SubscriptionQueue, clearCall)
TEST(SubscriptionQueue, clearThenAddAndCall)
{
SubscriptionQueue queue("blah", 1);
SubscriptionQueue queue("blah", 1, false);
FakeSubHelperPtr helper(new FakeSubHelper);
MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
@ -191,7 +191,7 @@ void clearInCallbackCallback(SubscriptionQueue& queue)
TEST(SubscriptionQueue, clearInCallback)
{
SubscriptionQueue queue("blah", 1);
SubscriptionQueue queue("blah", 1, false);
FakeSubHelperPtr helper(new FakeSubHelper);
MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
@ -215,7 +215,7 @@ void callThread(SubscriptionQueue& queue)
TEST(SubscriptionQueue, clearWhileThreadIsBlocking)
{
SubscriptionQueue queue("blah", 1);
SubscriptionQueue queue("blah", 1, false);
FakeSubHelperPtr helper(new FakeSubHelper);
MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
@ -232,6 +232,29 @@ TEST(SubscriptionQueue, clearWhileThreadIsBlocking)
ASSERT_TRUE(done);
}
void waitForBarrier(boost::barrier* bar)
{
bar->wait();
}
TEST(SubscriptionQueue, concurrentCallbacks)
{
SubscriptionQueue queue("blah", 0, true);
FakeSubHelperPtr helper(new FakeSubHelper);
MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr<M_string>()));
boost::barrier bar(2);
helper->cb_ = boost::bind(waitForBarrier, &bar);
queue.push(helper, des, false, VoidConstWPtr(), true);
queue.push(helper, des, false, VoidConstWPtr(), true);
boost::thread t1(callThread, boost::ref(queue));
boost::thread t2(callThread, boost::ref(queue));
t1.join();
t2.join();
ASSERT_EQ(helper->calls_, 2);
}
int main(int argc, char** argv)
{
testing::InitGoogleTest(&argc, argv);