converted to epydoc

This commit is contained in:
Ken Conley 2009-09-17 23:42:16 +00:00
parent 0a4bd45a33
commit 311ca713f5
1 changed files with 186 additions and 120 deletions

View File

@ -32,6 +32,8 @@
#
# Revision $Id: transport.py 2941 2008-11-26 03:19:48Z sfkwc $
"""Internal use: common TCPROS libraries"""
import cStringIO
import select
import socket
@ -59,12 +61,18 @@ DEFAULT_BUFF_SIZE = 65536
## name of our customized TCP protocol for accepting flows over server socket
TCPROS = "TCPROS"
## read data from \a sock into buffer \a b.
## @param sock: socket to read from
## @param b StringIO: buffer to receive into
## @param buff_size int: recv read size
## @return int number of bytes read
def recv_buff(sock, b, buff_size):
"""
Read data from socket into buffer.
@param sock: socket to read from
@type sock: socket.socket
@param b: buffer to receive into
@type b: StringIO
@param buff_size: recv read size
@type buff_size: int
@return: number of bytes read
@rtype: int
"""
d = sock.recv(buff_size)
if d:
b.write(d)
@ -72,18 +80,23 @@ def recv_buff(sock, b, buff_size):
else: #bomb out
raise TransportTerminated("unable to receive data from sender, check sender's logs for details")
## Simple server that accepts inbound TCP/IP connections and hands
## them off to a handler function. TCPServer obeys the
## ROS_IP/ROS_HOSTNAME environment variables
class TCPServer:
"""
Simple server that accepts inbound TCP/IP connections and hands
them off to a handler function. TCPServer obeys the
ROS_IP/ROS_HOSTNAME environment variables
"""
## Setup a server socket listening on the specified port. If the
## port is omitted, will choose any open port.
## @param self
## @param inbound_handler fn(sock, addr): handler to invoke with
## new connection
## @param port int: port to bind to, omit/0 to bind to any
def __init__(self, inbound_handler, port=0):
def __init__(self, inbound_handler, port=0):
"""
Setup a server socket listening on the specified port. If the
port is omitted, will choose any open port.
@param inbound_handler: handler to invoke with
new connection
@type inbound_handler: fn(sock, addr)
@param port: port to bind to, omit/0 to bind to any
@type port: int
"""
self.port = port #will get overwritten if port=0
self.addr = None #set at socket bind
self.is_shutdown = False
@ -94,13 +107,15 @@ class TCPServer:
self.server_sock = None
raise
## Runs the run() loop in a separate thread
def start(self):
"""Runs the run() loop in a separate thread"""
thread.start_new_thread(self.run, ())
## Main TCP receive loop. Should be run in a separate thread -- use start()
## to do this automatically.
def run(self):
"""
Main TCP receive loop. Should be run in a separate thread -- use start()
to do this automatically.
"""
self.is_shutdown = False
if not self.server_sock:
raise ROSInternalException("%s did not connect"%self.__class__.__name__)
@ -114,17 +129,21 @@ class TCPServer:
logwarn("Failed to handle inbound connection due to socket error: %s"%e)
logdebug("TCPServer[%s] shutting down", self.port)
## @param self
## @return (str, int): (ip address, port) of server socket binding
def get_full_addr(self):
"""
@return: (ip address, port) of server socket binding
@rtype: (str, int)
"""
# return roslib.network.get_host_name() instead of address so that it
# obeys ROS_IP/ROS_HOSTNAME behavior
return (roslib.network.get_host_name(), self.port)
## binds the server socket. ROS_IP/ROS_HOSTNAME may restrict
## binding to loopback interface.
## @param self
def _create_server_sock(self):
"""
binds the server socket. ROS_IP/ROS_HOSTNAME may restrict
binding to loopback interface.
"""
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.bind((roslib.network.get_bind_address(), self.port))
@ -132,66 +151,73 @@ class TCPServer:
server_sock.listen(5)
return server_sock
## shutdown I/O resources uses by this server
## @param self
def shutdown(self):
"""shutdown I/O resources uses by this server"""
if not self.is_shutdown:
self.is_shutdown = True
#self.server_sock.shutdown(socket.SHUT_RDWR)
self.server_sock.close()
## TCPROS Server
# base maintains a tcpros_server singleton that is shared between
# services and topics for inbound connections. This global is set in
# the tcprosserver constructor. Constructor is called by init_tcpros()
_tcpros_server = None
## starts the TCPROS server socket for inbound connections
def init_tcpros_server():
"""starts the TCPROS server socket for inbound connections"""
global _tcpros_server
if _tcpros_server is None:
_tcpros_server = TCPROSServer()
return _tcpros_server
## start the TCPROS server if it has not started already
## @throws Exception if tcpros server has not been created
def start_tcpros_server():
"""
start the TCPROS server if it has not started already
"""
if _tcpros_server is None:
init_tcpros_server()
return _tcpros_server.start_server()
# provide an accessor of this so that the TCPROS Server is entirely hidden from upper layers
## get the address of the tcpros server.
## @throws Exception if tcpros server has not been started or created
def get_tcpros_server_address():
"""
get the address of the tcpros server.
@raise Exception: if tcpros server has not been started or created
"""
return _tcpros_server.get_address()
## utility handler that does nothing more than provide a rejection header
## @param sock: socket connection
## @param client_addr client address
## @param header dict: request header
def _error_connection_handler(sock, client_addr, header):
"""
utility handler that does nothing more than provide a rejection header
@param sock: socket connection
@type sock: socket.socket
@param client_addr: client address
@type client_addr: str
@param header: request header
@type header: dict
"""
return {'error': "unhandled connection"}
## ROS Protocol handler for TCPROS. Accepts both TCPROS topic
## connections as well as ROS service connections over TCP. TCP server
## socket is run once start_server() is called -- this is implicitly
## called during init_publisher().
class TCPROSServer(object):
## @param self
"""
ROS Protocol handler for TCPROS. Accepts both TCPROS topic
connections as well as ROS service connections over TCP. TCP server
socket is run once start_server() is called -- this is implicitly
called during init_publisher().
"""
def __init__(self):
"""ctor."""
self.tcp_ros_server = None #: server for receiving tcp conn
self.lock = threading.Lock()
## should be set to fn(sock, client_addr, header) for topic connections
# should be set to fn(sock, client_addr, header) for topic connections
self.topic_connection_handler = _error_connection_handler
## should be set to fn(sock, client_addr, header) for service connections
# should be set to fn(sock, client_addr, header) for service connections
self.service_connection_handler = _error_connection_handler
## Starts the TCP socket server if one is not already running
## @param self
def start_server(self):
"""Starts the TCP socket server if one is not already running"""
if self.tcp_ros_server:
return
try:
@ -207,26 +233,32 @@ class TCPROSServer(object):
finally:
self.lock.release()
## @param self
## @return str, int: address and port of TCP server socket for
## accepting inbound connections
def get_address(self):
"""
@return: address and port of TCP server socket for accepting
inbound connections
@rtype: str, int
"""
if self.tcp_ros_server is not None:
return self.tcp_ros_server.get_full_addr()
return None, None
## stops the TCP/IP server responsible for receiving inbound connections
## @param self
def shutdown(self):
"""stops the TCP/IP server responsible for receiving inbound connections"""
if self.tcp_ros_server:
self.tcp_ros_server.shutdown()
## TCPServer callback: detects incoming topic or service connection and passes connection accordingly
## @param self
## @param sock socket: socket connection
## @param client_addr (str, int): client address
## @throws TransportInitError If transport cannot be succesfully initialized
def _tcp_server_callback(self, sock, client_addr):
"""
TCPServer callback: detects incoming topic or service connection and passes connection accordingly
@param sock: socket connection
@type sock: socket.socket
@param client_addr: client address
@type client_addr: (str, int)
@raise TransportInitError: If transport cannot be succesfully initialized
"""
#TODOXXX:rewrite this logic so it is possible to create TCPROSTransport object first, set its protocol,
#and then use that to do the writing
try:
@ -252,18 +284,21 @@ class TCPROSServer(object):
if sock is not None:
sock.close()
## Abstraction of TCPROS connections. Implementations Services/Publishers/Subscribers must implement this
## protocol, which defines how messages are deserialized from an inbound connection (read_messages()) as
## well as which fields to send when creating a new connection (get_header_fields()).
class TCPROSTransportProtocol(object):
"""
Abstraction of TCPROS connections. Implementations Services/Publishers/Subscribers must implement this
protocol, which defines how messages are deserialized from an inbound connection (read_messages()) as
well as which fields to send when creating a new connection (get_header_fields()).
"""
## ctor
## @param self
## @param name str: service or topic name
## @param recv_data_class Class: message class for deserializing inbound messages
## @param queue_size int: maximum number of inbound messages to maintain
## @param buff_size int: recieve buffer size (in bytes) for reading data from the inbound connection.
def __init__(self, name, recv_data_class, queue_size=None, buff_size=DEFAULT_BUFF_SIZE):
"""
ctor
@param name str: service or topic name
@param recv_data_class Class: message class for deserializing inbound messages
@param queue_size int: maximum number of inbound messages to maintain
@param buff_size int: recieve buffer size (in bytes) for reading data from the inbound connection.
"""
if recv_data_class and not issubclass(recv_data_class, Message):
raise TransportInitError("Unable to initialize transport: data class is not a message data class")
self.name = name
@ -272,19 +307,26 @@ class TCPROSTransportProtocol(object):
self.buff_size = buff_size
self.direction = BIDIRECTIONAL
## @param self
## @param b StringIO: read buffer
## @param msg_queue [Message]: queue of deserialized messages
## @param sock socket: protocol can optionally read more data from
## the socket, but in most cases the required data will already be
## in \a b
def read_messages(self, b, msg_queue, sock):
"""
@param b StringIO: read buffer
@param msg_queue [Message]: queue of deserialized messages
@type msg_queue: [Message]
@param sock socket: protocol can optionally read more data from
the socket, but in most cases the required data will already be
in b
"""
# default implementation
deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size)
## @param self
## @return dict {str : str}: header fields to send when connecting to server
def get_header_fields(self):
"""
Header fields that should be sent over the connection. The header fields
are protocol specific (i.e. service vs. topic, publisher vs. subscriber).
@return: {str : str}: header fields to send over connection
@rtype: dict
"""
return {}
# TODO: this still isn't as clean and seamless as I want it to
@ -297,18 +339,21 @@ class TCPROSTransportProtocol(object):
# duplicative. I would also come up with a better name than
# protocol.
## Generic implementation of TCPROS exchange routines for both topics and services
class TCPROSTransport(Transport):
"""
Generic implementation of TCPROS exchange routines for both topics and services
"""
transport_type = 'TCPROS'
## ctor
## @param self
## @param name str: topic or service name
## @param protocol TCPROSTransportProtocol protocol implementation
## @param header dict: (optional) handshake header if transport handshake header was
## already read off of transport.
## @throws TransportInitError if transport cannot be initialized according to arguments
def __init__(self, protocol, name, header=None):
"""
ctor
@param name str: topic or service name
@param protocol TCPROSTransportProtocol protocol implementation
@param header dict: (optional) handshake header if transport handshake header was
already read off of transport.
@raise TransportInitError if transport cannot be initialized according to arguments
"""
super(TCPROSTransport, self).__init__(protocol.direction, name=name)
if not name:
raise TransportInitError("Unable to initialize transport: name is not set")
@ -328,26 +373,35 @@ class TCPROSTransport(Transport):
self.md5sum = None
self.type = None
## Set the socket for this transport
## @param self
## @param sock: socket
## @param endpoint_id str: identifier for connection endpoint
## @throws TransportInitError if socket has already been set
def set_socket(self, sock, endpoint_id):
"""
Set the socket for this transport
@param sock: socket
@type sock: socket.socket
@param endpoint_id: identifier for connection endpoint
@type endpoint_id: str
@raise TransportInitError: if socket has already been set
"""
if self.socket is not None:
raise TransportInitError("socket already initialized")
self.socket = sock
self.endpoint_id = endpoint_id
## Establish TCP connection to the specified
## address/port. connect() always calls write_header() and
## read_header() after the connection is made
## @param self
## @param dest_addr str
## @param dest_port int
## @param endpoint_id str: string identifier for connection (for statistics)
## @param timeout float: (optional keyword) timeout in seconds
def connect(self, dest_addr, dest_port, endpoint_id, timeout=None):
"""
Establish TCP connection to the specified
address/port. connect() always calls L{write_header()} and
L{read_header()} after the connection is made
@param dest_addr: destination IP address
@type dest_addr: str
@param dest_port: destination port
@type dest_port: int
@param endpoint_id: string identifier for connection (for statistics)
@type endpoint_id: str
@param timeout: (optional keyword) timeout in seconds
@type timeout: float
@raise TransportInitError: if unable to create connection
"""
try:
self.endpoint_id = endpoint_id
@ -371,11 +425,13 @@ class TCPROSTransport(Transport):
rospywarn("Unknown error initiating TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc()))
raise TransportInitError(str(e)) #re-raise i/o error
## Validate header and initialize fields accordingly
## @param self
## @param header dict: header fields from publisher
## @throws TransportInitError if header fails to validate
def _validate_header(self, header):
"""
Validate header and initialize fields accordingly
@param header: header fields from publisher
@type header: dict
@raise TransportInitError: if header fails to validate
"""
self.header = header
if 'error' in header:
raise TransportInitError("remote error reported: %s"%header['error'])
@ -385,9 +441,8 @@ class TCPROSTransport(Transport):
self.md5sum = header['md5sum']
self.type = header['type']
## Writes the TCPROS header to the active connection.
## @param self
def write_header(self):
"""Writes the TCPROS header to the active connection."""
sock = self.socket
# socket may still be getting spun up, so wait for it to be writable
fileno = sock.fileno()
@ -398,31 +453,37 @@ class TCPROSTransport(Transport):
sock.setblocking(1)
self.stat_bytes += write_ros_handshake_header(sock, self.protocol.get_header_fields())
## Read TCPROS header from socket
## @param self
## @throws TransportInitError if header fails to validate
def read_header(self):
"""
Read TCPROS header from active socket
@raise TransportInitError if header fails to validate
"""
self.socket.setblocking(1)
self._validate_header(read_ros_handshake_header(self.socket, self.read_buff, self.protocol.buff_size))
## Convenience routine for services to send a message across a
## particular connection. NOTE: write_data is much more efficient
## if same message is being sent to multiple connections. Not
## threadsafe.
## @param self
## @param msg Msg: message to send
## @param seq int: sequence number for message
## @throws TransportException if error occurred sending message
def send_message(self, msg, seq):
"""
Convenience routine for services to send a message across a
particular connection. NOTE: write_data is much more efficient
if same message is being sent to multiple connections. Not
threadsafe.
@param msg: message to send
@type msg: Msg
@param seq: sequence number for message
@type seq: int
@raise TransportException: if error occurred sending message
"""
# this will call write_data(), so no need to keep track of stats
serialize_message(self.write_buff, seq, msg)
self.write_data(self.write_buff.getvalue())
self.write_buff.truncate(0)
## Write raw data to transport
## @throws TransportInitialiationError could not be initialized
## @throws TransportTerminated no longer open for publishing
def write_data(self, data):
"""
Write raw data to transport
@raise TransportInitialiationError: could not be initialized
@raise TransportTerminated: no longer open for publishing
"""
if not self.socket:
raise TransportInitError("TCPROS transport was not successfully initialized")
if self.done:
@ -459,10 +520,13 @@ class TCPROSTransport(Transport):
raise
return True
## block until messages are read off of socket
## @return [Msg] : list of newly received messages
## @throws TransportException
def receive_once(self):
"""
block until messages are read off of socket
@return: list of newly received messages
@rtype: [Msg]
@raise TransportException: if unable to receive message due to error
"""
sock = self.socket
if sock is None:
raise TransportException("connection not initialized")
@ -492,10 +556,12 @@ class TCPROSTransport(Transport):
raise TransportException("receive_once[%s]: unexpected error %s"%(self.name, str(e)))
return retval
## Receive messages until shutdown
## @param self
## @param msgs_callback fn([msg]): callback to invoke for new messages received
def receive_loop(self, msgs_callback):
"""
Receive messages until shutdown
@param msgs_callback: callback to invoke for new messages received
@type msgs_callback: fn([msg])
"""
# - use assert here as this would be an internal error, aka bug
logger.debug("receive_loop for [%s]", self.name)
try:
@ -522,8 +588,8 @@ class TCPROSTransport(Transport):
self.close()
self.done = True
## close i/o and release resources
def close(self):
"""close i/o and release resources"""
super(TCPROSTransport, self).close()
self.done = True
if self.socket is not None: