rospy: #3096 cleanup of cleanup code to better monitor closed state during various entry points to teardown. Reinstated multicall code now that we are ROS 1.3. Switched more locks to use 'with'.

This commit is contained in:
Ken Conley 2010-10-25 06:23:19 +00:00
parent 2da4d610c7
commit 2751c343f6
3 changed files with 66 additions and 54 deletions

View File

@ -43,6 +43,7 @@ import thread
import threading
import time
import traceback
import xmlrpclib
from rospy.core import is_shutdown, xmlrpcapi, \
logfatal, logwarn, loginfo, logerr, logdebug, \
@ -150,6 +151,13 @@ class RegistrationListeners(object):
l.reg_added(resolved_name, data_type, reg_type)
except Exception, e:
logerr(traceback.format_exc(e))
def clear(self):
"""
Remove all registration listeners
"""
with self.lock:
del self.listeners[:]
_registration_listeners = RegistrationListeners()
def get_registration_listeners():
@ -327,21 +335,28 @@ class RegManager(RegistrationListener):
caller_id = get_caller_id()
# clear the registration listeners as we are going to do a quick unregister here
rl = get_registration_listeners()
if rl is not None:
rl.clear()
tm = get_topic_manager()
sm = get_service_manager()
try:
multi = xmlrpclib.MultiCall(master)
if tm is not None:
for resolved_name, _ in tm.get_subscriptions():
self.logger.debug("unregisterSubscriber [%s]"%resolved_name)
master.unregisterSubscriber(caller_id, resolved_name, self.uri)
multi.unregisterSubscriber(caller_id, resolved_name, self.uri)
for resolved_name, _ in tm.get_publications():
self.logger.debug("unregisterPublisher [%s]"%resolved_name)
master.unregisterPublisher(caller_id, resolved_name, self.uri)
multi.unregisterPublisher(caller_id, resolved_name, self.uri)
if sm is not None:
for resolved_name, service_uri in sm.get_services():
self.logger.debug("unregisterService [%s]"%resolved_name)
master.unregisterService(caller_id, resolved_name, service_uri)
multi.unregisterService(caller_id, resolved_name, service_uri)
multi()
except socket.error, (errno, msg):
if errno == 111 or errno == 61: #can't talk to master, nothing we can do about it
self.logger.warn("cannot unregister with master due to network issues")
@ -352,9 +367,10 @@ class RegManager(RegistrationListener):
self.logger.debug("registration cleanup: master calls complete")
#TODO: cleanup() should actually be orchestrated by a separate cleanup routine that calls the reg manager/sm/tm
#TODO: cleanup() should actually be orchestrated by a separate
#cleanup routine that calls the reg manager/sm/tm
if tm is not None:
tm.remove_all()
tm.close_all()
if sm is not None:
sm.unregister_all()

View File

@ -85,14 +85,11 @@ class ServiceManager(object):
@return: List of (service_name, service_uri) for all registered services.
@rtype: [(str, str)]
"""
try:
self.lock.acquire()
with self.lock:
ret_val = []
for name, service in self.map.iteritems():
ret_val.append((name, service.uri))
services = self.map.values()
finally:
self.lock.release()
return ret_val
def unregister_all(self):

View File

@ -509,6 +509,8 @@ class _SubscriberImpl(_TopicImpl):
@param cb_cargs: additional arguments to pass to callback
@type cb_cargs: Any
"""
if self.closed:
raise ROSException("subscriber [%s] has been closed"%(self.resolved_name))
with self.c_lock:
# we lock in order to serialize calls to add_callback, but
# we copy self.callbacks so we can it
@ -530,6 +532,8 @@ class _SubscriberImpl(_TopicImpl):
@type cb_cargs: Any
@raise KeyError: if no matching callback
"""
if self.closed:
return
with self.c_lock:
# we lock in order to serialize calls to add_callback, but
# we copy self.callbacks so we can it
@ -909,6 +913,7 @@ class _TopicManager(object):
self.subs = {} #: { topic: _SubscriberImpl }
self.topics = set() # [str] list of topic names
self.lock = threading.Condition()
self.closed = False
_logger.info("topicmanager initialized")
def get_pub_sub_info(self):
@ -918,14 +923,11 @@ class _TopicManager(object):
See getBusInfo() API for more data structure details.
@rtype: list
"""
try:
self.lock.acquire()
with self.lock:
info = []
for s in chain(self.pubs.itervalues(), self.subs.itervalues()):
info.extend(s.get_stats_info())
return info
finally:
self.lock.release()
def get_pub_sub_stats(self):
"""
@ -934,36 +936,36 @@ class _TopicManager(object):
See getBusStats() API for more data structure details.
@rtype: list
"""
try:
self.lock.acquire()
with self.lock:
return [s.get_stats() for s in self.pubs.itervalues()],\
[s.get_stats() for s in self.subs.itervalues()]
finally:
self.lock.release()
def remove_all(self):
def close_all(self):
"""
Remove all registered publication and subscriptions, closing them on removal
Close all registered publication and subscriptions. Manager is
no longer usable after close.
"""
for t in chain(self.pubs.itervalues(), self.subs.itervalues()):
t.close()
self.pubs.clear()
self.subs.clear()
self.closed = True
with self.lock:
for t in chain(self.pubs.itervalues(), self.subs.itervalues()):
t.close()
self.pubs.clear()
self.subs.clear()
def _add(self, ps, map, reg_type):
def _add(self, ps, rmap, reg_type):
"""
Add L{_TopicImpl} instance to map
Add L{_TopicImpl} instance to rmap
@param ps: a pub/sub impl instance
@type ps: L{_TopicImpl}
@param map: { topic: _TopicImpl} map to record instance in
@type map: dict
@param rmap: { topic: _TopicImpl} rmap to record instance in
@type rmap: dict
@param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
@type reg_type: str
"""
resolved_name = ps.resolved_name
_logger.debug("tm._add: %s, %s, %s", resolved_name, ps.type, reg_type)
with self.lock:
map[resolved_name] = ps
rmap[resolved_name] = ps
self.topics.add(resolved_name)
# NOTE: this call can take a lengthy amount of time (at
@ -975,28 +977,25 @@ class _TopicManager(object):
self.topics = set([x.resolved_name for x in self.pubs.itervalues()] +
[x.resolved_name for x in self.subs.itervalues()])
def _remove(self, ps, map, reg_type):
def _remove(self, ps, rmap, reg_type):
"""
Remove L{_TopicImpl} instance from map
Remove L{_TopicImpl} instance from rmap
@param ps: a pub/sub impl instance
@type ps: L{_TopicImpl}
@param map: topic->_TopicImpl map to remove instance in
@type map: dict
@param rmap: topic->_TopicImpl rmap to remove instance in
@type rmap: dict
@param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB}
@type reg_type: str
"""
resolved_name = ps.resolved_name
_logger.debug("tm._remove: %s, %s, %s", resolved_name, ps.type, reg_type)
try:
self.lock.acquire()
del map[resolved_name]
with self.lock:
del rmap[resolved_name]
self. _recalculate_topics()
# NOTE: this call can take a lengthy amount of time (at
# least until its reimplemented to use queues)
get_registration_listeners().notify_removed(resolved_name, ps.type, reg_type)
finally:
self.lock.release()
def get_impl(self, reg_type, resolved_name):
"""
@ -1009,12 +1008,12 @@ class _TopicManager(object):
@type reg_type: str
"""
if reg_type == Registration.PUB:
map = self.pubs
rmap = self.pubs
elif reg_type == Registration.SUB:
map = self.subs
rmap = self.subs
else:
raise TypeError("invalid reg_type: %s"%s)
return map.get(resolved_name, None)
return rmap.get(resolved_name, None)
def acquire_impl(self, reg_type, resolved_name, data_class):
"""
@ -1035,23 +1034,20 @@ class _TopicManager(object):
@type data_class: L{Message} class
"""
if reg_type == Registration.PUB:
map = self.pubs
rmap = self.pubs
impl_class = _PublisherImpl
elif reg_type == Registration.SUB:
map = self.subs
rmap = self.subs
impl_class = _SubscriberImpl
else:
raise TypeError("invalid reg_type: %s"%s)
try:
self.lock.acquire()
impl = map.get(resolved_name, None)
with self.lock:
impl = rmap.get(resolved_name, None)
if not impl:
impl = impl_class(resolved_name, data_class)
self._add(impl, map, reg_type)
self._add(impl, rmap, reg_type)
impl.ref_count += 1
return impl
finally:
self.lock.release()
def release_impl(self, reg_type, resolved_name):
"""
@ -1069,18 +1065,21 @@ class _TopicManager(object):
@type reg_type: str
"""
if reg_type == Registration.PUB:
map = self.pubs
rmap = self.pubs
else:
map = self.subs
rmap = self.subs
with self.lock:
impl = map.get(resolved_name, None)
# check for race condition where multiple things are cleaning up at once
if self.closed:
return
impl = rmap.get(resolved_name, None)
assert impl is not None, "cannot release topic impl as impl [%s] does not exist"%resolved_name
impl.ref_count -= 1
assert impl.ref_count >= 0, "topic impl's reference count has gone below zero"
if impl.ref_count == 0:
_logger.debug("topic impl's ref count is zero, deleting topic %s...", resolved_name)
impl.close()
self._remove(impl, map, reg_type)
self._remove(impl, rmap, reg_type)
del impl
_logger.debug("... done deleting topic %s", resolved_name)
@ -1127,8 +1126,8 @@ class _TopicManager(object):
"""
return self.topics
def _get_list(self, map):
return [[k, v.type] for k, v in map.iteritems()]
def _get_list(self, rmap):
return [[k, v.type] for k, v in rmap.iteritems()]
## @return [[str,str],]: list of topics subscribed to by this node, [ [topic1, topicType1]...[topicN, topicTypeN]]
def get_subscriptions(self):