more epydoc updates

This commit is contained in:
Ken Conley 2009-09-17 04:04:41 +00:00
parent 3d6a154bb0
commit c1e71c4aea
22 changed files with 544 additions and 424 deletions

View File

@ -193,8 +193,9 @@ def check_type(field_name, field_type, field_val):
# raise SerializationError('field [%s] must be a [%s] instance instead of a %s'%(field_name, field_type, type(field_val)))
#TODO: dynamically load message class and do instance compare
##Base class of auto-generated message data objects.
class Message(object):
"""Base class of Message data classes auto-generated from msg files. """
# slots is explicitly both for data representation and
# performance. Higher-level code assumes that there is a 1-to-1
# mapping between __slots__ and message fields. In terms of
@ -202,11 +203,14 @@ class Message(object):
# new-style object.
__slots__ = ['_connection_header']
## Message base constructor. Contains generic initializers for
## args-based and kwds-based initialization of message fields,
## assuming there is a one-to-one mapping between __slots__ and
## message fields.
def __init__(self, *args, **kwds):
"""
ctor. There are multiple ways of initializing Message
instances, either using a 1-to-1 correspondence between
constructor arguments and message fields (*args), or using
Python "keyword" arguments (**kwds) to initialize named field
and leave the rest with default values.
"""
if args and kwds:
raise TypeError("Message constructor may only use args OR keywords, not both")
if args:
@ -245,8 +249,18 @@ class Message(object):
raise SerializationError(str(exc))
def serialize(self, buff):
"""
Serialize data into buffer
@param buff: buffer
@type buff: StringIO
"""
pass
def deserialize(self, str):
"""
Deserialize data in str into this instance
@param str: serialized data
@type str: str
"""
pass
def __str__(self):
return strify_message(self)
@ -268,12 +282,16 @@ class Message(object):
return False
return True
class ServiceDefinition(object): pass #marker class for auto-generated code
class ServiceDefinition(object):
"""Base class of Service classes auto-generated from srv files"""
pass
## Message deserialization error
class DeserializationError(ROSMessageException): pass
## Message serialization error
class SerializationError(ROSMessageException): pass
class DeserializationError(ROSMessageException):
"""Message deserialization error"""
pass
class SerializationError(ROSMessageException):
"""Message serialization error"""
pass
# Utilities for rostopic/rosservice

View File

@ -33,10 +33,10 @@
# Revision $Id: msgspec.py 3357 2009-01-13 07:13:05Z jfaustwg $
# $Author: jfaustwg $
"""
ROS Msg Description Language Spec
ROS msg library for Python
Implements: U{http://ros.org/wiki/msg}
"""
## ROS Msg Description Language Spec
## Implements: http://pr.willowgarage.com/wiki/ROS/Message_Description_Language
import cStringIO
import os

View File

@ -32,9 +32,10 @@
#
# Revision $Id: srvspec.py 3357 2009-01-13 07:13:05Z jfaustwg $
# $Author: jfaustwg $
"""ROS Service Description Language Spec"""
## ROS Service Description Language Spec
# Implements http://pr.willowgarage.com/wiki/ROS/Message_Description_Language
"""
ROS Service Description Language Spec
Implements U{http://ros.org/wiki/srv}
"""
import os
import re

View File

@ -31,7 +31,7 @@
<depend package="roslang"/>
<export>
<roslang cmake="${prefix}/cmake/rospy.cmake"/>
<rosdoc builder="eepydoc"/>
<rosdoc builder="epydoc"/>
</export>
</package>

View File

@ -32,7 +32,7 @@
#
# Revision $Id$
"""
ROSPY initialization
Internal use: rospy initialization.
This is mainly routines for initializing the master or slave based on
the OS environment.

View File

@ -32,7 +32,7 @@
#
# Revision $Id$
## Support for ROS messages, including network serialization routines
"""Internal use: Support for ROS messages, including network serialization routines"""
import time
import struct

View File

@ -32,7 +32,7 @@
#
# Revision $Id$
## ROSNode wraps the master/slave implementations in an XMLRPC API
"""Internal use: contains the L{ROSNode} wrapper, which provides an unified interface for running an XMLRPC server for the rospy client library."""
import os
import traceback

View File

@ -32,10 +32,11 @@
#
# Revision $Id$
"""
Master/Slave XML-RPC Wrappers
Master/Slave XML-RPC Wrappers.
Simplifies usage of master/slave APIs by automatically inserting caller ID
and also adding python dictionary accessors on the parameter server.
The L{NodeProxy} and L{MasterProxy} simplify usage of master/slave
APIs by automatically inserting the caller ID and also adding python
dictionary accessors on the parameter server.
"""
import rospy.core

View File

@ -32,6 +32,8 @@
#
# Revision $Id$
"""Internal use: handles maintaining registrations with master via internal listener APIs"""
import socket
import sys
import logging
@ -44,8 +46,6 @@ import rospy.core
from rospy.core import is_shutdown, xmlrpcapi, logfatal, logwarn, loginfo, logerr, logdebug
from rospy.names import get_caller_id, get_namespace, resolve_name
"""Internal use: handles maintaining registrations with master via internal listener APIs"""
# topic manager and service manager singletons
_topic_manager = None

View File

@ -32,7 +32,7 @@
#
# Revision $Id: client.py 2258 2008-09-30 23:03:06Z sfkwc $
## Internal use: support for /rosout logging in rospy
"""Internal use: support for /rosout logging in rospy"""
import logging
import sys

View File

@ -32,7 +32,7 @@
#
# Revision $Id: client.py 2258 2008-09-30 23:03:06Z sfkwc $
## ROS time and duration representations and API
"""ROS time and duration representations, as well as internal routines for managing wallclock versus simulated time."""
import threading
import time

View File

@ -33,24 +33,14 @@
# Revision $Id$
"""
TCPROS connection protocol.
Implements: U{http://ros.org/wiki/ROS/TCPROS}
The rospy tcpros implementation is split into three areas:
- L{rospy.tcpros_base}: common TCPROS routines, including header and connection processing
- L{rospy.tcpros_pubsub}: Topic-specific capabilities for publishing and subscribing
- L{rospy.tcpros_service}: Service-specific capabilities
"""
## TCPROS connection protocol.
# http://pr.willowgarage.com/wiki/ROS/TCPROS
#
# TCPROS sets up a TCP/IP server socket on the source machine that
# waits for incoming connections. When it receives an incoming
# connection, it parses the header sent along the connection to
# determine which topic it corresponds to. The format of this header
# is:
#
# HEADER-LENGTH (32-bit int)
# TOPIC-NAME (string identifier)
# '\n' (newline character)
# MSG-MD5SUM
#
# The MSG-MD5SUM is an md5sum of the full text of the .msg file. As
# such, it will signal a version change even for compatible changes to
# the file.
import rospy.tcpros_base
import rospy.tcpros_pubsub

View File

@ -32,6 +32,8 @@
#
# Revision $Id: transport.py 2941 2008-11-26 03:19:48Z sfkwc $
"""Internal use: Topic-specific extensions for TCPROS support"""
import logging
import socket
import thread
@ -46,26 +48,36 @@ from rospy.tcpros_base import TCPROSTransport, TCPROSTransportProtocol, \
get_tcpros_server_address, start_tcpros_server,\
DEFAULT_BUFF_SIZE, TCPROS
## Subscription transport implementation for receiving topic data via
## peer-to-peer TCP/IP sockets
class TCPROSSub(TCPROSTransportProtocol):
"""Receive topic data via peer-to-peer TCP/IP sockets"""
"""
Subscription transport implementation for receiving topic data via
peer-to-peer TCP/IP sockets
"""
## ctor.
## @param recv_data_class Message: class to instantiate to receive
## messages
## @param queue_size int: maximum number of messages to
## deserialize from newly read data off socket
## @param buff_size int: recv buffer size
## @param tcp_nodelay bool: If True, request TCP_NODELAY from publisher
def __init__(self, name, recv_data_class, queue_size=None, \
buff_size=DEFAULT_BUFF_SIZE, tcp_nodelay=False):
"""
ctor.
@param recv_data_class: class to instantiate to receive
messages
@type recv_data_class: L{rospy.Message}
@param queue_size: maximum number of messages to
deserialize from newly read data off socket
@type queue_size: int
@param buff_size: recv buffer size
@type buff_size: int
@param tcp_nodelay: If True, request TCP_NODELAY from publisher
@type tcp_nodelay: bool
"""
super(TCPROSSub, self).__init__(name, recv_data_class, queue_size, buff_size)
self.direction = rospy.transport.INBOUND
self.tcp_nodelay = tcp_nodelay
## @return d dictionary of subscriber fields
def get_header_fields(self):
"""
@return: dictionary of subscriber fields
@rtype: dict
"""
return {'topic': self.name,
'message_definition': self.recv_data_class._full_text,
'tcp_nodelay': '1' if self.tcp_nodelay else '0',
@ -74,10 +86,14 @@ class TCPROSSub(TCPROSTransportProtocol):
'callerid': rospy.names.get_caller_id()}
# Separate method for easier testing
## Configure socket options on a new publisher socket.
## @param sock socket.socket
## @param is_tcp_nodelay bool: if True, TCP_NODELAY will be set on outgoing socket if available
def _configure_pub_socket(sock, is_tcp_nodelay):
"""
Configure socket options on a new publisher socket.
@param sock: socket.socket
@type sock: socket.socket
@param is_tcp_nodelay: if True, TCP_NODELAY will be set on outgoing socket if available
@param is_tcp_nodelay: bool
"""
# #956: low latency, TCP_NODELAY support
if is_tcp_nodelay:
if hasattr(socket, 'TCP_NODELAY'):
@ -87,15 +103,22 @@ def _configure_pub_socket(sock, is_tcp_nodelay):
#TODO:POLLING: TCPROSPub currently doesn't actually do anything -- not until polling is implemented
## Publisher transport implementation for publishing topic data via
## peer-to-peer TCP/IP sockets.
class TCPROSPub(TCPROSTransportProtocol):
"""Receive topic data via peer-to-peer TCP/IP sockets"""
"""
Publisher transport implementation for publishing topic data via
peer-to-peer TCP/IP sockets.
"""
## @param name str: topic name
## @param pub_data_class Message class: class to instance to receive messages
## @param is_latch bool: If True, Publisher is latching
def __init__(self, name, pub_data_class, is_latch=False, headers=None):
"""
ctor.
@param name: topic name
@type name: str
@param pub_data_class: class to instance to receive messages
@type pub_data_class: L{rospy.Message} class
@param is_latch: If True, Publisher is latching
@type is_latch: bool
"""
# very small buffer size for publishers as the messages they receive are very small
super(TCPROSPub, self).__init__(name, None, queue_size=None, buff_size=128)
self.pub_data_class = pub_data_class
@ -118,33 +141,45 @@ class TCPROSPub(TCPROSTransportProtocol):
base.update(self.headers)
return base
## ROS Protocol handler for TCPROS. Accepts both TCPROS topic
## connections as well as ROS service connections over TCP. TCP server
## socket is run once start_server() is called -- this is implicitly
## called during init_publisher().
class TCPROSHandler(rospy.transport.ProtocolHandler):
## @param self
"""
ROS Protocol handler for TCPROS. Accepts both TCPROS topic
connections as well as ROS service connections over TCP. TCP server
socket is run once start_server() is called -- this is implicitly
called during init_publisher().
"""
def __init__(self):
"""ctor"""
self.tcp_nodelay_map = {} # { topic : tcp_nodelay}
## @param tcp_nodelay bool: If True, sets TCP_NODELAY on
## publisher's socket (disables Nagle algorithm). This results
## in lower latency publishing at the cost of efficiency.
def set_tcp_nodelay(self, topic, tcp_nodelay):
"""
@param tcp_nodelay: If True, sets TCP_NODELAY on publisher's
socket (disables Nagle algorithm). This results in lower
latency publishing at the cost of efficiency.
@type tcp_nodelay: bool
"""
self.tcp_nodelay_map[rospy.names.resolve_name(topic)] = tcp_nodelay
## stops the TCP/IP server responsible for receiving inbound connections
## @param self
def shutdown(self):
"""
stops the TCP/IP server responsible for receiving inbound connections
"""
pass
## Connect to topic \a topic_name on Publisher \a pub_uri using TCPROS.
## @param self
## @param topic_name str: topic name
## @param pub_uri str: XML-RPC URI of publisher
## @param protocol_params [XmlRpcLegal]: protocol parameters to use for connecting
## @return int, str, int: code, message, debug
def create_transport(self, topic_name, pub_uri, protocol_params):
"""
Connect to topic topic_name on Publisher pub_uri using TCPROS.
@param topic_name str: topic name
@type topic_name: str
@param pub_uri: XML-RPC URI of publisher
@type pub_uri: str
@param protocol_params: protocol parameters to use for connecting
@type protocol_params: [XmlRpcLegal]
@return: code, message, debug
@rtype: (int, str, int)
"""
#Validate protocol params = [TCPROS, address, port]
if type(protocol_params) != list or len(protocol_params) != 3:
@ -175,38 +210,53 @@ class TCPROSHandler(rospy.transport.ProtocolHandler):
conn.close()
return 0, "ERROR: Race condition failure: duplicate topic subscriber [%s] was created"%(topic_name), 0
## @param self
## @param protocol str: name of protocol
## @return bool: True if protocol is supported
def supports(self, protocol):
"""
@param protocol: name of protocol
@type protocol: str
@return: True if protocol is supported
@rtype: bool
"""
return protocol == TCPROS
## @param self
def get_supported(self):
"""
Get supported protocols
"""
return [[TCPROS]]
## Initialize this node to receive an inbound TCP connection,
## i.e. startup a TCP server if one is not already running.
## @param self
## @param topic_name str
## @param protocol [str, value*]: negotiated protocol
## parameters. protocol[0] must be the string 'TCPROS'
## @return (int, str, list): (code, msg, [TCPROS, addr, port])
def init_publisher(self, topic_name, protocol):
"""
Initialize this node to receive an inbound TCP connection,
i.e. startup a TCP server if one is not already running.
@param topic_name: topic name
@type topic_name: str
@param protocol: negotiated protocol
parameters. protocol[0] must be the string 'TCPROS'
@type protocol: [str, value*]
@return: (code, msg, [TCPROS, addr, port])
@rtype: (int, str, list)
"""
if protocol[0] != TCPROS:
return 0, "Internal error: protocol does not match TCPROS: %s"%protocol, []
start_tcpros_server()
addr, port = get_tcpros_server_address()
return 1, "ready on %s:%s"%(addr, port), [TCPROS, addr, port]
## Process incoming topic connection. Reads in topic name from
## handshake and creates the appropriate TCPROSPub handler for the
## connection.
## @param sock socket: socket connection
## @param client_addr (str, int): client address
## @param header dict: key/value pairs from handshake header
## @return str: error string or None
def topic_connection_handler(self, sock, client_addr, header):
"""
Process incoming topic connection. Reads in topic name from
handshake and creates the appropriate L{TCPROSPub} handler for the
connection.
@param sock: socket connection
@type sock: socket.socket
@param client_addr: client address
@type client_addr: (str, int)
@param header: key/value pairs from handshake header
@type header: dict
@return: error string or None
@rtype: str
"""
for required in ['topic', 'md5sum', 'callerid']:
if not required in header:
return "Missing required '%s' field"%required

View File

@ -32,6 +32,8 @@
#
# Revision $Id: service.py 2945 2008-11-26 03:48:27Z sfkwc $
"""Internal use: Service-specific extensions for TCPROS support"""
import cStringIO
import socket
import struct
@ -92,8 +94,10 @@ def service_connection_handler(sock, client_addr, header):
thread.start_new_thread(service.handle, (transport, header))
## Protocol implementation for Services over TCPROS
class TCPService(TCPROSTransportProtocol):
"""
Protocol implementation for Services over TCPROS
"""
## ctor.
## @param self
@ -110,24 +114,30 @@ class TCPService(TCPROSTransportProtocol):
return {'service': self.name, 'type': self.service_class._type,
'md5sum': self.service_class._md5sum, 'callerid': rospy.names.get_caller_id() }
## Protocol Implementation for Service clients over TCPROS
class TCPROSServiceClient(TCPROSTransportProtocol):
"""Protocol Implementation for Service clients over TCPROS"""
## ctor.
## @param self
## @param name str: name of service
## @param service_class Service: Service data type class
## @param headers dict: identifier for Service session
## @param buff_size int: size of buffer (bytes) for reading responses from Service.
def __init__(self, name, service_class, headers=None, buff_size=DEFAULT_BUFF_SIZE):
"""
ctor.
@param name: name of service
@type name: str
@param service_class: Service data type class
@type service_class: Service
@param headers: identifier for Service session
@type headers: dict
@param buff_size: size of buffer (bytes) for reading responses from Service.
@type buff_size: int
"""
super(TCPROSServiceClient, self).__init__(name, service_class._response_class)
self.service_class = service_class
self.headers = headers or {}
self.buff_size = buff_size
## TCPROSTransportProtocol API
## @param self
def get_header_fields(self):
"""
TCPROSTransportProtocol API
"""
headers = {'service': self.name, 'md5sum': self.service_class._md5sum,
'callerid': rospy.names.get_caller_id()}
# The current implementation allows user-supplied headers to
@ -137,12 +147,15 @@ class TCPROSServiceClient(TCPROSTransportProtocol):
headers[k] = v
return headers
## utility for reading the OK-byte/error-message header preceding each message
## @param self
## @param sock: socket connection. Will be read from if OK byte is
## false and error message needs to be read
## @param b StringIO: buffer to read from
def _read_ok_byte(self, b, sock):
"""
Utility for reading the OK-byte/error-message header preceding each message.
@param sock: socket connection. Will be read from if OK byte is
false and error message needs to be read
@type sock: socket.socket
@param b: buffer to read from
@type b: StringIO
"""
if b.tell() == 0:
return
pos = b.tell()
@ -153,9 +166,18 @@ class TCPROSServiceClient(TCPROSTransportProtocol):
str = self._read_service_error(sock, b)
raise ServiceException("service [%s] responded with an error: %s"%(self.name, str))
## In service implementation, reads in OK byte that preceeds each response. The OK byte allows
## for the passing of error messages instead of a response message
def read_messages(self, b, msg_queue, sock):
"""
In service implementation, reads in OK byte that preceeds each
response. The OK byte allows for the passing of error messages
instead of a response message
@param b: buffer
@type b: StringIO
@param msg_queue: Message queue to append to
@type msg_queue: [Message]
@param sock: socket to read from
@type sock: socket.socket
"""
self._read_ok_byte(b, sock)
rospy.msg.deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size, max_msgs=1, start=1) #rospy.msg
#deserialize_messages only resets the buffer to the start

View File

@ -32,12 +32,16 @@
#
# Revision $Id$
## Adapted from http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/203871
##
## Added a 'marker' to tasks so that multiple tasks with the same
## marker are not executed. As we are using the thread pool for i/o
## tasks, the marker is set to the i/o name. This prevents a slow i/o
## for gobbling up all of our threads
"""
Internal threadpool library for zenmaster.
Adapted from U{http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/203871}
Added a 'marker' to tasks so that multiple tasks with the same
marker are not executed. As we are using the thread pool for i/o
tasks, the marker is set to the i/o name. This prevents a slow i/o
for gobbling up all of our threads
"""
import threading, logging, traceback
from time import sleep

View File

@ -32,13 +32,13 @@
#
# Revision $Id$
"""rospy implementation of topics.
"""
rospy implementation of topics.
Client API
==========
L{Publisher} and L{Subscriber} are the client-API facing instantiation
of topics.
L{Publisher} and L{Subscriber} are the client API for topics.
Internal Implementation
=======================
@ -54,20 +54,13 @@ L{_TopicManager}
The L{_TopicManager} does the backend topic bookkeeping for the local
node. Use L{get_topic_manager()} to access singleton. Actual topic
implementations are done through the
L{_TopicImpl}/L{_PublisherImpl}/L{_SubscriberImpl}
hierarchy. Client code generates instances of type
L{Publisher}/L{Subscriber}, which enable to client to create
multiple publishers/subscribers of that topic that get controlled
access to the underlying share connections.
L{_TopicImpl}/L{_PublisherImpl}/L{_SubscriberImpl} hierarchy. Client
code generates instances of type L{Publisher}/L{Subscriber}, which
enable to client to create multiple publishers/subscribers of that
topic that get controlled access to the underlying share connections.
Common parent classes for all rospy topics. The rospy topic autogenerators
create classes that are children of these implementations.
TopicListener
=============
Subscribe to new topics created by the local node. This is mainly
a hook for creating registration calls to the master.
"""
from __future__ import with_statement
@ -92,15 +85,20 @@ logger = logging.getLogger('rospy.topics')
# for interfacing with topics, while _TopicImpl implements the
# underlying connection details.
##Base class of client API topic references
class Topic(object):
"""Base class of L{Publisher} and L{Subscriber}"""
## @param self
## @param name str: graph resource name of topic, e.g. 'laser'.
## @param data_class Message: message class for serialization
## @param reg_type Registration.PUB or Registration.SUB
## @raise ValueError if parameters are invalid
def __init__(self, name, data_class, reg_type):
"""
@param name: graph resource name of topic, e.g. 'laser'.
@type name: str
@param data_class: message class for serialization
@type data_class: Message
@param reg_type Registration.PUB or Registration.SUB
@type reg_type: str
@raise ValueError: if parameters are invalid
"""
if not name or not isinstance(name, basestring):
raise ValueError("topic parameter 'name' is not a non-empty string")
if data_class is None:
@ -117,29 +115,39 @@ class Topic(object):
self.reg_type = reg_type
self.impl = get_topic_manager().acquire_impl(reg_type, self.name, data_class)
## get the number of connections to other ROS nodes for this topic. For a Publisher,
## this corresponds to the number of nodes subscribing. For a Subscriber, the number
## of publishers.
def get_num_connections(self):
"""
get the number of connections to other ROS nodes for this topic. For a Publisher,
this corresponds to the number of nodes subscribing. For a Subscriber, the number
of publishers.
@return: number of connections
@rtype: int
"""
return self.impl.get_num_connections()
## unpublish/unsubscribe from topic. Topic instance is no longer
## valid after this call.
## @param self
def unregister(self):
"""
unpublish/unsubscribe from topic. Topic instance is no longer
valid after this call.
"""
get_topic_manager().release_impl(self.reg_type, self.name)
self.impl = self.name = self.type = self.md5sum = self.data_class = None
## Base class of internal topic implementations. Each topic has a
## singleton _TopicImpl implementation for managing the underlying
## connections.
class _TopicImpl(object):
"""
Base class of internal topic implementations. Each topic has a
singleton _TopicImpl implementation for managing the underlying
connections.
"""
## Base constructor
## @param self
## @param name str: graph resource name of topic, e.g. 'laser'.
## @param data_class Message: message data class
def __init__(self, name, data_class):
"""
Base constructor
@param name: graph resource name of topic, e.g. 'laser'.
@type name: str
@param data_class Message: message data class
@type data_class: Message
"""
self.name = resolve_name(name) #NOTE: remapping occurs here!
self.data_class = data_class
self.type = data_class._type
@ -160,9 +168,8 @@ class _TopicImpl(object):
#STATS
self.dead_connections = [] #for retaining stats on old conns
## close I/O
## @param self
def close(self):
"""close I/O"""
self.closed = True
with self.c_lock:
for c in self.connections:
@ -177,11 +184,13 @@ class _TopicImpl(object):
with self.c_lock:
return len(self.connections)
## Query whether or not a connection with the associated \a
## endpoint has been added to this object.
## @param self
## @param endpoint_id str: endpoint ID associated with connection.
def has_connection(self, endpoint_id):
"""
Query whether or not a connection with the associated \a
endpoint has been added to this object.
@param endpoint_id: endpoint ID associated with connection.
@type endpoint_id: str
"""
# save reference to avoid lock
conn = self.connections
for c in conn:
@ -189,18 +198,24 @@ class _TopicImpl(object):
return True
return False
## Check to see if this topic is connected to other publishers/subscribers
## @param self
def has_connections(self):
"""
Check to see if this topic is connected to other publishers/subscribers
@return: True if topic is connected
@rtype: bool
"""
if self.connections:
return True
return False
## Add a connection to this topic.
## @param self
## @param c Transport: connection instance
## @return bool: True if connection was added
def add_connection(self, c):
"""
Add a connection to this topic.
@param c: connection instance
@type c: Transport
@return: True if connection was added
@rtype: bool
"""
with self.c_lock:
# c_lock is to make add_connection thread-safe, but we
# still make a copy of self.connections so that the rest of the
@ -214,10 +229,12 @@ class _TopicImpl(object):
return True
## Remove connection from topic.
## @param self
## @param c Transport: connection instance to remove
def remove_connection(self, c):
"""
Remove connection from topic.
@param c: connection instance to remove
@type c: Transport
"""
try:
# c_lock is to make remove_connection thread-safe, but we
# still make a copy of self.connections so that the rest of the
@ -233,20 +250,21 @@ class _TopicImpl(object):
finally:
self.c_lock.release()
## Get the stats for this topic
## @param self
## @return list: stats for topic in getBusInfo() format
## ((connection_id, destination_caller_id, direction, transport, topic_name, connected)*)
def get_stats_info(self): # STATS
"""
Get the stats for this topic
@return: stats for topic in getBusInfo() format::
((connection_id, destination_caller_id, direction, transport, topic_name, connected)*)
@rtype: list
"""
# save referenceto avoid locking
connections = self.connections
dead_connections = self.dead_connections
return [(c.id, c.endpoint_id, c.direction, c.transport_type, self.name, True) for c in connections] + \
[(c.id, c.endpoint_id, c.direction, c.transport_type, self.name, False) for c in dead_connections]
## Get the stats for this topic (API stub)
## @param self
def get_stats(self): # STATS
"""Get the stats for this topic (API stub)"""
raise Exception("subclasses must override")
# Implementation note: Subscriber attaches to a _SubscriberImpl
@ -322,16 +340,19 @@ class Subscriber(Topic):
if tcp_nodelay:
self.impl.set_tcp_nodelay(tcp_nodelay)
## _Topic*Impl classes manage the underlying connections for a given topic. The
# separation of the _Topic*Impl classes and the Topic* classes that a client
# instantiates allows the client to generate multiple Topic* instances for a given
# topic. It also hides the underlying API for managing the connections.
class _SubscriberImpl(_TopicImpl):
"""
Underyling L{_TopicImpl} implementation for subscriptions.
"""
## @param self
## @param name str: graph resource name of topic, e.g. 'laser'.
# @param data_class Message: Message data class
def __init__(self, name, data_class):
"""
ctor.
@param name: graph resource name of topic, e.g. 'laser'.
@type name: str
@param data_class: Message data class
@type data_class: Message
"""
super(_SubscriberImpl, self).__init__(name, data_class)
# client-methods to invoke on new messages. should only modify
# under lock. This is a list of 2-tuples (fn, args), where
@ -341,17 +362,22 @@ class _SubscriberImpl(_TopicImpl):
self.buff_size = DEFAULT_BUFF_SIZE
self.tcp_nodelay = False
## Set the value of TCP_NODELAY, which causes the Nagle algorithm
## to be disabled for future topic connections, if the publisher
## supports it.
def set_tcp_nodelay(self, tcp_nodelay):
"""
Set the value of TCP_NODELAY, which causes the Nagle algorithm
to be disabled for future topic connections, if the publisher
supports it.
"""
self.tcp_nodelay = tcp_nodelay
## Set the receive queue size. If more than queue_size messages are waiting to be deserialized,
## they are discarded.
## @param self
## @param queue_size int: incoming queue size. Must be positive integer or None.
def set_queue_size(self, queue_size):
"""
Set the receive queue size. If more than queue_size messages
are waiting to be deserialized, they are discarded.
@param queue_size int: incoming queue size. Must be positive integer or None.
@type queue_size: int
"""
if queue_size == -1:
self.queue_size = None
elif queue_size == 0:
@ -361,22 +387,28 @@ class _SubscriberImpl(_TopicImpl):
else:
self.queue_size = queue_size
## Set the receive buffer size. The exact meaning of this is
## transport dependent.
## @param self
## @param buff_size int: receive buffer size
def set_buff_size(self, buff_size):
"""
Set the receive buffer size. The exact meaning of this is
transport dependent.
@param buff_size: receive buffer size
@type buff_size: int
"""
if type(buff_size) != int:
raise ROSException("buffer size must be an integer")
elif buff_size <= 0:
raise ROSException("buffer size must be a positive integer")
self.buff_size = buff_size
## Get the stats for this topic subscriber
## @param self
## @return list: stats for topic in getBusStats() publisher format (topicName, connStats),
## where connStats = [connectionId, bytesReceived, numSent, dropEstimate, connected]*
def get_stats(self): # STATS
"""
Get the stats for this topic subscriber
@return: stats for topic in getBusStats() publisher format::
(topicName, connStats)
where connStats is::
[connectionId, bytesReceived, numSent, dropEstimate, connected]*
@rtype: list
"""
# save reference to avoid locking
conn = self.connections
dead_conn = self.dead_connections
@ -386,13 +418,16 @@ class _SubscriberImpl(_TopicImpl):
for c in chain(conn, dead_conn)] )
return stats
## Register a callback to be invoked whenever a new message is received
## @param self
## @param cb fn(msg): callback function to invoke with message data
## instance, i.e. fn(data). If callback args is set, they will be passed
## in as the second argument.
## @param cb_cargs Any: additional arguments to pass to callback
def add_callback(self, cb, cb_args):
"""
Register a callback to be invoked whenever a new message is received
@param cb: callback function to invoke with message data
instance, i.e. fn(data). If callback args is set, they will
be passed in as the second argument.
@type cb: fn(msg)
@param cb_cargs: additional arguments to pass to callback
@type cb_cargs: Any
"""
with self.c_lock:
# we lock in order to serialize calls to add_callback, but
# we copy self.callbacks so we can it
@ -400,10 +435,12 @@ class _SubscriberImpl(_TopicImpl):
new_callbacks.append((cb, cb_args))
self.callbacks = new_callbacks
## Called by underlying connection transport for each new message received
## @param self
## @param msgs [Message]: message data
def receive_callback(self, msgs):
"""
Called by underlying connection transport for each new message received
@param msgs: message data
@type msgs: [Message]
"""
# save reference to avoid lock
callbacks = self.callbacks
for msg in msgs:
@ -535,10 +572,7 @@ def args_kwds_to_message(data_class, args, kwds):
class _PublisherImpl(_TopicImpl):
"""
_PublisherImpl classes manage the underlying connections for a given topic. The
separation of the _PublisherImpl classes and the L{Publisher} class that a client
instantiates allows the client to generate multiple L{Publisher} instances for a given
topic. It also hides the underlying API for managing the connections.
Underyling L{_TopicImpl} implementation for publishers.
"""
def __init__(self, name, data_class):

View File

@ -33,6 +33,8 @@
# Copyright (c) 2008, Willow Garage, Inc.
# Revision $Id$
"""Command-line handler for ROS zenmaster (Python Master)"""
import getopt
import logging
import os
@ -42,8 +44,6 @@ import time
import rospy
"""Command-line handler for ROS zenmaster (Python Master)"""
# Environment variables used to configure master/slave
from roslib.rosenv import ROS_ROOT, ROS_MASTER_URI, ROS_HOSTNAME, ROS_NAMESPACE, ROS_PACKAGE_PATH, ROS_LOG_DIR