diff --git a/core/roscpp/include/ros/connection.h b/core/roscpp/include/ros/connection.h index 2b0231d5..d1d3c874 100644 --- a/core/roscpp/include/ros/connection.h +++ b/core/roscpp/include/ros/connection.h @@ -215,13 +215,6 @@ private: /// Function that handles the incoming header HeaderReceivedFunc header_func_; - /** - * If there is data available to read, we always try to read into our fixed read buffer, even if a read request - * has not been made. - */ - uint8_t fixed_read_buffer_[READ_BUFFER_SIZE]; - uint32_t fixed_read_filled_; - /// Read buffer that ends up being passed to the read callback boost::shared_array read_buffer_; /// Amount of data currently in the read buffer, in bytes diff --git a/core/roscpp/include/ros/transport/transport.h b/core/roscpp/include/ros/transport/transport.h index 91cdb6ec..c53d16d1 100644 --- a/core/roscpp/include/ros/transport/transport.h +++ b/core/roscpp/include/ros/transport/transport.h @@ -81,6 +81,15 @@ public: */ virtual void disableWrite() = 0; + /** + * \brief Enable reading on this transport. Allows derived classes to, for example, enable read polling for asynchronous sockets + */ + virtual void enableRead() = 0; + /** + * \brief Disable reading on this transport. Allows derived classes to, for example, disable read polling for asynchronous sockets + */ + virtual void disableRead() = 0; + /** * \brief Close this transport. Once this call has returned, writing on this transport should always return an error. */ diff --git a/core/roscpp/include/ros/transport/transport_tcp.h b/core/roscpp/include/ros/transport/transport_tcp.h index 6e76cfa7..07b06480 100644 --- a/core/roscpp/include/ros/transport/transport_tcp.h +++ b/core/roscpp/include/ros/transport/transport_tcp.h @@ -109,6 +109,8 @@ public: virtual void enableWrite(); virtual void disableWrite(); + virtual void enableRead(); + virtual void disableRead(); virtual void close(); @@ -133,11 +135,6 @@ private: */ bool setSocket(int sock); - /** - * \brief Enables reading on our socket - */ - void enableRead(); - void socketUpdate(int events); int sock_; diff --git a/core/roscpp/include/ros/transport/transport_udp.h b/core/roscpp/include/ros/transport/transport_udp.h index bb094faa..f84a9a50 100644 --- a/core/roscpp/include/ros/transport/transport_udp.h +++ b/core/roscpp/include/ros/transport/transport_udp.h @@ -39,7 +39,6 @@ #include #include -#include #include @@ -103,17 +102,14 @@ public: */ int getServerPort() const {return server_port_;} - /** - * \brief Get a unique connection ID - */ - int generateConnectionId() {return gen_();} - // overrides from Transport virtual int32_t read(uint8_t* buffer, uint32_t size); virtual int32_t write(uint8_t* buffer, uint32_t size); virtual void enableWrite(); virtual void disableWrite(); + virtual void enableRead(); + virtual void disableRead(); virtual void close(); @@ -138,11 +134,6 @@ private: */ bool setSocket(int sock); - /** - * \brief Enables reading on our socket - */ - void enableRead(); - void socketUpdate(int events); int sock_; @@ -161,8 +152,6 @@ private: PollSet* poll_set_; int flags_; - boost::rand48 gen_; - uint32_t connection_id_; uint8_t current_message_id_; uint16_t total_blocks_; @@ -170,6 +159,10 @@ private: uint32_t max_datagram_size_; + uint8_t* data_buffer_; + uint8_t* data_start_; + uint32_t data_filled_; + uint8_t *reorder_buffer_; TransportUDPHeader reorder_header_; uint32_t reorder_bytes_; diff --git a/core/roscpp/src/libros/connection.cpp b/core/roscpp/src/libros/connection.cpp index 538a0455..1c3f2ede 100644 --- a/core/roscpp/src/libros/connection.cpp +++ b/core/roscpp/src/libros/connection.cpp @@ -47,7 +47,6 @@ namespace ros Connection::Connection() : is_server_(false) , dropped_(false) -, fixed_read_filled_(0) , read_filled_(0) , read_size_(0) , reading_(false) @@ -115,56 +114,41 @@ void Connection::readTransport() reading_ = true; - if (fixed_read_filled_ < READ_BUFFER_SIZE) + while (!dropped_ && has_read_callback_) { - int32_t bytes_read = transport_->read(fixed_read_buffer_ + fixed_read_filled_, READ_BUFFER_SIZE - fixed_read_filled_); - ROS_DEBUG_NAMED("superdebug", "Connection read %d bytes", bytes_read); - if (dropped_) + ROS_ASSERT(read_buffer_); + uint32_t to_read = read_size_ - read_filled_; + if (to_read > 0) { - return; - } - else if (bytes_read < 0) - { - reading_ = false; + int32_t bytes_read = transport_->read(read_buffer_.get() + read_filled_, to_read); + ROS_DEBUG_NAMED("superdebug", "Connection read %d bytes", bytes_read); + if (dropped_) + { + return; + } + else if (bytes_read < 0) + { + // Bad read, throw away results and report error + ReadFinishedFunc callback; + callback = read_callback_; + read_callback_.clear(); + read_buffer_.reset(); + uint32_t size = read_size_; + read_size_ = 0; + read_filled_ = 0; + has_read_callback_ = 0; - // Bad read, throw away results and report error - ReadFinishedFunc callback; - callback = read_callback_; - read_callback_ = ReadFinishedFunc(); - read_buffer_ = boost::shared_array(); - uint32_t size = read_size_; - read_size_ = 0; - read_filled_ = 0; - has_read_callback_ = 0; - fixed_read_filled_ = 0; + if (callback) + { + callback(shared_from_this(), read_buffer_, size, false); + } + } - if (callback) - callback(shared_from_this(), read_buffer_, size, false); - return; + read_filled_ += bytes_read; } - fixed_read_filled_ += bytes_read; - } - else - { - if (has_read_callback_) - { - ROS_WARN("Connection read buffer filled with no read callback set"); - } - } - - while (has_read_callback_ && (fixed_read_filled_ > 0 || read_size_ == 0) && !dropped_) - { ROS_ASSERT((int)read_size_ >= 0); ROS_ASSERT((int)read_filled_ >= 0); - - uint32_t write_amount = std::min(read_size_ - read_filled_, fixed_read_filled_); - ROS_DEBUG_NAMED("superdebug", "Copying %d bytes into read buffer", write_amount); - memcpy(read_buffer_.get() + read_filled_, fixed_read_buffer_, write_amount); - memmove(fixed_read_buffer_, fixed_read_buffer_ + write_amount, fixed_read_filled_ - write_amount); - fixed_read_filled_ -= write_amount; - read_filled_ += write_amount; - ROS_ASSERT(read_filled_ <= read_size_); if (read_filled_ == read_size_ && !dropped_) @@ -179,8 +163,8 @@ void Connection::readTransport() callback = read_callback_; size = read_size_; buffer = read_buffer_; - read_callback_ = ReadFinishedFunc(); - read_buffer_ = boost::shared_array(); + read_callback_.clear(); + read_buffer_.reset(); read_size_ = 0; read_filled_ = 0; has_read_callback_ = 0; @@ -188,6 +172,15 @@ void Connection::readTransport() ROS_DEBUG_NAMED("superdebug", "Calling read callback"); callback(shared_from_this(), buffer, size, true); } + else + { + break; + } + } + + if (!has_read_callback_) + { + transport_->disableRead(); } reading_ = false; @@ -283,6 +276,8 @@ void Connection::read(uint32_t size, const ReadFinishedFunc& callback) has_read_callback_ = 1; } + transport_->enableRead(); + // read immediately if possible readTransport(); } diff --git a/core/roscpp/src/libros/service_server_link.cpp b/core/roscpp/src/libros/service_server_link.cpp index d54211a9..f4c75e9d 100644 --- a/core/roscpp/src/libros/service_server_link.cpp +++ b/core/roscpp/src/libros/service_server_link.cpp @@ -177,6 +177,7 @@ void ServiceServerLink::onConnectionDropped(const ConnectionPtr& conn) void ServiceServerLink::onRequestWritten(const ConnectionPtr& conn) { + //ros::WallDuration(0.1).sleep(); connection_->read(5, boost::bind(&ServiceServerLink::onResponseOkAndLength, this, _1, _2, _3, _4)); } @@ -322,6 +323,8 @@ bool ServiceServerLink::call(const SerializedMessage& req, SerializedMessage& re info->call_finished_ = false; info->caller_thread_id_ = boost::this_thread::get_id(); + //ros::WallDuration(0.1).sleep(); + bool immediate = false; { boost::mutex::scoped_lock lock(call_queue_mutex_); diff --git a/core/roscpp/src/libros/topic_manager.cpp b/core/roscpp/src/libros/topic_manager.cpp index d2936b47..d20709cd 100644 --- a/core/roscpp/src/libros/topic_manager.cpp +++ b/core/roscpp/src/libros/topic_manager.cpp @@ -648,7 +648,7 @@ bool TopicManager::requestTopic(const string &topic, } int max_datagram_size = proto[4]; - int conn_id = connection_manager_->getUDPServerTransport()->generateConnectionId(); + int conn_id = connection_manager_->getNewConnectionID(); TransportUDPPtr transport = connection_manager_->getUDPServerTransport()->createOutgoing(host, port, conn_id, max_datagram_size); connection_manager_->udprosIncomingConnection(transport, h); diff --git a/core/roscpp/src/libros/transport/transport_tcp.cpp b/core/roscpp/src/libros/transport/transport_tcp.cpp index 7bea5814..4c4a223f 100644 --- a/core/roscpp/src/libros/transport/transport_tcp.cpp +++ b/core/roscpp/src/libros/transport/transport_tcp.cpp @@ -132,7 +132,7 @@ bool TransportTCP::initializeSocket() if (!(flags_ & SYNCHRONOUS)) { - enableRead(); + //enableRead(); } return true; @@ -314,6 +314,11 @@ bool TransportTCP::listen(int port, int backlog, const AcceptCallback& accept_cb return false; } + if (!(flags_ & SYNCHRONOUS)) + { + enableRead(); + } + return true; } @@ -450,6 +455,26 @@ void TransportTCP::enableRead() } } +void TransportTCP::disableRead() +{ + ROS_ASSERT(!(flags_ & SYNCHRONOUS)); + + { + boost::recursive_mutex::scoped_lock lock(close_mutex_); + + if (closed_) + { + return; + } + } + + if (expecting_read_) + { + poll_set_->delEvents(sock_, POLLIN); + expecting_read_ = false; + } +} + void TransportTCP::enableWrite() { ROS_ASSERT(!(flags_ & SYNCHRONOUS)); diff --git a/core/roscpp/src/libros/transport/transport_udp.cpp b/core/roscpp/src/libros/transport/transport_udp.cpp index 7ac55e7b..02842e50 100644 --- a/core/roscpp/src/libros/transport/transport_udp.cpp +++ b/core/roscpp/src/libros/transport/transport_udp.cpp @@ -64,6 +64,7 @@ TransportUDP::TransportUDP(PollSet* poll_set, int flags, int max_datagram_size) , total_blocks_(0) , last_block_(0) , max_datagram_size_(max_datagram_size) +, data_filled_(0) , reorder_buffer_(0) , reorder_bytes_(0) { @@ -71,12 +72,15 @@ TransportUDP::TransportUDP(PollSet* poll_set, int flags, int max_datagram_size) if (max_datagram_size_ == 0) max_datagram_size_ = 1500; reorder_buffer_ = new uint8_t[max_datagram_size_]; + data_buffer_ = new uint8_t[max_datagram_size_]; + data_start_ = data_buffer_; } TransportUDP::~TransportUDP() { ROS_ASSERT_MSG(sock_ == -1, "TransportUDP socket [%d] was never closed", sock_); - delete[] reorder_buffer_; + delete [] reorder_buffer_; + delete [] data_buffer_; } bool TransportUDP::setSocket(int sock) @@ -205,7 +209,6 @@ bool TransportUDP::connect(const std::string& host, int port, int connection_id) bool TransportUDP::createIncoming(int port, bool is_server) { is_server_ = is_server; - gen_.seed(getpid()); sock_ = socket(AF_INET, SOCK_DGRAM, 0); @@ -326,52 +329,80 @@ int32_t TransportUDP::read(uint8_t* buffer, uint32_t size) while (bytes_read < size) { TransportUDPHeader header; - struct iovec iov[2]; - iov[0].iov_base = &header; - iov[0].iov_len = sizeof(header); - iov[1].iov_base = buffer + bytes_read; - iov[1].iov_len = (size - bytes_read) > max_datagram_size_ ? max_datagram_size_ : (size - bytes_read); - // Don't read a partial datagram when buffer gets full - if (iov[1].iov_len < max_datagram_size_ && bytes_read != 0) - break; - - // Read a datagram with header ssize_t num_bytes = 0; + bool from_previous = false; if (reorder_bytes_) { num_bytes = reorder_bytes_ + sizeof(header); header = reorder_header_; - memcpy(iov[1].iov_base, reorder_buffer_, reorder_bytes_); + memcpy(buffer + bytes_read, reorder_buffer_, reorder_bytes_); reorder_bytes_ = 0; } else { - num_bytes = readv(sock_, iov, 2); - } - if (num_bytes < 0) - { - if (errno == EAGAIN || errno == EWOULDBLOCK) + if (data_filled_ == 0) { - num_bytes = 0; - break; + struct iovec iov[2]; + iov[0].iov_base = &header; + iov[0].iov_len = sizeof(header); + iov[1].iov_base = data_buffer_; + iov[1].iov_len = max_datagram_size_ - sizeof(header); + + // Read a datagram with header + num_bytes = readv(sock_, iov, 2); + + if (num_bytes < 0) + { + if (errno == EAGAIN || errno == EWOULDBLOCK) + { + num_bytes = 0; + break; + } + else + { + ROSCPP_LOG_DEBUG("readv() failed with error [%s]", strerror(errno)); + close(); + break; + } + } + else if (num_bytes == 0) + { + ROSCPP_LOG_DEBUG("Socket [%d] received 0/%d bytes, closing", sock_, size); + close(); + return -1; + } + else if (num_bytes < (ssize_t)sizeof(header)) + { + ROS_ERROR("Socket [%d] received short header (%d bytes): %s", sock_, int(num_bytes), strerror(errno)); + close(); + return -1; + } + + num_bytes -= sizeof(header); + data_filled_ = num_bytes; + data_start_ = data_buffer_; } else { - ROSCPP_LOG_DEBUG("readv() failed with error [%s]", strerror(errno)); - close(); - break; + from_previous = true; } + + num_bytes = std::min(size, data_filled_); + // Copy from the data buffer, whether it has data left in it from a previous datagram or + // was just filled by readv() + memcpy(buffer, data_start_, num_bytes); + data_filled_ = std::max((int64_t)0, (int64_t)data_filled_ - (int64_t)size); + data_start_ += num_bytes; } - else if (num_bytes == 0) + + + if (from_previous) { - ROSCPP_LOG_DEBUG("Socket [%d] received 0/%d bytes, closing", sock_, size); - close(); - return -1; + bytes_read += num_bytes; } - else if (num_bytes >= ssize_t(sizeof(header))) + else { - num_bytes -= sizeof(header); // Process header switch (header.op_) { @@ -409,20 +440,17 @@ int32_t TransportUDP::read(uint8_t* buffer, uint32_t size) ROSCPP_LOG_DEBUG("Unexpected UDP header OP [%d]", header.op_); return -1; } + bytes_read += num_bytes; + if (last_block_ == (total_blocks_ - 1)) { current_message_id_ = 0; break; } } - else - { - ROS_ERROR("Socket [%d] received short header (%d bytes), closing", sock_, int(num_bytes)); - close(); - return -1; - } } + return bytes_read; } @@ -516,6 +544,26 @@ void TransportUDP::enableRead() } } +void TransportUDP::disableRead() +{ + ROS_ASSERT(!(flags_ & SYNCHRONOUS)); + + { + boost::mutex::scoped_lock lock(close_mutex_); + + if (closed_) + { + return; + } + } + + if (expecting_read_) + { + poll_set_->delEvents(sock_, POLLIN); + expecting_read_ = false; + } +} + void TransportUDP::enableWrite() { { diff --git a/core/roscpp/src/libros/transport_subscriber_link.cpp b/core/roscpp/src/libros/transport_subscriber_link.cpp index a92c5a96..fc5e85c8 100644 --- a/core/roscpp/src/libros/transport_subscriber_link.cpp +++ b/core/roscpp/src/libros/transport_subscriber_link.cpp @@ -147,7 +147,7 @@ void TransportSubscriberLink::onMessageWritten(const ConnectionPtr& conn) void TransportSubscriberLink::startMessageWrite(bool immediate_write) { boost::shared_array dummy; - SerializedMessage m(dummy, (unsigned int)0); + SerializedMessage m(dummy, (uint32_t)0); { boost::mutex::scoped_lock lock(outbox_mutex_);