diff --git a/core/rospy/src/rospy/tcpros_base.py b/core/rospy/src/rospy/tcpros_base.py index ef0a582e..346c0634 100644 --- a/core/rospy/src/rospy/tcpros_base.py +++ b/core/rospy/src/rospy/tcpros_base.py @@ -32,6 +32,8 @@ # # Revision $Id: transport.py 2941 2008-11-26 03:19:48Z sfkwc $ +"""Internal use: common TCPROS libraries""" + import cStringIO import select import socket @@ -59,12 +61,18 @@ DEFAULT_BUFF_SIZE = 65536 ## name of our customized TCP protocol for accepting flows over server socket TCPROS = "TCPROS" -## read data from \a sock into buffer \a b. -## @param sock: socket to read from -## @param b StringIO: buffer to receive into -## @param buff_size int: recv read size -## @return int number of bytes read def recv_buff(sock, b, buff_size): + """ + Read data from socket into buffer. + @param sock: socket to read from + @type sock: socket.socket + @param b: buffer to receive into + @type b: StringIO + @param buff_size: recv read size + @type buff_size: int + @return: number of bytes read + @rtype: int + """ d = sock.recv(buff_size) if d: b.write(d) @@ -72,18 +80,23 @@ def recv_buff(sock, b, buff_size): else: #bomb out raise TransportTerminated("unable to receive data from sender, check sender's logs for details") -## Simple server that accepts inbound TCP/IP connections and hands -## them off to a handler function. TCPServer obeys the -## ROS_IP/ROS_HOSTNAME environment variables class TCPServer: + """ + Simple server that accepts inbound TCP/IP connections and hands + them off to a handler function. TCPServer obeys the + ROS_IP/ROS_HOSTNAME environment variables + """ - ## Setup a server socket listening on the specified port. If the - ## port is omitted, will choose any open port. - ## @param self - ## @param inbound_handler fn(sock, addr): handler to invoke with - ## new connection - ## @param port int: port to bind to, omit/0 to bind to any - def __init__(self, inbound_handler, port=0): + def __init__(self, inbound_handler, port=0): + """ + Setup a server socket listening on the specified port. If the + port is omitted, will choose any open port. + @param inbound_handler: handler to invoke with + new connection + @type inbound_handler: fn(sock, addr) + @param port: port to bind to, omit/0 to bind to any + @type port: int + """ self.port = port #will get overwritten if port=0 self.addr = None #set at socket bind self.is_shutdown = False @@ -94,13 +107,15 @@ class TCPServer: self.server_sock = None raise - ## Runs the run() loop in a separate thread def start(self): + """Runs the run() loop in a separate thread""" thread.start_new_thread(self.run, ()) - ## Main TCP receive loop. Should be run in a separate thread -- use start() - ## to do this automatically. def run(self): + """ + Main TCP receive loop. Should be run in a separate thread -- use start() + to do this automatically. + """ self.is_shutdown = False if not self.server_sock: raise ROSInternalException("%s did not connect"%self.__class__.__name__) @@ -114,17 +129,21 @@ class TCPServer: logwarn("Failed to handle inbound connection due to socket error: %s"%e) logdebug("TCPServer[%s] shutting down", self.port) - ## @param self - ## @return (str, int): (ip address, port) of server socket binding + def get_full_addr(self): + """ + @return: (ip address, port) of server socket binding + @rtype: (str, int) + """ # return roslib.network.get_host_name() instead of address so that it # obeys ROS_IP/ROS_HOSTNAME behavior return (roslib.network.get_host_name(), self.port) - ## binds the server socket. ROS_IP/ROS_HOSTNAME may restrict - ## binding to loopback interface. - ## @param self def _create_server_sock(self): + """ + binds the server socket. ROS_IP/ROS_HOSTNAME may restrict + binding to loopback interface. + """ server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_sock.bind((roslib.network.get_bind_address(), self.port)) @@ -132,66 +151,73 @@ class TCPServer: server_sock.listen(5) return server_sock - ## shutdown I/O resources uses by this server - ## @param self def shutdown(self): + """shutdown I/O resources uses by this server""" if not self.is_shutdown: self.is_shutdown = True #self.server_sock.shutdown(socket.SHUT_RDWR) self.server_sock.close() -## TCPROS Server - # base maintains a tcpros_server singleton that is shared between # services and topics for inbound connections. This global is set in # the tcprosserver constructor. Constructor is called by init_tcpros() _tcpros_server = None -## starts the TCPROS server socket for inbound connections def init_tcpros_server(): + """starts the TCPROS server socket for inbound connections""" global _tcpros_server if _tcpros_server is None: _tcpros_server = TCPROSServer() return _tcpros_server -## start the TCPROS server if it has not started already -## @throws Exception if tcpros server has not been created def start_tcpros_server(): + """ + start the TCPROS server if it has not started already + """ if _tcpros_server is None: init_tcpros_server() return _tcpros_server.start_server() # provide an accessor of this so that the TCPROS Server is entirely hidden from upper layers -## get the address of the tcpros server. -## @throws Exception if tcpros server has not been started or created def get_tcpros_server_address(): + """ + get the address of the tcpros server. + @raise Exception: if tcpros server has not been started or created + """ return _tcpros_server.get_address() -## utility handler that does nothing more than provide a rejection header -## @param sock: socket connection -## @param client_addr client address -## @param header dict: request header def _error_connection_handler(sock, client_addr, header): + """ + utility handler that does nothing more than provide a rejection header + @param sock: socket connection + @type sock: socket.socket + @param client_addr: client address + @type client_addr: str + @param header: request header + @type header: dict + """ return {'error': "unhandled connection"} -## ROS Protocol handler for TCPROS. Accepts both TCPROS topic -## connections as well as ROS service connections over TCP. TCP server -## socket is run once start_server() is called -- this is implicitly -## called during init_publisher(). class TCPROSServer(object): - ## @param self + """ + ROS Protocol handler for TCPROS. Accepts both TCPROS topic + connections as well as ROS service connections over TCP. TCP server + socket is run once start_server() is called -- this is implicitly + called during init_publisher(). + """ + def __init__(self): + """ctor.""" self.tcp_ros_server = None #: server for receiving tcp conn self.lock = threading.Lock() - ## should be set to fn(sock, client_addr, header) for topic connections + # should be set to fn(sock, client_addr, header) for topic connections self.topic_connection_handler = _error_connection_handler - ## should be set to fn(sock, client_addr, header) for service connections + # should be set to fn(sock, client_addr, header) for service connections self.service_connection_handler = _error_connection_handler - ## Starts the TCP socket server if one is not already running - ## @param self def start_server(self): + """Starts the TCP socket server if one is not already running""" if self.tcp_ros_server: return try: @@ -207,26 +233,32 @@ class TCPROSServer(object): finally: self.lock.release() - ## @param self - ## @return str, int: address and port of TCP server socket for - ## accepting inbound connections + def get_address(self): + """ + @return: address and port of TCP server socket for accepting + inbound connections + @rtype: str, int + """ if self.tcp_ros_server is not None: return self.tcp_ros_server.get_full_addr() return None, None - ## stops the TCP/IP server responsible for receiving inbound connections - ## @param self def shutdown(self): + """stops the TCP/IP server responsible for receiving inbound connections""" if self.tcp_ros_server: self.tcp_ros_server.shutdown() - ## TCPServer callback: detects incoming topic or service connection and passes connection accordingly - ## @param self - ## @param sock socket: socket connection - ## @param client_addr (str, int): client address - ## @throws TransportInitError If transport cannot be succesfully initialized def _tcp_server_callback(self, sock, client_addr): + """ + TCPServer callback: detects incoming topic or service connection and passes connection accordingly + + @param sock: socket connection + @type sock: socket.socket + @param client_addr: client address + @type client_addr: (str, int) + @raise TransportInitError: If transport cannot be succesfully initialized + """ #TODOXXX:rewrite this logic so it is possible to create TCPROSTransport object first, set its protocol, #and then use that to do the writing try: @@ -252,18 +284,21 @@ class TCPROSServer(object): if sock is not None: sock.close() -## Abstraction of TCPROS connections. Implementations Services/Publishers/Subscribers must implement this -## protocol, which defines how messages are deserialized from an inbound connection (read_messages()) as -## well as which fields to send when creating a new connection (get_header_fields()). class TCPROSTransportProtocol(object): + """ + Abstraction of TCPROS connections. Implementations Services/Publishers/Subscribers must implement this + protocol, which defines how messages are deserialized from an inbound connection (read_messages()) as + well as which fields to send when creating a new connection (get_header_fields()). + """ - ## ctor - ## @param self - ## @param name str: service or topic name - ## @param recv_data_class Class: message class for deserializing inbound messages - ## @param queue_size int: maximum number of inbound messages to maintain - ## @param buff_size int: recieve buffer size (in bytes) for reading data from the inbound connection. def __init__(self, name, recv_data_class, queue_size=None, buff_size=DEFAULT_BUFF_SIZE): + """ + ctor + @param name str: service or topic name + @param recv_data_class Class: message class for deserializing inbound messages + @param queue_size int: maximum number of inbound messages to maintain + @param buff_size int: recieve buffer size (in bytes) for reading data from the inbound connection. + """ if recv_data_class and not issubclass(recv_data_class, Message): raise TransportInitError("Unable to initialize transport: data class is not a message data class") self.name = name @@ -272,19 +307,26 @@ class TCPROSTransportProtocol(object): self.buff_size = buff_size self.direction = BIDIRECTIONAL - ## @param self - ## @param b StringIO: read buffer - ## @param msg_queue [Message]: queue of deserialized messages - ## @param sock socket: protocol can optionally read more data from - ## the socket, but in most cases the required data will already be - ## in \a b + def read_messages(self, b, msg_queue, sock): + """ + @param b StringIO: read buffer + @param msg_queue [Message]: queue of deserialized messages + @type msg_queue: [Message] + @param sock socket: protocol can optionally read more data from + the socket, but in most cases the required data will already be + in b + """ # default implementation deserialize_messages(b, msg_queue, self.recv_data_class, queue_size=self.queue_size) - ## @param self - ## @return dict {str : str}: header fields to send when connecting to server def get_header_fields(self): + """ + Header fields that should be sent over the connection. The header fields + are protocol specific (i.e. service vs. topic, publisher vs. subscriber). + @return: {str : str}: header fields to send over connection + @rtype: dict + """ return {} # TODO: this still isn't as clean and seamless as I want it to @@ -297,18 +339,21 @@ class TCPROSTransportProtocol(object): # duplicative. I would also come up with a better name than # protocol. -## Generic implementation of TCPROS exchange routines for both topics and services class TCPROSTransport(Transport): + """ + Generic implementation of TCPROS exchange routines for both topics and services + """ transport_type = 'TCPROS' - ## ctor - ## @param self - ## @param name str: topic or service name - ## @param protocol TCPROSTransportProtocol protocol implementation - ## @param header dict: (optional) handshake header if transport handshake header was - ## already read off of transport. - ## @throws TransportInitError if transport cannot be initialized according to arguments def __init__(self, protocol, name, header=None): + """ + ctor + @param name str: topic or service name + @param protocol TCPROSTransportProtocol protocol implementation + @param header dict: (optional) handshake header if transport handshake header was + already read off of transport. + @raise TransportInitError if transport cannot be initialized according to arguments + """ super(TCPROSTransport, self).__init__(protocol.direction, name=name) if not name: raise TransportInitError("Unable to initialize transport: name is not set") @@ -328,26 +373,35 @@ class TCPROSTransport(Transport): self.md5sum = None self.type = None - ## Set the socket for this transport - ## @param self - ## @param sock: socket - ## @param endpoint_id str: identifier for connection endpoint - ## @throws TransportInitError if socket has already been set def set_socket(self, sock, endpoint_id): + """ + Set the socket for this transport + @param sock: socket + @type sock: socket.socket + @param endpoint_id: identifier for connection endpoint + @type endpoint_id: str + @raise TransportInitError: if socket has already been set + """ if self.socket is not None: raise TransportInitError("socket already initialized") self.socket = sock self.endpoint_id = endpoint_id - ## Establish TCP connection to the specified - ## address/port. connect() always calls write_header() and - ## read_header() after the connection is made - ## @param self - ## @param dest_addr str - ## @param dest_port int - ## @param endpoint_id str: string identifier for connection (for statistics) - ## @param timeout float: (optional keyword) timeout in seconds def connect(self, dest_addr, dest_port, endpoint_id, timeout=None): + """ + Establish TCP connection to the specified + address/port. connect() always calls L{write_header()} and + L{read_header()} after the connection is made + @param dest_addr: destination IP address + @type dest_addr: str + @param dest_port: destination port + @type dest_port: int + @param endpoint_id: string identifier for connection (for statistics) + @type endpoint_id: str + @param timeout: (optional keyword) timeout in seconds + @type timeout: float + @raise TransportInitError: if unable to create connection + """ try: self.endpoint_id = endpoint_id @@ -371,11 +425,13 @@ class TCPROSTransport(Transport): rospywarn("Unknown error initiating TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc())) raise TransportInitError(str(e)) #re-raise i/o error - ## Validate header and initialize fields accordingly - ## @param self - ## @param header dict: header fields from publisher - ## @throws TransportInitError if header fails to validate def _validate_header(self, header): + """ + Validate header and initialize fields accordingly + @param header: header fields from publisher + @type header: dict + @raise TransportInitError: if header fails to validate + """ self.header = header if 'error' in header: raise TransportInitError("remote error reported: %s"%header['error']) @@ -385,9 +441,8 @@ class TCPROSTransport(Transport): self.md5sum = header['md5sum'] self.type = header['type'] - ## Writes the TCPROS header to the active connection. - ## @param self def write_header(self): + """Writes the TCPROS header to the active connection.""" sock = self.socket # socket may still be getting spun up, so wait for it to be writable fileno = sock.fileno() @@ -398,31 +453,37 @@ class TCPROSTransport(Transport): sock.setblocking(1) self.stat_bytes += write_ros_handshake_header(sock, self.protocol.get_header_fields()) - ## Read TCPROS header from socket - ## @param self - ## @throws TransportInitError if header fails to validate def read_header(self): + """ + Read TCPROS header from active socket + @raise TransportInitError if header fails to validate + """ self.socket.setblocking(1) self._validate_header(read_ros_handshake_header(self.socket, self.read_buff, self.protocol.buff_size)) - ## Convenience routine for services to send a message across a - ## particular connection. NOTE: write_data is much more efficient - ## if same message is being sent to multiple connections. Not - ## threadsafe. - ## @param self - ## @param msg Msg: message to send - ## @param seq int: sequence number for message - ## @throws TransportException if error occurred sending message def send_message(self, msg, seq): + """ + Convenience routine for services to send a message across a + particular connection. NOTE: write_data is much more efficient + if same message is being sent to multiple connections. Not + threadsafe. + @param msg: message to send + @type msg: Msg + @param seq: sequence number for message + @type seq: int + @raise TransportException: if error occurred sending message + """ # this will call write_data(), so no need to keep track of stats serialize_message(self.write_buff, seq, msg) self.write_data(self.write_buff.getvalue()) self.write_buff.truncate(0) - ## Write raw data to transport - ## @throws TransportInitialiationError could not be initialized - ## @throws TransportTerminated no longer open for publishing def write_data(self, data): + """ + Write raw data to transport + @raise TransportInitialiationError: could not be initialized + @raise TransportTerminated: no longer open for publishing + """ if not self.socket: raise TransportInitError("TCPROS transport was not successfully initialized") if self.done: @@ -459,10 +520,13 @@ class TCPROSTransport(Transport): raise return True - ## block until messages are read off of socket - ## @return [Msg] : list of newly received messages - ## @throws TransportException def receive_once(self): + """ + block until messages are read off of socket + @return: list of newly received messages + @rtype: [Msg] + @raise TransportException: if unable to receive message due to error + """ sock = self.socket if sock is None: raise TransportException("connection not initialized") @@ -492,10 +556,12 @@ class TCPROSTransport(Transport): raise TransportException("receive_once[%s]: unexpected error %s"%(self.name, str(e))) return retval - ## Receive messages until shutdown - ## @param self - ## @param msgs_callback fn([msg]): callback to invoke for new messages received def receive_loop(self, msgs_callback): + """ + Receive messages until shutdown + @param msgs_callback: callback to invoke for new messages received + @type msgs_callback: fn([msg]) + """ # - use assert here as this would be an internal error, aka bug logger.debug("receive_loop for [%s]", self.name) try: @@ -522,8 +588,8 @@ class TCPROSTransport(Transport): self.close() self.done = True - ## close i/o and release resources def close(self): + """close i/o and release resources""" super(TCPROSTransport, self).close() self.done = True if self.socket is not None: