Refactor streaming library
This commit is contained in:
parent
83e16d065f
commit
b7a13d08d6
|
@ -23,3 +23,29 @@ file(GLOB libcarla_carla_streaming_headers "${libcarla_source_path}/carla/stream
|
|||
install(FILES ${libcarla_carla_streaming_headers} DESTINATION include/carla/streaming)
|
||||
|
||||
install(DIRECTORY "${BOOST_INCLUDE_PATH}/boost" DESTINATION include)
|
||||
|
||||
# carla_server library.
|
||||
|
||||
file(GLOB_RECURSE libcarla_server_sources
|
||||
"${libcarla_source_path}/carla/rpc/*.h"
|
||||
"${libcarla_source_path}/carla/rpc/*.cpp"
|
||||
"${libcarla_source_path}/carla/streaming/*.h"
|
||||
"${libcarla_source_path}/carla/streaming/*.cpp")
|
||||
|
||||
# Create targets for debug and release in the same build type.
|
||||
foreach(target carla_server_debug carla_server)
|
||||
add_library(${target} STATIC ${libcarla_server_sources})
|
||||
|
||||
target_include_directories(${target} PRIVATE
|
||||
"${BOOST_INCLUDE_PATH}"
|
||||
"${RPCLIB_INCLUDE_PATH}")
|
||||
|
||||
install(TARGETS ${target} DESTINATION lib)
|
||||
endforeach(target)
|
||||
|
||||
# Specific options for debug.
|
||||
set_target_properties(carla_server_debug PROPERTIES COMPILE_FLAGS ${CMAKE_CXX_FLAGS_DEBUG})
|
||||
target_compile_definitions(carla_server_debug PUBLIC -DBOOST_ASIO_ENABLE_BUFFER_DEBUGGING)
|
||||
|
||||
# Specific options for release.
|
||||
set_target_properties(carla_server PROPERTIES COMPILE_FLAGS ${CMAKE_CXX_FLAGS_RELEASE})
|
||||
|
|
|
@ -30,7 +30,9 @@ endforeach(target)
|
|||
|
||||
# Specific options for debug.
|
||||
set_target_properties(libcarla_test_debug PROPERTIES COMPILE_FLAGS ${CMAKE_CXX_FLAGS_DEBUG})
|
||||
target_link_libraries(libcarla_test_debug "carla_server_debug")
|
||||
target_compile_definitions(libcarla_test_debug PUBLIC -DBOOST_ASIO_ENABLE_BUFFER_DEBUGGING)
|
||||
|
||||
# Specific options for release.
|
||||
set_target_properties(libcarla_test_release PROPERTIES COMPILE_FLAGS ${CMAKE_CXX_FLAGS_RELEASE})
|
||||
target_link_libraries(libcarla_test_release "carla_server")
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
|
||||
namespace carla {
|
||||
|
||||
/// Inherit (privately) to suppress copy-construction and copy-assignment.
|
||||
class NonCopyable {
|
||||
public:
|
||||
|
||||
|
@ -15,7 +16,7 @@ namespace carla {
|
|||
|
||||
NonCopyable(const NonCopyable &) = delete;
|
||||
|
||||
void operator=(const NonCopyable &x) = delete;
|
||||
void operator=(const NonCopyable &) = delete;
|
||||
};
|
||||
|
||||
} // namespace carla
|
||||
|
|
|
@ -9,6 +9,7 @@
|
|||
#include <chrono>
|
||||
|
||||
namespace carla {
|
||||
namespace detail {
|
||||
|
||||
template <typename CLOCK>
|
||||
class StopWatchTmpl {
|
||||
|
@ -51,6 +52,8 @@ namespace carla {
|
|||
bool _is_running;
|
||||
};
|
||||
|
||||
using StopWatch = StopWatchTmpl<std::chrono::steady_clock>;
|
||||
} // namespace detail
|
||||
|
||||
using StopWatch = detail::StopWatchTmpl<std::chrono::steady_clock>;
|
||||
|
||||
} // namespace carla
|
||||
|
|
|
@ -11,33 +11,31 @@
|
|||
#include <chrono>
|
||||
|
||||
namespace carla {
|
||||
namespace streaming {
|
||||
namespace low_level {
|
||||
namespace tcp {
|
||||
|
||||
/// Positive time-out up to milliseconds resolution.
|
||||
class timeout_type {
|
||||
/// Positive time duration up to milliseconds resolution. Automatically casts
|
||||
/// between std::chrono::duration and boost::posix_time::time_duration.
|
||||
class time_duration {
|
||||
public:
|
||||
|
||||
static inline timeout_type seconds(size_t timeout) {
|
||||
static inline time_duration seconds(size_t timeout) {
|
||||
return std::chrono::seconds(timeout);
|
||||
}
|
||||
|
||||
static inline timeout_type milliseconds(size_t timeout) {
|
||||
static inline time_duration milliseconds(size_t timeout) {
|
||||
return std::chrono::milliseconds(timeout);
|
||||
}
|
||||
|
||||
constexpr timeout_type() : _milliseconds(0u) {}
|
||||
constexpr time_duration() : _milliseconds(0u) {}
|
||||
|
||||
template <typename Rep, typename Period>
|
||||
timeout_type(std::chrono::duration<Rep, Period> duration)
|
||||
time_duration(std::chrono::duration<Rep, Period> duration)
|
||||
: _milliseconds(std::chrono::duration_cast<std::chrono::milliseconds>(duration).count()) {}
|
||||
|
||||
timeout_type(boost::posix_time::time_duration timeout)
|
||||
: timeout_type(std::chrono::milliseconds(timeout.total_milliseconds())) {}
|
||||
time_duration(boost::posix_time::time_duration timeout)
|
||||
: time_duration(std::chrono::milliseconds(timeout.total_milliseconds())) {}
|
||||
|
||||
timeout_type(const timeout_type &) = default;
|
||||
timeout_type &operator=(const timeout_type &) = default;
|
||||
time_duration(const time_duration &) = default;
|
||||
time_duration &operator=(const time_duration &) = default;
|
||||
|
||||
boost::posix_time::time_duration to_posix_time() const {
|
||||
return boost::posix_time::milliseconds(_milliseconds);
|
||||
|
@ -56,7 +54,4 @@ namespace tcp {
|
|||
size_t _milliseconds;
|
||||
};
|
||||
|
||||
} // namespace tcp
|
||||
} // namespace low_level
|
||||
} // namespace streaming
|
||||
} // namespace carla
|
|
@ -8,16 +8,16 @@
|
|||
|
||||
#include "carla/ThreadGroup.h"
|
||||
#include "carla/streaming/low_level/Client.h"
|
||||
#include "carla/streaming/low_level/tcp/Client.h"
|
||||
#include "carla/streaming/detail/tcp/Client.h"
|
||||
|
||||
#include <boost/asio/io_service.hpp>
|
||||
|
||||
namespace carla {
|
||||
namespace streaming {
|
||||
|
||||
using stream_token = low_level::token_type;
|
||||
using stream_token = detail::token_type;
|
||||
|
||||
/// With this client you can subscribe to multiple streams.
|
||||
/// A client able to subscribe to multiple streams.
|
||||
class Client {
|
||||
public:
|
||||
|
||||
|
@ -45,7 +45,7 @@ namespace streaming {
|
|||
|
||||
private:
|
||||
|
||||
using underlying_client = low_level::Client<low_level::tcp::Client>;
|
||||
using underlying_client = low_level::Client<detail::tcp::Client>;
|
||||
|
||||
boost::asio::io_service _io_service;
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
#pragma once
|
||||
|
||||
#include "carla/Debug.h"
|
||||
#include "carla/streaming/detail/Types.h"
|
||||
|
||||
#include <boost/asio/buffer.hpp>
|
||||
|
||||
|
@ -19,14 +20,6 @@
|
|||
namespace carla {
|
||||
namespace streaming {
|
||||
|
||||
namespace low_level {
|
||||
namespace tcp {
|
||||
|
||||
class Client; /// @todo
|
||||
|
||||
} // namespace low_level
|
||||
} // namespace tcp
|
||||
|
||||
/// A message owns a buffer with raw data.
|
||||
class Message {
|
||||
|
||||
|
@ -38,7 +31,7 @@ namespace tcp {
|
|||
|
||||
using value_type = unsigned char;
|
||||
|
||||
using size_type = uint32_t;
|
||||
using size_type = detail::message_size_type;
|
||||
|
||||
// =========================================================================
|
||||
// -- Construction and assignment ------------------------------------------
|
||||
|
@ -131,8 +124,6 @@ namespace tcp {
|
|||
|
||||
private:
|
||||
|
||||
friend class low_level::tcp::Client; /// @todo
|
||||
|
||||
size_type _size = 0u;
|
||||
|
||||
std::unique_ptr<value_type[]> _data = nullptr;
|
||||
|
|
|
@ -8,19 +8,19 @@
|
|||
|
||||
#include "carla/ThreadGroup.h"
|
||||
#include "carla/streaming/low_level/Server.h"
|
||||
#include "carla/streaming/low_level/tcp/Server.h"
|
||||
#include "carla/streaming/detail/tcp/Server.h"
|
||||
|
||||
#include <boost/asio/io_service.hpp>
|
||||
|
||||
namespace carla {
|
||||
namespace streaming {
|
||||
|
||||
/// A streaming server. Each new stream has a token associated, this token can
|
||||
/// be used by a client to subscribe to the stream.
|
||||
class Server {
|
||||
using underlying_server = low_level::Server<low_level::tcp::Server>;
|
||||
using underlying_server = low_level::Server<detail::tcp::Server>;
|
||||
public:
|
||||
|
||||
using duration_type = underlying_server::duration_type;
|
||||
|
||||
explicit Server(uint16_t port)
|
||||
: _server(_io_service, port) {}
|
||||
|
||||
|
@ -31,7 +31,7 @@ namespace streaming {
|
|||
Stop();
|
||||
}
|
||||
|
||||
void set_timeout(duration_type timeout) {
|
||||
void set_timeout(time_duration timeout) {
|
||||
_server.set_timeout(timeout);
|
||||
}
|
||||
|
||||
|
|
|
@ -8,8 +8,8 @@
|
|||
|
||||
#include "carla/Debug.h"
|
||||
#include "carla/streaming/Message.h"
|
||||
#include "carla/streaming/low_level/StreamState.h"
|
||||
#include "carla/streaming/low_level/Token.h"
|
||||
#include "carla/streaming/detail/StreamState.h"
|
||||
#include "carla/streaming/detail/Token.h"
|
||||
|
||||
#include <boost/asio/buffer.hpp>
|
||||
|
||||
|
@ -18,13 +18,13 @@
|
|||
namespace carla {
|
||||
namespace streaming {
|
||||
|
||||
namespace low_level {
|
||||
namespace detail {
|
||||
|
||||
class Dispatcher;
|
||||
|
||||
} // namespace low_level
|
||||
} // namespace detail
|
||||
|
||||
using stream_token = low_level::token_type;
|
||||
using stream_token = detail::token_type;
|
||||
|
||||
class Stream {
|
||||
public:
|
||||
|
@ -54,14 +54,14 @@ namespace low_level {
|
|||
|
||||
private:
|
||||
|
||||
friend class low_level::Dispatcher;
|
||||
friend class detail::Dispatcher;
|
||||
|
||||
Stream(std::shared_ptr<low_level::StreamState> state)
|
||||
Stream(std::shared_ptr<detail::StreamState> state)
|
||||
: _shared_state(std::move(state)) {
|
||||
DEBUG_ASSERT(_shared_state != nullptr);
|
||||
}
|
||||
|
||||
std::shared_ptr<low_level::StreamState> _shared_state;
|
||||
std::shared_ptr<detail::StreamState> _shared_state;
|
||||
};
|
||||
|
||||
} // namespace streaming
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
|
||||
// de Barcelona (UAB).
|
||||
//
|
||||
// This work is licensed under the terms of the MIT license.
|
||||
// For a copy, see <https://opensource.org/licenses/MIT>.
|
||||
|
||||
#include "carla/streaming/detail/Dispatcher.h"
|
||||
|
||||
#include "carla/Logging.h"
|
||||
|
||||
#include <exception>
|
||||
|
||||
namespace carla {
|
||||
namespace streaming {
|
||||
namespace detail {
|
||||
|
||||
Stream Dispatcher::MakeStream() {
|
||||
std::lock_guard<std::mutex> lock(_mutex);
|
||||
++_cached_token._token.stream_id; // id zero only happens in overflow.
|
||||
auto ptr = std::make_shared<StreamState>(_cached_token);
|
||||
auto result = _stream_map.emplace(std::make_pair(_cached_token.get_stream_id(), ptr));
|
||||
if (!result.second) {
|
||||
throw std::runtime_error("failed to create stream!");
|
||||
}
|
||||
return ptr;
|
||||
}
|
||||
|
||||
void Dispatcher::RegisterSession(std::shared_ptr<Session> session) {
|
||||
DEBUG_ASSERT(session != nullptr);
|
||||
std::lock_guard<std::mutex> lock(_mutex);
|
||||
auto search = _stream_map.find(session->get_stream_id());
|
||||
if (search != _stream_map.end()) {
|
||||
DEBUG_ASSERT(search->second != nullptr);
|
||||
search->second->set_session(std::move(session));
|
||||
} else {
|
||||
log_error("Invalid session: no stream available with id", session->get_stream_id());
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace detail
|
||||
} // namespace streaming
|
||||
} // namespace carla
|
|
@ -0,0 +1,49 @@
|
|||
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
|
||||
// de Barcelona (UAB).
|
||||
//
|
||||
// This work is licensed under the terms of the MIT license.
|
||||
// For a copy, see <https://opensource.org/licenses/MIT>.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "carla/streaming/Stream.h"
|
||||
#include "carla/streaming/detail/Session.h"
|
||||
#include "carla/streaming/detail/StreamState.h"
|
||||
#include "carla/streaming/detail/Token.h"
|
||||
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <unordered_map>
|
||||
|
||||
namespace carla {
|
||||
namespace streaming {
|
||||
namespace detail {
|
||||
|
||||
/// Keeps the mapping between streams and sessions.
|
||||
class Dispatcher {
|
||||
public:
|
||||
|
||||
template <typename P>
|
||||
explicit Dispatcher(const boost::asio::ip::basic_endpoint<P> &ep)
|
||||
: _cached_token(0u, ep) {}
|
||||
|
||||
Stream MakeStream();
|
||||
|
||||
void RegisterSession(std::shared_ptr<Session> session);
|
||||
|
||||
private:
|
||||
|
||||
// We use a mutex here, but we assume that sessions and streams won't be
|
||||
// created too often.
|
||||
std::mutex _mutex;
|
||||
|
||||
token_type _cached_token;
|
||||
|
||||
std::unordered_map<
|
||||
stream_id_type,
|
||||
std::shared_ptr<StreamState>> _stream_map;
|
||||
};
|
||||
|
||||
} // namespace detail
|
||||
} // namespace streaming
|
||||
} // namespace carla
|
|
@ -0,0 +1,48 @@
|
|||
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
|
||||
// de Barcelona (UAB).
|
||||
//
|
||||
// This work is licensed under the terms of the MIT license.
|
||||
// For a copy, see <https://opensource.org/licenses/MIT>.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "carla/streaming/detail/Types.h"
|
||||
|
||||
#include <functional>
|
||||
|
||||
namespace carla {
|
||||
namespace streaming {
|
||||
namespace detail {
|
||||
|
||||
/// A wrapper to put clients into std::unordered_set.
|
||||
template <typename T>
|
||||
class HashableClient : public T {
|
||||
public:
|
||||
|
||||
template <typename ... Args>
|
||||
HashableClient(Args && ... args)
|
||||
: T(std::forward<Args>(args) ...) {}
|
||||
|
||||
bool operator==(const HashableClient &rhs) const {
|
||||
return T::GetStreamId() == rhs.GetStreamId();
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace detail
|
||||
} // namespace streaming
|
||||
} // namespace carla
|
||||
|
||||
namespace std {
|
||||
|
||||
// Injecting a hash function for our clients into std namespace so we can
|
||||
// directly insert them into std::unordered_set.
|
||||
template <typename T>
|
||||
struct hash<carla::streaming::detail::HashableClient<T>> {
|
||||
using argument_type = carla::streaming::detail::HashableClient<T>;
|
||||
using result_type = std::size_t;
|
||||
result_type operator()(const argument_type &client) const noexcept {
|
||||
return std::hash<carla::streaming::detail::stream_id_type>()(client.GetStreamId());
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace std
|
|
@ -6,14 +6,14 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include "carla/streaming/low_level/tcp/ServerSession.h"
|
||||
#include "carla/streaming/detail/tcp/ServerSession.h"
|
||||
|
||||
namespace carla {
|
||||
namespace streaming {
|
||||
namespace low_level {
|
||||
namespace detail {
|
||||
|
||||
using Session = tcp::ServerSession;
|
||||
|
||||
} // namespace low_level
|
||||
} // namespace detail
|
||||
} // namespace streaming
|
||||
} // namespace carla
|
|
@ -6,17 +6,16 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include "carla/NonCopyable.h"
|
||||
#include "carla/streaming/Message.h"
|
||||
#include "carla/streaming/low_level/Session.h"
|
||||
#include "carla/streaming/low_level/Token.h"
|
||||
#include "carla/streaming/detail/Session.h"
|
||||
#include "carla/streaming/detail/Token.h"
|
||||
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
|
||||
namespace carla {
|
||||
namespace streaming {
|
||||
namespace low_level {
|
||||
|
||||
namespace detail {
|
||||
|
||||
/// Handles the synchronization of the shared session.
|
||||
|
@ -37,18 +36,16 @@ namespace detail {
|
|||
|
||||
private:
|
||||
|
||||
mutable std::mutex _mutex; /// @todo it can be atomic
|
||||
mutable std::mutex _mutex; /// @todo it can be atomic.
|
||||
|
||||
std::shared_ptr<Session> _session;
|
||||
};
|
||||
|
||||
} // namespace detail
|
||||
|
||||
/// Shared state among all the copies of a stream. Provides access to the
|
||||
/// underlying UDP session if active.
|
||||
/// underlying server session if active.
|
||||
class StreamState
|
||||
: public detail::SessionHolder,
|
||||
private boost::noncopyable {
|
||||
: public SessionHolder,
|
||||
private NonCopyable {
|
||||
public:
|
||||
|
||||
explicit StreamState(const token_type &token) : _token(token) {}
|
||||
|
@ -69,6 +66,6 @@ namespace detail {
|
|||
const token_type _token;
|
||||
};
|
||||
|
||||
} // namespace low_level
|
||||
} // namespace detail
|
||||
} // namespace streaming
|
||||
} // namespace carla
|
|
@ -0,0 +1,36 @@
|
|||
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
|
||||
// de Barcelona (UAB).
|
||||
//
|
||||
// This work is licensed under the terms of the MIT license.
|
||||
// For a copy, see <https://opensource.org/licenses/MIT>.
|
||||
|
||||
#include "carla/streaming/detail/Token.h"
|
||||
|
||||
#include <exception>
|
||||
|
||||
namespace carla {
|
||||
namespace streaming {
|
||||
namespace detail {
|
||||
|
||||
void token_type::set_address(const boost::asio::ip::address &addr) {
|
||||
if (addr.is_v4()) {
|
||||
_token.address_type = token_data::address::ip_v4;
|
||||
_token.address.v4 = addr.to_v4().to_bytes();
|
||||
} else if (addr.is_v6()) {
|
||||
_token.address_type = token_data::address::ip_v6;
|
||||
_token.address.v6 = addr.to_v6().to_bytes();
|
||||
} else {
|
||||
throw std::invalid_argument("invalid ip address!");
|
||||
}
|
||||
}
|
||||
|
||||
boost::asio::ip::address token_type::get_address() const {
|
||||
if (_token.address_type == token_data::address::ip_v4) {
|
||||
return boost::asio::ip::address_v4(_token.address.v4);
|
||||
}
|
||||
return boost::asio::ip::address_v6(_token.address.v6);
|
||||
}
|
||||
|
||||
} // namespace detail
|
||||
} // namespace streaming
|
||||
} // namespace carla
|
|
@ -7,7 +7,7 @@
|
|||
#pragma once
|
||||
|
||||
#include "carla/Debug.h"
|
||||
#include "carla/streaming/low_level/Types.h"
|
||||
#include "carla/streaming/detail/Types.h"
|
||||
|
||||
#include <boost/asio/ip/address.hpp>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
|
@ -15,13 +15,11 @@
|
|||
|
||||
namespace carla {
|
||||
namespace streaming {
|
||||
namespace low_level {
|
||||
|
||||
namespace detail {
|
||||
|
||||
#pragma pack(push, 1)
|
||||
|
||||
struct token {
|
||||
struct token_data {
|
||||
stream_id_type stream_id;
|
||||
|
||||
uint16_t port;
|
||||
|
@ -47,7 +45,7 @@ namespace detail {
|
|||
#pragma pack(pop)
|
||||
|
||||
static_assert(
|
||||
sizeof(token) == 24u,
|
||||
sizeof(token_data) == 24u,
|
||||
"Size shouldn't be more than"
|
||||
" v6 address : 128"
|
||||
" + state : 16"
|
||||
|
@ -56,8 +54,6 @@ namespace detail {
|
|||
" -----------------"
|
||||
" 192");
|
||||
|
||||
} // namespace detail
|
||||
|
||||
/// Serializes a stream endpoint. Contains all the necessary information for a
|
||||
/// client to subscribe to a stream.
|
||||
class token_type {
|
||||
|
@ -66,28 +62,13 @@ namespace detail {
|
|||
template <typename P>
|
||||
static constexpr auto get_protocol() {
|
||||
return std::is_same<P, boost::asio::ip::tcp>::value ?
|
||||
detail::token::protocol::tcp :
|
||||
detail::token::protocol::udp;
|
||||
token_data::protocol::tcp :
|
||||
token_data::protocol::udp;
|
||||
}
|
||||
|
||||
void set_address(const boost::asio::ip::address &addr) {
|
||||
if (addr.is_v4()) {
|
||||
_token.address_type = detail::token::address::ip_v4;
|
||||
_token.address.v4 = addr.to_v4().to_bytes();
|
||||
} else if (addr.is_v6()) {
|
||||
_token.address_type = detail::token::address::ip_v6;
|
||||
_token.address.v6 = addr.to_v6().to_bytes();
|
||||
} else {
|
||||
throw std::invalid_argument("invalid ip address!");
|
||||
}
|
||||
}
|
||||
void set_address(const boost::asio::ip::address &addr);
|
||||
|
||||
boost::asio::ip::address get_address() const {
|
||||
if (_token.address_type == detail::token::address::ip_v4) {
|
||||
return boost::asio::ip::address_v4(_token.address.v4);
|
||||
}
|
||||
return boost::asio::ip::address_v6(_token.address.v6);
|
||||
}
|
||||
boost::asio::ip::address get_address() const;
|
||||
|
||||
template <typename P>
|
||||
boost::asio::ip::basic_endpoint<P> get_endpoint() const {
|
||||
|
@ -120,24 +101,24 @@ namespace detail {
|
|||
}
|
||||
|
||||
bool is_valid() const {
|
||||
return ((_token.protocol != detail::token::protocol::not_set) &&
|
||||
(_token.address_type != detail::token::address::not_set));
|
||||
return ((_token.protocol != token_data::protocol::not_set) &&
|
||||
(_token.address_type != token_data::address::not_set));
|
||||
}
|
||||
|
||||
bool address_is_v4() const {
|
||||
return _token.address_type == detail::token::address::ip_v4;
|
||||
return _token.address_type == token_data::address::ip_v4;
|
||||
}
|
||||
|
||||
bool address_is_v6() const {
|
||||
return _token.address_type == detail::token::address::ip_v6;
|
||||
return _token.address_type == token_data::address::ip_v6;
|
||||
}
|
||||
|
||||
bool protocol_is_udp() const {
|
||||
return _token.protocol == detail::token::protocol::udp;
|
||||
return _token.protocol == token_data::protocol::udp;
|
||||
}
|
||||
|
||||
bool protocol_is_tcp() const {
|
||||
return _token.protocol == detail::token::protocol::tcp;
|
||||
return _token.protocol == token_data::protocol::tcp;
|
||||
}
|
||||
|
||||
boost::asio::ip::udp::endpoint to_udp_endpoint() const {
|
||||
|
@ -156,9 +137,9 @@ namespace detail {
|
|||
|
||||
friend class Dispatcher;
|
||||
|
||||
detail::token _token;
|
||||
token_data _token;
|
||||
};
|
||||
|
||||
} // namespace low_level
|
||||
} // namespace detail
|
||||
} // namespace streaming
|
||||
} // namespace carla
|
|
@ -10,12 +10,12 @@
|
|||
|
||||
namespace carla {
|
||||
namespace streaming {
|
||||
namespace low_level {
|
||||
namespace detail {
|
||||
|
||||
using stream_id_type = uint32_t;
|
||||
|
||||
using message_size_type = uint32_t;
|
||||
|
||||
} // namespace low_level
|
||||
} // namespace detail
|
||||
} // namespace streaming
|
||||
} // namespace carla
|
|
@ -4,36 +4,30 @@
|
|||
// This work is licensed under the terms of the MIT license.
|
||||
// For a copy, see <https://opensource.org/licenses/MIT>.
|
||||
|
||||
#pragma once
|
||||
#include "carla/streaming/detail/tcp/Client.h"
|
||||
|
||||
#include "carla/Debug.h"
|
||||
#include "carla/Logging.h"
|
||||
#include "carla/streaming/Message.h"
|
||||
#include "carla/streaming/low_level/Types.h"
|
||||
#include "carla/streaming/low_level/tcp/Timeout.h"
|
||||
#include "carla/Time.h"
|
||||
|
||||
#include <boost/asio/connect.hpp>
|
||||
#include <boost/asio/deadline_timer.hpp>
|
||||
#include <boost/asio/io_service.hpp>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
#include <boost/asio/read.hpp>
|
||||
#include <boost/asio/strand.hpp>
|
||||
#include <boost/asio/write.hpp>
|
||||
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
namespace carla {
|
||||
namespace streaming {
|
||||
namespace low_level {
|
||||
namespace detail {
|
||||
namespace tcp {
|
||||
|
||||
class Encoder {
|
||||
// ===========================================================================
|
||||
// -- Decoder ----------------------------------------------------------------
|
||||
// ===========================================================================
|
||||
|
||||
class Decoder {
|
||||
public:
|
||||
|
||||
boost::asio::mutable_buffer header() {
|
||||
return boost::asio::buffer(reinterpret_cast<unsigned char *>(&_size), sizeof(_size));
|
||||
return boost::asio::buffer(&_size, sizeof(_size));
|
||||
}
|
||||
|
||||
boost::asio::mutable_buffer body() {
|
||||
|
@ -58,81 +52,37 @@ namespace tcp {
|
|||
std::shared_ptr<Message> _message;
|
||||
};
|
||||
|
||||
/// @warning The client should not be destroyed before the @a io_service is
|
||||
/// stopped.
|
||||
class Client : private boost::noncopyable {
|
||||
public:
|
||||
// ===========================================================================
|
||||
// -- Client -----------------------------------------------------------------
|
||||
// ===========================================================================
|
||||
|
||||
using endpoint = boost::asio::ip::tcp::endpoint;
|
||||
Client::Client(
|
||||
boost::asio::io_service &io_service,
|
||||
endpoint ep,
|
||||
stream_id_type stream_id,
|
||||
callback_function_type callback)
|
||||
: _endpoint(std::move(ep)),
|
||||
_stream_id(stream_id),
|
||||
_callback(std::move(callback)),
|
||||
_socket(io_service),
|
||||
_strand(io_service),
|
||||
_connection_timer(io_service) {
|
||||
Connect();
|
||||
}
|
||||
|
||||
template <typename Functor>
|
||||
Client(
|
||||
boost::asio::io_service &io_service,
|
||||
endpoint ep,
|
||||
stream_id_type stream_id,
|
||||
Functor &&callback)
|
||||
: _endpoint(std::move(ep)),
|
||||
_stream_id(stream_id),
|
||||
_callback(std::forward<Functor>(callback)),
|
||||
_socket(io_service),
|
||||
_strand(io_service),
|
||||
_connection_timer(io_service) {
|
||||
Connect();
|
||||
}
|
||||
Client::~Client() {
|
||||
Stop();
|
||||
}
|
||||
|
||||
~Client() {
|
||||
Stop();
|
||||
}
|
||||
|
||||
stream_id_type get_id() const {
|
||||
return _stream_id;
|
||||
}
|
||||
|
||||
bool operator==(const Client &rhs) const {
|
||||
return get_id() == rhs.get_id();
|
||||
}
|
||||
|
||||
void Stop() {
|
||||
_connection_timer.cancel();
|
||||
_strand.post([this]() {
|
||||
_done = true;
|
||||
if (_socket.is_open()) {
|
||||
_socket.close();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
/// @todo Stop inlining and make cpp files.
|
||||
|
||||
inline void Connect();
|
||||
|
||||
inline void Reconnect() {
|
||||
_connection_timer.expires_from_now(timeout_type::seconds(1u));
|
||||
_connection_timer.async_wait([this](boost::system::error_code ec) {
|
||||
if (!ec) {
|
||||
Connect();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
inline void ReadData();
|
||||
|
||||
const endpoint _endpoint;
|
||||
|
||||
const stream_id_type _stream_id;
|
||||
|
||||
std::function<void(std::shared_ptr<Message>)> _callback;
|
||||
|
||||
boost::asio::ip::tcp::socket _socket;
|
||||
|
||||
boost::asio::io_service::strand _strand;
|
||||
|
||||
boost::asio::deadline_timer _connection_timer;
|
||||
|
||||
bool _done = false;
|
||||
};
|
||||
void Client::Stop() {
|
||||
_connection_timer.cancel();
|
||||
_strand.post([this]() {
|
||||
_done = true;
|
||||
if (_socket.is_open()) {
|
||||
_socket.close();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void Client::Connect() {
|
||||
_strand.post([this]() {
|
||||
|
@ -151,9 +101,9 @@ namespace tcp {
|
|||
// Send the stream id to subscribe to the stream.
|
||||
log_debug("streaming client: sending stream id", _stream_id);
|
||||
boost::asio::async_write(
|
||||
_socket,
|
||||
boost::asio::buffer(&_stream_id, sizeof(_stream_id)),
|
||||
_strand.wrap([=](error_code ec, size_t DEBUG_ONLY(bytes)) {
|
||||
_socket,
|
||||
boost::asio::buffer(&_stream_id, sizeof(_stream_id)),
|
||||
_strand.wrap([=](error_code ec, size_t DEBUG_ONLY(bytes)) {
|
||||
if (!ec) {
|
||||
DEBUG_ASSERT_EQ(bytes, sizeof(_stream_id));
|
||||
// If succeeded start reading data.
|
||||
|
@ -175,6 +125,15 @@ namespace tcp {
|
|||
});
|
||||
}
|
||||
|
||||
void Client::Reconnect() {
|
||||
_connection_timer.expires_from_now(time_duration::seconds(1u));
|
||||
_connection_timer.async_wait([this](boost::system::error_code ec) {
|
||||
if (!ec) {
|
||||
Connect();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void Client::ReadData() {
|
||||
_strand.post([this]() {
|
||||
if (_done) {
|
||||
|
@ -183,14 +142,15 @@ namespace tcp {
|
|||
|
||||
log_debug("streaming client: Client::ReadData");
|
||||
|
||||
auto encoder = std::make_shared<Encoder>();
|
||||
auto encoder = std::make_shared<Decoder>();
|
||||
|
||||
auto handle_read_data = [=](boost::system::error_code ec, size_t DEBUG_ONLY(bytes)) {
|
||||
DEBUG_ONLY(log_debug("streaming client: Client::ReadData.handle_read_data", bytes, "bytes"));
|
||||
if (!ec) {
|
||||
DEBUG_ASSERT_EQ(bytes, encoder->size());
|
||||
DEBUG_ASSERT_NE(bytes, 0u);
|
||||
// Move the buffer to the callback function and start reading the next
|
||||
// Move the buffer to the callback function and start reading
|
||||
// the next
|
||||
// piece of data.
|
||||
log_debug("streaming client: success reading data, calling the callback");
|
||||
_socket.get_io_service().post([this, encoder]() { _callback(encoder->pop()); });
|
||||
|
@ -206,7 +166,8 @@ namespace tcp {
|
|||
DEBUG_ONLY(log_debug("streaming client: Client::ReadData.handle_read_header", bytes, "bytes"));
|
||||
if (!ec && (encoder->size() > 0u)) {
|
||||
DEBUG_ASSERT_EQ(bytes, sizeof(message_size_type));
|
||||
// Now that we know the size of the coming buffer, we can allocate
|
||||
// Now that we know the size of the coming buffer, we can
|
||||
// allocate
|
||||
// our buffer and start putting data into it.
|
||||
boost::asio::async_read(
|
||||
_socket,
|
||||
|
@ -229,21 +190,6 @@ namespace tcp {
|
|||
}
|
||||
|
||||
} // namespace tcp
|
||||
} // namespace low_level
|
||||
} // namespace detail
|
||||
} // namespace streaming
|
||||
} // namespace carla
|
||||
|
||||
namespace std {
|
||||
|
||||
// Injecting a hash function for our clients into std namespace so we can
|
||||
// directly insert them into std::unordered_set.
|
||||
template <>
|
||||
struct hash<carla::streaming::low_level::tcp::Client> {
|
||||
using argument_type = carla::streaming::low_level::tcp::Client;
|
||||
using result_type = std::size_t;
|
||||
result_type operator()(const argument_type &client) const noexcept {
|
||||
return std::hash<carla::streaming::low_level::stream_id_type>()(client.get_id());
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace std
|
|
@ -0,0 +1,76 @@
|
|||
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
|
||||
// de Barcelona (UAB).
|
||||
//
|
||||
// This work is licensed under the terms of the MIT license.
|
||||
// For a copy, see <https://opensource.org/licenses/MIT>.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "carla/NonCopyable.h"
|
||||
#include "carla/streaming/Message.h"
|
||||
#include "carla/streaming/detail/Types.h"
|
||||
|
||||
#include <boost/asio/deadline_timer.hpp>
|
||||
#include <boost/asio/io_service.hpp>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
#include <boost/asio/strand.hpp>
|
||||
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
|
||||
namespace carla {
|
||||
namespace streaming {
|
||||
namespace detail {
|
||||
namespace tcp {
|
||||
|
||||
/// A client that connects to a single stream.
|
||||
///
|
||||
/// @warning The client should not be destroyed before the @a io_service is
|
||||
/// stopped.
|
||||
class Client : private NonCopyable {
|
||||
public:
|
||||
|
||||
using endpoint = boost::asio::ip::tcp::endpoint;
|
||||
using callback_function_type = std::function<void (std::shared_ptr<Message>)>;
|
||||
|
||||
Client(
|
||||
boost::asio::io_service &io_service,
|
||||
endpoint ep,
|
||||
stream_id_type stream_id,
|
||||
callback_function_type callback);
|
||||
|
||||
~Client();
|
||||
|
||||
stream_id_type GetStreamId() const {
|
||||
return _stream_id;
|
||||
}
|
||||
|
||||
void Stop();
|
||||
|
||||
private:
|
||||
|
||||
void Connect();
|
||||
|
||||
void Reconnect();
|
||||
|
||||
void ReadData();
|
||||
|
||||
const endpoint _endpoint;
|
||||
|
||||
const stream_id_type _stream_id;
|
||||
|
||||
callback_function_type _callback;
|
||||
|
||||
boost::asio::ip::tcp::socket _socket;
|
||||
|
||||
boost::asio::io_service::strand _strand;
|
||||
|
||||
boost::asio::deadline_timer _connection_timer;
|
||||
|
||||
bool _done = false;
|
||||
};
|
||||
|
||||
} // namespace tcp
|
||||
} // namespace detail
|
||||
} // namespace streaming
|
||||
} // namespace carla
|
|
@ -0,0 +1,47 @@
|
|||
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
|
||||
// de Barcelona (UAB).
|
||||
//
|
||||
// This work is licensed under the terms of the MIT license.
|
||||
// For a copy, see <https://opensource.org/licenses/MIT>.
|
||||
|
||||
#include "carla/streaming/detail/tcp/Server.h"
|
||||
|
||||
#include "carla/Logging.h"
|
||||
|
||||
#include <memory>
|
||||
|
||||
namespace carla {
|
||||
namespace streaming {
|
||||
namespace detail {
|
||||
namespace tcp {
|
||||
|
||||
Server::Server(boost::asio::io_service &io_service, endpoint ep)
|
||||
: _acceptor(io_service, std::move(ep)),
|
||||
_timeout(time_duration::seconds(10u)) {}
|
||||
|
||||
void Server::OpenSession(
|
||||
const time_duration timeout,
|
||||
ServerSession::callback_function_type callback) {
|
||||
using boost::system::error_code;
|
||||
|
||||
auto session = std::make_shared<ServerSession>(_acceptor.get_io_service(), timeout);
|
||||
|
||||
auto handle_query = [=](const error_code &ec) {
|
||||
if (!ec) {
|
||||
session->Open(callback);
|
||||
} else {
|
||||
log_error("tcp accept error:", ec.message());
|
||||
}
|
||||
};
|
||||
|
||||
_acceptor.async_accept(session->_socket, [=](error_code ec) {
|
||||
// Handle query and open a new session immediately.
|
||||
_acceptor.get_io_service().post([=]() { handle_query(ec); });
|
||||
OpenSession(timeout, callback);
|
||||
});
|
||||
}
|
||||
|
||||
} // namespace tcp
|
||||
} // namespace detail
|
||||
} // namespace streaming
|
||||
} // namespace carla
|
|
@ -0,0 +1,56 @@
|
|||
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
|
||||
// de Barcelona (UAB).
|
||||
//
|
||||
// This work is licensed under the terms of the MIT license.
|
||||
// For a copy, see <https://opensource.org/licenses/MIT>.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "carla/NonCopyable.h"
|
||||
#include "carla/Time.h"
|
||||
#include "carla/streaming/detail/tcp/ServerSession.h"
|
||||
|
||||
#include <boost/asio/io_service.hpp>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
|
||||
#include <atomic>
|
||||
|
||||
namespace carla {
|
||||
namespace streaming {
|
||||
namespace detail {
|
||||
namespace tcp {
|
||||
|
||||
class Server : private NonCopyable {
|
||||
public:
|
||||
|
||||
using endpoint = boost::asio::ip::tcp::endpoint;
|
||||
using protocol_type = endpoint::protocol_type;
|
||||
|
||||
explicit Server(boost::asio::io_service &io_service, endpoint ep);
|
||||
|
||||
/// Set session time-out. Applies only to newly created sessions. By default
|
||||
/// the time-out is set to 10 seconds.
|
||||
void set_timeout(time_duration timeout) {
|
||||
_timeout = timeout;
|
||||
}
|
||||
|
||||
/// Start listening for connections, on each new connection @a callback is
|
||||
/// called.
|
||||
template <typename Functor>
|
||||
void Listen(Functor callback) {
|
||||
_acceptor.get_io_service().post([=]() { OpenSession(_timeout, callback); });
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
void OpenSession(time_duration timeout, ServerSession::callback_function_type callback);
|
||||
|
||||
boost::asio::ip::tcp::acceptor _acceptor;
|
||||
|
||||
std::atomic<time_duration> _timeout;
|
||||
};
|
||||
|
||||
} // namespace tcp
|
||||
} // namespace detail
|
||||
} // namespace streaming
|
||||
} // namespace carla
|
|
@ -0,0 +1,119 @@
|
|||
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
|
||||
// de Barcelona (UAB).
|
||||
//
|
||||
// This work is licensed under the terms of the MIT license.
|
||||
// For a copy, see <https://opensource.org/licenses/MIT>.
|
||||
|
||||
#include "carla/streaming/detail/tcp/ServerSession.h"
|
||||
|
||||
#include "carla/Debug.h"
|
||||
#include "carla/Logging.h"
|
||||
|
||||
#include <boost/asio/read.hpp>
|
||||
#include <boost/asio/write.hpp>
|
||||
|
||||
#include <atomic>
|
||||
|
||||
namespace carla {
|
||||
namespace streaming {
|
||||
namespace detail {
|
||||
namespace tcp {
|
||||
|
||||
static std::atomic_size_t SESSION_COUNTER{0u};
|
||||
|
||||
ServerSession::ServerSession(
|
||||
boost::asio::io_service &io_service,
|
||||
const time_duration timeout)
|
||||
: _session_id(SESSION_COUNTER++),
|
||||
_socket(io_service),
|
||||
_timeout(timeout),
|
||||
_deadline(io_service),
|
||||
_strand(io_service) {}
|
||||
|
||||
ServerSession::~ServerSession() {
|
||||
_deadline.cancel();
|
||||
}
|
||||
|
||||
void ServerSession::Open(callback_function_type callback) {
|
||||
StartTimer();
|
||||
auto self = shared_from_this(); // To keep myself alive.
|
||||
_strand.post([=]() {
|
||||
|
||||
auto handle_query = [this, self, cb=std::move(callback)](
|
||||
const boost::system::error_code &ec,
|
||||
size_t DEBUG_ONLY(bytes_received)) {
|
||||
DEBUG_ASSERT_EQ(bytes_received, sizeof(_stream_id));
|
||||
if (!ec) {
|
||||
log_debug("session", _session_id, "for stream", _stream_id, " started");
|
||||
_socket.get_io_service().post([=]() { cb(self); });
|
||||
} else {
|
||||
log_error("session", _session_id, ": error retrieving stream id :", ec.message());
|
||||
Close();
|
||||
}
|
||||
};
|
||||
|
||||
// Read the stream id.
|
||||
_deadline.expires_from_now(_timeout);
|
||||
boost::asio::async_read(
|
||||
_socket,
|
||||
boost::asio::buffer(&_stream_id, sizeof(_stream_id)),
|
||||
_strand.wrap(handle_query));
|
||||
});
|
||||
}
|
||||
|
||||
void ServerSession::Write(std::shared_ptr<const Message> message) {
|
||||
auto self = shared_from_this();
|
||||
_strand.post([=]() {
|
||||
|
||||
/// @todo has to be a better way of doing this...
|
||||
if (_is_writing) {
|
||||
// Repost and return;
|
||||
Write(std::move(message));
|
||||
return;
|
||||
}
|
||||
_is_writing = true;
|
||||
|
||||
auto handle_sent = [this, self, message](const boost::system::error_code &ec, size_t DEBUG_ONLY(bytes)) {
|
||||
_is_writing = false;
|
||||
if (ec) {
|
||||
log_error("session", _session_id, ": error sending data :", ec.message());
|
||||
} else {
|
||||
DEBUG_ONLY(log_debug("session", _session_id, ": successfully sent", bytes, "bytes"));
|
||||
DEBUG_ASSERT_EQ(bytes, sizeof(message_size_type) + message->size());
|
||||
}
|
||||
};
|
||||
|
||||
log_debug("session", _session_id, ": sending message of", message->size(), "bytes");
|
||||
|
||||
_deadline.expires_from_now(_timeout);
|
||||
boost::asio::async_write(
|
||||
_socket,
|
||||
message->encode(),
|
||||
_strand.wrap(handle_sent));
|
||||
});
|
||||
}
|
||||
|
||||
void ServerSession::Close() {
|
||||
_strand.post([this, self = shared_from_this()]() {
|
||||
if (_socket.is_open()) {
|
||||
_socket.close();
|
||||
}
|
||||
log_debug("session", _session_id, "closed");
|
||||
});
|
||||
}
|
||||
|
||||
void ServerSession::StartTimer() {
|
||||
if (_deadline.expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
|
||||
log_debug("session", _session_id, "timed out");
|
||||
Close();
|
||||
} else {
|
||||
_deadline.async_wait([self = shared_from_this()](boost::system::error_code) {
|
||||
self->StartTimer();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace tcp
|
||||
} // namespace detail
|
||||
} // namespace streaming
|
||||
} // namespace carla
|
|
@ -0,0 +1,82 @@
|
|||
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
|
||||
// de Barcelona (UAB).
|
||||
//
|
||||
// This work is licensed under the terms of the MIT license.
|
||||
// For a copy, see <https://opensource.org/licenses/MIT>.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "carla/NonCopyable.h"
|
||||
#include "carla/Time.h"
|
||||
#include "carla/streaming/Message.h"
|
||||
#include "carla/streaming/detail/Types.h"
|
||||
|
||||
#include <boost/asio/deadline_timer.hpp>
|
||||
#include <boost/asio/io_service.hpp>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
#include <boost/asio/strand.hpp>
|
||||
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
|
||||
namespace carla {
|
||||
namespace streaming {
|
||||
namespace detail {
|
||||
namespace tcp {
|
||||
|
||||
/// A TCP server session. When a session opens, it reads from the socket a
|
||||
/// stream id object and passes itself to the callback functor. The session
|
||||
/// closes itself after @a timeout of inactivity is met.
|
||||
class ServerSession
|
||||
: public std::enable_shared_from_this<ServerSession>,
|
||||
private NonCopyable {
|
||||
public:
|
||||
|
||||
using socket_type = boost::asio::ip::tcp::socket;
|
||||
using callback_function_type = std::function<void(std::shared_ptr<ServerSession>)>;
|
||||
|
||||
explicit ServerSession(boost::asio::io_service &io_service, time_duration timeout);
|
||||
|
||||
~ServerSession();
|
||||
|
||||
/// Starts the session and calls @a callback after successfully reading the
|
||||
/// stream id.
|
||||
void Open(callback_function_type callback);
|
||||
|
||||
/// @warning This function should only be called after the session is
|
||||
/// opened. It is safe to call this function from within the @a callback.
|
||||
stream_id_type get_stream_id() const {
|
||||
return _stream_id;
|
||||
}
|
||||
|
||||
/// Writes some data to the socket.
|
||||
void Write(std::shared_ptr<const Message> message);
|
||||
|
||||
/// Posts a job to close this session.
|
||||
void Close();
|
||||
|
||||
private:
|
||||
|
||||
void StartTimer();
|
||||
|
||||
friend class Server;
|
||||
|
||||
const size_t _session_id;
|
||||
|
||||
stream_id_type _stream_id = 0u;
|
||||
|
||||
socket_type _socket;
|
||||
|
||||
time_duration _timeout;
|
||||
|
||||
boost::asio::deadline_timer _deadline;
|
||||
|
||||
boost::asio::io_service::strand _strand;
|
||||
|
||||
bool _is_writing = false;
|
||||
};
|
||||
|
||||
} // namespace tcp
|
||||
} // namespace detail
|
||||
} // namespace streaming
|
||||
} // namespace carla
|
|
@ -6,7 +6,9 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include "carla/streaming/low_level/Token.h"
|
||||
#include "carla/streaming/detail/HashableClient.h"
|
||||
#include "carla/streaming/detail/Token.h"
|
||||
#include "carla/streaming/detail/tcp/Client.h"
|
||||
|
||||
#include <boost/asio/io_service.hpp>
|
||||
|
||||
|
@ -16,23 +18,25 @@ namespace carla {
|
|||
namespace streaming {
|
||||
namespace low_level {
|
||||
|
||||
/// Wrapper around low level clients. You can subscribe to multiple streams.
|
||||
/// A client able to subscribe to multiple streams. Accepts an external
|
||||
/// io_service.
|
||||
///
|
||||
/// @warning The client should not be destroyed before the @a io_service is
|
||||
/// stopped.
|
||||
/// @pre T has to be hashable.
|
||||
template <typename T>
|
||||
class Client {
|
||||
public:
|
||||
|
||||
using underlying_client = T;
|
||||
using underlying_client = detail::HashableClient<T>;
|
||||
|
||||
using token_type = carla::streaming::detail::token_type;
|
||||
|
||||
template <typename Functor>
|
||||
void Subscribe(
|
||||
boost::asio::io_service &io_service,
|
||||
const token_type &token,
|
||||
Functor &&callback) {
|
||||
if (!token.protocol_is_tcp()) {
|
||||
if (!token.protocol_is_tcp()) { /// @todo
|
||||
throw std::invalid_argument("invalid token, only TCP tokens supported");
|
||||
}
|
||||
_clients.emplace(
|
||||
|
|
|
@ -1,68 +0,0 @@
|
|||
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
|
||||
// de Barcelona (UAB).
|
||||
//
|
||||
// This work is licensed under the terms of the MIT license.
|
||||
// For a copy, see <https://opensource.org/licenses/MIT>.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "carla/streaming/Stream.h"
|
||||
#include "carla/streaming/low_level/Session.h"
|
||||
#include "carla/streaming/low_level/StreamState.h"
|
||||
#include "carla/streaming/low_level/Token.h"
|
||||
|
||||
#include <exception>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <unordered_map>
|
||||
|
||||
namespace carla {
|
||||
namespace streaming {
|
||||
namespace low_level {
|
||||
|
||||
class Dispatcher {
|
||||
public:
|
||||
|
||||
template <typename P>
|
||||
explicit Dispatcher(const boost::asio::ip::basic_endpoint<P> &ep)
|
||||
: _cached_token(0u, ep) {}
|
||||
|
||||
Stream MakeStream() {
|
||||
std::lock_guard<std::mutex> lock(_mutex);
|
||||
++_cached_token._token.stream_id; // id zero only happens in overflow.
|
||||
auto ptr = std::make_shared<StreamState>(_cached_token);
|
||||
auto result = _stream_map.emplace(std::make_pair(_cached_token.get_stream_id(), ptr));
|
||||
if (!result.second) {
|
||||
throw std::runtime_error("failed to create stream!");
|
||||
}
|
||||
return ptr;
|
||||
}
|
||||
|
||||
void RegisterSession(std::shared_ptr<Session> session) {
|
||||
DEBUG_ASSERT(session != nullptr);
|
||||
std::lock_guard<std::mutex> lock(_mutex);
|
||||
auto search = _stream_map.find(session->get_stream_id());
|
||||
if (search != _stream_map.end()) {
|
||||
DEBUG_ASSERT(search->second != nullptr);
|
||||
search->second->set_session(std::move(session));
|
||||
} else {
|
||||
log_error("Invalid session: no stream available with id", session->get_stream_id());
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
// We use a mutex here, but we assume that sessions and streams won't be
|
||||
// created too often.
|
||||
std::mutex _mutex;
|
||||
|
||||
token_type _cached_token;
|
||||
|
||||
std::unordered_map<
|
||||
stream_id_type,
|
||||
std::shared_ptr<StreamState>> _stream_map;
|
||||
};
|
||||
|
||||
} // namespace low_level
|
||||
} // namespace streaming
|
||||
} // namespace carla
|
|
@ -6,7 +6,7 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include "carla/streaming/low_level/Dispatcher.h"
|
||||
#include "carla/streaming/detail/Dispatcher.h"
|
||||
#include "carla/streaming/Stream.h"
|
||||
|
||||
#include <boost/asio/io_service.hpp>
|
||||
|
@ -15,14 +15,15 @@ namespace carla {
|
|||
namespace streaming {
|
||||
namespace low_level {
|
||||
|
||||
/// Wrapper around low level servers.
|
||||
/// A low-level streaming server. Each new stream has a token associated, this
|
||||
/// token can be used by a client to subscribe to the stream. This server
|
||||
/// requires an external io_service running.
|
||||
template <typename T>
|
||||
class Server {
|
||||
public:
|
||||
|
||||
using underlying_server = T;
|
||||
|
||||
using duration_type = typename underlying_server::duration_type;
|
||||
using endpoint = typename underlying_server::endpoint;
|
||||
using protocol_type = typename underlying_server::protocol_type;
|
||||
|
||||
|
@ -40,7 +41,7 @@ namespace low_level {
|
|||
explicit Server(boost::asio::io_service &io_service, const std::string &address, uint16_t port)
|
||||
: Server(io_service, endpoint(boost::asio::ip::address::from_string(address), port)) {}
|
||||
|
||||
void set_timeout(duration_type timeout) {
|
||||
void set_timeout(time_duration timeout) {
|
||||
_server.set_timeout(timeout);
|
||||
}
|
||||
|
||||
|
@ -52,7 +53,7 @@ namespace low_level {
|
|||
|
||||
underlying_server _server;
|
||||
|
||||
Dispatcher _dispatcher;
|
||||
detail::Dispatcher _dispatcher;
|
||||
};
|
||||
|
||||
} // namespace low_level
|
||||
|
|
|
@ -1,76 +0,0 @@
|
|||
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
|
||||
// de Barcelona (UAB).
|
||||
//
|
||||
// This work is licensed under the terms of the MIT license.
|
||||
// For a copy, see <https://opensource.org/licenses/MIT>.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "carla/streaming/low_level/tcp/ServerSession.h"
|
||||
|
||||
#include <boost/asio/io_service.hpp>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
|
||||
#include <atomic>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
namespace carla {
|
||||
namespace streaming {
|
||||
namespace low_level {
|
||||
namespace tcp {
|
||||
|
||||
class Server : private boost::noncopyable {
|
||||
public:
|
||||
|
||||
using endpoint = boost::asio::ip::tcp::endpoint;
|
||||
using protocol_type = endpoint::protocol_type;
|
||||
using duration_type = ServerSession::duration_type;
|
||||
|
||||
explicit Server(boost::asio::io_service &io_service, endpoint ep)
|
||||
: _acceptor(io_service, std::move(ep)),
|
||||
_timeout(duration_type::seconds(10u)) {}
|
||||
|
||||
/// Set session time-out. Applies only to newly created sessions.
|
||||
void set_timeout(duration_type timeout) {
|
||||
_timeout = timeout;
|
||||
}
|
||||
|
||||
template <typename Functor>
|
||||
void Listen(Functor callback) {
|
||||
log_info("starting streaming server at port", _acceptor.local_endpoint().port());
|
||||
_acceptor.get_io_service().post([=]() { OpenSession(_timeout, callback); });
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
template <typename Functor>
|
||||
void OpenSession(duration_type timeout, Functor callback) {
|
||||
using boost::system::error_code;
|
||||
|
||||
auto session = std::make_shared<ServerSession>(_acceptor.get_io_service(), timeout);
|
||||
|
||||
auto handle_query = [=](const error_code &ec) {
|
||||
if (!ec) {
|
||||
session->Open(callback);
|
||||
} else {
|
||||
log_error("tcp accept error:", ec.message());
|
||||
}
|
||||
};
|
||||
|
||||
_acceptor.async_accept(session->_socket, [=](error_code ec) {
|
||||
// Handle query and open a new session immediately.
|
||||
_acceptor.get_io_service().post([=]() { handle_query(ec); });
|
||||
OpenSession(timeout, callback);
|
||||
});
|
||||
}
|
||||
|
||||
boost::asio::ip::tcp::acceptor _acceptor;
|
||||
|
||||
std::atomic<timeout_type> _timeout;
|
||||
};
|
||||
|
||||
} // namespace tcp
|
||||
} // namespace low_level
|
||||
} // namespace streaming
|
||||
} // namespace carla
|
|
@ -1,173 +0,0 @@
|
|||
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
|
||||
// de Barcelona (UAB).
|
||||
//
|
||||
// This work is licensed under the terms of the MIT license.
|
||||
// For a copy, see <https://opensource.org/licenses/MIT>.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "carla/Debug.h"
|
||||
#include "carla/Logging.h"
|
||||
#include "carla/streaming/Message.h"
|
||||
#include "carla/streaming/low_level/Types.h"
|
||||
#include "carla/streaming/low_level/tcp/Timeout.h"
|
||||
|
||||
#include <boost/asio/deadline_timer.hpp>
|
||||
#include <boost/asio/io_service.hpp>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
#include <boost/asio/read.hpp>
|
||||
#include <boost/asio/strand.hpp>
|
||||
#include <boost/asio/write.hpp>
|
||||
|
||||
#include <array>
|
||||
#include <memory>
|
||||
|
||||
namespace carla {
|
||||
namespace streaming {
|
||||
namespace low_level {
|
||||
namespace tcp {
|
||||
|
||||
namespace detail {
|
||||
|
||||
static std::atomic_size_t SESSION_COUNTER{0u};
|
||||
|
||||
} // namespace detail
|
||||
|
||||
/// A TCP server session. When a session opens, it reads from the socket a
|
||||
/// stream id object and passes itself to the callback functor. The session
|
||||
/// closes itself after @a timeout of inactivity is met.
|
||||
class ServerSession
|
||||
: public std::enable_shared_from_this<ServerSession>,
|
||||
private boost::noncopyable {
|
||||
public:
|
||||
|
||||
using socket_type = boost::asio::ip::tcp::socket;
|
||||
using duration_type = timeout_type;
|
||||
|
||||
explicit ServerSession(boost::asio::io_service &io_service, duration_type timeout)
|
||||
: _session_id(detail::SESSION_COUNTER++),
|
||||
_socket(io_service),
|
||||
_timeout(timeout),
|
||||
_deadline(io_service),
|
||||
_strand(io_service) {}
|
||||
|
||||
~ServerSession() {
|
||||
_deadline.cancel();
|
||||
}
|
||||
|
||||
/// Starts the session and calls @a callback after successfully reading the
|
||||
/// stream id.
|
||||
///
|
||||
/// @pre Callback function signature:
|
||||
/// `void(std::shared_ptr<ServerSession>)`.
|
||||
template <typename Functor>
|
||||
void Open(Functor callback) {
|
||||
StartTimer();
|
||||
auto self = shared_from_this(); // To keep myself alive.
|
||||
_strand.post([=]() {
|
||||
|
||||
auto handle_query = [this, self, callback](
|
||||
const boost::system::error_code &ec,
|
||||
size_t DEBUG_ONLY(bytes_received)) {
|
||||
DEBUG_ASSERT_EQ(bytes_received, sizeof(_stream_id));
|
||||
if (!ec) {
|
||||
log_debug("session", _session_id, "for stream", _stream_id, " started");
|
||||
_socket.get_io_service().post([=]() { callback(self); });
|
||||
} else {
|
||||
log_error("session", _session_id, ": error retrieving stream id :", ec.message());
|
||||
Close();
|
||||
}
|
||||
};
|
||||
|
||||
// Read the stream id.
|
||||
_deadline.expires_from_now(_timeout);
|
||||
boost::asio::async_read(
|
||||
_socket,
|
||||
boost::asio::buffer(&_stream_id, sizeof(_stream_id)),
|
||||
_strand.wrap(handle_query));
|
||||
});
|
||||
}
|
||||
|
||||
stream_id_type get_stream_id() const {
|
||||
// Note that the stream id isn't synchronized. This function should only be
|
||||
// called from the @a callback function, and after that point the stream_id
|
||||
// can't change.
|
||||
return _stream_id;
|
||||
}
|
||||
|
||||
/// Writes some data to the socket.
|
||||
void Write(std::shared_ptr<const Message> message) {
|
||||
auto self = shared_from_this();
|
||||
_strand.post([=]() {
|
||||
|
||||
/// @todo has to be a better way of doing this...
|
||||
if (_is_writing) {
|
||||
// Repost and return;
|
||||
Write(std::move(message));
|
||||
return;
|
||||
}
|
||||
_is_writing = true;
|
||||
|
||||
auto handle_sent = [this, self, message](const boost::system::error_code &ec, size_t DEBUG_ONLY(bytes)) {
|
||||
_is_writing = false;
|
||||
if (ec) {
|
||||
log_error("session", _session_id, ": error sending data :", ec.message());
|
||||
} else {
|
||||
DEBUG_ONLY(log_debug("session", _session_id, ": successfully sent", bytes, "bytes"));
|
||||
DEBUG_ASSERT_EQ(bytes, sizeof(message_size_type) + message->size());
|
||||
}
|
||||
};
|
||||
|
||||
log_debug("session", _session_id, ": sending message of", message->size(), "bytes");
|
||||
|
||||
_deadline.expires_from_now(_timeout);
|
||||
boost::asio::async_write(
|
||||
_socket,
|
||||
message->encode(),
|
||||
_strand.wrap(handle_sent));
|
||||
});
|
||||
}
|
||||
|
||||
void Close() {
|
||||
_strand.post([this, self = shared_from_this()]() {
|
||||
if (_socket.is_open()) {
|
||||
_socket.close();
|
||||
}
|
||||
log_debug("session", _session_id, "closed");
|
||||
});
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
void StartTimer() {
|
||||
if (_deadline.expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
|
||||
log_debug("session", _session_id, "timed out");
|
||||
Close();
|
||||
} else {
|
||||
_deadline.async_wait([self = shared_from_this()](boost::system::error_code) {
|
||||
self->StartTimer();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
friend class Server;
|
||||
|
||||
const size_t _session_id;
|
||||
|
||||
stream_id_type _stream_id = 0u;
|
||||
|
||||
socket_type _socket;
|
||||
|
||||
duration_type _timeout;
|
||||
|
||||
boost::asio::deadline_timer _deadline;
|
||||
|
||||
boost::asio::io_service::strand _strand;
|
||||
|
||||
bool _is_writing = false;
|
||||
};
|
||||
|
||||
} // namespace tcp
|
||||
} // namespace low_level
|
||||
} // namespace streaming
|
||||
} // namespace carla
|
|
@ -4,6 +4,12 @@
|
|||
// This work is licensed under the terms of the MIT license.
|
||||
// For a copy, see <https://opensource.org/licenses/MIT>.
|
||||
|
||||
#ifndef LIBCARLA_INCLUDED_DISABLE_UE4_MACROS_HEADER
|
||||
# define LIBCARLA_INCLUDED_DISABLE_UE4_MACROS_HEADER
|
||||
#else
|
||||
# error disable-ue4-macros.h should only be included once!
|
||||
#endif // LIBCARLA_INCLUDED_DISABLE_UE4_MACROS_HEADER
|
||||
|
||||
#pragma push_macro("check")
|
||||
#undef check
|
||||
|
||||
|
@ -13,16 +19,14 @@
|
|||
# define BOOST_ERROR_CODE_HEADER_ONLY
|
||||
#endif // BOOST_ERROR_CODE_HEADER_ONLY
|
||||
|
||||
#ifndef BOOST_COROUTINES_NO_DEPRECATION_WARNING
|
||||
# define BOOST_COROUTINES_NO_DEPRECATION_WARNING
|
||||
#endif // BOOST_COROUTINES_NO_DEPRECATION_WARNING
|
||||
|
||||
#ifndef BOOST_NO_EXCEPTIONS
|
||||
# define BOOST_NO_EXCEPTIONS
|
||||
#endif // BOOST_NO_EXCEPTIONS
|
||||
|
||||
namespace boost {
|
||||
|
||||
static inline void throw_exception(const std::exception &) {}
|
||||
static inline void throw_exception(const std::exception &e) {
|
||||
UE_LOG(LogCarla, Fatal, TEXT("Exception thronw on Boost libraries: %s"), UTF8_TO_TCHAR(e.what()));
|
||||
}
|
||||
|
||||
} // namespace boost
|
||||
|
|
|
@ -31,7 +31,7 @@ public:
|
|||
void AddStream() {
|
||||
Stream stream = _server.MakeStream();
|
||||
|
||||
_client.Subscribe(stream.token(), [this](std::shared_ptr<Message> msg) {
|
||||
_client.Subscribe(stream.token(), [this](std::shared_ptr<Message> DEBUG_ONLY(msg)) {
|
||||
DEBUG_ASSERT_EQ(msg->size(), _message.size());
|
||||
DEBUG_ASSERT(*msg == _message);
|
||||
_client_callback.post([this]() {
|
||||
|
|
|
@ -39,7 +39,7 @@ TEST(rpc, server_bind_sync_run_on_game_thread) {
|
|||
carla::ThreadGroup threads;
|
||||
threads.CreateThread([&]() {
|
||||
Client client("localhost", TESTING_PORT);
|
||||
for (auto i = 0u; i < 300u; ++i) {
|
||||
for (auto i = 0; i < 300; ++i) {
|
||||
auto result = client.call("do_the_thing", i, 1).as<int>();
|
||||
EXPECT_EQ(result, i + 1);
|
||||
}
|
||||
|
|
|
@ -9,13 +9,14 @@
|
|||
#include <carla/ThreadGroup.h>
|
||||
#include <carla/streaming/low_level/Client.h>
|
||||
#include <carla/streaming/low_level/Server.h>
|
||||
#include <carla/streaming/low_level/tcp/Client.h>
|
||||
#include <carla/streaming/low_level/tcp/Server.h>
|
||||
#include <carla/streaming/detail/tcp/Client.h>
|
||||
#include <carla/streaming/detail/tcp/Server.h>
|
||||
|
||||
#include <atomic>
|
||||
|
||||
TEST(streaming_low_level, sending_strings) {
|
||||
using namespace util::message;
|
||||
using namespace carla::streaming::detail;
|
||||
using namespace carla::streaming::low_level;
|
||||
|
||||
constexpr auto number_of_messages = 5'000u;
|
||||
|
|
|
@ -7,15 +7,14 @@
|
|||
#include "test.h"
|
||||
|
||||
#include <carla/ThreadGroup.h>
|
||||
#include <carla/streaming/low_level/tcp/Client.h>
|
||||
#include <carla/streaming/low_level/tcp/Server.h>
|
||||
#include <carla/streaming/detail/tcp/Client.h>
|
||||
#include <carla/streaming/detail/tcp/Server.h>
|
||||
|
||||
#include <atomic>
|
||||
|
||||
TEST(streaming_low_level_tcp, small_message) {
|
||||
TEST(streaming_detail_tcp, small_message) {
|
||||
using namespace util::message;
|
||||
using namespace carla::streaming::low_level;
|
||||
using shared_session = std::shared_ptr<tcp::ServerSession>;
|
||||
using namespace carla::streaming::detail;
|
||||
|
||||
boost::asio::io_service io_service;
|
||||
tcp::Server::endpoint ep(boost::asio::ip::tcp::v4(), TESTING_PORT);
|
||||
|
|
|
@ -101,6 +101,14 @@ public class Carla : ModuleRules
|
|||
{
|
||||
PublicAdditionalLibraries.Add(Path.Combine(LibCarlaInstallPath, "lib", GetLibName("c++abi")));
|
||||
PublicAdditionalLibraries.Add(Path.Combine(LibCarlaInstallPath, "lib", GetLibName("rpc")));
|
||||
if (UseDebugLibs(Target))
|
||||
{
|
||||
PublicAdditionalLibraries.Add(Path.Combine(LibCarlaInstallPath, "lib", GetLibName("carla_server_debug")));
|
||||
}
|
||||
else
|
||||
{
|
||||
PublicAdditionalLibraries.Add(Path.Combine(LibCarlaInstallPath, "lib", GetLibName("carla_server")));
|
||||
}
|
||||
}
|
||||
|
||||
// Include path.
|
||||
|
|
|
@ -209,7 +209,9 @@ set(CMAKE_C_COMPILER ${CC})
|
|||
set(CMAKE_CXX_COMPILER ${CXX})
|
||||
|
||||
set(CMAKE_CXX_FLAGS "\${CMAKE_CXX_FLAGS} -std=c++17 -pthread -fPIC" CACHE STRING "" FORCE)
|
||||
# set(CMAKE_CXX_FLAGS "\${CMAKE_CXX_FLAGS} -Werror -Wall -Wextra" CACHE STRING "" FORCE)
|
||||
set(CMAKE_CXX_FLAGS "\${CMAKE_CXX_FLAGS} -Werror -Wall -Wextra" CACHE STRING "" FORCE)
|
||||
# See https://bugs.llvm.org/show_bug.cgi?id=21629
|
||||
set(CMAKE_CXX_FLAGS "\${CMAKE_CXX_FLAGS} -Wno-missing-braces" CACHE STRING "" FORCE)
|
||||
EOL
|
||||
|
||||
# -- LIBCPP_TOOLCHAIN_FILE -----------------------------------------------------
|
||||
|
@ -232,7 +234,6 @@ cat >${CMAKE_CONFIG_FILE}.gen <<EOL
|
|||
|
||||
set(CARLA_VERSION $(get_carla_version))
|
||||
|
||||
add_definitions(-DBOOST_COROUTINES_NO_DEPRECATION_WARNING)
|
||||
add_definitions(-DBOOST_ERROR_CODE_HEADER_ONLY)
|
||||
|
||||
set(BOOST_INCLUDE_PATH "${BOOST_INCLUDE}")
|
||||
|
|
Loading…
Reference in New Issue