diff --git a/LibCarla/source/carla/streaming/detail/Token.cpp b/LibCarla/source/carla/streaming/detail/Token.cpp index e5b6ec0da..b582854ee 100644 --- a/LibCarla/source/carla/streaming/detail/Token.cpp +++ b/LibCarla/source/carla/streaming/detail/Token.cpp @@ -25,13 +25,6 @@ namespace detail { } } - boost::asio::ip::address token_type::get_address() const { - if (_token.address_type == token_data::address::ip_v4) { - return boost::asio::ip::address_v4(_token.address.v4); - } - return boost::asio::ip::address_v6(_token.address.v6); - } - token_type::token_type(const Token &rhs) { std::memcpy(&_token, &rhs.data[0u], sizeof(_token)); } @@ -42,6 +35,13 @@ namespace detail { return token; } + boost::asio::ip::address token_type::get_address() const { + if (_token.address_type == token_data::address::ip_v4) { + return boost::asio::ip::address_v4(_token.address.v4); + } + return boost::asio::ip::address_v6(_token.address.v6); + } + } // namespace detail } // namespace streaming } // namespace carla diff --git a/LibCarla/source/carla/streaming/detail/Token.h b/LibCarla/source/carla/streaming/detail/Token.h index fe74918d8..e54c63ebe 100644 --- a/LibCarla/source/carla/streaming/detail/Token.h +++ b/LibCarla/source/carla/streaming/detail/Token.h @@ -69,8 +69,6 @@ namespace detail { void set_address(const boost::asio::ip::address &addr); - boost::asio::ip::address get_address() const; - template boost::asio::ip::basic_endpoint

