to ros_comm

This commit is contained in:
Tully Foote 2010-10-27 00:53:18 +00:00
parent 8b5250b221
commit 686468043e
15 changed files with 0 additions and 2521 deletions

View File

@ -1,9 +0,0 @@
cmake_minimum_required(VERSION 2.4.6)
include($ENV{ROS_ROOT}/core/rosbuild/rosbuild.cmake)
rosbuild_init()
#set the default path for built executables to the "bin" directory
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin)
#set the default path for built libraries to the "lib" directory
set(LIBRARY_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/lib)

View File

@ -1 +0,0 @@
include $(shell rospack find mk)/cmake.mk

View File

@ -1,7 +0,0 @@
[epydoc]
name: rosmaster
modules: rosmaster
inheritance: included
url: http://ros.org/wiki/rosmaster
frames: no
private: no

View File

@ -1,23 +0,0 @@
<package>
<description brief="rosmaster">
ROS <a href="http://ros.org/wiki/Master">Master</a> implementation.
</description>
<author>Ken Conley</author>
<license>BSD</license>
<review status="experimental" notes=""/>
<url>http://ros.org/wiki/rosmaster</url>
<depend package="roslib" />
<export>
<rosdoc config="rosdoc.yaml"/>
</export>
<platform os="ubuntu" version="9.04"/>
<platform os="ubuntu" version="9.10"/>
<platform os="ubuntu" version="10.04"/>
<platform os="macports" version="macports"/>
</package>

View File

@ -1,2 +0,0 @@
- builder: epydoc
config: epydoc.config

View File

@ -1,37 +0,0 @@
# Software License Agreement (BSD License)
#
# Copyright (c) 2010, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Revision $Id$
from .main import rosmaster_main
from .master import DEFAULT_MASTER_PORT

View File

@ -1,39 +0,0 @@
# Software License Agreement (BSD License)
#
# Copyright (c) 2010, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Revision $Id$
"""
Exceptions for rosmaster package.
"""
class InternalException(Exception): pass

View File

@ -1,113 +0,0 @@
# Software License Agreement (BSD License)
#
# Copyright (c) 2008, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Revision $Id$
"""Command-line handler for ROS zenmaster (Python Master)"""
import logging
import os
import sys
import time
import optparse
import rosmaster.master
def configure_logging():
"""
Setup filesystem logging for the master
"""
filename = 'master.log'
# #988 __log command-line remapping argument
import roslib.names
import roslib.roslogging
mappings = roslib.names.load_mappings(sys.argv)
if '__log' in mappings:
logfilename_remap = mappings['__log']
filename = os.path.abspath(logfilename_remap)
_log_filename = roslib.roslogging.configure_logging('rosmaster', logging.DEBUG, filename=filename)
def rosmaster_main(argv=sys.argv, stdout=sys.stdout, env=os.environ):
parser = optparse.OptionParser(usage="usage: zenmaster [options]")
parser.add_option("--core",
dest="core", action="store_true", default=False,
help="run as core")
parser.add_option("-p", "--port",
dest="port", default=0,
help="override port", metavar="PORT")
options, args = parser.parse_args(argv[1:])
# only arg that zenmaster supports is __log remapping of logfilename
for arg in args:
if not arg.startswith('__log:='):
parser.error("unrecognized arg: %s"%arg)
configure_logging()
port = rosmaster.master.DEFAULT_MASTER_PORT
if options.port:
port = int(options.port)
if not options.core:
print """
ACHTUNG WARNING ACHTUNG WARNING ACHTUNG
WARNING ACHTUNG WARNING ACHTUNG WARNING
Standalone zenmaster has been deprecated, please use 'roscore' instead
ACHTUNG WARNING ACHTUNG WARNING ACHTUNG
WARNING ACHTUNG WARNING ACHTUNG WARNING
"""
logger = logging.getLogger("rosmaster.main")
logger.info("initialization complete, waiting for shutdown")
try:
logger.info("Starting ROS Master Node")
master = rosmaster.master.Master(port)
master.start()
import time
while master.ok():
time.sleep(.1)
except KeyboardInterrupt:
logger.info("keyboard interrupt, will exit")
finally:
logger.info("stopping master...")
master.stop()
if __name__ == "__main__":
main()

View File

@ -1,84 +0,0 @@
# Software License Agreement (BSD License)
#
# Copyright (c) 2008, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Revision $Id$
"""
ROS Master.
This module integrates the lower-level implementation modules into a
single interface for running and stopping the ROS Master.
"""
import logging
import time
import roslib.xmlrpc
import rosmaster.master_api
DEFAULT_MASTER_PORT=11311 #default port for master's to bind to
class Master(object):
def __init__(self, port=DEFAULT_MASTER_PORT):
self.port = port
def start(self):
"""
Start the ROS Master.
"""
handler = rosmaster.master_api.ROSMasterHandler()
master_node = roslib.xmlrpc.XmlRpcNode(self.port, handler)
master_node.start()
# poll for initialization
while not master_node.uri:
time.sleep(0.0001)
# save fields
self.handler = handler
self.master_node = master_node
self.uri = master_node.uri
logging.getLogger('rosmaster.master').info("Master initialized: port[%s], uri[%s]", self.port, self.uri)
def ok(self):
if self.master_node is not None:
return self.master_node.handler._ok()
else:
return False
def stop(self):
if self.master_node is not None:
self.master_node.shutdown('Master.stop')
self.master_node = None

View File

