User buffer as much as possible to avoid a couple of dynamic allocations

This commit is contained in:
nsubiron 2018-09-12 15:02:27 +02:00
parent 930a3cf9bc
commit a869424185
18 changed files with 242 additions and 59 deletions

View File

@ -228,11 +228,6 @@ namespace carla {
return {data(), size()};
}
std::array<boost::asio::const_buffer, 2u> encode() const {
DEBUG_ASSERT(!empty());
return {boost::asio::buffer(&_size, sizeof(_size)), buffer()};
}
// =========================================================================
// -- Private members ------------------------------------------------------
// =========================================================================

View File

@ -0,0 +1,64 @@
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#pragma once
#include <type_traits>
#include <iterator>
namespace carla {
template<typename IT>
class ListView {
public:
using iterator = IT;
using const_iterator = typename std::add_const<IT>::type;
using difference_type = typename std::iterator_traits<iterator>::difference_type;
using value_type = typename std::iterator_traits<iterator>::value_type;
using pointer = typename std::iterator_traits<iterator>::pointer;
using reference = typename std::iterator_traits<iterator>::reference;
explicit ListView(iterator begin, iterator end)
: _begin(begin), _end(end) {}
template <typename STL_CONTAINER>
explicit ListView(STL_CONTAINER &container)
: _begin(iterator(container.begin())),
_end(iterator(container.end())) {}
ListView(const ListView &) = default;
ListView &operator=(const ListView &) = delete;
iterator begin() const {
return _begin;
}
iterator end() const {
return _end;
}
bool empty() const {
return _begin == _end;
}
difference_type size() const {
return std::distance(_begin, _end);
}
private:
const iterator _begin;
const iterator _end;
};
template <typename T>
static inline auto MakeListView(T begin, T end) {
return ListView<T>(begin, end);
}
} // namespace carla

View File

@ -0,0 +1,26 @@
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#pragma once
#include <type_traits>
namespace carla {
template <typename... Ts>
struct are_same;
template <typename T0, typename T1, typename... Ts>
struct are_same<T0, T1, Ts...> {
static constexpr bool value = std::is_same<T0, T1>::value && are_same<T0, Ts...>::value;
};
template <typename T0, typename T1>
struct are_same<T0, T1> {
static constexpr bool value = std::is_same<T0, T1>::value;
};
} // namespace carla

View File

@ -33,7 +33,8 @@ namespace client {
}
}
SharedPtr<Image> Image::FromBuffer(boost::asio::const_buffer buffer) {
SharedPtr<Image> Image::FromBuffer(Buffer buffer) {
/// @todo We can avoid making another copy of the buffer here.
if (buffer.size() < sizeof(FImageHeaderData)) {
throw std::invalid_argument("buffer too small to be an image");
}

View File

@ -6,11 +6,10 @@
#pragma once
#include "carla/Buffer.h"
#include "carla/NonCopyable.h"
#include "carla/client/Memory.h"
#include <boost/asio/buffer.hpp>
#include <memory>
namespace carla {
@ -23,7 +22,7 @@ namespace client {
using byte_type = unsigned char;
static SharedPtr<Image> FromBuffer(boost::asio::const_buffer buffer);
static SharedPtr<Image> FromBuffer(Buffer buffer);
Image();

View File

@ -6,13 +6,11 @@
#pragma once
#include "carla/Buffer.h"
#include "carla/Debug.h"
#include "carla/streaming/Message.h"
#include "carla/streaming/Token.h"
#include "carla/streaming/detail/StreamState.h"
#include <boost/asio/buffer.hpp>
#include <memory>
namespace carla {
@ -39,14 +37,16 @@ namespace detail {
return _shared_state->token();
}
template <typename ConstBufferSequence>
void Write(ConstBufferSequence buffer) {
_shared_state->Write(std::make_shared<Message>(buffer));
/// Flush @a buffers down the stream. No copies are made.
template <typename... Buffers>
void Write(Buffers... buffers) {
_shared_state->Write(std::move(buffers)...);
}
/// Make a copy of @a data and flush it down the stream.
template <typename T>
Stream &operator<<(const T &rhs) {
Write(boost::asio::buffer(rhs));
Stream &operator<<(const T &data) {
Write(Buffer(data));
return *this;
}

View File

@ -7,7 +7,6 @@
#pragma once
#include "carla/NonCopyable.h"
#include "carla/streaming/Message.h"
#include "carla/streaming/detail/Session.h"
#include "carla/streaming/detail/Token.h"
@ -54,10 +53,11 @@ namespace detail {
return _token;
}
void Write(std::shared_ptr<const Message> message) {
template <typename... Buffers>
void Write(Buffers... buffers) {
auto session = get_session();
if (session != nullptr) {
session->Write(message);
session->Write(std::move(buffers)...);
}
}

View File

@ -9,6 +9,7 @@
#include "carla/Buffer.h"
#include <cstdint>
#include <type_traits>
namespace carla {
namespace streaming {
@ -16,7 +17,11 @@ namespace detail {
using stream_id_type = uint32_t;
using message_size_type = Buffer::size_type;
using message_size_type = uint32_t;
static_assert(
std::is_same<message_size_type, Buffer::size_type>::value,
"uint type mismatch!");
} // namespace detail
} // namespace streaming

View File

@ -34,9 +34,9 @@ namespace tcp {
boost::asio::mutable_buffer body() {
DEBUG_ASSERT(_size > 0u);
DEBUG_ASSERT(_message == nullptr);
_message = std::make_shared<Message>(_size);
return _message->buffer();
DEBUG_ASSERT(_message.empty());
_message.reset(_size);
return _message.buffer();
}
auto size() const {
@ -51,7 +51,7 @@ namespace tcp {
message_size_type _size = 0u;
std::shared_ptr<Message> _message;
Buffer _message;
};
// ===========================================================================

View File

@ -6,8 +6,8 @@
#pragma once
#include "carla/Buffer.h"
#include "carla/NonCopyable.h"
#include "carla/streaming/Message.h"
#include "carla/streaming/detail/Token.h"
#include "carla/streaming/detail/Types.h"
@ -33,7 +33,7 @@ namespace tcp {
using endpoint = boost::asio::ip::tcp::endpoint;
using protocol_type = endpoint::protocol_type;
using callback_function_type = std::function<void (std::shared_ptr<Message>)>;
using callback_function_type = std::function<void (Buffer)>;
Client(
boost::asio::io_service &io_service,

View File

@ -0,0 +1,86 @@
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#pragma once
#include "carla/ListView.h"
#include "carla/Buffer.h"
#include "carla/Debug.h"
#include "carla/NonCopyable.h"
#include "carla/streaming/detail/Types.h"
#include <boost/asio/buffer.hpp>
#include <array>
#include <exception>
#include <limits>
#include <memory>
#include <string>
namespace carla {
namespace streaming {
namespace detail {
namespace tcp {
template <size_t MaxNumberOfBuffers>
class MessageTmpl
: public std::enable_shared_from_this<MessageTmpl<MaxNumberOfBuffers>>,
private NonCopyable {
public:
static constexpr size_t max_size() {
return MaxNumberOfBuffers;
}
private:
MessageTmpl(size_t) {}
template <typename... Buffers>
MessageTmpl(size_t size, Buffer buffer, Buffers... buffers)
: MessageTmpl(size, std::move(buffers)...) {
++_number_of_buffers;
_total_size += buffer.size();
_buffer_views[1u + size - _number_of_buffers] = buffer.cbuffer();
_buffers[size - _number_of_buffers] = std::move(buffer);
}
public:
template <typename... Buffers>
MessageTmpl(Buffer buf, Buffers... buffers)
: MessageTmpl(sizeof...(Buffers) + 1u, std::move(buf), std::move(buffers)...) {
static_assert(sizeof...(Buffers) < max_size(), "Too many buffers!");
_buffer_views[0u] = boost::asio::buffer(&_total_size, sizeof(_total_size));
}
/// Size in bytes of the message excluding the header.
auto size() const {
return _total_size;
}
auto GetBufferSequence() const {
auto begin = _buffer_views.begin();
return MakeListView(begin, begin + _number_of_buffers + 1u);
}
private:
message_size_type _number_of_buffers = 0u;
message_size_type _total_size = 0u;
std::array<Buffer, max_size()> _buffers;
std::array<boost::asio::const_buffer, max_size() + 1u> _buffer_views;
};
using Message = MessageTmpl<2u>;
} // namespace tcp
} // namespace detail
} // namespace streaming
} // namespace carla

View File

@ -61,6 +61,15 @@ namespace tcp {
});
}
void ServerSession::Close() {
_strand.post([this, self = shared_from_this()]() {
if (_socket.is_open()) {
_socket.close();
}
log_debug("session", _session_id, "closed");
});
}
void ServerSession::Write(std::shared_ptr<const Message> message) {
auto self = shared_from_this();
_strand.post([=]() {
@ -88,20 +97,11 @@ namespace tcp {
_deadline.expires_from_now(_timeout);
boost::asio::async_write(
_socket,
message->encode(),
message->GetBufferSequence(),
_strand.wrap(handle_sent));
});
}
void ServerSession::Close() {
_strand.post([this, self = shared_from_this()]() {
if (_socket.is_open()) {
_socket.close();
}
log_debug("session", _session_id, "closed");
});
}
void ServerSession::StartTimer() {
if (_deadline.expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
log_debug("session", _session_id, "timed out");

View File

@ -8,8 +8,9 @@
#include "carla/NonCopyable.h"
#include "carla/Time.h"
#include "carla/streaming/Message.h"
#include "carla/TypeTraits.h"
#include "carla/streaming/detail/Types.h"
#include "carla/streaming/detail/tcp/Message.h"
#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/io_service.hpp>
@ -50,13 +51,21 @@ namespace tcp {
}
/// Writes some data to the socket.
void Write(std::shared_ptr<const Message> message);
template <typename... Buffers>
void Write(Buffers... buffers) {
static_assert(
are_same<Buffer, Buffers...>::value,
"This function only accepts arguments of type Buffer.");
Write(std::make_shared<const Message>(std::move(buffers)...));
}
/// Posts a job to close this session.
void Close();
private:
void Write(std::shared_ptr<const Message> message);
void StartTimer();
friend class Server;

View File

@ -13,7 +13,7 @@ using namespace carla::streaming;
static auto make_special_message(size_t size) {
std::vector<uint32_t> v(size/sizeof(uint32_t), 42u);
Message msg(boost::asio::buffer(v));
carla::Buffer msg(v);
EXPECT_EQ(msg.size(), size);
return msg;
}
@ -31,9 +31,9 @@ public:
void AddStream() {
Stream stream = _server.MakeStream();
_client.Subscribe(stream.token(), [this](std::shared_ptr<Message> DEBUG_ONLY(msg)) {
DEBUG_ASSERT_EQ(msg->size(), _message.size());
DEBUG_ASSERT(*msg == _message);
_client.Subscribe(stream.token(), [this](carla::Buffer DEBUG_ONLY(msg)) {
DEBUG_ASSERT_EQ(msg.size(), _message.size());
DEBUG_ASSERT(msg == _message);
_client_callback.post([this]() {
CARLA_PROFILE_FPS(client, listen_callback);
++_number_of_messages_received;
@ -92,7 +92,7 @@ private:
Client _client;
const Message _message;
const carla::Buffer _message;
boost::asio::io_service _client_callback;

View File

@ -35,9 +35,8 @@ TEST(streaming_low_level, sending_strings) {
Client<tcp::Client> c;
c.Subscribe(io_service, stream.token(), [&](auto message) {
++message_count;
ASSERT_NE(message, nullptr);
ASSERT_EQ(message->size(), message_text.size());
const std::string msg = as_string(*message);
ASSERT_EQ(message.size(), message_text.size());
const std::string msg = as_string(message);
ASSERT_EQ(msg, message_text);
});

View File

@ -28,9 +28,9 @@ TEST(streaming_detail_tcp, small_message) {
srv.Listen([&](std::shared_ptr<tcp::ServerSession> session) {
ASSERT_EQ(session->get_stream_id(), 1u);
const std::string msg = "Hola!";
auto message = std::make_shared<Message>(boost::asio::buffer(msg));
auto message = carla::Buffer(msg);
while (!done) {
session->Write(message);
session->Write(std::move(message));
std::this_thread::sleep_for(1ns);
}
std::cout << "done!\n";
@ -38,11 +38,11 @@ TEST(streaming_detail_tcp, small_message) {
Dispatcher dispatcher{make_endpoint<tcp::Client::protocol_type>(ep)};
auto stream = dispatcher.MakeStream();
tcp::Client c(io_service, stream.token(), [&](std::shared_ptr<Message> message) {
tcp::Client c(io_service, stream.token(), [&](carla::Buffer message) {
++message_count;
ASSERT_NE(message, nullptr);
ASSERT_EQ(message->size(), 5u);
const std::string msg = util::buffer::as_string(*message);
ASSERT_FALSE(message.empty());
ASSERT_EQ(message.size(), 5u);
const std::string msg = util::buffer::as_string(message);
ASSERT_EQ(msg, std::string("Hola!"));
});

View File

@ -113,7 +113,7 @@ void export_actor() {
self.Listen([callback](auto message) {
cc::SharedPtr<cc::Image> image;
try {
image = cc::Image::FromBuffer(message->buffer());
image = cc::Image::FromBuffer(std::move(message));
} catch (const std::exception &e) {
std::cerr << "exception while parsing the image: " << e.what() << std::endl;
return;

View File

@ -77,12 +77,11 @@ public:
void Write(const FSensorDataView &SensorData) final {
auto MakeBuffer = [](FReadOnlyBufferView View) {
return boost::asio::buffer(View.GetData(), View.GetSize());
return carla::Buffer(boost::asio::buffer(View.GetData(), View.GetSize()));
};
std::array<boost::asio::const_buffer, 2u> SequencedBuffer;
SequencedBuffer[0u] = MakeBuffer(SensorData.GetHeader());
SequencedBuffer[1u] = MakeBuffer(SensorData.GetData());
TheStream.Write(SequencedBuffer);
TheStream.Write(
MakeBuffer(SensorData.GetHeader()),
MakeBuffer(SensorData.GetData()));
}
private: