prevent deletion of currently selected topic, with test, #2863

This commit is contained in:
Brian Gerkey 2010-07-28 18:24:26 +00:00
parent 5e427f8634
commit e7c0e61b4c
3 changed files with 44 additions and 8 deletions

View File

@ -38,5 +38,6 @@ rosbuild_add_rostest(test/mux.test)
rosbuild_add_rostest(test/switch_mux.test)
rosbuild_add_rostest(test/switch_mux_leading_slash.test)
rosbuild_add_rostest(test/switch_mux_none.test)
rosbuild_add_rostest(test/mux_add.test)
rosbuild_add_pyunit(test/args.py)

View File

@ -198,6 +198,12 @@ bool del_topic_cb(topic_tools::MuxDelete::Request& req,
{
if (ros::names::resolve(it->sub.getTopic()) == ros::names::resolve(req.topic))
{
// Can't delete the currently selected input, #2863
if(it == g_selected)
{
ROS_WARN("tried to delete currently selected topic %s from mux", req.topic.c_str());
return false;
}
it->sub.shutdown();
delete it->msg;
g_subs.erase(it);

View File

@ -43,6 +43,7 @@ import rospy
from topic_tools.srv import MuxAdd
from topic_tools.srv import MuxDelete
from topic_tools.srv import MuxList
from topic_tools.srv import MuxSelect
class MuxServiceTestCase(unittest.TestCase):
def make_srv_proxies(self):
@ -50,26 +51,54 @@ class MuxServiceTestCase(unittest.TestCase):
rospy.wait_for_service('mux/add', 5)
rospy.wait_for_service('mux/delete', 5)
rospy.wait_for_service('mux/list', 5)
rospy.wait_for_service('mux/select', 5)
except rospy.ROSException, e:
self.fail('failed to find a required service: ' + `e`)
add_srv = rospy.ServiceProxy('mux/add', MuxAdd)
delete_srv = rospy.ServiceProxy('mux/delete', MuxDelete)
list_srv = rospy.ServiceProxy('mux/list', MuxList)
select_srv = rospy.ServiceProxy('mux/select', MuxSelect)
return (add_srv, delete_srv, list_srv)
return (add_srv, delete_srv, list_srv, select_srv)
def test_add_delete_list(self):
add_srv, delete_srv, list_srv = self.make_srv_proxies()
add_srv, delete_srv, list_srv, select_srv = self.make_srv_proxies()
# Check initial condition
topics = list_srv().topics
self.assertEquals(set(topics), set(['input']))
add_srv('new_input')
self.assertEquals(set(topics), set(['/input']))
# Add a topic and make sure it's there
add_srv('/new_input')
topics = list_srv().topics
self.assertEquals(set(topics), set(['input', 'new_input']))
delete_srv('input')
self.assertEquals(set(topics), set(['/input', '/new_input']))
# Try to add the same topic again, make sure it fails, and that
# nothing changes.
try:
add_srv('/new_input')
except rospy.ServiceException, e:
pass
else:
self.fail('service call should have thrown an exception')
topics = list_srv().topics
self.assertEquals(set(topics), set(['new_input']))
delete_srv('new_input')
self.assertEquals(set(topics), set(['/input', '/new_input']))
# Select a topic, then try to delete it, make sure it fails, and
# that nothing changes.
select_srv('/input')
try:
delete_srv('/input')
except rospy.ServiceException, e:
pass
else:
self.fail('service call should have thrown an exception')
topics = list_srv().topics
self.assertEquals(set(topics), set(['/input', '/new_input']))
# Select nothing, to allow deletion
select_srv('__none')
# Delete topics
delete_srv('/input')
topics = list_srv().topics
self.assertEquals(set(topics), set(['/new_input']))
delete_srv('/new_input')
topics = list_srv().topics
self.assertEquals(set(topics), set([]))