diff --git a/core/roscpp/include/ros/subscribe_options.h b/core/roscpp/include/ros/subscribe_options.h index 222df58a..de4235ee 100644 --- a/core/roscpp/include/ros/subscribe_options.h +++ b/core/roscpp/include/ros/subscribe_options.h @@ -47,6 +47,7 @@ struct SubscribeOptions SubscribeOptions() : queue_size(1) , callback_queue(0) + , allow_concurrent_callbacks(false) { } @@ -65,6 +66,7 @@ struct SubscribeOptions , md5sum(_md5sum) , datatype(_datatype) , callback_queue(0) + , allow_concurrent_callbacks(false) {} /** @@ -119,6 +121,10 @@ struct SubscribeOptions CallbackQueueInterface* callback_queue; ///< Queue to add callbacks to. If NULL, the global callback queue will be used + /// By default subscription callbacks are guaranteed to arrive in-order, with only one callback happening for this subscription at any given + /// time. Setting this to true allows you to receive multiple messages on the same topic from multiple threads at the same time + bool allow_concurrent_callbacks; + /** * \brief An object whose destruction will prevent the callback associated with this subscription * diff --git a/core/roscpp/include/ros/subscription.h b/core/roscpp/include/ros/subscription.h index 6ab31ba6..82a681bb 100644 --- a/core/roscpp/include/ros/subscription.h +++ b/core/roscpp/include/ros/subscription.h @@ -95,7 +95,7 @@ public: XmlRpc::XmlRpcValue getStats(); void getInfo(XmlRpc::XmlRpcValue& info); - bool addCallback(const SubscriptionCallbackHelperPtr& helper, const std::string& md5sum, CallbackQueueInterface* queue, int32_t queue_size, const VoidConstPtr& tracked_object); + bool addCallback(const SubscriptionCallbackHelperPtr& helper, const std::string& md5sum, CallbackQueueInterface* queue, int32_t queue_size, const VoidConstPtr& tracked_object, bool allow_concurrent_callbacks); void removeCallback(const SubscriptionCallbackHelperPtr& helper); typedef std::map M_string; diff --git a/core/roscpp/include/ros/subscription_queue.h b/core/roscpp/include/ros/subscription_queue.h index 33bbe534..16d6e43c 100644 --- a/core/roscpp/include/ros/subscription_queue.h +++ b/core/roscpp/include/ros/subscription_queue.h @@ -64,7 +64,7 @@ private: typedef std::deque D_Item; public: - SubscriptionQueue(const std::string& topic, int32_t queue_size); + SubscriptionQueue(const std::string& topic, int32_t queue_size, bool allow_concurrent_callbacks); ~SubscriptionQueue(); void push(const SubscriptionCallbackHelperPtr& helper, const MessageDeserializerPtr& deserializer, bool has_tracked_object, const VoidConstWPtr& tracked_object, bool nonconst_need_copy, ros::Time receipt_time = ros::Time(), bool* was_full = 0); void clear(); @@ -82,6 +82,7 @@ private: boost::mutex queue_mutex_; D_Item queue_; uint32_t queue_size_; + bool allow_concurrent_callbacks_; boost::recursive_mutex callback_mutex_; }; diff --git a/core/roscpp/src/libros/subscription.cpp b/core/roscpp/src/libros/subscription.cpp index 9fc12161..841e3bf5 100644 --- a/core/roscpp/src/libros/subscription.cpp +++ b/core/roscpp/src/libros/subscription.cpp @@ -649,7 +649,7 @@ uint32_t Subscription::handleMessage(const SerializedMessage& m, bool ser, bool return drops; } -bool Subscription::addCallback(const SubscriptionCallbackHelperPtr& helper, const std::string& md5sum, CallbackQueueInterface* queue, int32_t queue_size, const VoidConstPtr& tracked_object) +bool Subscription::addCallback(const SubscriptionCallbackHelperPtr& helper, const std::string& md5sum, CallbackQueueInterface* queue, int32_t queue_size, const VoidConstPtr& tracked_object, bool allow_concurrent_callbacks) { ROS_ASSERT(helper); ROS_ASSERT(queue); @@ -675,7 +675,7 @@ bool Subscription::addCallback(const SubscriptionCallbackHelperPtr& helper, cons CallbackInfoPtr info(new CallbackInfo); info->helper_ = helper; info->callback_queue_ = queue; - info->subscription_queue_.reset(new SubscriptionQueue(name_, queue_size)); + info->subscription_queue_.reset(new SubscriptionQueue(name_, queue_size, allow_concurrent_callbacks)); info->tracked_object_ = tracked_object; info->has_tracked_object_ = false; if (tracked_object) diff --git a/core/roscpp/src/libros/subscription_queue.cpp b/core/roscpp/src/libros/subscription_queue.cpp index 4426e277..de2179b8 100644 --- a/core/roscpp/src/libros/subscription_queue.cpp +++ b/core/roscpp/src/libros/subscription_queue.cpp @@ -33,11 +33,12 @@ namespace ros { -SubscriptionQueue::SubscriptionQueue(const std::string& topic, int32_t queue_size) +SubscriptionQueue::SubscriptionQueue(const std::string& topic, int32_t queue_size, bool allow_concurrent_callbacks) : topic_(topic) , size_(queue_size) , full_(false) , queue_size_(0) +, allow_concurrent_callbacks_(allow_concurrent_callbacks) {} SubscriptionQueue::~SubscriptionQueue() @@ -104,7 +105,7 @@ CallbackInterface::CallResult SubscriptionQueue::call() // that outlasts the scoped_try_lock boost::shared_ptr self; boost::recursive_mutex::scoped_try_lock lock(callback_mutex_); - if (!lock.owns_lock()) + if (!allow_concurrent_callbacks_ && !lock.owns_lock()) { return CallbackInterface::TryAgain; } @@ -141,6 +142,11 @@ CallbackInterface::CallResult SubscriptionQueue::call() --queue_size_; } + if (allow_concurrent_callbacks_) + { + lock.unlock(); + } + VoidConstPtr msg = i.deserializer->deserialize(); // msg can be null here if deserialization failed diff --git a/core/roscpp/src/libros/topic_manager.cpp b/core/roscpp/src/libros/topic_manager.cpp index 8b7ae5cf..d2936b47 100644 --- a/core/roscpp/src/libros/topic_manager.cpp +++ b/core/roscpp/src/libros/topic_manager.cpp @@ -235,7 +235,7 @@ bool TopicManager::addSubCallback(const SubscribeOptions& ops) } else if (found) { - if (!sub->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object)) + if (!sub->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks)) { return false; } @@ -278,7 +278,7 @@ bool TopicManager::subscribe(const SubscribeOptions& ops) std::string datatype = ops.datatype; SubscriptionPtr s(new Subscription(ops.topic, md5sum, datatype, ops.transport_hints)); - s->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object); + s->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks); if (!registerSubscriber(s, ops.datatype)) { diff --git a/core/roscpp/test/test_subscription_queue.cpp b/core/roscpp/test/test_subscription_queue.cpp index 27c33b07..8f6fd807 100644 --- a/core/roscpp/test/test_subscription_queue.cpp +++ b/core/roscpp/test/test_subscription_queue.cpp @@ -97,7 +97,7 @@ typedef boost::shared_ptr FakeSubHelperPtr; TEST(SubscriptionQueue, queueSize) { - SubscriptionQueue queue("blah", 1); + SubscriptionQueue queue("blah", 1, false); FakeSubHelperPtr helper(new FakeSubHelper); MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr())); @@ -130,7 +130,7 @@ TEST(SubscriptionQueue, queueSize) TEST(SubscriptionQueue, infiniteQueue) { - SubscriptionQueue queue("blah", 0); + SubscriptionQueue queue("blah", 0, false); FakeSubHelperPtr helper(new FakeSubHelper); MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr())); @@ -161,7 +161,7 @@ TEST(SubscriptionQueue, infiniteQueue) TEST(SubscriptionQueue, clearCall) { - SubscriptionQueue queue("blah", 1); + SubscriptionQueue queue("blah", 1, false); FakeSubHelperPtr helper(new FakeSubHelper); MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr())); @@ -173,7 +173,7 @@ TEST(SubscriptionQueue, clearCall) TEST(SubscriptionQueue, clearThenAddAndCall) { - SubscriptionQueue queue("blah", 1); + SubscriptionQueue queue("blah", 1, false); FakeSubHelperPtr helper(new FakeSubHelper); MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr())); @@ -191,7 +191,7 @@ void clearInCallbackCallback(SubscriptionQueue& queue) TEST(SubscriptionQueue, clearInCallback) { - SubscriptionQueue queue("blah", 1); + SubscriptionQueue queue("blah", 1, false); FakeSubHelperPtr helper(new FakeSubHelper); MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr())); @@ -215,7 +215,7 @@ void callThread(SubscriptionQueue& queue) TEST(SubscriptionQueue, clearWhileThreadIsBlocking) { - SubscriptionQueue queue("blah", 1); + SubscriptionQueue queue("blah", 1, false); FakeSubHelperPtr helper(new FakeSubHelper); MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr())); @@ -232,6 +232,29 @@ TEST(SubscriptionQueue, clearWhileThreadIsBlocking) ASSERT_TRUE(done); } +void waitForBarrier(boost::barrier* bar) +{ + bar->wait(); +} + +TEST(SubscriptionQueue, concurrentCallbacks) +{ + SubscriptionQueue queue("blah", 0, true); + FakeSubHelperPtr helper(new FakeSubHelper); + MessageDeserializerPtr des(new MessageDeserializer(helper, SerializedMessage(), boost::shared_ptr())); + + boost::barrier bar(2); + helper->cb_ = boost::bind(waitForBarrier, &bar); + queue.push(helper, des, false, VoidConstWPtr(), true); + queue.push(helper, des, false, VoidConstWPtr(), true); + boost::thread t1(callThread, boost::ref(queue)); + boost::thread t2(callThread, boost::ref(queue)); + t1.join(); + t2.join(); + + ASSERT_EQ(helper->calls_, 2); +} + int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv);