diff --git a/core/rospy/src/rospy/impl/registration.py b/core/rospy/src/rospy/impl/registration.py index 473299ae..868f433d 100644 --- a/core/rospy/src/rospy/impl/registration.py +++ b/core/rospy/src/rospy/impl/registration.py @@ -43,6 +43,7 @@ import thread import threading import time import traceback +import xmlrpclib from rospy.core import is_shutdown, xmlrpcapi, \ logfatal, logwarn, loginfo, logerr, logdebug, \ @@ -150,6 +151,13 @@ class RegistrationListeners(object): l.reg_added(resolved_name, data_type, reg_type) except Exception, e: logerr(traceback.format_exc(e)) + + def clear(self): + """ + Remove all registration listeners + """ + with self.lock: + del self.listeners[:] _registration_listeners = RegistrationListeners() def get_registration_listeners(): @@ -327,21 +335,28 @@ class RegManager(RegistrationListener): caller_id = get_caller_id() + # clear the registration listeners as we are going to do a quick unregister here + rl = get_registration_listeners() + if rl is not None: + rl.clear() + tm = get_topic_manager() sm = get_service_manager() try: + multi = xmlrpclib.MultiCall(master) if tm is not None: for resolved_name, _ in tm.get_subscriptions(): self.logger.debug("unregisterSubscriber [%s]"%resolved_name) - master.unregisterSubscriber(caller_id, resolved_name, self.uri) + multi.unregisterSubscriber(caller_id, resolved_name, self.uri) for resolved_name, _ in tm.get_publications(): self.logger.debug("unregisterPublisher [%s]"%resolved_name) - master.unregisterPublisher(caller_id, resolved_name, self.uri) + multi.unregisterPublisher(caller_id, resolved_name, self.uri) if sm is not None: for resolved_name, service_uri in sm.get_services(): self.logger.debug("unregisterService [%s]"%resolved_name) - master.unregisterService(caller_id, resolved_name, service_uri) + multi.unregisterService(caller_id, resolved_name, service_uri) + multi() except socket.error, (errno, msg): if errno == 111 or errno == 61: #can't talk to master, nothing we can do about it self.logger.warn("cannot unregister with master due to network issues") @@ -352,9 +367,10 @@ class RegManager(RegistrationListener): self.logger.debug("registration cleanup: master calls complete") - #TODO: cleanup() should actually be orchestrated by a separate cleanup routine that calls the reg manager/sm/tm + #TODO: cleanup() should actually be orchestrated by a separate + #cleanup routine that calls the reg manager/sm/tm if tm is not None: - tm.remove_all() + tm.close_all() if sm is not None: sm.unregister_all() diff --git a/core/rospy/src/rospy/service.py b/core/rospy/src/rospy/service.py index abd67c77..eb9b1213 100644 --- a/core/rospy/src/rospy/service.py +++ b/core/rospy/src/rospy/service.py @@ -85,14 +85,11 @@ class ServiceManager(object): @return: List of (service_name, service_uri) for all registered services. @rtype: [(str, str)] """ - try: - self.lock.acquire() + with self.lock: ret_val = [] for name, service in self.map.iteritems(): ret_val.append((name, service.uri)) services = self.map.values() - finally: - self.lock.release() return ret_val def unregister_all(self): diff --git a/core/rospy/src/rospy/topics.py b/core/rospy/src/rospy/topics.py index 5dc3f47c..4dd00e3a 100644 --- a/core/rospy/src/rospy/topics.py +++ b/core/rospy/src/rospy/topics.py @@ -509,6 +509,8 @@ class _SubscriberImpl(_TopicImpl): @param cb_cargs: additional arguments to pass to callback @type cb_cargs: Any """ + if self.closed: + raise ROSException("subscriber [%s] has been closed"%(self.resolved_name)) with self.c_lock: # we lock in order to serialize calls to add_callback, but # we copy self.callbacks so we can it @@ -530,6 +532,8 @@ class _SubscriberImpl(_TopicImpl): @type cb_cargs: Any @raise KeyError: if no matching callback """ + if self.closed: + return with self.c_lock: # we lock in order to serialize calls to add_callback, but # we copy self.callbacks so we can it @@ -909,6 +913,7 @@ class _TopicManager(object): self.subs = {} #: { topic: _SubscriberImpl } self.topics = set() # [str] list of topic names self.lock = threading.Condition() + self.closed = False _logger.info("topicmanager initialized") def get_pub_sub_info(self): @@ -918,14 +923,11 @@ class _TopicManager(object): See getBusInfo() API for more data structure details. @rtype: list """ - try: - self.lock.acquire() + with self.lock: info = [] for s in chain(self.pubs.itervalues(), self.subs.itervalues()): info.extend(s.get_stats_info()) return info - finally: - self.lock.release() def get_pub_sub_stats(self): """ @@ -934,36 +936,36 @@ class _TopicManager(object): See getBusStats() API for more data structure details. @rtype: list """ - try: - self.lock.acquire() + with self.lock: return [s.get_stats() for s in self.pubs.itervalues()],\ [s.get_stats() for s in self.subs.itervalues()] - finally: - self.lock.release() - def remove_all(self): + def close_all(self): """ - Remove all registered publication and subscriptions, closing them on removal + Close all registered publication and subscriptions. Manager is + no longer usable after close. """ - for t in chain(self.pubs.itervalues(), self.subs.itervalues()): - t.close() - self.pubs.clear() - self.subs.clear() + self.closed = True + with self.lock: + for t in chain(self.pubs.itervalues(), self.subs.itervalues()): + t.close() + self.pubs.clear() + self.subs.clear() - def _add(self, ps, map, reg_type): + def _add(self, ps, rmap, reg_type): """ - Add L{_TopicImpl} instance to map + Add L{_TopicImpl} instance to rmap @param ps: a pub/sub impl instance @type ps: L{_TopicImpl} - @param map: { topic: _TopicImpl} map to record instance in - @type map: dict + @param rmap: { topic: _TopicImpl} rmap to record instance in + @type rmap: dict @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} @type reg_type: str """ resolved_name = ps.resolved_name _logger.debug("tm._add: %s, %s, %s", resolved_name, ps.type, reg_type) with self.lock: - map[resolved_name] = ps + rmap[resolved_name] = ps self.topics.add(resolved_name) # NOTE: this call can take a lengthy amount of time (at @@ -975,28 +977,25 @@ class _TopicManager(object): self.topics = set([x.resolved_name for x in self.pubs.itervalues()] + [x.resolved_name for x in self.subs.itervalues()]) - def _remove(self, ps, map, reg_type): + def _remove(self, ps, rmap, reg_type): """ - Remove L{_TopicImpl} instance from map + Remove L{_TopicImpl} instance from rmap @param ps: a pub/sub impl instance @type ps: L{_TopicImpl} - @param map: topic->_TopicImpl map to remove instance in - @type map: dict + @param rmap: topic->_TopicImpl rmap to remove instance in + @type rmap: dict @param reg_type: L{rospy.registration.Registration.PUB} or L{rospy.registration.Registration.SUB} @type reg_type: str """ resolved_name = ps.resolved_name _logger.debug("tm._remove: %s, %s, %s", resolved_name, ps.type, reg_type) - try: - self.lock.acquire() - del map[resolved_name] + with self.lock: + del rmap[resolved_name] self. _recalculate_topics() # NOTE: this call can take a lengthy amount of time (at # least until its reimplemented to use queues) get_registration_listeners().notify_removed(resolved_name, ps.type, reg_type) - finally: - self.lock.release() def get_impl(self, reg_type, resolved_name): """ @@ -1009,12 +1008,12 @@ class _TopicManager(object): @type reg_type: str """ if reg_type == Registration.PUB: - map = self.pubs + rmap = self.pubs elif reg_type == Registration.SUB: - map = self.subs + rmap = self.subs else: raise TypeError("invalid reg_type: %s"%s) - return map.get(resolved_name, None) + return rmap.get(resolved_name, None) def acquire_impl(self, reg_type, resolved_name, data_class): """ @@ -1035,23 +1034,20 @@ class _TopicManager(object): @type data_class: L{Message} class """ if reg_type == Registration.PUB: - map = self.pubs + rmap = self.pubs impl_class = _PublisherImpl elif reg_type == Registration.SUB: - map = self.subs + rmap = self.subs impl_class = _SubscriberImpl else: raise TypeError("invalid reg_type: %s"%s) - try: - self.lock.acquire() - impl = map.get(resolved_name, None) + with self.lock: + impl = rmap.get(resolved_name, None) if not impl: impl = impl_class(resolved_name, data_class) - self._add(impl, map, reg_type) + self._add(impl, rmap, reg_type) impl.ref_count += 1 return impl - finally: - self.lock.release() def release_impl(self, reg_type, resolved_name): """ @@ -1069,18 +1065,21 @@ class _TopicManager(object): @type reg_type: str """ if reg_type == Registration.PUB: - map = self.pubs + rmap = self.pubs else: - map = self.subs + rmap = self.subs with self.lock: - impl = map.get(resolved_name, None) + # check for race condition where multiple things are cleaning up at once + if self.closed: + return + impl = rmap.get(resolved_name, None) assert impl is not None, "cannot release topic impl as impl [%s] does not exist"%resolved_name impl.ref_count -= 1 assert impl.ref_count >= 0, "topic impl's reference count has gone below zero" if impl.ref_count == 0: _logger.debug("topic impl's ref count is zero, deleting topic %s...", resolved_name) impl.close() - self._remove(impl, map, reg_type) + self._remove(impl, rmap, reg_type) del impl _logger.debug("... done deleting topic %s", resolved_name) @@ -1127,8 +1126,8 @@ class _TopicManager(object): """ return self.topics - def _get_list(self, map): - return [[k, v.type] for k, v in map.iteritems()] + def _get_list(self, rmap): + return [[k, v.type] for k, v in rmap.iteritems()] ## @return [[str,str],]: list of topics subscribed to by this node, [ [topic1, topicType1]...[topicN, topicTypeN]] def get_subscriptions(self):