@ -1,876 +0,0 @@
# Software License Agreement (BSD License)
#
# Copyright (c) 2008, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Revision $Id$
"""
ROS Master API.
L{ROSMasterHandler} provides the API implementation of the
Master. Python allows an API to be introspected from a Python class,
so the handler has a 1-to-1 mapping with the actual XMLRPC API.
API return convention: (statusCode, statusMessage, returnValue)
- statusCode: an integer indicating the completion condition of the method.
- statusMessage: a human-readable string message for debugging
- returnValue: the return value of the method; method-specific.
Current status codes:
- -1: ERROR: Error on the part of the caller, e.g. an invalid parameter
- 0: FAILURE: Method was attempted but failed to complete correctly.
- 1: SUCCESS: Method completed successfully.
Individual methods may assign additional meaning/semantics to statusCode.
"""
import os
import sys
import logging
import threading
import time
import traceback
from roslib.xmlrpc import XmlRpcHandler
import roslib.names
from roslib.names import resolve_name
import rosmaster.paramserver
import rosmaster.threadpool
from rosmaster.util import xmlrpcapi
from rosmaster.registrations import RegistrationManager
from rosmaster.validators import non_empty, non_empty_str, not_none, is_api, is_topic, is_service, valid_type_name, valid_name, empty_or_valid_name, ParameterInvalid
NUM_WORKERS = 3 #number of threads we use to send publisher_update notifications
# Return code slots
STATUS = 0
MSG = 1
VAL = 2
_logger = logging.getLogger("rosmaster.master")
LOG_API = False
def mloginfo(msg, *args):
"""
Info-level master log statements. These statements may be printed
to screen so they should be user-readable.
@param msg: Message string
@type msg: str
@param args: arguments for msg if msg is a format string
"""
#mloginfo is in core so that it is accessible to master and masterdata
_logger.info(msg, *args)
def mlogwarn(msg, *args):
"""
Warn-level master log statements. These statements may be printed
to screen so they should be user-readable.
@param msg: Message string
@type msg: str
@param args: arguments for msg if msg is a format string
"""
#mloginfo is in core so that it is accessible to master and masterdata
_logger.warn(msg, *args)
if args:
print "WARN: "+msg%args
else:
print "WARN: "+str(msg)
def apivalidate(error_return_value, validators=()):
"""
ROS master/slave arg-checking decorator. Applies the specified
validator to the corresponding argument and also remaps each
argument to be the value returned by the validator. Thus,
arguments can be simultaneously validated and canonicalized prior
to actual function call.
@param error_return_value: API value to return if call unexpectedly fails
@param validators: sequence of validators to apply to each
arg. None means no validation for the parameter is required. As all
api methods take caller_id as the first parameter, the validators
start with the second param.
@type validators: sequence
"""
def check_validates(f):
assert len(validators) == f.func_code.co_argcount - 2, "%s failed arg check"%f #ignore self and caller_id
def validated_f(*args, **kwds):
if LOG_API:
_logger.debug("%s%s", f.func_name, str(args[1:]))
#print "%s%s"%(f.func_name, str(args[1:]))
if len(args) == 1:
_logger.error("%s invoked without caller_id paramter"%f.func_name)
return -1, "missing required caller_id parameter", error_return_value
elif len(args) != f.func_code.co_argcount:
return -1, "Error: bad call arity", error_return_value
instance = args[0]
caller_id = args[1]
if not isinstance(caller_id, basestring):
_logger.error("%s: invalid caller_id param type", f.func_name)
return -1, "caller_id must be a string", error_return_value
newArgs = [instance, caller_id] #canonicalized args
try:
for (v, a) in zip(validators, args[2:]):
if v:
try:
newArgs.append(v(a, caller_id))
except ParameterInvalid, e:
_logger.error("%s: invalid parameter: %s", f.func_name, str(e) or 'error')
return -1, str(e) or 'error', error_return_value
else:
newArgs.append(a)
if LOG_API:
retval = f(*newArgs, **kwds)
_logger.debug("%s%s returns %s", f.func_name, args[1:], retval)
return retval
else:
code, msg, val = f(*newArgs, **kwds)
if val is None:
return -1, "Internal error (None value returned)", error_return_value
return code, msg, val
except TypeError, te: #most likely wrong arg number
_logger.error(traceback.format_exc())
return -1, "Error: invalid arguments: %s"%te, error_return_value
except Exception, e: #internal failure
_logger.error(traceback.format_exc())
return 0, "Internal failure: %s"%e, error_return_value
validated_f.func_name = f.func_name
validated_f.__doc__ = f.__doc__ #preserve doc
return validated_f
return check_validates
def publisher_update_task(api, topic, pub_uris):
"""
Contact api.publisherUpdate with specified parameters
@param api: XML-RPC URI of node to contact
@type api: str
@param topic: Topic name to send to node
@type topic: str
@param pub_uris: list of publisher APIs to send to node
@type pub_uris: [str]
"""
mloginfo("publisherUpdate[%s] -> %s", topic, api)
#TODO: check return value for errors so we can unsubscribe if stale
xmlrpcapi(api).publisherUpdate('/master', topic, pub_uris)
def service_update_task(api, service, uri):
"""
Contact api.serviceUpdate with specified parameters
@param api: XML-RPC URI of node to contact
@type api: str
@param service: Service name to send to node
@type service: str
@param uri: URI to send to node
@type uri: str
"""
mloginfo("serviceUpdate[%s, %s] -> %s",service, uri, api)
xmlrpcapi(api).serviceUpdate('/master', service, uri)
###################################################
# Master Implementation
class ROSMasterHandler(object):
"""
XML-RPC handler for ROS master APIs.
API routines for the ROS Master Node. The Master Node is a
superset of the Slave Node and contains additional API methods for
creating and monitoring a graph of slave nodes.
By convention, ROS nodes take in caller_id as the first parameter
of any API call. The setting of this parameter is rarely done by
client code as ros::msproxy::MasterProxy automatically inserts
this parameter (see ros::client::getMaster()).
"""
def __init__(self):
"""ctor."""
self.uri = None
self.done = False
self.thread_pool = rosmaster.threadpool.MarkedThreadPool(NUM_WORKERS)
# pub/sub/providers: dict { topicName : [publishers/subscribers names] }
self.ps_lock = threading.Condition(threading.Lock())
self.reg_manager = RegistrationManager(self.thread_pool)
# maintain refs to reg_manager fields
self.publishers = self.reg_manager.publishers
self.subscribers = self.reg_manager.subscribers
self.services = self.reg_manager.services
self.param_subscribers = self.reg_manager.param_subscribers
self.topics_types = {} #dict { topicName : type }
# parameter server dictionary
self.param_server = rosmaster.paramserver.ParamDictionary(self.reg_manager)
def _shutdown(self, reason=''):
if self.thread_pool is not None:
self.thread_pool.join_all(wait_for_tasks=False, wait_for_threads=False)
self.thread_pool = None
self.done = True
def _ready(self, uri):
"""
Initialize the handler with the XMLRPC URI. This is a standard callback from the roslib.xmlrpc.XmlRpcNode API.
@param uri: XML-RPC URI
@type uri: str
"""
self.uri = uri
def _ok(self):
return not self.done
###############################################################################
# EXTERNAL API
@apivalidate(0, (None, ))
def shutdown(self, caller_id, msg=''):
"""
Stop this server
@param caller_id: ROS caller id
@type caller_id: str
@param msg: a message describing why the node is being shutdown.
@type msg: str
@return: [code, msg, 0]
@rtype: [int, str, int]
"""
if msg:
print >> sys.stdout, "shutdown request: %s"%msg
else:
print >> sys.stdout, "shutdown requst"
self._shutdown('external shutdown request from [%s]: %s'%(caller_id, msg))
return 1, "shutdown", 0
@apivalidate('')
def getUri(self, caller_id):
"""
Get the XML-RPC URI of this server.
@param caller_id str: ROS caller id
@return [int, str, str]: [1, "", xmlRpcUri]
"""
return 1, "", self.uri
@apivalidate(-1)
def getPid(self, caller_id):
"""
Get the PID of this server
@param caller_id: ROS caller id
@type caller_id: str
@return: [1, "", serverProcessPID]
@rtype: [int, str, int]
"""
return 1, "", os.getpid()
################################################################
# PARAMETER SERVER ROUTINES
@apivalidate(0, (non_empty_str('key'),))
def deleteParam(self, caller_id, key):
"""
Parameter Server: delete parameter
@param caller_id: ROS caller id
@type caller_id: str
@param key: parameter name
@type key: str
@return: [code, msg, 0]
@rtype: [int, str, int]
"""
try:
key = resolve_name(key, caller_id)
self.param_server.delete_param(key, self._notify_param_subscribers)
mloginfo("-PARAM [%s] by %s",key, caller_id)
return 1, "parameter %s deleted"%key, 0
except KeyError, e:
return -1, "parameter [%s] is not set"%key, 0
@apivalidate(0, (non_empty_str('key'), not_none('value')))
def setParam(self, caller_id, key, value):
"""
Parameter Server: set parameter. NOTE: if value is a
dictionary it will be treated as a parameter tree, where key
is the parameter namespace. For example:::
{'x':1,'y':2,'sub':{'z':3}}
will set key/x=1, key/y=2, and key/sub/z=3. Furthermore, it
will replace all existing parameters in the key parameter
namespace with the parameters in value. You must set
parameters individually if you wish to perform a union update.
@param caller_id: ROS caller id
@type caller_id: str
@param key: parameter name
@type key: str
@param value: parameter value.
@type value: XMLRPCLegalValue
@return: [code, msg, 0]
@rtype: [int, str, int]
"""
key = resolve_name(key, caller_id)
self.param_server.set_param(key, value, self._notify_param_subscribers)
mloginfo("+PARAM [%s] by %s",key, caller_id)
return 1, "parameter %s set"%key, 0
@apivalidate(0, (non_empty_str('key'),))
def getParam(self, caller_id, key):
"""
Retrieve parameter value from server.
@param caller_id: ROS caller id
@type caller_id: str
@param key: parameter to lookup. If key is a namespace,
getParam() will return a parameter tree.
@type key: str
getParam() will return a parameter tree.
@return: [code, statusMessage, parameterValue]. If code is not
1, parameterValue should be ignored. If key is a namespace,
the return value will be a dictionary, where each key is a
parameter in that namespace. Sub-namespaces are also
represented as dictionaries.
@rtype: [int, str, XMLRPCLegalValue]
"""
try:
key = resolve_name(key, caller_id)
return 1, "Parameter [%s]"%key, self.param_server.get_param(key)
except KeyError, e:
return -1, "Parameter [%s] is not set"%key, 0
@apivalidate(0, (non_empty_str('key'),))
def searchParam(self, caller_id, key):
"""
Search for parameter key on parameter server. Search starts in caller's namespace and proceeds
upwards through parent namespaces until Parameter Server finds a matching key.
searchParam's behavior is to search for the first partial match.
For example, imagine that there are two 'robot_description' parameters::
/robot_description
/robot_description/arm
/robot_description/base
/pr2/robot_description
/pr2/robot_description/base
If I start in the namespace /pr2/foo and search for
'robot_description', searchParam will match
/pr2/robot_description. If I search for 'robot_description/arm'
it will return /pr2/robot_description/arm, even though that
parameter does not exist (yet).
@param caller_id str: ROS caller id
@type caller_id: str
@param key: parameter key to search for.
@type key: str
@return: [code, statusMessage, foundKey]. If code is not 1, foundKey should be
ignored.
@rtype: [int, str, str]
"""
search_key = self.param_server.search_param(caller_id, key)
if search_key:
return 1, "Found [%s]"%search_key, search_key
else:
return -1, "Cannot find parameter [%s] in an upwards search"%key, ''
@apivalidate(0, (is_api('caller_api'), non_empty_str('key'),))
def subscribeParam(self, caller_id, caller_api, key):
"""
Retrieve parameter value from server and subscribe to updates to that param. See
paramUpdate() in the Node API.
@param caller_id str: ROS caller id
@type caller_id: str
@param key: parameter to lookup.
@type key: str
@param caller_api: API URI for paramUpdate callbacks.
@type caller_api: str
@return: [code, statusMessage, parameterValue]. If code is not
1, parameterValue should be ignored. parameterValue is an empty dictionary if the parameter
has not been set yet.
@rtype: [int, str, XMLRPCLegalValue]
"""
key = resolve_name(key, caller_id)
try:
# ps_lock has precedence and is required due to
# potential self.reg_manager modification
self.ps_lock.acquire()
val = self.param_server.subscribe_param(key, (caller_id, caller_api))
finally:
self.ps_lock.release()
return 1, "Subscribed to parameter [%s]"%key, val
@apivalidate(0, (is_api('caller_api'), non_empty_str('key'),))
def unsubscribeParam(self, caller_id, caller_api, key):
"""
Retrieve parameter value from server and subscribe to updates to that param. See
paramUpdate() in the Node API.
@param caller_id str: ROS caller id
@type caller_id: str
@param key: parameter to lookup.
@type key: str
@param caller_api: API URI for paramUpdate callbacks.
@type caller_api: str
@return: [code, statusMessage, numUnsubscribed].
If numUnsubscribed is zero it means that the caller was not subscribed to the parameter.
@rtype: [int, str, int]
"""
key = resolve_name(key, caller_id)
try:
# ps_lock is required due to potential self.reg_manager modification
self.ps_lock.acquire()
retval = self.param_server.unsubscribe_param(key, (caller_id, caller_api))
finally:
self.ps_lock.release()
return 1, "Unsubscribe to parameter [%s]"%key, 1
@apivalidate(False, (non_empty_str('key'),))
def hasParam(self, caller_id, key):
"""
Check if parameter is stored on server.
@param caller_id str: ROS caller id
@type caller_id: str
@param key: parameter to check
@type key: str
@return: [code, statusMessage, hasParam]
@rtype: [int, str, bool]
"""
key = resolve_name(key, caller_id)
if self.param_server.has_param(key):
return 1, key, True
else:
return 1, key, False
@apivalidate([])
def getParamNames(self, caller_id):
"""
Get list of all parameter names stored on this server.
This does not adjust parameter names for caller's scope.
@param caller_id: ROS caller id
@type caller_id: str
@return: [code, statusMessage, parameterNameList]
@rtype: [int, str, [str]]
"""
return 1, "Parameter names", self.param_server.get_param_names()
##################################################################################
# NOTIFICATION ROUTINES
def _notify(self, registrations, task, key, value):
"""
Generic implementation of callback notification
@param registrations: Registrations
@type registrations: L{Registrations}
@param task: task to queue
@type task: fn
@param key: registration key
@type key: str
@param value: value to pass to task
@type value: Any
"""
# cache thread_pool for thread safety
thread_pool = self.thread_pool
if not thread_pool:
return
if registrations.has_key(key):
try:
for node_api in registrations.get_apis(key):
# use the api as a marker so that we limit one thread per subscriber
thread_pool.queue_task(node_api, task, (node_api, key, value))
except KeyError:
_logger.warn('subscriber data stale (key [%s], listener [%s]): node API unknown'%(key, s))
def _notify_param_subscribers(self, updates):
"""
Notify parameter subscribers of new parameter value
@param updates [([str], str, any)*]: [(subscribers, param_key, param_value)*]
@param param_value str: parameter value
"""
# cache thread_pool for thread safety
thread_pool = self.thread_pool
if not thread_pool:
return
for subscribers, key, value in updates:
# use the api as a marker so that we limit one thread per subscriber
for caller_id, caller_api in subscribers:
self.thread_pool.queue_task(caller_api, self.param_update_task, (caller_id, caller_api, key, value))
def param_update_task(self, caller_id, caller_api, param_key, param_value):
"""
Contact api.paramUpdate with specified parameters
@param caller_id: caller ID
@type caller_id: str
@param caller_api: XML-RPC URI of node to contact
@type caller_api: str
@param param_key: parameter key to pass to node
@type param_key: str
@param param_value: parameter value to pass to node
@type param_value: str
"""
mloginfo("paramUpdate[%s]", param_key)
code, _, _ = xmlrpcapi(caller_api).paramUpdate('/master', param_key, param_value)
if code == -1:
try:
# ps_lock is required due to potential self.reg_manager modification
self.ps_lock.acquire()
# reverse lookup to figure out who we just called
matches = self.reg_manager.reverse_lookup(caller_api)
for m in matches:
retval = self.param_server.unsubscribe_param(param_key, (m.id, caller_api))
finally:
self.ps_lock.release()
def _notify_topic_subscribers(self, topic, pub_uris):
"""
Notify subscribers with new publisher list
@param topic: name of topic
@type topic: str
@param pub_uris: list of URIs of publishers.
@type pub_uris: [str]
"""
self._notify(self.subscribers, publisher_update_task, topic, pub_uris)
def _notify_service_update(self, service, service_api):
"""
Notify clients of new service provider
@param service: name of service
@type service: str
@param service_api: new service URI
@type service_api: str
"""
###TODO:XXX:stub code, this callback doesnot exist yet
self._notify(self.service_clients, service_update_task, service, service_api)
##################################################################################
# SERVICE PROVIDER
@apivalidate(0, ( is_service('service'), is_api('service_api'), is_api('caller_api')))
def registerService(self, caller_id, service, service_api, caller_api):
"""
Register the caller as a provider of the specified service.
@param caller_id str: ROS caller id
@type caller_id: str
@param service: Fully-qualified name of service
@type service: str
@param service_api: Service URI
@type service_api: str
@param caller_api: XML-RPC URI of caller node
@type caller_api: str
@return: (code, message, ignore)
@rtype: (int, str, int)
"""
try:
self.ps_lock.acquire()
self.reg_manager.register_service(service, caller_id, caller_api, service_api)
mloginfo("+SERVICE [%s] %s %s", service, caller_id, caller_api)
if 0: #TODO
self._notify_service_update(service, service_api)
finally:
self.ps_lock.release()
return 1, "Registered [%s] as provider of [%s]"%(caller_id, service), 1
@apivalidate(0, (is_service('service'),))
def lookupService(self, caller_id, service):
"""
Lookup all provider of a particular service.
@param caller_id str: ROS caller id
@type caller_id: str
@param service: fully-qualified name of service to lookup.
@type: service: str
@return: (code, message, serviceUrl). service URL is provider's
ROSRPC URI with address and port. Fails if there is no provider.
@rtype: (int, str, str)
"""
try:
self.ps_lock.acquire()
service_url = self.services.get_service_api(service)
finally:
self.ps_lock.release()
if service_url:
return 1, "rosrpc URI: [%s]"%service_url, service_url
else:
return -1, "no provider", ''
@apivalidate(0, ( is_service('service'), is_api('service_api')))
def unregisterService(self, caller_id, service, service_api):
"""
Unregister the caller as a provider of the specified service.
@param caller_id str: ROS caller id
@type caller_id: str
@param service: Fully-qualified name of service
@type service: str
@param service_api: API URI of service to unregister. Unregistration will only occur if current
registration matches.
@type service_api: str
@return: (code, message, numUnregistered). Number of unregistrations (either 0 or 1).
If this is zero it means that the caller was not registered as a service provider.
The call still succeeds as the intended final state is reached.
@rtype: (int, str, int)
"""
try:
self.ps_lock.acquire()
retval = self.reg_manager.unregister_service(service, caller_id, service_api)
if 0: #TODO
self._notify_service_update(service, service_api)
mloginfo("-SERVICE [%s] %s %s", service, caller_id, service_api)
return retval
finally:
self.ps_lock.release()
##################################################################################
# PUBLISH/SUBSCRIBE
@apivalidate(0, ( is_topic('topic'), valid_type_name('topic_type'), is_api('caller_api')))
def registerSubscriber(self, caller_id, topic, topic_type, caller_api):
"""
Subscribe the caller to the specified topic. In addition to receiving
a list of current publishers, the subscriber will also receive notifications
of new publishers via the publisherUpdate API.
@param caller_id: ROS caller id
@type caller_id: str
@param topic str: Fully-qualified name of topic to subscribe to.
@param topic_type: Datatype for topic. Must be a package-resource name, i.e. the .msg name.
@type topic_type: str
@param caller_api: XML-RPC URI of caller node for new publisher notifications
@type caller_api: str
@return: (code, message, publishers). Publishers is a list of XMLRPC API URIs
for nodes currently publishing the specified topic.
@rtype: (int, str, [str])
"""
#NOTE: subscribers do not get to set topic type
try:
self.ps_lock.acquire()
self.reg_manager.register_subscriber(topic, caller_id, caller_api)
# ROS 1.1: subscriber can now set type if it is not already set
# - don't let '*' type squash valid typing
if not topic in self.topics_types and topic_type != roslib.names.ANYTYPE:
self.topics_types[topic] = topic_type
mloginfo("+SUB [%s] %s %s",topic, caller_id, caller_api)
pub_uris = self.publishers.get_apis(topic)
finally:
self.ps_lock.release()
return 1, "Subscribed to [%s]"%topic, pub_uris
@apivalidate(0, (is_topic('topic'), is_api('caller_api')))
def unregisterSubscriber(self, caller_id, topic, caller_api):
"""
Unregister the caller as a publisher of the topic.
@param caller_id: ROS caller id
@type caller_id: str
@param topic: Fully-qualified name of topic to unregister.
@type topic: str
@param caller_api: API URI of service to unregister. Unregistration will only occur if current
registration matches.
@type caller_api: str
@return: (code, statusMessage, numUnsubscribed).
If numUnsubscribed is zero it means that the caller was not registered as a subscriber.
The call still succeeds as the intended final state is reached.
@rtype: (int, str, int)
"""
try:
self.ps_lock.acquire()
retval = self.reg_manager.unregister_subscriber(topic, caller_id, caller_api)
mloginfo("-SUB [%s] %s %s",topic, caller_id, caller_api)
return retval
finally:
self.ps_lock.release()
@apivalidate(0, ( is_topic('topic'), valid_type_name('topic_type'), is_api('caller_api')))
def registerPublisher(self, caller_id, topic, topic_type, caller_api):
"""
Register the caller as a publisher the topic.
@param caller_id: ROS caller id
@type caller_id: str
@param topic: Fully-qualified name of topic to register.
@type topic: str
@param topic_type: Datatype for topic. Must be a
package-resource name, i.e. the .msg name.
@type topic_type: str
@param caller_api str: ROS caller XML-RPC API URI
@type caller_api: str
@return: (code, statusMessage, subscriberApis).
List of current subscribers of topic in the form of XMLRPC URIs.
@rtype: (int, str, [str])
"""
#NOTE: we need topic_type for getPublishedTopics.
try:
self.ps_lock.acquire()
self.reg_manager.register_publisher(topic, caller_id, caller_api)
# don't let '*' type squash valid typing
if topic_type != roslib.names.ANYTYPE or not topic in self.topics_types:
self.topics_types[topic] = topic_type
pub_uris = self.publishers.get_apis(topic)
self._notify_topic_subscribers(topic, pub_uris)
mloginfo("+PUB [%s] %s %s",topic, caller_id, caller_api)
sub_uris = self.subscribers.get_apis(topic)
finally:
self.ps_lock.release()
return 1, "Registered [%s] as publisher of [%s]"%(caller_id, topic), sub_uris
@apivalidate(0, (is_topic('topic'), is_api('caller_api')))
def unregisterPublisher(self, caller_id, topic, caller_api):
"""
Unregister the caller as a publisher of the topic.
@param caller_id: ROS caller id
@type caller_id: str
@param topic: Fully-qualified name of topic to unregister.
@type topic: str
@param caller_api str: API URI of service to
unregister. Unregistration will only occur if current
registration matches.
@type caller_api: str
@return: (code, statusMessage, numUnregistered).
If numUnregistered is zero it means that the caller was not registered as a publisher.
The call still succeeds as the intended final state is reached.
@rtype: (int, str, int)
"""
try:
self.ps_lock.acquire()
retval = self.reg_manager.unregister_publisher(topic, caller_id, caller_api)
if retval[VAL]:
self._notify_topic_subscribers(topic, self.publishers.get_apis(topic))
mloginfo("-PUB [%s] %s %s",topic, caller_id, caller_api)
finally:
self.ps_lock.release()
return retval
##################################################################################
# GRAPH STATE APIS
@apivalidate('', (valid_name('node'),))
def lookupNode(self, caller_id, node_name):
"""
Get the XML-RPC URI of the node with the associated
name/caller_id. This API is for looking information about
publishers and subscribers. Use lookupService instead to lookup
ROS-RPC URIs.
@param caller_id: ROS caller id
@type caller_id: str
@param node: name of node to lookup
@type node: str
@return: (code, msg, URI)
@rtype: (int, str, str)
"""
try:
self.ps_lock.acquire()
node = self.reg_manager.get_node(node_name)
if node is not None:
retval = 1, "node api", node.api
else:
retval = -1, "unknown node [%s]"%node_name, ''
finally:
self.ps_lock.release()
return retval
@apivalidate(0, (empty_or_valid_name('subgraph'),))
def getPublishedTopics(self, caller_id, subgraph):
"""
Get list of topics that can be subscribed to. This does not return topics that have no publishers.
See L{getSystemState()} to get more comprehensive list.
@param caller_id: ROS caller id
@type caller_id: str
@param subgraph: Restrict topic names to match within the specified subgraph. Subgraph namespace
is resolved relative to the caller's namespace. Use '' to specify all names.
@type subgraph: str
@return: (code, msg, [[topic1, type1]...[topicN, typeN]])
@rtype: (int, str, [[str, str],])
"""
try:
self.ps_lock.acquire()
# force subgraph to be a namespace with trailing slash
if subgraph and subgraph[-1] != roslib.names.SEP:
subgraph = subgraph + roslib.names.SEP
#we don't bother with subscribers as subscribers don't report topic types. also, the intended
#use case is for subscribe-by-topic-type
retval = [[t, self.topics_types[t]] for t in self.publishers.iterkeys() if t.startswith(subgraph)]
finally:
self.ps_lock.release()
return 1, "current topics", retval
@apivalidate([])
def getTopicTypes(self, caller_id):
"""
Retrieve list topic names and their types.
@param caller_id: ROS caller id
@type caller_id: str
@rtype: (int, str, [[str,str]] )
@return: (code, statusMessage, topicTypes). topicTypes is a list of [topicName, topicType] pairs.
"""
try:
self.ps_lock.acquire()
retval = self.topics_types.items()
finally:
self.ps_lock.release()
return 1, "current system state", retval
@apivalidate([[],[], []])
def getSystemState(self, caller_id):
"""
Retrieve list representation of system state (i.e. publishers, subscribers, and services).
@param caller_id: ROS caller id
@type caller_id: str
@rtype: (int, str, [[str,[str]], [str,[str]], [str,[str]]])
@return: (code, statusMessage, systemState).
System state is in list representation::
[publishers, subscribers, services].
publishers is of the form::
[ [topic1, [topic1Publisher1...topic1PublisherN]] ... ]
subscribers is of the form::
[ [topic1, [topic1Subscriber1...topic1SubscriberN]] ... ]
services is of the form::
[ [service1, [service1Provider1...service1ProviderN]] ... ]
"""
edges = []
try:
self.ps_lock.acquire()
retval = [r.get_state() for r in (self.publishers, self.subscribers, self.services)]
finally:
self.ps_lock.release()
return 1, "current system state", retval

