better error checking for invalid registrations, starting to switch to epydoc

This commit is contained in:
Ken Conley 2009-09-22 01:05:41 +00:00
parent 15087be906
commit c8644da56c
1 changed files with 210 additions and 106 deletions

View File

@ -33,6 +33,15 @@
#
# Revision $Id: rosservice.py 3813 2009-02-11 21:16:34Z sfkwc $
"""
Command-line utility for querying ROS services, along with library
calls for similar functionality. The main benefit of the rosservice
Python library over the rospy ServiceProxy library is that rosservice
supports type-introspection on ROS Services. This allows for both
introspecting information about Services, as well as using this
introspection to dynamically call Services.
"""
import roslib; roslib.load_manifest('rosservice')
NAME='rosservice'
@ -54,30 +63,46 @@ import rosmsg
from optparse import OptionParser
class ROSServiceException(Exception): pass
class ROSServiceException(Exception):
"""Base class for rosservice-related exceptions"""
pass
class ROSServiceIOException(ROSServiceException): pass
class ROSServiceIOException(ROSServiceException): pass
class ROSServiceIOException(ROSServiceException):
"""rosservice related to network I/O failure"""
pass
## Utility that raises a ROSServiceException if ROS XMLRPC command fails
## @param args (code, msg, val): ROS XMLRPC call return args
## @return value argument from ROS XMLRPC call (third arg of tuple)
## @throws ROSServiceException if XMLRPC command does not return a SUCCESS code
def succeed(args):
"""
Utility that raises a ROSServiceException if ROS XMLRPC command fails
@param args: (code, msg, val) ROS XMLRPC call return args
@type args: (int, str, XmlRpcValue)
@return: value argument from ROS XMLRPC call (third arg of tuple)
@rtype: XmlRpcLegal value
@raise ROSServiceException: if XMLRPC command does not return a SUCCESS code
"""
code, msg, val = args
if code != 1:
raise ROSServiceException("remote call failed: %s"%msg)
return val
## Utility for connecting to a service and retrieving the TCPROS
## headers. Services currently do not declare their type with the
## master, so instead we probe the service for its headers.
## @param service_name str: name of service
## @param service_uri str: ROSRPC URI of service
## @return dict: map of header fields
## @throws ROSServiceIOException
def get_service_headers(service_name, service_uri):
dest_addr, dest_port = rospy.parse_rosrpc_uri(service_uri)
"""
Utility for connecting to a service and retrieving the TCPROS
headers. Services currently do not declare their type with the
master, so instead we probe the service for its headers.
@param service_name: name of service
@type service_name: str
@param service_uri: ROSRPC URI of service
@type service_uri: str
@return: map of header fields
@rtype: dict
@raise ROSServiceException: if service has invalid information
@raise ROSServiceIOException: if unable to communicate with service
"""
try:
dest_addr, dest_port = rospy.parse_rosrpc_uri(service_uri)
except:
raise ROSServiceException("service [%s] has an invalid RPC URI [%s]"%(service_name, service_uri))
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
try:
@ -94,11 +119,16 @@ def get_service_headers(service_name, service_uri):
if s is not None:
s.close()
## Get the type of the specified \a service_name. May print errors to stderr.
## @param service_name str: name of service
## @return str: type of service or None
## @throws ROSServiceIOException
def get_service_type(service_name):
"""
Get the type of the specified service_name. May print errors to stderr.
@param service_name: name of service
@type service_name: str
@return: type of service or None
@rtype: str
@raise ROSServiceException: if service information is invalid
@raise ROSServiceIOException: if unable to communicate with service
"""
master = roslib.scriptutil.get_master()
try:
code, msg, service_uri = master.lookupService('/rosservice', service_name)
@ -115,18 +145,22 @@ def get_service_type(service_name):
except socket.error:
raise ROSServiceIOException("Unable to communicate with service [%s]! Service address is [%s]"%(service_name, service_uri))
## Implements 'type' command. Prints service type to stdout
## @param service_name str: name of service
def rosservice_type(service_name):
"""
Implements 'type' command. Prints service type to stdout
@param service_name str: name of service
"""
service_type = get_service_type(service_name)
if service_type is None:
print "unknown"
else:
print service_type
## @param service_name str: name of service to lookup
## @return str: ROSRPC URI for \a service_name
def get_service_uri(service_name):
"""
@param service_name str: name of service to lookup
@return str: ROSRPC URI for service_name
"""
try:
master = roslib.scriptutil.get_master()
code, msg, url = master.lookupService('/rosservice', service_name)
@ -136,20 +170,26 @@ def get_service_uri(service_name):
except socket.error:
raise ROSServiceIOException("Unable to communicate with master!")
## Implements rosservice uri command
## @param service_name str: name of service to lookup
## @throws ROSServiceIOException
def rosservice_uri(service_name):
"""
Implements rosservice uri command
@param service_name: name of service to lookup
@type service_name: str
@raise ROSServiceIOException: if the I/O issues prevent retrieving service information
"""
uri = get_service_uri(service_name)
if uri:
print uri
else:
print >> sys.stderr, "Unknown service: %s"%service_name
## Implements rosservice node command
## @param service_name str: name of service to lookup
## @throws ROSServiceIOException
def rosservice_node(service_name):
"""
Implements rosservice node command
@param service_name: name of service to lookup
@type service_name: str
@raise ROSServiceIOException: if the I/O issues prevent retrieving service information
"""
srvs = get_service_list(include_nodes=True)
s = [s for s in srvs if s[0] == service_name]
if s:
@ -160,13 +200,18 @@ def rosservice_node(service_name):
else:
print >> sys.stderr, "Unknown service: %s"%service_name
## Get the list of services
## @param node str: Name of node to print services for or None to return all services
## @param include_nodes bool: If True, return list will be [service_name, [node]]
## @return [services]: if \a include_nodes, services is service_name,
## [node]. Otherwise, it is just the service_name
## @throws ROSServiceIOException
def get_service_list(node=None, include_nodes=False):
"""
Get the list of services
@param node: Name of node to print services for or None to return all services
@type node: str
@param include_nodes: If True, return list will be [service_name, [node]]
@type include_nodes: bool
@return: if include_nodes, services is service_name,
[node]. Otherwise, it is just the service_name
@rtype: [services]
@raise ROSServiceIOException: if the I/O issues prevent retrieving service information
"""
try:
master = roslib.scriptutil.get_master()
state = succeed(master.getSystemState('/rosservice'))
@ -185,11 +230,15 @@ def get_service_list(node=None, include_nodes=False):
except socket.error:
raise ROSServiceIOException("Unable to communicate with master!")
## Implements 'rosservice list'
## @param node str: Name of node to print services for or None to print all services
## @param print_nodes bool: If True, also print nodes providing service
## @throws ROSServiceIOException
def rosservice_list(node=None, print_nodes=False):
"""
Implements 'rosservice list'
@param node: Name of node to print services for or None to print all services
@type node: str
@param print_nodes: If True, also print nodes providing service
@type print_nodes: bool
@raise ROSServiceIOException: if the I/O issues prevent retrieving service information
"""
srvs = get_service_list(node=node, include_nodes=print_nodes)
for s in srvs:
if print_nodes:
@ -197,10 +246,14 @@ def rosservice_list(node=None, print_nodes=False):
else:
print s
## Lookup services by \a service_type
## @param service_type str: type of service to find
## @return [str]: list of service names that use \a service_type
def rosservice_find(service_type):
"""
Lookup services by service_type
@param service_type: type of service to find
@type service_type: str
@return: list of service names that use service_type
@rtype: [str]
"""
master = roslib.scriptutil.get_master()
matches = []
try:
@ -213,9 +266,11 @@ def rosservice_find(service_type):
raise ROSServiceIOException("Unable to communicate with master!")
return matches
## Implements 'rosservice type'
## @param argv [str]: command-line args
def rosservice_cmd_find(argv=sys.argv):
"""
Implements 'rosservice type'
@param argv [str]: command-line args
"""
args = argv[2:]
parser = OptionParser(usage="usage: %prog find msg-type", prog=NAME)
options, args = parser.parse_args(args)
@ -225,12 +280,16 @@ def rosservice_cmd_find(argv=sys.argv):
parser.error("you may only specify one message type")
print '\n'.join(rosservice_find(args[0]))
## Get the service class using the name of the service. NOTE: this
## call results in a probe call to the service.
## @param service_name str: fully-resolved name of service to call
## @return ServiceDefinition: service class
## @throws ROSServiceException if service class cannot be retrieved
def get_service_class_by_name(service_name):
"""
Get the service class using the name of the service. NOTE: this
call results in a probe call to the service.
@param service_name: fully-resolved name of service to call
@type service_name: str
@return: service class
@rtype: ServiceDefinition: service class
@raise ROSServiceException: if service class cannot be retrieved
"""
# lookup the service type
service_type = get_service_type(service_name)
if not service_type:
@ -256,15 +315,20 @@ def get_service_class_by_name(service_name):
"Have you typed 'make' in [%s]?"%pkg)
return service_class
## Call the specified \a service_name
## @param service_name str: fully-resolved name of service to call
## @param service_args [args]: args to pass to service
## @param service_class Message class: (optional) service type
## class. If this argument is provided, it saves a probe call against
## the service
## @return roslib.message.Message: service response
## @throws ROSServiceException if call command cannot be executed
def call_service(service_name, service_args, service_class=None):
"""
Call the specified service_name
@param service_name: fully-resolved name of service to call
@type service_name: str
@param service_args: args to pass to service
@type service_args: [any]
@param service_class: (optional) service type class. If this
argument is provided, it saves a probe call against the service
@type service_class: Message class
@return: service response
@rtype: roslib.message.Message
@raise ROSServiceException: if call command cannot be executed
"""
if service_class is None:
service_class = get_service_class_by_name(service_name)
request = service_class._request_class()
@ -280,15 +344,21 @@ def call_service(service_name, service_args, service_class=None):
raise ROSServiceException("Unable to send request. One of the fields has an incorrect type:\n"+\
" %s\n\nsrv file:\n%s"%(e, rosmsg.get_srv_text(service_class._type)))
## Implements 'rosservice call'
## @param service_name str: name of service to call
## @param service_args [args]: arguments to call service with
## @param verbose bool: if True, print extra output
## @param service_class Message class: (optional) service type
## class. If this argument is provided, it saves a probe call against
## the service
## @throws ROSServiceException if call command cannot be executed
def rosservice_call(service_name, service_args, verbose=False, service_class=None):
"""
Implements 'rosservice call'
@param service_name: name of service to call
@type service_name: str
@param service_args: arguments to call service with
@type service_args: [args]
@param verbose: if True, print extra output
@type verbose: bool
@param service_class Message class: (optional) service type
class. If this argument is provided, it saves a probe call against
the service
@type service_class: Message class
@raise ROSServiceException: if call command cannot be executed
"""
service_name = roslib.scriptutil.script_resolve_name('rosservice', service_name)
request, response = call_service(service_name, service_args, service_class=service_class)
if verbose:
@ -296,22 +366,29 @@ def rosservice_call(service_name, service_args, verbose=False, service_class=Non
print '---'
print str(response)
## Check if service requires arguments
## @param service_name str: name of service being called
## @param service_class Message class: (optional) service type
## class. If this argument is provided, it saves a probe call against
## the service
## @return bool: True if \a service_name has request arguments
def has_service_args(service_name, service_class=None):
"""
Check if service requires arguments
@param service_name: name of service being called
@type service_name: str
@param service_class: (optional) service type class. If this
argument is provided, it saves a probe call against the service
@type service_class: Message class
@return: True if service_name has request arguments
@rtype: bool
"""
service_name = roslib.scriptutil.script_resolve_name('rosservice', service_name)
if service_class is None:
service_class = get_service_class_by_name(service_name)
return len(service_class._request_class.__slots__) > 0
## Implements 'rosservice args'
## @param service_name str: name of service to get arguments for
## @throws ROSServiceException if call command cannot be executed
def rosservice_args(service_name):
"""
Implements 'rosservice args'
@param service_name: name of service to get arguments for
@type service_name: str
@raise ROSServiceException: if call command cannot be executed
"""
service_name = roslib.scriptutil.script_resolve_name('rosservice', service_name)
service_class = get_service_class_by_name(service_name)
print roslib.message.get_printable_message_args(service_class._request_class)
@ -319,12 +396,16 @@ def rosservice_args(service_name):
##########################################################################################
# COMMAND PROCESSING #####################################################################
## Parse command-line arguments for commands that take a service name
## only. Will cause a system exit if command-line argument parsing
## fails.
## @param cmd str: command name, e.g. 'type'
## @param argv [str]: command-line arguments
def _optparse_service_only(cmd, argv=sys.argv):
"""
Parse command-line arguments for commands that take a service name
only. Will cause a system exit if command-line argument parsing
fails.
@param cmd: command name, e.g. 'type'
@type cmd: str
@param argv: command-line arguments
@type argv: [str]
"""
args = argv[2:]
parser = OptionParser(usage="usage: %%prog %s /service"%cmd, prog=NAME)
(options, args) = parser.parse_args(args)
@ -334,39 +415,54 @@ def _optparse_service_only(cmd, argv=sys.argv):
parser.error("you may only specify one input service")
return roslib.scriptutil.script_resolve_name('rosservice', args[0])
## Parse 'type' command arguments and run command Will cause a system
## exit if command-line argument parsing fails.
## @param argv [str]: command-line arguments
## @throws ROSServiceException if type command cannot be executed
def rosservice_cmd_type(argv):
"""
Parse 'type' command arguments and run command Will cause a system
exit if command-line argument parsing fails.
@param argv: command-line arguments
@type argv: [str]
@raise ROSServiceException: if type command cannot be executed
"""
rosservice_type(_optparse_service_only('type', argv=argv))
## Parse 'uri' command arguments and run command Will cause a system
## exit if command-line argument parsing fails.
## @param argv [str]: command-line arguments
## @throws ROSServiceException if uri command cannot be executed
def rosservice_cmd_uri(argv, ):
"""
Parse 'uri' command arguments and run command Will cause a system
exit if command-line argument parsing fails.
@param argv: command-line arguments
@type argv: [str]
@raise ROSServiceException: if uri command cannot be executed
"""
rosservice_uri(_optparse_service_only('uri', argv=argv))
## Parse 'node' command arguments and run command Will cause a system
## exit if command-line argument parsing fails.
## @param argv [str]: command-line arguments
## @throws ROSServiceException if node command cannot be executed
def rosservice_cmd_node(argv, ):
"""
Parse 'node' command arguments and run command Will cause a system
exit if command-line argument parsing fails.
@param argv: command-line arguments
@type argv: [str]
@raise ROSServiceException: if node command cannot be executed
"""
rosservice_node(_optparse_service_only('node', argv=argv))
## Parse 'args' command arguments and run command Will cause a system
## exit if command-line argument parsing fails.
## @param argv [str]: command-line arguments
## @throws ROSServiceException if args command cannot be executed
def rosservice_cmd_args(argv, ):
"""
Parse 'args' command arguments and run command Will cause a system
exit if command-line argument parsing fails.
@param argv: command-line arguments
@type argv: [str]
@raise ROSServiceException: if args command cannot be executed
"""
rosservice_args(_optparse_service_only('args', argv=argv))
## Parse 'call' command arguments and run command Will cause a system
## exit if command-line argument parsing fails.
## @param argv [str]: command-line arguments
## @throws ROSServiceException if call command cannot be executed
def rosservice_cmd_call(argv):
"""
Parse 'call' command arguments and run command Will cause a system
exit if command-line argument parsing fails.
@param argv: command-line arguments
@type argv: [str]
@raise ROSServiceException: if call command cannot be executed
"""
try:
import yaml
except ImportError, e:
@ -398,8 +494,11 @@ def rosservice_cmd_call(argv):
else:
rosservice_call(service_name, service_args, verbose=options.verbose, service_class=service_class)
## @return iterator for next yaml document on stdin
def _stdin_yaml_arg():
"""
@return iterator for next yaml document on stdin
@rtype: iterator
"""
import yaml
import select
poll = select.poll()
@ -422,11 +521,14 @@ def _stdin_yaml_arg():
except select.error:
return # most likely ctrl-c interrupt
## Parse 'list' command arguments and run command
## Will cause a system exit if command-line argument parsing fails.
## @param argv [str]: command-line arguments
## @throws ROSServiceException if list command cannot be executed
def rosservice_cmd_list(argv):
"""
Parse 'list' command arguments and run command
Will cause a system exit if command-line argument parsing fails.
@param argv: command-line arguments
@type argv: [str]
@raise ROSServiceException: if list command cannot be executed
"""
args = argv[2:]
parser = OptionParser(usage="usage: %prog list [/node]", prog=NAME)
parser.add_option("-n", "--nodes",
@ -440,8 +542,8 @@ def rosservice_cmd_list(argv):
parser.error("you may only specify one input node")
rosservice_list(nodename, print_nodes=options.print_nodes)
## Print generic usage for rosservice
def fullusage():
"""Print generic usage for rosservice"""
print """Commands:
\trosservice list\tprint information about active topics
\trosservice call\tcall the service with the provided args
@ -453,8 +555,10 @@ Type rosservice <command> -h for more detailed usage, e.g. 'rosservice call -h'
"""
sys.exit(os.EX_USAGE)
## main entry point for rosservice command-line tool
def rosservicemain(argv=sys.argv):
"""
main entry point for rosservice command-line tool
"""
if len(argv) == 1:
fullusage()
try: