From 2beedaf4f1965c3036b36c7108eb9b849666e259 Mon Sep 17 00:00:00 2001 From: nsubiron Date: Fri, 7 Sep 2018 17:28:33 +0200 Subject: [PATCH] Fix again #761, add more flexibility to the info that goes into a stream token --- LibCarla/source/carla/client/Client.cpp | 3 +- LibCarla/source/carla/streaming/Client.h | 9 +- LibCarla/source/carla/streaming/EndPoint.h | 111 ++++++++++++++++++ LibCarla/source/carla/streaming/Server.h | 17 ++- .../carla/streaming/detail/Dispatcher.h | 5 +- .../source/carla/streaming/detail/Token.h | 43 +++++-- .../carla/streaming/detail/tcp/Client.cpp | 16 +-- .../carla/streaming/detail/tcp/Client.h | 1 + .../carla/streaming/detail/tcp/Server.h | 2 +- .../source/carla/streaming/low_level/Client.h | 18 ++- .../source/carla/streaming/low_level/Server.h | 29 +++-- .../source/test/test_streaming_low_level.cpp | 3 +- .../test/test_streaming_low_level_tcp.cpp | 11 +- 13 files changed, 224 insertions(+), 44 deletions(-) create mode 100644 LibCarla/source/carla/streaming/EndPoint.h diff --git a/LibCarla/source/carla/client/Client.cpp b/LibCarla/source/carla/client/Client.cpp index cb7612572..789ccc595 100644 --- a/LibCarla/source/carla/client/Client.cpp +++ b/LibCarla/source/carla/client/Client.cpp @@ -19,7 +19,8 @@ namespace carla { namespace client { Client::Client(const std::string &host, uint16_t port, size_t worker_threads) - : _client(host, port) { + : _client(host, port), + _streaming_client(host) { _streaming_client.AsyncRun( worker_threads > 0u ? worker_threads : std::thread::hardware_concurrency()); } diff --git a/LibCarla/source/carla/streaming/Client.h b/LibCarla/source/carla/streaming/Client.h index 16c5cb4f8..4d3a99980 100644 --- a/LibCarla/source/carla/streaming/Client.h +++ b/LibCarla/source/carla/streaming/Client.h @@ -23,7 +23,14 @@ namespace streaming { class Client { public: - Client() : _io_service(), _work_to_do(_io_service) {} + explicit Client() + : _io_service(), + _work_to_do(_io_service) {} + + explicit Client(const std::string &fallback_address) + : _io_service(), + _work_to_do(_io_service), + _client(fallback_address) {} ~Client() { Stop(); diff --git a/LibCarla/source/carla/streaming/EndPoint.h b/LibCarla/source/carla/streaming/EndPoint.h new file mode 100644 index 000000000..14b6c3c0a --- /dev/null +++ b/LibCarla/source/carla/streaming/EndPoint.h @@ -0,0 +1,111 @@ +// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma +// de Barcelona (UAB). +// +// This work is licensed under the terms of the MIT license. +// For a copy, see . + +#pragma once + +#include +#include +#include + +#include +#include +#include + +#include + +namespace carla { +namespace streaming { +namespace detail { + + // When in doubt, V4 addresses are returned. + + struct FullyDefinedEndPoint {}; + struct PartiallyDefinedEndPoint {}; + + template + class EndPoint; + + template + class EndPoint { + public: + + explicit EndPoint(boost::asio::ip::basic_endpoint ep) + : _endpoint(std::move(ep)) {} + + auto address() const { + return _endpoint.address(); + } + + uint16_t port() const { + return _endpoint.port(); + } + + operator boost::asio::ip::basic_endpoint() const { + return _endpoint; + } + + private: + + boost::asio::ip::basic_endpoint _endpoint; + }; + + template + class EndPoint { + public: + + explicit EndPoint(uint16_t port) : _port(port) {} + + uint16_t port() const { + return _port; + } + + operator boost::asio::ip::basic_endpoint() const { + return {Protocol::v4(), _port}; + } + + private: + + uint16_t _port; + }; + +} // namespace detail + + static inline auto make_localhost_address() { + return boost::asio::ip::make_address("127.0.0.1"); + } + + static inline auto make_address(const char *address) { + return std::strcmp("localhost", address) == 0 ? + make_localhost_address() : + boost::asio::ip::make_address(address); + } + + static inline auto make_address(const std::string &address) { + return make_address(address.c_str()); + } + + template + static inline auto make_endpoint(boost::asio::ip::basic_endpoint ep) { + return detail::EndPoint{std::move(ep)}; + } + + template + static inline auto make_endpoint(const char *address, uint16_t port) { + return make_endpoint({make_address(address), port}); + } + + template + static inline auto make_endpoint(const std::string &address, uint16_t port) { + return make_endpoint(address.c_str(), port); + } + + template + static inline auto make_endpoint(uint16_t port) { + return detail::EndPoint{port}; + } + +} // namespace streaming +} // namespace carla diff --git a/LibCarla/source/carla/streaming/Server.h b/LibCarla/source/carla/streaming/Server.h index 91ea98ea7..740838bb1 100644 --- a/LibCarla/source/carla/streaming/Server.h +++ b/LibCarla/source/carla/streaming/Server.h @@ -19,20 +19,29 @@ namespace streaming { /// be used by a client to subscribe to the stream. class Server { using underlying_server = low_level::Server; + using protocol_type = low_level::Server::protocol_type; public: explicit Server(uint16_t port) - : _server(_io_service, port) {} + : _server(_io_service, make_endpoint(port)) {} explicit Server(const std::string &address, uint16_t port) - : _server(_io_service, address, port) {} + : _server(_io_service, make_endpoint(address, port)) {} + + explicit Server( + const std::string &address, uint16_t port, + const std::string &external_address, uint16_t external_port) + : _server( + _io_service, + make_endpoint(address, port), + make_endpoint(external_address, external_port)) {} ~Server() { Stop(); } - void set_timeout(time_duration timeout) { - _server.set_timeout(timeout); + void SetTimeout(time_duration timeout) { + _server.SetTimeout(timeout); } Stream MakeStream() { diff --git a/LibCarla/source/carla/streaming/detail/Dispatcher.h b/LibCarla/source/carla/streaming/detail/Dispatcher.h index 0f485603f..fae308bac 100644 --- a/LibCarla/source/carla/streaming/detail/Dispatcher.h +++ b/LibCarla/source/carla/streaming/detail/Dispatcher.h @@ -6,6 +6,7 @@ #pragma once +#include "carla/streaming/EndPoint.h" #include "carla/streaming/Stream.h" #include "carla/streaming/detail/Session.h" #include "carla/streaming/detail/StreamState.h" @@ -23,8 +24,8 @@ namespace detail { class Dispatcher { public: - template - explicit Dispatcher(const boost::asio::ip::basic_endpoint

