Fix again #761, add more flexibility to the info that goes into a stream token

This commit is contained in:
nsubiron 2018-09-07 17:28:33 +02:00
parent 1f1f24dad6
commit 2beedaf4f1
13 changed files with 224 additions and 44 deletions

View File

@ -19,7 +19,8 @@ namespace carla {
namespace client {
Client::Client(const std::string &host, uint16_t port, size_t worker_threads)
: _client(host, port) {
: _client(host, port),
_streaming_client(host) {
_streaming_client.AsyncRun(
worker_threads > 0u ? worker_threads : std::thread::hardware_concurrency());
}

View File

@ -23,7 +23,14 @@ namespace streaming {
class Client {
public:
Client() : _io_service(), _work_to_do(_io_service) {}
explicit Client()
: _io_service(),
_work_to_do(_io_service) {}
explicit Client(const std::string &fallback_address)
: _io_service(),
_work_to_do(_io_service),
_client(fallback_address) {}
~Client() {
Stop();

View File

@ -0,0 +1,111 @@
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#pragma once
#include <boost/asio/ip/address.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ip/udp.hpp>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <experimental/optional>
namespace carla {
namespace streaming {
namespace detail {
// When in doubt, V4 addresses are returned.
struct FullyDefinedEndPoint {};
struct PartiallyDefinedEndPoint {};
template <typename Protocol, typename EndPointType>
class EndPoint;
template <typename Protocol>
class EndPoint<Protocol, FullyDefinedEndPoint> {
public:
explicit EndPoint(boost::asio::ip::basic_endpoint<Protocol> ep)
: _endpoint(std::move(ep)) {}
auto address() const {
return _endpoint.address();
}
uint16_t port() const {
return _endpoint.port();
}
operator boost::asio::ip::basic_endpoint<Protocol>() const {
return _endpoint;
}
private:
boost::asio::ip::basic_endpoint<Protocol> _endpoint;
};
template <typename Protocol>
class EndPoint<Protocol, PartiallyDefinedEndPoint> {
public:
explicit EndPoint(uint16_t port) : _port(port) {}
uint16_t port() const {
return _port;
}
operator boost::asio::ip::basic_endpoint<Protocol>() const {
return {Protocol::v4(), _port};
}
private:
uint16_t _port;
};
} // namespace detail
static inline auto make_localhost_address() {
return boost::asio::ip::make_address("127.0.0.1");
}
static inline auto make_address(const char *address) {
return std::strcmp("localhost", address) == 0 ?
make_localhost_address() :
boost::asio::ip::make_address(address);
}
static inline auto make_address(const std::string &address) {
return make_address(address.c_str());
}
template <typename Protocol>
static inline auto make_endpoint(boost::asio::ip::basic_endpoint<Protocol> ep) {
return detail::EndPoint<Protocol, detail::FullyDefinedEndPoint>{std::move(ep)};
}
template <typename Protocol>
static inline auto make_endpoint(const char *address, uint16_t port) {
return make_endpoint<Protocol>({make_address(address), port});
}
template <typename Protocol>
static inline auto make_endpoint(const std::string &address, uint16_t port) {
return make_endpoint<Protocol>(address.c_str(), port);
}
template <typename Protocol>
static inline auto make_endpoint(uint16_t port) {
return detail::EndPoint<Protocol, detail::PartiallyDefinedEndPoint>{port};
}
} // namespace streaming
} // namespace carla

View File

@ -19,20 +19,29 @@ namespace streaming {
/// be used by a client to subscribe to the stream.
class Server {
using underlying_server = low_level::Server<detail::tcp::Server>;
using protocol_type = low_level::Server<detail::tcp::Server>::protocol_type;
public:
explicit Server(uint16_t port)
: _server(_io_service, port) {}
: _server(_io_service, make_endpoint<protocol_type>(port)) {}
explicit Server(const std::string &address, uint16_t port)
: _server(_io_service, address, port) {}
: _server(_io_service, make_endpoint<protocol_type>(address, port)) {}
explicit Server(
const std::string &address, uint16_t port,
const std::string &external_address, uint16_t external_port)
: _server(
_io_service,
make_endpoint<protocol_type>(address, port),
make_endpoint<protocol_type>(external_address, external_port)) {}
~Server() {
Stop();
}
void set_timeout(time_duration timeout) {
_server.set_timeout(timeout);
void SetTimeout(time_duration timeout) {
_server.SetTimeout(timeout);
}
Stream MakeStream() {

View File

@ -6,6 +6,7 @@
#pragma once
#include "carla/streaming/EndPoint.h"
#include "carla/streaming/Stream.h"
#include "carla/streaming/detail/Session.h"
#include "carla/streaming/detail/StreamState.h"
@ -23,8 +24,8 @@ namespace detail {
class Dispatcher {
public:
template <typename P>
explicit Dispatcher(const boost::asio::ip::basic_endpoint<P> &ep)
template <typename Protocol, typename EndPointType>
explicit Dispatcher(const EndPoint<Protocol, EndPointType> &ep)
: _cached_token(0u, ep) {}
Stream MakeStream();

View File

@ -7,6 +7,7 @@
#pragma once
#include "carla/Debug.h"
#include "carla/streaming/EndPoint.h"
#include "carla/streaming/Token.h"
#include "carla/streaming/detail/Types.h"
@ -21,9 +22,9 @@ namespace detail {
#pragma pack(push, 1)
struct token_data {
stream_id_type stream_id;
stream_id_type stream_id = 0u;
uint16_t port;
uint16_t port = 0u;
enum class protocol : uint8_t {
not_set,
@ -62,13 +63,14 @@ namespace detail {
template <typename P>
static constexpr auto get_protocol() {
static_assert(
std::is_same<P, boost::asio::ip::tcp>::value ||
std::is_same<P, boost::asio::ip::udp>::value, "Invalid protocol.");
return std::is_same<P, boost::asio::ip::tcp>::value ?
token_data::protocol::tcp :
token_data::protocol::udp;
}
void set_address(const boost::asio::ip::address &addr);
template <typename P>
boost::asio::ip::basic_endpoint<P> get_endpoint() const {
DEBUG_ASSERT(is_valid());
@ -76,18 +78,27 @@ namespace detail {
return {get_address(), _token.port};
}
public:
template <typename P>
template <typename Protocol>
explicit token_type(
stream_id_type stream_id,
const boost::asio::ip::basic_endpoint<P> &ep) {
const EndPoint<Protocol, FullyDefinedEndPoint> &ep) {
_token.stream_id = stream_id;
_token.port = ep.port();
_token.protocol = get_protocol<P>();
_token.protocol = get_protocol<Protocol>();
set_address(ep.address());
}
template <typename Protocol>
explicit token_type(
stream_id_type stream_id,
EndPoint<Protocol, PartiallyDefinedEndPoint> ep) {
_token.stream_id = stream_id;
_token.port = ep.port();
_token.protocol = get_protocol<Protocol>();
}
public:
token_type() = default;
token_type(const token_type &) = default;
@ -101,6 +112,12 @@ namespace detail {
return _token.stream_id;
}
bool has_address() const {
return _token.address_type != token_data::address::not_set;
}
void set_address(const boost::asio::ip::address &addr);
boost::asio::ip::address get_address() const;
auto get_port() const {
@ -108,7 +125,8 @@ namespace detail {
}
bool is_valid() const {
return ((_token.protocol != token_data::protocol::not_set) &&
return has_address() &&
((_token.protocol != token_data::protocol::not_set) &&
(_token.address_type != token_data::address::not_set));
}
@ -128,6 +146,11 @@ namespace detail {
return _token.protocol == token_data::protocol::tcp;
}
template <typename Protocol>
bool has_same_protocol(const boost::asio::ip::basic_endpoint<Protocol> &) const {
return _token.protocol == get_protocol<Protocol>();
}
boost::asio::ip::udp::endpoint to_udp_endpoint() const {
return get_endpoint<boost::asio::ip::udp>();
}

View File

@ -99,11 +99,13 @@ namespace tcp {
_socket.close();
}
auto handle_connect = [=](
error_code ec,
const boost::asio::ip::tcp::endpoint & DEBUG_ONLY(ep)) {
DEBUG_ONLY(log_debug("streaming client: connected to", ep));
DEBUG_ASSERT(_token.is_valid());
DEBUG_ASSERT(_token.protocol_is_tcp());
const auto ep = _token.to_tcp_endpoint();
auto handle_connect = [=](error_code ec) {
if (!ec) {
log_debug("streaming client: connected to", ep);
// Send the stream id to subscribe to the stream.
const auto &stream_id = _token.get_stream_id();
log_debug("streaming client: sending stream id", stream_id);
@ -127,10 +129,8 @@ namespace tcp {
}
};
log_debug("streaming client: connecting to", _token.get_address(), "at port", _token.get_port());
boost::asio::ip::tcp::resolver resolver(_socket.get_io_service());
auto endpoint_it = resolver.resolve({_token.get_address(), _token.get_port()});
boost::asio::async_connect(_socket, endpoint_it, _strand.wrap(handle_connect));
log_debug("streaming client: connecting to", ep);
_socket.async_connect(ep, _strand.wrap(handle_connect));
});
}

View File

@ -32,6 +32,7 @@ namespace tcp {
public:
using endpoint = boost::asio::ip::tcp::endpoint;
using protocol_type = endpoint::protocol_type;
using callback_function_type = std::function<void (std::shared_ptr<Message>)>;
Client(

View File

@ -30,7 +30,7 @@ namespace tcp {
/// Set session time-out. Applies only to newly created sessions. By default
/// the time-out is set to 10 seconds.
void set_timeout(time_duration timeout) {
void SetTimeout(time_duration timeout) {
_timeout = timeout;
}

View File

@ -28,14 +28,26 @@ namespace low_level {
public:
using underlying_client = detail::HashableClient<T>;
using protocol_type = typename underlying_client::protocol_type;
using token_type = carla::streaming::detail::token_type;
explicit Client(boost::asio::ip::address fallback_address)
: _fallback_address(std::move(fallback_address)) {}
explicit Client(const std::string &fallback_address)
: Client(carla::streaming::make_address(fallback_address)) {}
explicit Client()
: Client(carla::streaming::make_localhost_address()) {}
template <typename Functor>
void Subscribe(
boost::asio::io_service &io_service,
const token_type &token,
token_type token,
Functor &&callback) {
if (!token.has_address()) {
token.set_address(_fallback_address);
}
_clients.emplace(
io_service,
token,
@ -44,6 +56,8 @@ namespace low_level {
private:
boost::asio::ip::address _fallback_address;
std::unordered_set<underlying_client> _clients;
};

View File

@ -24,25 +24,32 @@ namespace low_level {
using underlying_server = T;
using endpoint = typename underlying_server::endpoint;
using protocol_type = typename underlying_server::protocol_type;
explicit Server(boost::asio::io_service &io_service, const endpoint &ep)
: _server(io_service, ep),
_dispatcher(ep) {
_server.Listen([this](auto session){
template <typename InternalEPType, typename ExternalEPType>
explicit Server(
boost::asio::io_service &io_service,
detail::EndPoint<protocol_type, InternalEPType> internal_ep,
detail::EndPoint<protocol_type, ExternalEPType> external_ep)
: _server(io_service, std::move(internal_ep)),
_dispatcher(std::move(external_ep)) {
_server.Listen([this](auto session) {
_dispatcher.RegisterSession(session);
});
}
explicit Server(boost::asio::io_service &io_service, uint16_t port)
: Server(io_service, endpoint(protocol_type::v4(), port)) {}
template <typename InternalEPType>
explicit Server(
boost::asio::io_service &io_service,
detail::EndPoint<protocol_type, InternalEPType> internal_ep)
: Server(io_service, internal_ep, make_endpoint<protocol_type>(internal_ep.port())) {}
explicit Server(boost::asio::io_service &io_service, const std::string &address, uint16_t port)
: Server(io_service, endpoint(boost::asio::ip::address::from_string(address), port)) {}
template <typename... EPArgs>
explicit Server(boost::asio::io_service &io_service, EPArgs &&... args)
: Server(io_service, make_endpoint<protocol_type>(std::forward<EPArgs>(args)...)) {}
void set_timeout(time_duration timeout) {
_server.set_timeout(timeout);
void SetTimeout(time_duration timeout) {
_server.SetTimeout(timeout);
}
Stream MakeStream() {

View File

@ -16,6 +16,7 @@
TEST(streaming_low_level, sending_strings) {
using namespace util::message;
using namespace carla::streaming;
using namespace carla::streaming::detail;
using namespace carla::streaming::low_level;
@ -27,7 +28,7 @@ TEST(streaming_low_level, sending_strings) {
boost::asio::io_service io_service;
Server<tcp::Server> srv(io_service, TESTING_PORT);
srv.set_timeout(1s);
srv.SetTimeout(1s);
auto stream = srv.MakeStream();

View File

@ -7,6 +7,7 @@
#include "test.h"
#include <carla/ThreadGroup.h>
#include <carla/streaming/detail/Dispatcher.h>
#include <carla/streaming/detail/tcp/Client.h>
#include <carla/streaming/detail/tcp/Server.h>
@ -14,18 +15,19 @@
TEST(streaming_detail_tcp, small_message) {
using namespace util::message;
using namespace carla::streaming;
using namespace carla::streaming::detail;
boost::asio::io_service io_service;
tcp::Server::endpoint ep(boost::asio::ip::tcp::v4(), TESTING_PORT);
tcp::Server srv(io_service, ep);
srv.set_timeout(1s);
srv.SetTimeout(1s);
std::atomic_bool done{false};
std::atomic_size_t message_count{0u};
srv.Listen([&](std::shared_ptr<tcp::ServerSession> session) {
ASSERT_EQ(session->get_stream_id(), 42u);
ASSERT_EQ(session->get_stream_id(), 1u);
const std::string msg = "Hola!";
auto message = std::make_shared<Message>(boost::asio::buffer(msg));
while (!done) {
@ -35,7 +37,9 @@ TEST(streaming_detail_tcp, small_message) {
std::cout << "done!\n";
});
tcp::Client c(io_service, token_type{42u, ep}, [&](std::shared_ptr<Message> message) {
Dispatcher dispatcher{make_endpoint<tcp::Client::protocol_type>(ep)};
auto stream = dispatcher.MakeStream();
tcp::Client c(io_service, stream.token(), [&](std::shared_ptr<Message> message) {
++message_count;
ASSERT_NE(message, nullptr);
ASSERT_EQ(message->size(), 5u);
@ -53,4 +57,5 @@ TEST(streaming_detail_tcp, small_message) {
io_service.stop();
done = true;
std::cout << "client received " << message_count << " messages\n";
ASSERT_GT(message_count, 10u);
}