The main purpose of this checkin is to fix a deadlock in roscpp (ticket 4195 in ros-pkg) when the connection has already read all the data from a service response but the ServiceServerLink has not actually asked for the response data yet.

The fix for this both:
 1) Removed an extra copy happening in the Connection class, where we were reading data into a fixed-size 65k receive buffer before copying data into the read buffer (this was an optimization, but it turns out that due to other decisions it shouldn't actually help).
 2) Exposed a bug in the UDP implementation, where it assumed that the # of writes on one end always matched the # of reads on the other.

The majority of this change in terms of how dangerous it is is due to #2 -- I believe I know exactly what was going wrong, and that my fix is correct, but I may have introduced subtle bugs.  All tests pass on my 4-core Karmic machine.
This commit is contained in:
Josh Faust 2010-06-23 03:27:18 +00:00
parent e429af2ff6
commit e7959474d0
10 changed files with 171 additions and 108 deletions

View File

@ -215,13 +215,6 @@ private:
/// Function that handles the incoming header
HeaderReceivedFunc header_func_;
/**
* If there is data available to read, we always try to read into our fixed read buffer, even if a read request
* has not been made.
*/
uint8_t fixed_read_buffer_[READ_BUFFER_SIZE];
uint32_t fixed_read_filled_;
/// Read buffer that ends up being passed to the read callback
boost::shared_array<uint8_t> read_buffer_;
/// Amount of data currently in the read buffer, in bytes

View File

@ -81,6 +81,15 @@ public:
*/
virtual void disableWrite() = 0;
/**
* \brief Enable reading on this transport. Allows derived classes to, for example, enable read polling for asynchronous sockets
*/
virtual void enableRead() = 0;
/**
* \brief Disable reading on this transport. Allows derived classes to, for example, disable read polling for asynchronous sockets
*/
virtual void disableRead() = 0;
/**
* \brief Close this transport. Once this call has returned, writing on this transport should always return an error.
*/

View File

@ -109,6 +109,8 @@ public:
virtual void enableWrite();
virtual void disableWrite();
virtual void enableRead();
virtual void disableRead();
virtual void close();
@ -133,11 +135,6 @@ private:
*/
bool setSocket(int sock);
/**
* \brief Enables reading on our socket
*/
void enableRead();
void socketUpdate(int events);
int sock_;

View File

@ -39,7 +39,6 @@
#include <ros/transport/transport.h>
#include <boost/thread/mutex.hpp>
#include <boost/random.hpp>
#include <netinet/in.h>
@ -103,17 +102,14 @@ public:
*/
int getServerPort() const {return server_port_;}
/**
* \brief Get a unique connection ID
*/
int generateConnectionId() {return gen_();}
// overrides from Transport
virtual int32_t read(uint8_t* buffer, uint32_t size);
virtual int32_t write(uint8_t* buffer, uint32_t size);
virtual void enableWrite();
virtual void disableWrite();
virtual void enableRead();
virtual void disableRead();
virtual void close();
@ -138,11 +134,6 @@ private:
*/
bool setSocket(int sock);
/**
* \brief Enables reading on our socket
*/
void enableRead();
void socketUpdate(int events);
int sock_;
@ -161,8 +152,6 @@ private:
PollSet* poll_set_;
int flags_;
boost::rand48 gen_;
uint32_t connection_id_;
uint8_t current_message_id_;
uint16_t total_blocks_;
@ -170,6 +159,10 @@ private:
uint32_t max_datagram_size_;
uint8_t* data_buffer_;
uint8_t* data_start_;
uint32_t data_filled_;
uint8_t *reorder_buffer_;
TransportUDPHeader reorder_header_;
uint32_t reorder_bytes_;

View File

@ -47,7 +47,6 @@ namespace ros
Connection::Connection()
: is_server_(false)
, dropped_(false)
, fixed_read_filled_(0)
, read_filled_(0)
, read_size_(0)
, reading_(false)
@ -115,56 +114,41 @@ void Connection::readTransport()
reading_ = true;
if (fixed_read_filled_ < READ_BUFFER_SIZE)
while (!dropped_ && has_read_callback_)
{
int32_t bytes_read = transport_->read(fixed_read_buffer_ + fixed_read_filled_, READ_BUFFER_SIZE - fixed_read_filled_);
ROS_DEBUG_NAMED("superdebug", "Connection read %d bytes", bytes_read);
if (dropped_)
ROS_ASSERT(read_buffer_);
uint32_t to_read = read_size_ - read_filled_;
if (to_read > 0)
{
return;
}
else if (bytes_read < 0)
{
reading_ = false;
int32_t bytes_read = transport_->read(read_buffer_.get() + read_filled_, to_read);
ROS_DEBUG_NAMED("superdebug", "Connection read %d bytes", bytes_read);
if (dropped_)
{
return;
}
else if (bytes_read < 0)
{
// Bad read, throw away results and report error
ReadFinishedFunc callback;
callback = read_callback_;
read_callback_.clear();
read_buffer_.reset();
uint32_t size = read_size_;
read_size_ = 0;
read_filled_ = 0;
has_read_callback_ = 0;
// Bad read, throw away results and report error
ReadFinishedFunc callback;
callback = read_callback_;
read_callback_ = ReadFinishedFunc();
read_buffer_ = boost::shared_array<uint8_t>();
uint32_t size = read_size_;
read_size_ = 0;
read_filled_ = 0;
has_read_callback_ = 0;
fixed_read_filled_ = 0;
if (callback)
{
callback(shared_from_this(), read_buffer_, size, false);
}
}
if (callback)
callback(shared_from_this(), read_buffer_, size, false);
return;
read_filled_ += bytes_read;
}
fixed_read_filled_ += bytes_read;
}
else
{
if (has_read_callback_)
{
ROS_WARN("Connection read buffer filled with no read callback set");
}
}
while (has_read_callback_ && (fixed_read_filled_ > 0 || read_size_ == 0) && !dropped_)
{
ROS_ASSERT((int)read_size_ >= 0);
ROS_ASSERT((int)read_filled_ >= 0);
uint32_t write_amount = std::min(read_size_ - read_filled_, fixed_read_filled_);
ROS_DEBUG_NAMED("superdebug", "Copying %d bytes into read buffer", write_amount);
memcpy(read_buffer_.get() + read_filled_, fixed_read_buffer_, write_amount);
memmove(fixed_read_buffer_, fixed_read_buffer_ + write_amount, fixed_read_filled_ - write_amount);
fixed_read_filled_ -= write_amount;
read_filled_ += write_amount;
ROS_ASSERT(read_filled_ <= read_size_);
if (read_filled_ == read_size_ && !dropped_)
@ -179,8 +163,8 @@ void Connection::readTransport()
callback = read_callback_;
size = read_size_;
buffer = read_buffer_;
read_callback_ = ReadFinishedFunc();
read_buffer_ = boost::shared_array<uint8_t>();
read_callback_.clear();
read_buffer_.reset();
read_size_ = 0;
read_filled_ = 0;
has_read_callback_ = 0;
@ -188,6 +172,15 @@ void Connection::readTransport()
ROS_DEBUG_NAMED("superdebug", "Calling read callback");
callback(shared_from_this(), buffer, size, true);
}
else
{
break;
}
}
if (!has_read_callback_)
{
transport_->disableRead();
}
reading_ = false;
@ -283,6 +276,8 @@ void Connection::read(uint32_t size, const ReadFinishedFunc& callback)
has_read_callback_ = 1;
}
transport_->enableRead();
// read immediately if possible
readTransport();
}

View File

@ -177,6 +177,7 @@ void ServiceServerLink::onConnectionDropped(const ConnectionPtr& conn)
void ServiceServerLink::onRequestWritten(const ConnectionPtr& conn)
{
//ros::WallDuration(0.1).sleep();
connection_->read(5, boost::bind(&ServiceServerLink::onResponseOkAndLength, this, _1, _2, _3, _4));
}
@ -322,6 +323,8 @@ bool ServiceServerLink::call(const SerializedMessage& req, SerializedMessage& re
info->call_finished_ = false;
info->caller_thread_id_ = boost::this_thread::get_id();
//ros::WallDuration(0.1).sleep();
bool immediate = false;
{
boost::mutex::scoped_lock lock(call_queue_mutex_);

View File

@ -648,7 +648,7 @@ bool TopicManager::requestTopic(const string &topic,
}
int max_datagram_size = proto[4];
int conn_id = connection_manager_->getUDPServerTransport()->generateConnectionId();
int conn_id = connection_manager_->getNewConnectionID();
TransportUDPPtr transport = connection_manager_->getUDPServerTransport()->createOutgoing(host, port, conn_id, max_datagram_size);
connection_manager_->udprosIncomingConnection(transport, h);

View File

@ -132,7 +132,7 @@ bool TransportTCP::initializeSocket()
if (!(flags_ & SYNCHRONOUS))
{
enableRead();
//enableRead();
}
return true;
@ -314,6 +314,11 @@ bool TransportTCP::listen(int port, int backlog, const AcceptCallback& accept_cb
return false;
}
if (!(flags_ & SYNCHRONOUS))
{
enableRead();
}
return true;
}
@ -450,6 +455,26 @@ void TransportTCP::enableRead()
}
}
void TransportTCP::disableRead()
{
ROS_ASSERT(!(flags_ & SYNCHRONOUS));
{
boost::recursive_mutex::scoped_lock lock(close_mutex_);
if (closed_)
{
return;
}
}
if (expecting_read_)
{
poll_set_->delEvents(sock_, POLLIN);
expecting_read_ = false;
}
}
void TransportTCP::enableWrite()
{
ROS_ASSERT(!(flags_ & SYNCHRONOUS));

View File

@ -64,6 +64,7 @@ TransportUDP::TransportUDP(PollSet* poll_set, int flags, int max_datagram_size)
, total_blocks_(0)
, last_block_(0)
, max_datagram_size_(max_datagram_size)
, data_filled_(0)
, reorder_buffer_(0)
, reorder_bytes_(0)
{
@ -71,12 +72,15 @@ TransportUDP::TransportUDP(PollSet* poll_set, int flags, int max_datagram_size)
if (max_datagram_size_ == 0)
max_datagram_size_ = 1500;
reorder_buffer_ = new uint8_t[max_datagram_size_];
data_buffer_ = new uint8_t[max_datagram_size_];
data_start_ = data_buffer_;
}
TransportUDP::~TransportUDP()
{
ROS_ASSERT_MSG(sock_ == -1, "TransportUDP socket [%d] was never closed", sock_);
delete[] reorder_buffer_;
delete [] reorder_buffer_;
delete [] data_buffer_;
}
bool TransportUDP::setSocket(int sock)
@ -205,7 +209,6 @@ bool TransportUDP::connect(const std::string& host, int port, int connection_id)
bool TransportUDP::createIncoming(int port, bool is_server)
{
is_server_ = is_server;
gen_.seed(getpid());
sock_ = socket(AF_INET, SOCK_DGRAM, 0);
@ -326,52 +329,80 @@ int32_t TransportUDP::read(uint8_t* buffer, uint32_t size)
while (bytes_read < size)
{
TransportUDPHeader header;
struct iovec iov[2];
iov[0].iov_base = &header;
iov[0].iov_len = sizeof(header);
iov[1].iov_base = buffer + bytes_read;
iov[1].iov_len = (size - bytes_read) > max_datagram_size_ ? max_datagram_size_ : (size - bytes_read);
// Don't read a partial datagram when buffer gets full
if (iov[1].iov_len < max_datagram_size_ && bytes_read != 0)
break;
// Read a datagram with header
ssize_t num_bytes = 0;
bool from_previous = false;
if (reorder_bytes_)
{
num_bytes = reorder_bytes_ + sizeof(header);
header = reorder_header_;
memcpy(iov[1].iov_base, reorder_buffer_, reorder_bytes_);
memcpy(buffer + bytes_read, reorder_buffer_, reorder_bytes_);
reorder_bytes_ = 0;
}
else
{
num_bytes = readv(sock_, iov, 2);
}
if (num_bytes < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
if (data_filled_ == 0)
{
num_bytes = 0;
break;
struct iovec iov[2];
iov[0].iov_base = &header;
iov[0].iov_len = sizeof(header);
iov[1].iov_base = data_buffer_;
iov[1].iov_len = max_datagram_size_ - sizeof(header);
// Read a datagram with header
num_bytes = readv(sock_, iov, 2);
if (num_bytes < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
num_bytes = 0;
break;
}
else
{
ROSCPP_LOG_DEBUG("readv() failed with error [%s]", strerror(errno));
close();
break;
}
}
else if (num_bytes == 0)
{
ROSCPP_LOG_DEBUG("Socket [%d] received 0/%d bytes, closing", sock_, size);
close();
return -1;
}
else if (num_bytes < (ssize_t)sizeof(header))
{
ROS_ERROR("Socket [%d] received short header (%d bytes): %s", sock_, int(num_bytes), strerror(errno));
close();
return -1;
}
num_bytes -= sizeof(header);
data_filled_ = num_bytes;
data_start_ = data_buffer_;
}
else
{
ROSCPP_LOG_DEBUG("readv() failed with error [%s]", strerror(errno));
close();
break;
from_previous = true;
}
num_bytes = std::min(size, data_filled_);
// Copy from the data buffer, whether it has data left in it from a previous datagram or
// was just filled by readv()
memcpy(buffer, data_start_, num_bytes);
data_filled_ = std::max((int64_t)0, (int64_t)data_filled_ - (int64_t)size);
data_start_ += num_bytes;
}
else if (num_bytes == 0)
if (from_previous)
{
ROSCPP_LOG_DEBUG("Socket [%d] received 0/%d bytes, closing", sock_, size);
close();
return -1;
bytes_read += num_bytes;
}
else if (num_bytes >= ssize_t(sizeof(header)))
else
{
num_bytes -= sizeof(header);
// Process header
switch (header.op_)
{
@ -409,20 +440,17 @@ int32_t TransportUDP::read(uint8_t* buffer, uint32_t size)
ROSCPP_LOG_DEBUG("Unexpected UDP header OP [%d]", header.op_);
return -1;
}
bytes_read += num_bytes;
if (last_block_ == (total_blocks_ - 1))
{
current_message_id_ = 0;
break;
}
}
else
{
ROS_ERROR("Socket [%d] received short header (%d bytes), closing", sock_, int(num_bytes));
close();
return -1;
}
}
return bytes_read;
}
@ -516,6 +544,26 @@ void TransportUDP::enableRead()
}
}
void TransportUDP::disableRead()
{
ROS_ASSERT(!(flags_ & SYNCHRONOUS));
{
boost::mutex::scoped_lock lock(close_mutex_);
if (closed_)
{
return;
}
}
if (expecting_read_)
{
poll_set_->delEvents(sock_, POLLIN);
expecting_read_ = false;
}
}
void TransportUDP::enableWrite()
{
{

View File

@ -147,7 +147,7 @@ void TransportSubscriberLink::onMessageWritten(const ConnectionPtr& conn)
void TransportSubscriberLink::startMessageWrite(bool immediate_write)
{
boost::shared_array<uint8_t> dummy;
SerializedMessage m(dummy, (unsigned int)0);
SerializedMessage m(dummy, (uint32_t)0);
{
boost::mutex::scoped_lock lock(outbox_mutex_);