&ep) + template + explicit Dispatcher(const EndPoint &ep) : _cached_token(0u, ep) {} Stream MakeStream(); diff --git a/LibCarla/source/carla/streaming/detail/Token.h b/LibCarla/source/carla/streaming/detail/Token.h index e54c63ebe..8cc174e19 100644 --- a/LibCarla/source/carla/streaming/detail/Token.h +++ b/LibCarla/source/carla/streaming/detail/Token.h @@ -7,6 +7,7 @@ #pragma once #include "carla/Debug.h" +#include "carla/streaming/EndPoint.h" #include "carla/streaming/Token.h" #include "carla/streaming/detail/Types.h" @@ -21,9 +22,9 @@ namespace detail { #pragma pack(push, 1) struct token_data { - stream_id_type stream_id; + stream_id_type stream_id = 0u; - uint16_t port; + uint16_t port = 0u; enum class protocol : uint8_t { not_set, @@ -62,13 +63,14 @@ namespace detail { template static constexpr auto get_protocol() { + static_assert( + std::is_same::value || + std::is_same::value, "Invalid protocol."); return std::is_same::value ? token_data::protocol::tcp : token_data::protocol::udp; } - void set_address(const boost::asio::ip::address &addr); - template boost::asio::ip::basic_endpoint

get_endpoint() const { DEBUG_ASSERT(is_valid()); @@ -76,18 +78,27 @@ namespace detail { return {get_address(), _token.port}; } - public: - - template + template explicit token_type( stream_id_type stream_id, - const boost::asio::ip::basic_endpoint

&ep) { + const EndPoint &ep) { _token.stream_id = stream_id; _token.port = ep.port(); - _token.protocol = get_protocol

(); + _token.protocol = get_protocol(); set_address(ep.address()); } + template + explicit token_type( + stream_id_type stream_id, + EndPoint ep) { + _token.stream_id = stream_id; + _token.port = ep.port(); + _token.protocol = get_protocol(); + } + + public: + token_type() = default; token_type(const token_type &) = default; @@ -101,6 +112,12 @@ namespace detail { return _token.stream_id; } + bool has_address() const { + return _token.address_type != token_data::address::not_set; + } + + void set_address(const boost::asio::ip::address &addr); + boost::asio::ip::address get_address() const; auto get_port() const { @@ -108,7 +125,8 @@ namespace detail { } bool is_valid() const { - return ((_token.protocol != token_data::protocol::not_set) && + return has_address() && + ((_token.protocol != token_data::protocol::not_set) && (_token.address_type != token_data::address::not_set)); } @@ -128,6 +146,11 @@ namespace detail { return _token.protocol == token_data::protocol::tcp; } + template + bool has_same_protocol(const boost::asio::ip::basic_endpoint &) const { + return _token.protocol == get_protocol(); + } + boost::asio::ip::udp::endpoint to_udp_endpoint() const { return get_endpoint(); } diff --git a/LibCarla/source/carla/streaming/detail/tcp/Client.cpp b/LibCarla/source/carla/streaming/detail/tcp/Client.cpp index b4d7ea786..68eaa7bf4 100644 --- a/LibCarla/source/carla/streaming/detail/tcp/Client.cpp +++ b/LibCarla/source/carla/streaming/detail/tcp/Client.cpp @@ -99,11 +99,13 @@ namespace tcp { _socket.close(); } - auto handle_connect = [=]( - error_code ec, - const boost::asio::ip::tcp::endpoint & DEBUG_ONLY(ep)) { - DEBUG_ONLY(log_debug("streaming client: connected to", ep)); + DEBUG_ASSERT(_token.is_valid()); + DEBUG_ASSERT(_token.protocol_is_tcp()); + const auto ep = _token.to_tcp_endpoint(); + + auto handle_connect = [=](error_code ec) { if (!ec) { + log_debug("streaming client: connected to", ep); // Send the stream id to subscribe to the stream. const auto &stream_id = _token.get_stream_id(); log_debug("streaming client: sending stream id", stream_id); @@ -127,10 +129,8 @@ namespace tcp { } }; - log_debug("streaming client: connecting to", _token.get_address(), "at port", _token.get_port()); - boost::asio::ip::tcp::resolver resolver(_socket.get_io_service()); - auto endpoint_it = resolver.resolve({_token.get_address(), _token.get_port()}); - boost::asio::async_connect(_socket, endpoint_it, _strand.wrap(handle_connect)); + log_debug("streaming client: connecting to", ep); + _socket.async_connect(ep, _strand.wrap(handle_connect)); }); } diff --git a/LibCarla/source/carla/streaming/detail/tcp/Client.h b/LibCarla/source/carla/streaming/detail/tcp/Client.h index 3aa2cf6ff..3a2befa85 100644 --- a/LibCarla/source/carla/streaming/detail/tcp/Client.h +++ b/LibCarla/source/carla/streaming/detail/tcp/Client.h @@ -32,6 +32,7 @@ namespace tcp { public: using endpoint = boost::asio::ip::tcp::endpoint; + using protocol_type = endpoint::protocol_type; using callback_function_type = std::function)>; Client( diff --git a/LibCarla/source/carla/streaming/detail/tcp/Server.h b/LibCarla/source/carla/streaming/detail/tcp/Server.h index 693ce80d2..fc331a4bf 100644 --- a/LibCarla/source/carla/streaming/detail/tcp/Server.h +++ b/LibCarla/source/carla/streaming/detail/tcp/Server.h @@ -30,7 +30,7 @@ namespace tcp { /// Set session time-out. Applies only to newly created sessions. By default /// the time-out is set to 10 seconds. - void set_timeout(time_duration timeout) { + void SetTimeout(time_duration timeout) { _timeout = timeout; } diff --git a/LibCarla/source/carla/streaming/low_level/Client.h b/LibCarla/source/carla/streaming/low_level/Client.h index f55e7aed7..0a29d1a87 100644 --- a/LibCarla/source/carla/streaming/low_level/Client.h +++ b/LibCarla/source/carla/streaming/low_level/Client.h @@ -28,14 +28,26 @@ namespace low_level { public: using underlying_client = detail::HashableClient; - + using protocol_type = typename underlying_client::protocol_type; using token_type = carla::streaming::detail::token_type; + explicit Client(boost::asio::ip::address fallback_address) + : _fallback_address(std::move(fallback_address)) {} + + explicit Client(const std::string &fallback_address) + : Client(carla::streaming::make_address(fallback_address)) {} + + explicit Client() + : Client(carla::streaming::make_localhost_address()) {} + template void Subscribe( boost::asio::io_service &io_service, - const token_type &token, + token_type token, Functor &&callback) { + if (!token.has_address()) { + token.set_address(_fallback_address); + } _clients.emplace( io_service, token, @@ -44,6 +56,8 @@ namespace low_level { private: + boost::asio::ip::address _fallback_address; + std::unordered_set _clients; }; diff --git a/LibCarla/source/carla/streaming/low_level/Server.h b/LibCarla/source/carla/streaming/low_level/Server.h index d93916868..4626b96d3 100644 --- a/LibCarla/source/carla/streaming/low_level/Server.h +++ b/LibCarla/source/carla/streaming/low_level/Server.h @@ -24,25 +24,32 @@ namespace low_level { using underlying_server = T; - using endpoint = typename underlying_server::endpoint; using protocol_type = typename underlying_server::protocol_type; - explicit Server(boost::asio::io_service &io_service, const endpoint &ep) - : _server(io_service, ep), - _dispatcher(ep) { - _server.Listen([this](auto session){ + template + explicit Server( + boost::asio::io_service &io_service, + detail::EndPoint internal_ep, + detail::EndPoint external_ep) + : _server(io_service, std::move(internal_ep)), + _dispatcher(std::move(external_ep)) { + _server.Listen([this](auto session) { _dispatcher.RegisterSession(session); }); } - explicit Server(boost::asio::io_service &io_service, uint16_t port) - : Server(io_service, endpoint(protocol_type::v4(), port)) {} + template + explicit Server( + boost::asio::io_service &io_service, + detail::EndPoint internal_ep) + : Server(io_service, internal_ep, make_endpoint(internal_ep.port())) {} - explicit Server(boost::asio::io_service &io_service, const std::string &address, uint16_t port) - : Server(io_service, endpoint(boost::asio::ip::address::from_string(address), port)) {} + template + explicit Server(boost::asio::io_service &io_service, EPArgs &&... args) + : Server(io_service, make_endpoint(std::forward(args)...)) {} - void set_timeout(time_duration timeout) { - _server.set_timeout(timeout); + void SetTimeout(time_duration timeout) { + _server.SetTimeout(timeout); } Stream MakeStream() { diff --git a/LibCarla/source/test/test_streaming_low_level.cpp b/LibCarla/source/test/test_streaming_low_level.cpp index 36b0d4094..1e727376b 100644 --- a/LibCarla/source/test/test_streaming_low_level.cpp +++ b/LibCarla/source/test/test_streaming_low_level.cpp @@ -16,6 +16,7 @@ TEST(streaming_low_level, sending_strings) { using namespace util::message; + using namespace carla::streaming; using namespace carla::streaming::detail; using namespace carla::streaming::low_level; @@ -27,7 +28,7 @@ TEST(streaming_low_level, sending_strings) { boost::asio::io_service io_service; Server srv(io_service, TESTING_PORT); - srv.set_timeout(1s); + srv.SetTimeout(1s); auto stream = srv.MakeStream(); diff --git a/LibCarla/source/test/test_streaming_low_level_tcp.cpp b/LibCarla/source/test/test_streaming_low_level_tcp.cpp index c6e8bbeaa..7ef01fd95 100644 --- a/LibCarla/source/test/test_streaming_low_level_tcp.cpp +++ b/LibCarla/source/test/test_streaming_low_level_tcp.cpp @@ -7,6 +7,7 @@ #include "test.h" #include +#include #include #include @@ -14,18 +15,19 @@ TEST(streaming_detail_tcp, small_message) { using namespace util::message; + using namespace carla::streaming; using namespace carla::streaming::detail; boost::asio::io_service io_service; tcp::Server::endpoint ep(boost::asio::ip::tcp::v4(), TESTING_PORT); tcp::Server srv(io_service, ep); - srv.set_timeout(1s); + srv.SetTimeout(1s); std::atomic_bool done{false}; std::atomic_size_t message_count{0u}; srv.Listen([&](std::shared_ptr session) { - ASSERT_EQ(session->get_stream_id(), 42u); + ASSERT_EQ(session->get_stream_id(), 1u); const std::string msg = "Hola!"; auto message = std::make_shared(boost::asio::buffer(msg)); while (!done) { @@ -35,7 +37,9 @@ TEST(streaming_detail_tcp, small_message) { std::cout << "done!\n"; }); - tcp::Client c(io_service, token_type{42u, ep}, [&](std::shared_ptr message) { + Dispatcher dispatcher{make_endpoint(ep)}; + auto stream = dispatcher.MakeStream(); + tcp::Client c(io_service, stream.token(), [&](std::shared_ptr message) { ++message_count; ASSERT_NE(message, nullptr); ASSERT_EQ(message->size(), 5u); @@ -53,4 +57,5 @@ TEST(streaming_detail_tcp, small_message) { io_service.stop(); done = true; std::cout << "client received " << message_count << " messages\n"; + ASSERT_GT(message_count, 10u); }