Add MultiStream for streaming simultaneously to multiple clients

This commit is contained in:
nsubiron 2018-10-11 00:19:47 +02:00
parent 75cec0b615
commit 0f38aff8d5
14 changed files with 358 additions and 133 deletions

View File

@ -0,0 +1,37 @@
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#pragma once
#include "carla/NonCopyable.h"
#include <memory>
namespace carla {
/// A very simple atomic shared ptr with relaxed memory order.
template <typename T>
class AtomicSharedPtr : private NonCopyable {
public:
void store(std::shared_ptr<T> ptr) {
std::atomic_store_explicit(&_ptr, ptr, std::memory_order_relaxed);
}
std::shared_ptr<T> load() const {
return std::atomic_load_explicit(&_ptr, std::memory_order_relaxed);
}
void operator=(std::shared_ptr<T> ptr) {
store(std::move(ptr));
}
private:
std::shared_ptr<T> _ptr;
};
} // namespace carla

View File

@ -48,6 +48,10 @@ namespace streaming {
return _server.MakeStream();
}
MultiStream MakeMultiStream() {
return _server.MakeMultiStream();
}
void Run() {
_service.Run();
}

View File

@ -6,79 +6,26 @@
#pragma once
#include "carla/Buffer.h"
#include "carla/Debug.h"
#include "carla/streaming/Token.h"
#include "carla/streaming/detail/MultiStreamState.h"
#include "carla/streaming/detail/Stream.h"
#include "carla/streaming/detail/StreamState.h"
#include <memory>
namespace carla {
namespace streaming {
namespace detail {
class Dispatcher;
} // namespace detail
/// A stream represents an unidirectional channel for sending data from server
/// to client. A **single** client can subscribe to this stream using the
/// stream token. If no client is subscribed, the data flushed down the stream
/// is discarded.
using Stream = detail::Stream<detail::StreamState>;
/// A stream represents an unidirectional channel for sending data from server
/// to client. A (single) client can subscribe to this stream using the stream
/// to client. Multiple clients can subscribe to this stream using the stream
/// token. If no client is subscribed, the data flushed down the stream is
/// discarded.
class Stream {
public:
Stream() = delete;
Stream(const Stream &) = default;
Stream(Stream &&) = default;
Stream &operator=(const Stream &) = default;
Stream &operator=(Stream &&) = default;
/// Token associated with this stream. This token can be used by a client to
/// subscribe to this stream.
Token token() const {
return _shared_state->token();
}
/// Pull a buffer from the buffer pool associated to this stream. Discarded
/// buffers are re-used to avoid memory allocations.
///
/// @note Re-using buffers is optimized for the use case in which all the
/// messages sent through the stream are big and have (approximately) the
/// same size.
Buffer MakeBuffer() {
return _shared_state->MakeBuffer();
}
/// Flush @a buffers down the stream. No copies are made.
template <typename... Buffers>
void Write(Buffers... buffers) {
_shared_state->Write(std::move(buffers)...);
}
/// Make a copy of @a data and flush it down the stream.
template <typename T>
Stream &operator<<(const T &data) {
auto buffer = MakeBuffer();
buffer.copy_from(data);
Write(std::move(buffer));
return *this;
}
private:
friend class detail::Dispatcher;
Stream(std::shared_ptr<detail::StreamState> state)
: _shared_state(std::move(state)) {
DEBUG_ASSERT(_shared_state != nullptr);
}
std::shared_ptr<detail::StreamState> _shared_state;
};
///
/// @warning MultiStream is quite slower than Stream.
using MultiStream = detail::Stream<detail::MultiStreamState>;
} // namespace streaming
} // namespace carla

View File

@ -7,6 +7,8 @@
#include "carla/streaming/detail/Dispatcher.h"
#include "carla/Logging.h"
#include "carla/streaming/detail/MultiStreamState.h"
#include "carla/streaming/detail/StreamState.h"
#include <exception>
@ -14,24 +16,35 @@ namespace carla {
namespace streaming {
namespace detail {
template <typename StreamStateT, typename StreamMapT>
static auto MakeStreamState(const token_type &cached_token, StreamMapT &stream_map) {
auto ptr = std::make_shared<StreamStateT>(cached_token);
auto result = stream_map.emplace(std::make_pair(cached_token.get_stream_id(), ptr));
if (!result.second) {
throw std::runtime_error("failed to create stream!");
}
return ptr;
}
Dispatcher::~Dispatcher() {
// Disconnect all the sessions from their streams, this should kill any
// session remaining since at this point the io_service should be already
// stopped.
for (auto &pair : _stream_map) {
pair.second->set_session(nullptr);
pair.second->ClearSessions();
}
}
Stream Dispatcher::MakeStream() {
carla::streaming::Stream Dispatcher::MakeStream() {
std::lock_guard<std::mutex> lock(_mutex);
++_cached_token._token.stream_id; // id zero only happens in overflow.
auto ptr = std::make_shared<StreamState>(_cached_token);
auto result = _stream_map.emplace(std::make_pair(_cached_token.get_stream_id(), ptr));
if (!result.second) {
throw std::runtime_error("failed to create stream!");
}
return ptr;
return MakeStreamState<StreamState>(_cached_token, _stream_map);
}
carla::streaming::MultiStream Dispatcher::MakeMultiStream() {
std::lock_guard<std::mutex> lock(_mutex);
++_cached_token._token.stream_id; // id zero only happens in overflow.
return MakeStreamState<MultiStreamState>(_cached_token, _stream_map);
}
bool Dispatcher::RegisterSession(std::shared_ptr<Session> session) {
@ -40,7 +53,7 @@ namespace detail {
auto search = _stream_map.find(session->get_stream_id());
if (search != _stream_map.end()) {
DEBUG_ASSERT(search->second != nullptr);
search->second->set_session(std::move(session));
search->second->ConnectSession(std::move(session));
return true;
} else {
log_error("Invalid session: no stream available with id", session->get_stream_id());
@ -54,7 +67,7 @@ namespace detail {
auto search = _stream_map.find(session->get_stream_id());
if (search != _stream_map.end()) {
DEBUG_ASSERT(search->second != nullptr);
search->second->set_session(nullptr);
search->second->DisconnectSession(session);
}
}

View File

@ -9,7 +9,6 @@
#include "carla/streaming/EndPoint.h"
#include "carla/streaming/Stream.h"
#include "carla/streaming/detail/Session.h"
#include "carla/streaming/detail/StreamState.h"
#include "carla/streaming/detail/Token.h"
#include <memory>
@ -20,6 +19,8 @@ namespace carla {
namespace streaming {
namespace detail {
class StreamStateBase;
/// Keeps the mapping between streams and sessions.
class Dispatcher {
public:
@ -30,7 +31,9 @@ namespace detail {
~Dispatcher();
Stream MakeStream();
carla::streaming::Stream MakeStream();
carla::streaming::MultiStream MakeMultiStream();
bool RegisterSession(std::shared_ptr<Session> session);
@ -48,7 +51,7 @@ namespace detail {
/// them alive the whole run.
std::unordered_map<
stream_id_type,
std::shared_ptr<StreamState>> _stream_map;
std::shared_ptr<StreamStateBase>> _stream_map;
};
} // namespace detail

View File

@ -0,0 +1,66 @@
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#pragma once
#include "carla/AtomicSharedPtr.h"
#include "carla/streaming/detail/StreamStateBase.h"
#include <mutex>
#include <vector>
namespace carla {
namespace streaming {
namespace detail {
/// A stream state that can hold any number of sessions.
///
/// @todo Lacking some optimization.
class MultiStreamState final : public StreamStateBase {
public:
using StreamStateBase::StreamStateBase;
template <typename... Buffers>
void Write(Buffers... buffers) {
auto message = Session::MakeMessage(std::move(buffers)...);
std::lock_guard<std::mutex> lock(_mutex);
for (auto &session : _sessions) {
if (session != nullptr) {
session->Write(message);
}
}
}
private:
void ConnectSession(std::shared_ptr<Session> session) final {
DEBUG_ASSERT(session != nullptr);
std::lock_guard<std::mutex> lock(_mutex);
_sessions.emplace_back(std::move(session));
}
void DisconnectSession(std::shared_ptr<Session> session) final {
std::lock_guard<std::mutex> lock(_mutex);
DEBUG_ASSERT(session != nullptr);
_sessions.erase(
std::remove(_sessions.begin(), _sessions.end(), session),
_sessions.end());
}
void ClearSessions() final {
std::lock_guard<std::mutex> lock(_mutex);
_sessions.clear();
}
std::mutex _mutex;
std::vector<std::shared_ptr<Session>> _sessions;
};
} // namespace detail
} // namespace streaming
} // namespace carla

View File

@ -0,0 +1,78 @@
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#pragma once
#include "carla/Buffer.h"
#include "carla/Debug.h"
#include "carla/streaming/Token.h"
#include <memory>
namespace carla {
namespace streaming {
namespace detail {
class Dispatcher;
template <typename StreamStateT>
class Stream {
public:
Stream() = delete;
Stream(const Stream &) = default;
Stream(Stream &&) = default;
Stream &operator=(const Stream &) = default;
Stream &operator=(Stream &&) = default;
/// Token associated with this stream. This token can be used by a client to
/// subscribe to this stream.
Token token() const {
return _shared_state->token();
}
/// Pull a buffer from the buffer pool associated to this stream. Discarded
/// buffers are re-used to avoid memory allocations.
///
/// @note Re-using buffers is optimized for the use case in which all the
/// messages sent through the stream are big and have (approximately) the
/// same size.
Buffer MakeBuffer() {
return _shared_state->MakeBuffer();
}
/// Flush @a buffers down the stream. No copies are made.
template <typename... Buffers>
void Write(Buffers... buffers) {
_shared_state->Write(std::move(buffers)...);
}
/// Make a copy of @a data and flush it down the stream.
template <typename T>
Stream &operator<<(const T &data) {
auto buffer = MakeBuffer();
buffer.copy_from(data);
Write(std::move(buffer));
return *this;
}
private:
friend class detail::Dispatcher;
Stream(std::shared_ptr<StreamStateT> state)
: _shared_state(std::move(state)) {
DEBUG_ASSERT(_shared_state != nullptr);
}
std::shared_ptr<StreamStateT> _shared_state;
};
} // namespace detail
} // namespace streaming
} // namespace carla

View File

@ -6,57 +6,22 @@
#pragma once
#include "carla/NonCopyable.h"
#include "carla/streaming/detail/Session.h"
#include "carla/streaming/detail/Token.h"
#include <atomic>
#include <memory>
#include "carla/AtomicSharedPtr.h"
#include "carla/streaming/detail/StreamStateBase.h"
namespace carla {
class BufferPool;
namespace streaming {
namespace detail {
/// Handles the synchronization of the shared session.
class SessionHolder : private NonCopyable {
/// A stream state that can hold only a single session.
class StreamState final : public StreamStateBase {
public:
void set_session(std::shared_ptr<Session> session) {
std::atomic_store_explicit(&_session, session, std::memory_order_relaxed);
}
protected:
std::shared_ptr<Session> get_session() const {
return std::atomic_load_explicit(&_session, std::memory_order_relaxed);
}
private:
std::shared_ptr<Session> _session;
};
/// Shared state among all the copies of a stream. Provides access to the
/// underlying server session if active.
class StreamState : public SessionHolder {
public:
explicit StreamState(const token_type &token);
~StreamState();
const token_type &token() const {
return _token;
}
Buffer MakeBuffer();
using StreamStateBase::StreamStateBase;
template <typename... Buffers>
void Write(Buffers... buffers) {
auto session = get_session();
auto session = _session.load();
if (session != nullptr) {
session->Write(std::move(buffers)...);
}
@ -64,9 +29,21 @@ namespace detail {
private:
const token_type _token;
void ConnectSession(std::shared_ptr<Session> session) final {
DEBUG_ASSERT(session != nullptr);
_session = std::move(session);
}
const std::shared_ptr<BufferPool> _buffer_pool;
void DisconnectSession(std::shared_ptr<Session> DEBUG_ONLY(session)) final {
DEBUG_ASSERT(session == _session.load());
_session = nullptr;
}
void ClearSessions() final {
_session = nullptr;
}
AtomicSharedPtr<Session> _session;
};
} // namespace detail

View File

@ -4,7 +4,7 @@
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#include "carla/streaming/detail/StreamState.h"
#include "carla/streaming/detail/StreamStateBase.h"
#include "carla/BufferPool.h"
@ -12,13 +12,13 @@ namespace carla {
namespace streaming {
namespace detail {
StreamState::StreamState(const token_type &token)
StreamStateBase::StreamStateBase(const token_type &token)
: _token(token),
_buffer_pool(std::make_shared<BufferPool>()) {}
StreamState::~StreamState() = default;
StreamStateBase::~StreamStateBase() = default;
Buffer StreamState::MakeBuffer() {
Buffer StreamStateBase::MakeBuffer() {
return _buffer_pool->Pop();
}

View File

@ -0,0 +1,52 @@
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#pragma once
#include "carla/NonCopyable.h"
#include "carla/streaming/detail/Session.h"
#include "carla/streaming/detail/Token.h"
#include <memory>
namespace carla {
class BufferPool;
namespace streaming {
namespace detail {
/// Shared state among all the copies of a stream. Provides access to the
/// underlying server session(s) if active.
class StreamStateBase : private NonCopyable {
public:
explicit StreamStateBase(const token_type &token);
virtual ~StreamStateBase();
const token_type &token() const {
return _token;
}
Buffer MakeBuffer();
virtual void ConnectSession(std::shared_ptr<Session> session) = 0;
virtual void DisconnectSession(std::shared_ptr<Session> session) = 0;
virtual void ClearSessions() = 0;
private:
const token_type _token;
const std::shared_ptr<BufferPool> _buffer_pool;
};
} // namespace detail
} // namespace streaming
} // namespace carla

View File

@ -63,10 +63,6 @@ namespace tcp {
});
}
void ServerSession::Close() {
_strand.post([self=shared_from_this()]() { self->CloseNow(); });
}
void ServerSession::Write(std::shared_ptr<const Message> message) {
DEBUG_ASSERT(message != nullptr);
DEBUG_ASSERT(!message->empty());
@ -102,6 +98,10 @@ namespace tcp {
});
}
void ServerSession::Close() {
_strand.post([self=shared_from_this()]() { self->CloseNow(); });
}
void ServerSession::StartTimer() {
if (_deadline.expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
log_debug("session", _session_id, "timed out");

View File

@ -52,13 +52,21 @@ namespace tcp {
return _stream_id;
}
/// Writes some data to the socket.
template <typename... Buffers>
void Write(Buffers... buffers) {
static auto MakeMessage(Buffers... buffers) {
static_assert(
are_same<Buffer, Buffers...>::value,
"This function only accepts arguments of type Buffer.");
Write(std::make_shared<const Message>(std::move(buffers)...));
return std::make_shared<const Message>(std::move(buffers)...);
}
/// Writes some data to the socket.
void Write(std::shared_ptr<const Message> message);
/// Writes some data to the socket.
template <typename... Buffers>
void Write(Buffers... buffers) {
Write(MakeMessage(std::move(buffers)...));
}
/// Post a job to close the session.
@ -66,8 +74,6 @@ namespace tcp {
private:
void Write(std::shared_ptr<const Message> message);
void StartTimer();
void CloseNow();

View File

@ -65,6 +65,10 @@ namespace low_level {
return _dispatcher.MakeStream();
}
MultiStream MakeMultiStream() {
return _dispatcher.MakeMultiStream();
}
private:
underlying_server _server;

View File

@ -214,3 +214,41 @@ TEST(streaming, stream_outlives_server) {
std::this_thread::sleep_for(20ms);
done = true;
} // stream dies here.
TEST(streaming, multi_stream) {
using namespace carla::streaming;
using namespace util::buffer;
constexpr size_t number_of_messages = 100u;
constexpr size_t number_of_clients = 6u;
constexpr size_t iterations = 20u;
const std::string message = "Hi y'all!";
Server srv(TESTING_PORT);
srv.AsyncRun(number_of_clients);
auto stream = srv.MakeMultiStream();
for (auto i = 0u; i < iterations; ++i) {
std::vector<std::pair<std::atomic_size_t, std::unique_ptr<Client>>> v(number_of_clients);
for (auto &pair : v) {
pair.first = 0u;
pair.second = std::make_unique<Client>();
pair.second->AsyncRun(1u);
pair.second->Subscribe(stream.token(), [&](auto buffer) {
const std::string result = as_string(buffer);
ASSERT_EQ(result, message);
++pair.first;
});
}
for (auto i = 0u; i < number_of_messages; ++i) {
std::this_thread::sleep_for(4ms);
stream << message;
}
std::this_thread::sleep_for(4ms);
for (auto &pair : v) {
ASSERT_EQ(pair.first, number_of_messages);
}
}
}