View File

@ -1,393 +0,0 @@
# Software License Agreement (BSD License)
#
# Copyright (c) 2009, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
from threading import RLock
from roslib.names import ns_join, GLOBALNS, SEP, is_global, is_private, canonicalize_name
def _get_param_names(names, key, d):
"""
helper recursive routine for getParamNames()
@param names: list of param names to append to
@type names: [str]
@param d: parameter tree node
@type d: dict
@param key: parameter key for tree node d
@type key: str
"""
#TODOXXX
for k,v in d.iteritems():
if type(v) == dict:
_get_param_names(names, ns_join(key, k), v)
else:
names.append(ns_join(key, k))
class ParamDictionary(object):
def __init__(self, reg_manager):
"""
ctor.
@param subscribers: parameter subscribers
@type subscribers: Registrations
"""
self.lock = RLock()
self.parameters = {}
self.reg_manager = reg_manager
def get_param_names(self):
"""
Get list of all parameter names stored on this server.
@return: [code, statusMessage, parameterNameList]
@rtype: [int, str, [str]]
"""
try:
self.lock.acquire()
param_names = []
_get_param_names(param_names, '/', self.parameters)
finally:
self.lock.release()
return param_names
def search_param(self, ns, key):
"""
Search for matching parameter key for search param
key. Search for key starts at ns and proceeds upwards to
the root. As such, search_param should only be called with a
relative parameter name.
search_param's behavior is to search for the first partial match.
For example, imagine that there are two 'robot_description' parameters:
- /robot_description
- /robot_description/arm
- /robot_description/base
- /pr2/robot_description
- /pr2/robot_description/base
If I start in the namespace /pr2/foo and search for
'robot_description', search_param will match
/pr2/robot_description. If I search for 'robot_description/arm'
it will return /pr2/robot_description/arm, even though that
parameter does not exist (yet).
@param ns: namespace to begin search from.
@type ns: str
@param key: Parameter key.
@type key: str
@return: key of matching parameter or None if no matching
parameter.
@rtype: str
"""
if not key or is_private(key):
raise ValueError("invalid key")
if not is_global(ns):
raise ValueError("namespace must be global")
if is_global(key):
if self.has_param(key):
return key
else:
return None
# there are more efficient implementations, but our hiearchy
# is not very deep and this is fairly clean code to read.
# - we only search for the first namespace in the key to check for a match
key_namespaces = [x for x in key.split(SEP) if x]
key_ns = key_namespaces[0]
# - corner case: have to test initial namespace first as
# negative indices won't work with 0
search_key = ns_join(ns, key_ns)
if self.has_param(search_key):
# resolve to full key
return ns_join(ns, key)
namespaces = [x for x in ns.split(SEP) if x]
for i in xrange(1, len(namespaces)+1):
search_key = SEP + SEP.join(namespaces[0:-i] + [key_ns])
if self.has_param(search_key):
# we have a match on the namespace of the key, so
# compose the full key and return it
full_key = SEP + SEP.join(namespaces[0:-i] + [key])
return full_key
return None
def get_param(self, key):
"""
Get the parameter in the parameter dictionary.
@param key: parameter key
@type key: str
@return: parameter value
"""
try:
self.lock.acquire()
val = self.parameters
if key != GLOBALNS:
# split by the namespace separator, ignoring empty splits
namespaces = [x for x in key.split(SEP)[1:] if x]
for ns in namespaces:
if not type(val) == dict:
raise KeyError(val)
val = val[ns]
return val
finally:
self.lock.release()
def set_param(self, key, value, notify_task=None):
"""
Set the parameter in the parameter dictionary.
@param key: parameter key
@type key: str
@param value: parameter value
@param notify_task: function to call with
subscriber updates. updates is of the form
[(subscribers, param_key, param_value)*]. The empty dictionary
represents an unset parameter.
@type notify_task: fn(updates)
"""
try:
self.lock.acquire()
if key == GLOBALNS:
if type(value) != dict:
raise TypeError("cannot set root of parameter tree to non-dictionary")
self.parameters = value
else:
namespaces = [x for x in key.split(SEP) if x]
# - last namespace is the actual key we're storing in
value_key = namespaces[-1]
namespaces = namespaces[:-1]
d = self.parameters
# - descend tree to the node we're setting
for ns in namespaces:
if not ns in d:
new_d = {}
d[ns] = new_d
d = new_d
else:
val = d[ns]
# implicit type conversion of value to namespace
if type(val) != dict:
d[ns] = val = {}
d = val
d[value_key] = value
# ParamDictionary needs to queue updates so that the updates are thread-safe
if notify_task:
updates = compute_param_updates(self.reg_manager.param_subscribers, key, value)
if updates:
notify_task(updates)
finally:
self.lock.release()
def subscribe_param(self, key, registration_args):
"""
@param key: parameter key
@type key: str
@param registration_args: additional args to pass to
subscribers.register. First parameter is always the parameter
key.
@type registration_args: tuple
"""
if key != SEP:
key = canonicalize_name(key) + SEP
try:
self.lock.acquire()
# fetch parameter value
try:
val = self.get_param(key)
except KeyError:
# parameter not set yet
val = {}
self.reg_manager.register_param_subscriber(key, *registration_args)
return val
finally:
self.lock.release()
def unsubscribe_param(self, key, unregistration_args):
"""
@param key str: parameter key
@type key: str
@param unregistration_args: additional args to pass to
subscribers.unregister. i.e. unregister will be called with
(key, *unregistration_args)
@type unregistration_args: tuple
@return: return value of subscribers.unregister()
"""
if key != SEP:
key = canonicalize_name(key) + SEP
return self.reg_manager.unregister_param_subscriber(key, *unregistration_args)
def delete_param(self, key, notify_task=None):
"""
Delete the parameter in the parameter dictionary.
@param key str: parameter key
@param notify_task fn(updates): function to call with
subscriber updates. updates is of the form
[(subscribers, param_key, param_value)*]. The empty dictionary
represents an unset parameter.
"""
try:
self.lock.acquire()
if key == GLOBALNS:
raise KeyError("cannot delete root of parameter tree")
else:
# key is global, so first split is empty
namespaces = [x for x in key.split(SEP) if x]
# - last namespace is the actual key we're deleting
value_key = namespaces[-1]
namespaces = namespaces[:-1]
d = self.parameters
# - descend tree to the node we're setting
for ns in namespaces:
if type(d) != dict or not ns in d:
raise KeyError(key)
else:
d = d[ns]
if not value_key in d:
raise KeyError(key)
else:
del d[value_key]
# ParamDictionary needs to queue updates so that the updates are thread-safe
if notify_task:
updates = compute_param_updates(self.reg_manager.param_subscribers, key, {})
if updates:
notify_task(updates)
finally:
self.lock.release()
def has_param(self, key):
"""
Test for parameter existence
@param key: parameter key
@type key: str
@return: True if parameter set, False otherwise
@rtype: bool
"""
try:
# more efficient implementations are certainly possible,
# but this guarantees correctness for now
self.get_param(key)
return True
except KeyError:
return False
def _compute_all_keys(param_key, param_value, all_keys=None):
"""
Compute which subscribers should be notified based on the parameter update
@param param_key: key of updated parameter
@type param_key: str
@param param_value: value of updated parameter
@param all_keys: (internal use only) list of parameter keys
to append to for recursive calls.
@type all_keys: [str]
@return: list of parameter keys. All keys will be canonicalized with trailing slash.
@rtype: [str]
"""
if all_keys is None:
all_keys = []
for k, v in param_value.iteritems():
new_k = ns_join(param_key, k) + SEP
all_keys.append(new_k)
if type(v) == dict:
_compute_all_keys(new_k, v, all_keys)
return all_keys
def compute_param_updates(subscribers, param_key, param_value):
"""
Compute subscribers that should be notified based on the parameter update
@param subscribers: parameter subscribers
@type subscribers: Registrations
@param param_key: parameter key
@type param_key: str
@param param_value: parameter value
@type param_value: str
"""
# logic correct for both updates and deletions
if not subscribers:
return []
# end with a trailing slash to optimize startswith check from
# needing an extra equals check
if param_key != SEP:
param_key = canonicalize_name(param_key) + SEP
# compute all the updated keys
if type(param_value) == dict:
all_keys = _compute_all_keys(param_key, param_value)
else:
all_keys = None
updates = []
# subscriber gets update if anything in the subscribed namespace is updated or if its deleted
for sub_key in subscribers.iterkeys():
ns_key = sub_key
if ns_key[-1] != SEP:
ns_key = sub_key + SEP
if param_key.startswith(ns_key):
node_apis = subscribers[sub_key]
updates.append((node_apis, param_key, param_value))
elif all_keys is not None and ns_key.startswith(param_key) \
and not sub_key in all_keys:
# parameter was deleted
node_apis = subscribers[sub_key]
updates.append((node_apis, sub_key, {}))
# add updates for exact matches within tree
if all_keys is not None:
# #586: iterate over parameter tree for notification
for key in all_keys:
if key in subscribers:
# compute actual update value
sub_key = key[len(param_key):]
namespaces = [x for x in sub_key.split(SEP) if x]
val = param_value
for ns in namespaces:
val = val[ns]
updates.append((subscribers[key], key, val))
return updates

View File

@ -1,465 +0,0 @@
# Software License Agreement (BSD License)
#
# Copyright (c) 2008, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Revision $Id$
from rosmaster.util import xmlrpcapi
import rosmaster.exceptions
"""Data structures for representing registration data in the Master"""
class NodeRef(object):
"""
Container for node registration information. Used in master's
self.nodes data structure. This is effectively a reference
counter for the node registration information: when the
subscriptions and publications are empty the node registration can
be deleted.
"""
def __init__(self, id, api):
"""
ctor
@param api str: node XML-RPC API
"""
self.id = id
self.api = api
self.param_subscriptions = []
self.topic_subscriptions = []
self.topic_publications = []
self.services = []
def clear(self):
"""
Delete all state from this NodeRef except for the api location
"""
self.param_subscriptions = []
self.topic_subscriptions = []
self.topic_publications = []
self.services = []
def is_empty(self):
"""
@return: True if node has no active registrations
"""
return sum((len(x) for x in
[self.param_subscriptions,
self.topic_subscriptions,
self.topic_publications,
self.services,])) == 0
def add(self, type_, key):
if type_ == Registrations.TOPIC_SUBSCRIPTIONS:
if not key in self.topic_subscriptions:
self.topic_subscriptions.append(key)
elif type_ == Registrations.TOPIC_PUBLICATIONS:
if not key in self.topic_publications:
self.topic_publications.append(key)
elif type_ == Registrations.SERVICE:
if not key in self.services:
self.services.append(key)
elif type_ == Registrations.PARAM_SUBSCRIPTIONS:
if not key in self.param_subscriptions:
self.param_subscriptions.append(key)
else:
raise rosmaster.exceptions.InternalException("internal bug")
def remove(self, type_, key):
if type_ == Registrations.TOPIC_SUBSCRIPTIONS:
if key in self.topic_subscriptions:
self.topic_subscriptions.remove(key)
elif type_ == Registrations.TOPIC_PUBLICATIONS:
if key in self.topic_publications:
self.topic_publications.remove(key)
elif type_ == Registrations.SERVICE:
if key in self.services:
self.services.remove(key)
elif type_ == Registrations.PARAM_SUBSCRIPTIONS:
if key in self.param_subscriptions:
self.param_subscriptions.remove(key)
else:
raise rosmaster.exceptions.InternalException("internal bug")
# NOTE: I'm not terribly happy that this task has leaked into the data model. need
# to refactor to get this back into masterslave.
def shutdown_node_task(api, caller_id, reason):
"""
Method to shutdown another ROS node. Generally invoked within a
separate thread as this is used to cleanup hung nodes.
@param api: XML-RPC API of node to shutdown
@type api: str
@param caller_id: name of node being shutdown
@type caller_id: str
@param reason: human-readable reason why node is being shutdown
@type reason: str
"""
try:
xmlrpcapi(api).shutdown('/master', reason)
except:
pass #expected in many common cases
class Registrations(object):
"""
All calls may result in access/modifications to node registrations
dictionary, so be careful to guarantee appropriate thread-safeness.
Data structure for storing a set of registrations (e.g. publications, services).
The underlying data storage is the same except for services, which have the
constraint that only one registration may be active for a given key.
"""
TOPIC_SUBSCRIPTIONS = 1
TOPIC_PUBLICATIONS = 2
SERVICE = 3
PARAM_SUBSCRIPTIONS = 4
def __init__(self, type_):
"""
ctor.
@param type_: one of [ TOPIC_SUBSCRIPTIONS,
TOPIC_PUBLICATIONS, SERVICE, PARAM_SUBSCRIPTIONS ]
@type type_: int
"""
if not type_ in [
Registrations.TOPIC_SUBSCRIPTIONS,
Registrations.TOPIC_PUBLICATIONS,
Registrations.SERVICE,
Registrations.PARAM_SUBSCRIPTIONS ]:
raise rosmaster.exceptions.InternalException("invalid registration type: %s"%type_)
self.type = type_
## { key: [(caller_id, caller_api)] }
self.map = {}
self.service_api_map = None
def __nonzero__(self):
"""
@return: True if there are no registrations
"""
return len(self.map) != 0
def iterkeys(self):
"""
Iterate over registration keys
@return: iterator for registration keys
"""
return self.map.iterkeys()
def get_service_api(self, service):
"""
Lookup service API URI. NOTE: this should only be valid if type==SERVICE as
service Registrations instances are the only ones that track service API URIs.
@param service: service name
@type service: str
@return str: service_api for registered key or None if
registration is no longer valid.
@type: str
"""
if self.service_api_map and service in self.service_api_map:
caller_id, service_api = self.service_api_map[service]
return service_api
return None
def get_apis(self, key):
"""
Only valid if self.type != SERVICE.
@param key: registration key (e.g. topic/service/param name)
@type key: str
@return: caller_apis for registered key, empty list if registration is not valid
@rtype: [str]
"""
return [api for _, api in self.map.get(key, [])]
def __contains__(self, key):
"""
Emulate mapping type for has_key()
"""
return key in self.map
def __getitem__(self, key):
"""
@param key: registration key (e.g. topic/service/param name)
@type key: str
@return: (caller_id, caller_api) for registered
key, empty list if registration is not valid
@rtype: [(str, str),]
"""
# unlike get_apis, returns the caller_id to prevent any race
# conditions that can occur if caller_id/caller_apis change
# due to a new node.
return self.map.get(key, [])
def has_key(self, key):
"""
@param key: registration key (e.g. topic/service/param name)
@type key: str
@return: True if key is registered
@rtype: bool
"""
return key in self.map
def get_state(self):
"""
@return: state in getSystemState()-friendly format [ [key, [callerId1...callerIdN]] ... ]
@rtype: [str, [str]...]
"""
retval = []
for k in self.map.iterkeys():
retval.append([k, [id for id, _ in self.map[k]]])
return retval
def register(self, key, caller_id, caller_api, service_api=None):
"""
Add caller_id into the map as a provider of the specified
service (key). caller_id must not have been previously
registered with a different caller_api.
Subroutine for managing provider map data structure (essentially a multimap).
@param key: registration key (e.g. topic/service/param name)
@type key: str
@param caller_id: caller_id of provider
@type caller_id: str
@param caller_api: API URI of provider
@type caller_api: str
@param service_api: (keyword) ROS service API URI if registering a service
@type service_api: str
"""
map = self.map
if key in map and not service_api:
providers = map[key]
if not (caller_id, caller_api) in providers:
providers.append((caller_id, caller_api))
else:
map[key] = providers = [(caller_id, caller_api)]
if service_api:
if self.service_api_map is None:
self.service_api_map = {}
self.service_api_map[key] = (caller_id, service_api)
elif self.type == Registrations.SERVICE:
raise rosmaster.exceptions.InternalException("service_api must be specified for Registrations.SERVICE")
def unregister_all(self, caller_id):
"""
Remove all registrations associated with caller_id
@param caller_id: caller_id of provider
@type caller_id: str
"""
map = self.map
# fairly expensive
dead_keys = []
for key in map:
providers = map[key]
# find all matching entries
to_remove = [(id, api) for id, api in providers if id == caller_id]
# purge them
for r in to_remove:
providers.remove(r)
if not providers:
dead_keys.append(key)
for k in dead_keys:
del self.map[k]
if self.type == Registrations.SERVICE and self.service_api_map:
del dead_keys[:]
for key, val in self.service_api_map.iteritems():
if val[0] == caller_id:
dead_keys.append(key)
for k in dead_keys:
del self.service_api_map[k]
def unregister(self, key, caller_id, caller_api, service_api=None):
"""
Remove caller_id from the map as a provider of the specified service (key).
Subroutine for managing provider map data structure, essentially a multimap
@param key: registration key (e.g. topic/service/param name)
@type key: str
@param caller_id: caller_id of provider
@type caller_id: str
@param caller_api: API URI of provider
@type caller_api: str
@param service_api: (keyword) ROS service API URI if registering a service
@type service_api: str
@return: for ease of master integration, directly returns unregister value for
higher-level XMLRPC API. val is the number of APIs unregistered (0 or 1)
@rtype: code, msg, val
"""
# if we are unregistering a topic, validate against the caller_api
if service_api:
# validate against the service_api
if self.service_api_map is None:
return 1, "[%s] is not a provider of [%s]"%(caller_id, key), 0
if self.service_api_map.get(key, None) != (caller_id, service_api):
return 1, "[%s] is no longer the current service api handle for [%s]"%(service_api, key), 0
else:
del self.service_api_map[key]
del self.map[key]
# caller_api is None for unregister service, so we can't validate as well
return 1, "Unregistered [%s] as provider of [%s]"%(caller_id, key), 1
elif self.type == Registrations.SERVICE:
raise rosmaster.exceptions.InternalException("service_api must be specified for Registrations.SERVICE")
else:
providers = self.map.get(key, [])
if (caller_id, caller_api) in providers:
providers.remove((caller_id, caller_api))
if not providers:
del self.map[key]
return 1, "Unregistered [%s] as provider of [%s]"%(caller_id, key), 1
else:
return 1, "[%s] is not a known provider of [%s]"%(caller_id, key), 0
class RegistrationManager(object):
"""
Stores registrations for Master.
RegistrationManager is not threadsafe, so access must be externally locked as appropriate
"""
def __init__(self, thread_pool):
"""
ctor.
@param thread_pool: thread pool for queueing tasks
@type thread_pool: ThreadPool
"""
self.nodes = {}
self.thread_pool = thread_pool
self.publishers = Registrations(Registrations.TOPIC_PUBLICATIONS)
self.subscribers = Registrations(Registrations.TOPIC_SUBSCRIPTIONS)
self.services = Registrations(Registrations.SERVICE)
self.param_subscribers = Registrations(Registrations.PARAM_SUBSCRIPTIONS)
def reverse_lookup(self, caller_api):
"""
Get a NodeRef by caller_api
@param caller_api: caller XML RPC URI
@type caller_api: str
@return: nodes that declare caller_api as their
API. 99.9% of the time this should only be one node, but we
allow for multiple matches as the master API does not restrict
this.
@rtype: [NodeRef]
"""
matches = [n for n in self.nodes.iteritems() if n.api == caller_api]
if matches:
return matches
def get_node(self, caller_id):
return self.nodes.get(caller_id, None)
def _register(self, r, key, caller_id, caller_api, service_api=None):
# update node information
node_ref, changed = self._register_node_api(caller_id, caller_api)
node_ref.add(r.type, key)
# update pub/sub/service indicies
if changed:
self.publishers.unregister_all(caller_id)
self.subscribers.unregister_all(caller_id)
self.services.unregister_all(caller_id)
self.param_subscribers.unregister_all(caller_id)
r.register(key, caller_id, caller_api, service_api)
def _unregister(self, r, key, caller_id, caller_api, service_api=None):
node_ref = self.nodes.get(caller_id, None)
if node_ref != None:
retval = r.unregister(key, caller_id, caller_api, service_api)
# check num removed field, if 1, unregister is valid
if retval[2] == 1:
node_ref.remove(r.type, key)
if node_ref.is_empty():
del self.nodes[caller_id]
else:
retval = 1, "[%s] is not a registered node"%caller_id, 0
return retval
def register_service(self, service, caller_id, caller_api, service_api):
"""
Register service provider
@return: None
"""
self._register(self.services, service, caller_id, caller_api, service_api)
def register_publisher(self, topic, caller_id, caller_api):
"""
Register topic publisher
@return: None
"""
self._register(self.publishers, topic, caller_id, caller_api)
def register_subscriber(self, topic, caller_id, caller_api):
"""
Register topic subscriber
@return: None
"""
self._register(self.subscribers, topic, caller_id, caller_api)
def register_param_subscriber(self, param, caller_id, caller_api):
"""
Register param subscriber
@return: None
"""
self._register(self.param_subscribers, param, caller_id, caller_api)
def unregister_service(self, service, caller_id, service_api):
caller_api = None
return self._unregister(self.services, service, caller_id, caller_api, service_api)
def unregister_subscriber(self, topic, caller_id, caller_api):
return self._unregister(self.subscribers, topic, caller_id, caller_api)
def unregister_publisher(self, topic, caller_id, caller_api):
return self._unregister(self.publishers, topic, caller_id, caller_api)
def unregister_param_subscriber(self, param, caller_id, caller_api):
return self._unregister(self.param_subscribers, param, caller_id, caller_api)
def _register_node_api(self, caller_id, caller_api):
"""
@param caller_id: caller_id of provider
@type caller_id: str
@param caller_api: caller_api of provider
@type caller_api: str
@return: (registration_information, changed_registration). changed_registration is true if
caller_api is differet than the one registered with caller_id
@rtype: (NodeRef, bool)
"""
node_ref = self.nodes.get(caller_id, None)
bumped_api = None
if node_ref is not None:
if node_ref.api == caller_api:
return node_ref, False
else:
bumped_api = node_ref.api
self.thread_pool.queue_task(bumped_api, shutdown_node_task,
(bumped_api, caller_id, "new node registered with same name"))
node_ref = NodeRef(caller_id, caller_api)
self.nodes[caller_id] = node_ref
return (node_ref, bumped_api != None)

View File

@ -1,228 +0,0 @@
# Software License Agreement (BSD License)
#
# Copyright (c) 2008, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Revision $Id$
"""
Internal threadpool library for zenmaster.
Adapted from U{http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/203871}
Added a 'marker' to tasks so that multiple tasks with the same
marker are not executed. As we are using the thread pool for i/o
tasks, the marker is set to the i/o name. This prevents a slow i/o
for gobbling up all of our threads
"""
import threading, logging, traceback
from time import sleep
class MarkedThreadPool:
"""Flexible thread pool class. Creates a pool of threads, then
accepts tasks that will be dispatched to the next available
thread."""
def __init__(self, numThreads):
"""Initialize the thread pool with numThreads workers."""
self.__threads = []
self.__resizeLock = threading.Condition(threading.Lock())
self.__taskLock = threading.Condition(threading.Lock())
self.__tasks = []
self.__markers = set()
self.__isJoining = False
self.set_thread_count(numThreads)
def set_thread_count(self, newNumThreads):
""" External method to set the current pool size. Acquires
the resizing lock, then calls the internal version to do real
work."""
# Can't change the thread count if we're shutting down the pool!
if self.__isJoining:
return False
self.__resizeLock.acquire()
try:
self.__set_thread_count_nolock(newNumThreads)
finally:
self.__resizeLock.release()
return True
def __set_thread_count_nolock(self, newNumThreads):
"""Set the current pool size, spawning or terminating threads
if necessary. Internal use only; assumes the resizing lock is
held."""
# If we need to grow the pool, do so
while newNumThreads > len(self.__threads):
newThread = ThreadPoolThread(self)
self.__threads.append(newThread)
newThread.start()
# If we need to shrink the pool, do so
while newNumThreads < len(self.__threads):
self.__threads[0].go_away()
del self.__threads[0]
def get_thread_count(self):
"""@return: number of threads in the pool."""
self.__resizeLock.acquire()
try:
return len(self.__threads)
finally:
self.__resizeLock.release()
def queue_task(self, marker, task, args=None, taskCallback=None):
"""Insert a task into the queue. task must be callable;
args and taskCallback can be None."""
if self.__isJoining == True:
return False
if not callable(task):
return False
self.__taskLock.acquire()
try:
self.__tasks.append((marker, task, args, taskCallback))
return True
finally:
self.__taskLock.release()
def remove_marker(self, marker):
"""Remove the marker from the currently executing tasks. Only one
task with the given marker can be executed at a given time"""
if marker is None:
return
self.__taskLock.acquire()
try:
self.__markers.remove(marker)
finally:
self.__taskLock.release()
def get_next_task(self):
""" Retrieve the next task from the task queue. For use
only by ThreadPoolThread objects contained in the pool."""
self.__taskLock.acquire()
try:
retval = None
for marker, task, args, callback in self.__tasks:
# unmarked or not currently executing
if marker is None or marker not in self.__markers:
retval = (marker, task, args, callback)
break
if retval:
# add the marker so we don't do any similar tasks
self.__tasks.remove(retval)
if marker is not None:
self.__markers.add(marker)
return retval
else:
return (None, None, None, None)
finally:
self.__taskLock.release()
def join_all(self, wait_for_tasks = True, wait_for_threads = True):
""" Clear the task queue and terminate all pooled threads,
optionally allowing the tasks and threads to finish."""
# Mark the pool as joining to prevent any more task queueing
self.__isJoining = True
# Wait for tasks to finish
if wait_for_tasks:
while self.__tasks != []:
sleep(.1)
# Tell all the threads to quit
self.__resizeLock.acquire()
try:
self.__set_thread_count_nolock(0)
self.__isJoining = True
# Wait until all threads have exited
if wait_for_threads:
for t in self.__threads:
t.join()
del t
# Reset the pool for potential reuse
self.__isJoining = False
finally:
self.__resizeLock.release()
class ThreadPoolThread(threading.Thread):
"""
Pooled thread class.
"""
threadSleepTime = 0.1
def __init__(self, pool):
"""Initialize the thread and remember the pool."""
threading.Thread.__init__(self)
self.setDaemon(True) #don't block program exit
self.__pool = pool
self.__isDying = False
def run(self):
"""
Until told to quit, retrieve the next task and execute
it, calling the callback if any.
"""
while self.__isDying == False:
marker, cmd, args, callback = self.__pool.get_next_task()
# If there's nothing to do, just sleep a bit
if cmd is None:
sleep(ThreadPoolThread.threadSleepTime)
else:
try:
try:
result = cmd(*args)
finally:
self.__pool.remove_marker(marker)
if callback is not None:
callback(result)
except Exception, e:
logging.getLogger('rosmaster.threadpool').error(traceback.format_exc())
def go_away(self):
""" Exit the run loop next time through."""
self.__isDying = True

View File

@ -1,56 +0,0 @@
# Software License Agreement (BSD License)
#
# Copyright (c) 2008, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Revision $Id$
"""
Utility routines for rosmaster.
"""
import urlparse
import xmlrpclib
_proxies = {} #cache ServerProxys
def xmlrpcapi(uri):
"""
@return: instance for calling remote server or None if not a valid URI
@rtype: xmlrpclib.ServerProxy
"""
if uri is None:
return None
uriValidate = urlparse.urlparse(uri)
if not uriValidate[0] or not uriValidate[1]:
return None
if not _proxies.has_key(uri):
_proxies[uri] = xmlrpclib.ServerProxy(uri)
return _proxies[uri]

View File

@ -1,188 +0,0 @@
# Software License Agreement (BSD License)
#
# Copyright (c) 2008, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Revision $Id$
"""Internal-use Python decorators for parameter validation"""
from roslib.names import ANYTYPE, TYPE_SEPARATOR, resolve_name
ROSRPC = "rosrpc://"
class ParameterInvalid(Exception):
"""Exception that is raised when a parameter fails validation checks"""
def __init__(self, message):
self._message = message
def __str__(self):
return str(self._message)
def non_empty(param_name):
"""Validator that checks that parameter is not empty"""
def validator(param, context):
if not param:
raise ParameterInvalid("ERROR: parameter [%s] must be specified and non-empty"%param_name)
return param
return validator
def non_empty_str(param_name):
"""Validator that checks that parameter is a string and non-empty"""
def validator(param, context):
if not param:
raise ParameterInvalid("ERROR: parameter [%s] must be specified and non-empty"%param_name)
elif not isinstance(param, basestring):
raise ParameterInvalid("ERROR: parameter [%s] must be a string"%param_name)
return param
return validator
def not_none(param_name):
"""Validator that checks that parameter is not None"""
def validator(param, context):
if param is None:
raise ParameterInvalid("ERROR: parameter [%s] must be specified"%param_name)
return param
return validator
# Validators ######################################
def is_api(paramName):
"""
Validator that checks that parameter is a valid API handle
(i.e. URI). Both http and rosrpc are allowed schemes.
"""
def validator(param_value, callerId):
if not param_value or not isinstance(param_value, basestring):
raise ParameterInvalid("ERROR: parameter [%s] is not an XMLRPC URI"%paramName)
if not param_value.startswith("http://") and not param_value.startswith(ROSRPC):
raise ParameterInvalid("ERROR: parameter [%s] is not an RPC URI"%paramName)
#could do more fancy parsing, but the above catches the major cases well enough
return param_value
return validator
def is_topic(param_name):
"""
Validator that checks that parameter is a valid ROS topic name
"""
def validator(param_value, caller_id):
v = valid_name_validator_resolved(param_name, param_value, caller_id)
if param_value == '/':
raise ParameterInvalid("ERROR: parameter [%s] cannot be the global namespace"%param_name)
return v
return validator
def is_service(param_name):
"""Validator that checks that parameter is a valid ROS service name"""
def validator(param_value, caller_id):
v = valid_name_validator_resolved(param_name, param_value, caller_id)
if param_value == '/':
raise ParameterInvalid("ERROR: parameter [%s] cannot be the global namespace"%param_name)
return v
return validator
def empty_or_valid_name(param_name):
"""
empty or valid graph resource name.
Validator that resolves names unless they an empty string is supplied, in which case
an empty string is returned.
"""
def validator(param_value, caller_id):
if not isinstance(param_value, basestring):
raise ParameterInvalid("ERROR: parameter [%s] must be a string"%param_name)
if not param_value:
return ''
#return resolve_name(param_value, namespace(caller_id))
return resolve_name(param_value, caller_id)
return validator
def valid_name_validator_resolved(param_name, param_value, caller_id):
if not param_value or not isinstance(param_value, basestring):
raise ParameterInvalid("ERROR: parameter [%s] must be a non-empty string"%param_name)
#TODO: actual validation of chars
# I added the colon check as the common error will be to send an URI instead of name
if ':' in param_value or ' ' in param_value:
raise ParameterInvalid("ERROR: parameter [%s] contains illegal chars"%param_name)
#return resolve_name(param_value, namespace(caller_id))
return resolve_name(param_value, caller_id)
def valid_name_validator_unresolved(param_name, param_value, caller_id):
if not param_value or not isinstance(param_value, basestring):
raise ParameterInvalid("ERROR: parameter [%s] must be a non-empty string"%param_name)
#TODO: actual validation of chars
# I added the colon check as the common error will be to send an URI instead of name
if ':' in param_value or ' ' in param_value:
raise ParameterInvalid("ERROR: parameter [%s] contains illegal chars"%param_name)
return param_value
def valid_name(param_name, resolve=True):
"""
Validator that resolves names and also ensures that they are not empty
@param param_name: name
@type param_name: str
@param resolve: if True/omitted, the name will be resolved to
a global form. Otherwise, no resolution occurs.
@type resolve: bool
@return: resolved parameter value
@rtype: str
"""
def validator(param_value, caller_id):
if resolve:
return valid_name_validator_resolved(param_name, param_value, caller_id)
return valid_name_validator_unresolved(param_name, param_value, caller_id)
return validator
def global_name(param_name):
"""
Validator that checks for valid, global graph resource name.
@return: parameter value
@rtype: str
"""
def validator(param_value, caller_id):
if not param_value or not isinstance(param_value, basestring):
raise ParameterInvalid("ERROR: parameter [%s] must be a non-empty string"%param_name)
#TODO: actual validation of chars
if not is_global(param_value):
raise ParameterInvalid("ERROR: parameter [%s] must be a globally referenced name"%param_name)
return param_value
return validator
def valid_type_name(param_name):
"""validator that checks the type name is specified correctly"""
def validator(param_value, caller_id):
if param_value == ANYTYPE:
return param_value
if not param_value or not isinstance(param_value, basestring):
raise ParameterInvalid("ERROR: parameter [%s] must be a non-empty string"%param_name)
if not len(param_value.split(TYPE_SEPARATOR)) == 2:
raise ParameterInvalid("ERROR: parameter [%s] is not a valid package resource name"%param_name)
#TODO: actual validation of chars
return param_value
return validator