Fix #761 streaming client fails to connect in Windows

This commit is contained in:
nsubiron 2018-09-06 14:48:23 +02:00
parent a1684ad0ae
commit 1f1f24dad6
6 changed files with 48 additions and 45 deletions

View File

@ -25,13 +25,6 @@ namespace detail {
}
}
boost::asio::ip::address token_type::get_address() const {
if (_token.address_type == token_data::address::ip_v4) {
return boost::asio::ip::address_v4(_token.address.v4);
}
return boost::asio::ip::address_v6(_token.address.v6);
}
token_type::token_type(const Token &rhs) {
std::memcpy(&_token, &rhs.data[0u], sizeof(_token));
}
@ -42,6 +35,13 @@ namespace detail {
return token;
}
boost::asio::ip::address token_type::get_address() const {
if (_token.address_type == token_data::address::ip_v4) {
return boost::asio::ip::address_v4(_token.address.v4);
}
return boost::asio::ip::address_v6(_token.address.v6);
}
} // namespace detail
} // namespace streaming
} // namespace carla

View File

@ -69,8 +69,6 @@ namespace detail {
void set_address(const boost::asio::ip::address &addr);
boost::asio::ip::address get_address() const;
template <typename P>
boost::asio::ip::basic_endpoint<P> get_endpoint() const {
DEBUG_ASSERT(is_valid());
@ -78,6 +76,8 @@ namespace detail {
return {get_address(), _token.port};
}
public:
template <typename P>
explicit token_type(
stream_id_type stream_id,
@ -88,8 +88,6 @@ namespace detail {
set_address(ep.address());
}
public:
token_type() = default;
token_type(const token_type &) = default;
@ -97,10 +95,14 @@ namespace detail {
operator Token() const;
auto get_stream_id() const {
// We need to return a reference here so we can use the address of the
// stream id to send it as buffer.
const auto &get_stream_id() const {
return _token.stream_id;
}
boost::asio::ip::address get_address() const;
auto get_port() const {
return _token.port;
}

View File

@ -14,6 +14,8 @@
#include <boost/asio/read.hpp>
#include <boost/asio/write.hpp>
#include <exception>
namespace carla {
namespace streaming {
namespace detail {
@ -58,15 +60,16 @@ namespace tcp {
Client::Client(
boost::asio::io_service &io_service,
endpoint ep,
stream_id_type stream_id,
const token_type &token,
callback_function_type callback)
: _endpoint(std::move(ep)),
_stream_id(stream_id),
: _token(token),
_callback(std::move(callback)),
_socket(io_service),
_strand(io_service),
_connection_timer(io_service) {
if (!_token.protocol_is_tcp()) {
throw std::invalid_argument("invalid token, only TCP tokens supported");
}
Connect();
}
@ -96,32 +99,38 @@ namespace tcp {
_socket.close();
}
auto handle_connect = [=](error_code ec) {
auto handle_connect = [=](
error_code ec,
const boost::asio::ip::tcp::endpoint & DEBUG_ONLY(ep)) {
DEBUG_ONLY(log_debug("streaming client: connected to", ep));
if (!ec) {
// Send the stream id to subscribe to the stream.
log_debug("streaming client: sending stream id", _stream_id);
const auto &stream_id = _token.get_stream_id();
log_debug("streaming client: sending stream id", stream_id);
boost::asio::async_write(
_socket,
boost::asio::buffer(&_stream_id, sizeof(_stream_id)),
_strand.wrap([=](error_code ec, size_t DEBUG_ONLY(bytes)) {
_socket,
boost::asio::buffer(&stream_id, sizeof(stream_id)),
_strand.wrap([=](error_code ec, size_t DEBUG_ONLY(bytes)) {
if (!ec) {
DEBUG_ASSERT_EQ(bytes, sizeof(_stream_id));
DEBUG_ASSERT_EQ(bytes, sizeof(stream_id));
// If succeeded start reading data.
ReadData();
} else {
// Else try again.
log_debug("streaming client: failed to send stream id:", ec.message());
log_warning("streaming client: failed to send stream id:", ec.message());
Connect();
}
}));
} else {
log_debug("streaming client: connection failed:", ec.message());
log_warning("streaming client: connection failed:", ec.message());
Reconnect();
}
};
log_debug("streaming client: connecting to", _endpoint);
_socket.async_connect(_endpoint, _strand.wrap(handle_connect));
log_debug("streaming client: connecting to", _token.get_address(), "at port", _token.get_port());
boost::asio::ip::tcp::resolver resolver(_socket.get_io_service());
auto endpoint_it = resolver.resolve({_token.get_address(), _token.get_port()});
boost::asio::async_connect(_socket, endpoint_it, _strand.wrap(handle_connect));
});
}
@ -149,15 +158,14 @@ namespace tcp {
if (!ec) {
DEBUG_ASSERT_EQ(bytes, encoder->size());
DEBUG_ASSERT_NE(bytes, 0u);
// Move the buffer to the callback function and start reading
// the next
// Move the buffer to the callback function and start reading the next
// piece of data.
log_debug("streaming client: success reading data, calling the callback");
_socket.get_io_service().post([this, encoder]() { _callback(encoder->pop()); });
ReadData();
} else {
// As usual, if anything fails start over from the very top.
log_debug("streaming client: failed to read data:", ec.message());
log_warning("streaming client: failed to read data:", ec.message());
Connect();
}
};
@ -166,15 +174,14 @@ namespace tcp {
DEBUG_ONLY(log_debug("streaming client: Client::ReadData.handle_read_header", bytes, "bytes"));
if (!ec && (encoder->size() > 0u)) {
DEBUG_ASSERT_EQ(bytes, sizeof(message_size_type));
// Now that we know the size of the coming buffer, we can
// allocate
// our buffer and start putting data into it.
// Now that we know the size of the coming buffer, we can allocate our
// buffer and start putting data into it.
boost::asio::async_read(
_socket,
encoder->body(),
_strand.wrap(handle_read_data));
} else {
log_debug("streaming client: failed to read header:", ec.message());
log_warning("streaming client: failed to read header:", ec.message());
DEBUG_ONLY(log_debug("size = ", encoder->size()));
DEBUG_ONLY(log_debug("bytes = ", bytes));
Connect();

View File

@ -8,6 +8,7 @@
#include "carla/NonCopyable.h"
#include "carla/streaming/Message.h"
#include "carla/streaming/detail/Token.h"
#include "carla/streaming/detail/Types.h"
#include <boost/asio/deadline_timer.hpp>
@ -35,14 +36,13 @@ namespace tcp {
Client(
boost::asio::io_service &io_service,
endpoint ep,
stream_id_type stream_id,
const token_type &token,
callback_function_type callback);
~Client();
stream_id_type GetStreamId() const {
return _stream_id;
return _token.get_stream_id();
}
void Stop();
@ -55,9 +55,7 @@ namespace tcp {
void ReadData();
const endpoint _endpoint;
const stream_id_type _stream_id;
const token_type _token;
callback_function_type _callback;

View File

@ -36,13 +36,9 @@ namespace low_level {
boost::asio::io_service &io_service,
const token_type &token,
Functor &&callback) {
if (!token.protocol_is_tcp()) { /// @todo
throw std::invalid_argument("invalid token, only TCP tokens supported");
}
_clients.emplace(
io_service,
token.to_tcp_endpoint(),
token.get_stream_id(),
token,
std::forward<Functor>(callback));
}

View File

@ -35,7 +35,7 @@ TEST(streaming_detail_tcp, small_message) {
std::cout << "done!\n";
});
tcp::Client c(io_service, ep, 42u, [&](std::shared_ptr<Message> message) {
tcp::Client c(io_service, token_type{42u, ep}, [&](std::shared_ptr<Message> message) {
++message_count;
ASSERT_NE(message, nullptr);
ASSERT_EQ(message->size(), 5u);