get_endpoint() const { DEBUG_ASSERT(is_valid()); @@ -78,6 +76,8 @@ namespace detail { return {get_address(), _token.port}; } + public: + template explicit token_type( stream_id_type stream_id, @@ -88,8 +88,6 @@ namespace detail { set_address(ep.address()); } - public: - token_type() = default; token_type(const token_type &) = default; @@ -97,10 +95,14 @@ namespace detail { operator Token() const; - auto get_stream_id() const { + // We need to return a reference here so we can use the address of the + // stream id to send it as buffer. + const auto &get_stream_id() const { return _token.stream_id; } + boost::asio::ip::address get_address() const; + auto get_port() const { return _token.port; } diff --git a/LibCarla/source/carla/streaming/detail/tcp/Client.cpp b/LibCarla/source/carla/streaming/detail/tcp/Client.cpp index 454acfecd..b4d7ea786 100644 --- a/LibCarla/source/carla/streaming/detail/tcp/Client.cpp +++ b/LibCarla/source/carla/streaming/detail/tcp/Client.cpp @@ -14,6 +14,8 @@ #include #include +#include + namespace carla { namespace streaming { namespace detail { @@ -58,15 +60,16 @@ namespace tcp { Client::Client( boost::asio::io_service &io_service, - endpoint ep, - stream_id_type stream_id, + const token_type &token, callback_function_type callback) - : _endpoint(std::move(ep)), - _stream_id(stream_id), + : _token(token), _callback(std::move(callback)), _socket(io_service), _strand(io_service), _connection_timer(io_service) { + if (!_token.protocol_is_tcp()) { + throw std::invalid_argument("invalid token, only TCP tokens supported"); + } Connect(); } @@ -96,32 +99,38 @@ namespace tcp { _socket.close(); } - auto handle_connect = [=](error_code ec) { + auto handle_connect = [=]( + error_code ec, + const boost::asio::ip::tcp::endpoint & DEBUG_ONLY(ep)) { + DEBUG_ONLY(log_debug("streaming client: connected to", ep)); if (!ec) { // Send the stream id to subscribe to the stream. - log_debug("streaming client: sending stream id", _stream_id); + const auto &stream_id = _token.get_stream_id(); + log_debug("streaming client: sending stream id", stream_id); boost::asio::async_write( - _socket, - boost::asio::buffer(&_stream_id, sizeof(_stream_id)), - _strand.wrap([=](error_code ec, size_t DEBUG_ONLY(bytes)) { + _socket, + boost::asio::buffer(&stream_id, sizeof(stream_id)), + _strand.wrap([=](error_code ec, size_t DEBUG_ONLY(bytes)) { if (!ec) { - DEBUG_ASSERT_EQ(bytes, sizeof(_stream_id)); + DEBUG_ASSERT_EQ(bytes, sizeof(stream_id)); // If succeeded start reading data. ReadData(); } else { // Else try again. - log_debug("streaming client: failed to send stream id:", ec.message()); + log_warning("streaming client: failed to send stream id:", ec.message()); Connect(); } })); } else { - log_debug("streaming client: connection failed:", ec.message()); + log_warning("streaming client: connection failed:", ec.message()); Reconnect(); } }; - log_debug("streaming client: connecting to", _endpoint); - _socket.async_connect(_endpoint, _strand.wrap(handle_connect)); + log_debug("streaming client: connecting to", _token.get_address(), "at port", _token.get_port()); + boost::asio::ip::tcp::resolver resolver(_socket.get_io_service()); + auto endpoint_it = resolver.resolve({_token.get_address(), _token.get_port()}); + boost::asio::async_connect(_socket, endpoint_it, _strand.wrap(handle_connect)); }); } @@ -149,15 +158,14 @@ namespace tcp { if (!ec) { DEBUG_ASSERT_EQ(bytes, encoder->size()); DEBUG_ASSERT_NE(bytes, 0u); - // Move the buffer to the callback function and start reading - // the next + // Move the buffer to the callback function and start reading the next // piece of data. log_debug("streaming client: success reading data, calling the callback"); _socket.get_io_service().post([this, encoder]() { _callback(encoder->pop()); }); ReadData(); } else { // As usual, if anything fails start over from the very top. - log_debug("streaming client: failed to read data:", ec.message()); + log_warning("streaming client: failed to read data:", ec.message()); Connect(); } }; @@ -166,15 +174,14 @@ namespace tcp { DEBUG_ONLY(log_debug("streaming client: Client::ReadData.handle_read_header", bytes, "bytes")); if (!ec && (encoder->size() > 0u)) { DEBUG_ASSERT_EQ(bytes, sizeof(message_size_type)); - // Now that we know the size of the coming buffer, we can - // allocate - // our buffer and start putting data into it. + // Now that we know the size of the coming buffer, we can allocate our + // buffer and start putting data into it. boost::asio::async_read( _socket, encoder->body(), _strand.wrap(handle_read_data)); } else { - log_debug("streaming client: failed to read header:", ec.message()); + log_warning("streaming client: failed to read header:", ec.message()); DEBUG_ONLY(log_debug("size = ", encoder->size())); DEBUG_ONLY(log_debug("bytes = ", bytes)); Connect(); diff --git a/LibCarla/source/carla/streaming/detail/tcp/Client.h b/LibCarla/source/carla/streaming/detail/tcp/Client.h index 625e69725..3aa2cf6ff 100644 --- a/LibCarla/source/carla/streaming/detail/tcp/Client.h +++ b/LibCarla/source/carla/streaming/detail/tcp/Client.h @@ -8,6 +8,7 @@ #include "carla/NonCopyable.h" #include "carla/streaming/Message.h" +#include "carla/streaming/detail/Token.h" #include "carla/streaming/detail/Types.h" #include @@ -35,14 +36,13 @@ namespace tcp { Client( boost::asio::io_service &io_service, - endpoint ep, - stream_id_type stream_id, + const token_type &token, callback_function_type callback); ~Client(); stream_id_type GetStreamId() const { - return _stream_id; + return _token.get_stream_id(); } void Stop(); @@ -55,9 +55,7 @@ namespace tcp { void ReadData(); - const endpoint _endpoint; - - const stream_id_type _stream_id; + const token_type _token; callback_function_type _callback; diff --git a/LibCarla/source/carla/streaming/low_level/Client.h b/LibCarla/source/carla/streaming/low_level/Client.h index deb9b4da1..f55e7aed7 100644 --- a/LibCarla/source/carla/streaming/low_level/Client.h +++ b/LibCarla/source/carla/streaming/low_level/Client.h @@ -36,13 +36,9 @@ namespace low_level { boost::asio::io_service &io_service, const token_type &token, Functor &&callback) { - if (!token.protocol_is_tcp()) { /// @todo - throw std::invalid_argument("invalid token, only TCP tokens supported"); - } _clients.emplace( io_service, - token.to_tcp_endpoint(), - token.get_stream_id(), + token, std::forward(callback)); } diff --git a/LibCarla/source/test/test_streaming_low_level_tcp.cpp b/LibCarla/source/test/test_streaming_low_level_tcp.cpp index 31d8fd153..c6e8bbeaa 100644 --- a/LibCarla/source/test/test_streaming_low_level_tcp.cpp +++ b/LibCarla/source/test/test_streaming_low_level_tcp.cpp @@ -35,7 +35,7 @@ TEST(streaming_detail_tcp, small_message) { std::cout << "done!\n"; }); - tcp::Client c(io_service, ep, 42u, [&](std::shared_ptr message) { + tcp::Client c(io_service, token_type{42u, ep}, [&](std::shared_ptr message) { ++message_count; ASSERT_NE(message, nullptr); ASSERT_EQ(message->size(), 5u);