From a86942418596fbb7f3119551bc35674aa2aaf5b9 Mon Sep 17 00:00:00 2001 From: nsubiron Date: Wed, 12 Sep 2018 15:02:27 +0200 Subject: [PATCH] User buffer as much as possible to avoid a couple of dynamic allocations --- LibCarla/source/carla/Buffer.h | 5 -- LibCarla/source/carla/ListView.h | 64 ++++++++++++++ LibCarla/source/carla/TypeTraits.h | 26 ++++++ LibCarla/source/carla/client/Image.cpp | 3 +- LibCarla/source/carla/client/Image.h | 5 +- LibCarla/source/carla/streaming/Stream.h | 16 ++-- .../carla/streaming/detail/StreamState.h | 6 +- .../source/carla/streaming/detail/Types.h | 7 +- .../carla/streaming/detail/tcp/Client.cpp | 8 +- .../carla/streaming/detail/tcp/Client.h | 4 +- .../carla/streaming/detail/tcp/Message.h | 86 +++++++++++++++++++ .../streaming/detail/tcp/ServerSession.cpp | 20 ++--- .../streaming/detail/tcp/ServerSession.h | 13 ++- .../source/test/test_benchmark_streaming.cpp | 10 +-- .../source/test/test_streaming_low_level.cpp | 5 +- .../test/test_streaming_low_level_tcp.cpp | 12 +-- PythonAPI/source/libcarla/Actor.cpp | 2 +- .../Source/Carla/Server/TheNewCarlaServer.cpp | 9 +- 18 files changed, 242 insertions(+), 59 deletions(-) create mode 100644 LibCarla/source/carla/ListView.h create mode 100644 LibCarla/source/carla/TypeTraits.h create mode 100644 LibCarla/source/carla/streaming/detail/tcp/Message.h diff --git a/LibCarla/source/carla/Buffer.h b/LibCarla/source/carla/Buffer.h index 94846ac5c..9d431e8ab 100644 --- a/LibCarla/source/carla/Buffer.h +++ b/LibCarla/source/carla/Buffer.h @@ -228,11 +228,6 @@ namespace carla { return {data(), size()}; } - std::array encode() const { - DEBUG_ASSERT(!empty()); - return {boost::asio::buffer(&_size, sizeof(_size)), buffer()}; - } - // ========================================================================= // -- Private members ------------------------------------------------------ // ========================================================================= diff --git a/LibCarla/source/carla/ListView.h b/LibCarla/source/carla/ListView.h new file mode 100644 index 000000000..cf9df1a5f --- /dev/null +++ b/LibCarla/source/carla/ListView.h @@ -0,0 +1,64 @@ +// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma +// de Barcelona (UAB). +// +// This work is licensed under the terms of the MIT license. +// For a copy, see . + +#pragma once + +#include +#include + +namespace carla { + + template + class ListView { + public: + + using iterator = IT; + using const_iterator = typename std::add_const::type; + using difference_type = typename std::iterator_traits::difference_type; + using value_type = typename std::iterator_traits::value_type; + using pointer = typename std::iterator_traits::pointer; + using reference = typename std::iterator_traits::reference; + + explicit ListView(iterator begin, iterator end) + : _begin(begin), _end(end) {} + + template + explicit ListView(STL_CONTAINER &container) + : _begin(iterator(container.begin())), + _end(iterator(container.end())) {} + + ListView(const ListView &) = default; + ListView &operator=(const ListView &) = delete; + + iterator begin() const { + return _begin; + } + + iterator end() const { + return _end; + } + + bool empty() const { + return _begin == _end; + } + + difference_type size() const { + return std::distance(_begin, _end); + } + + private: + + const iterator _begin; + + const iterator _end; + }; + + template + static inline auto MakeListView(T begin, T end) { + return ListView(begin, end); + } + +} // namespace carla diff --git a/LibCarla/source/carla/TypeTraits.h b/LibCarla/source/carla/TypeTraits.h new file mode 100644 index 000000000..4fe12f743 --- /dev/null +++ b/LibCarla/source/carla/TypeTraits.h @@ -0,0 +1,26 @@ +// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma +// de Barcelona (UAB). +// +// This work is licensed under the terms of the MIT license. +// For a copy, see . + +#pragma once + +#include + +namespace carla { + + template + struct are_same; + + template + struct are_same { + static constexpr bool value = std::is_same::value && are_same::value; + }; + + template + struct are_same { + static constexpr bool value = std::is_same::value; + }; + +} // namespace carla diff --git a/LibCarla/source/carla/client/Image.cpp b/LibCarla/source/carla/client/Image.cpp index 086e5c13b..d96653b87 100644 --- a/LibCarla/source/carla/client/Image.cpp +++ b/LibCarla/source/carla/client/Image.cpp @@ -33,7 +33,8 @@ namespace client { } } - SharedPtr Image::FromBuffer(boost::asio::const_buffer buffer) { + SharedPtr Image::FromBuffer(Buffer buffer) { + /// @todo We can avoid making another copy of the buffer here. if (buffer.size() < sizeof(FImageHeaderData)) { throw std::invalid_argument("buffer too small to be an image"); } diff --git a/LibCarla/source/carla/client/Image.h b/LibCarla/source/carla/client/Image.h index 48b9c3035..f28a96dee 100644 --- a/LibCarla/source/carla/client/Image.h +++ b/LibCarla/source/carla/client/Image.h @@ -6,11 +6,10 @@ #pragma once +#include "carla/Buffer.h" #include "carla/NonCopyable.h" #include "carla/client/Memory.h" -#include - #include namespace carla { @@ -23,7 +22,7 @@ namespace client { using byte_type = unsigned char; - static SharedPtr FromBuffer(boost::asio::const_buffer buffer); + static SharedPtr FromBuffer(Buffer buffer); Image(); diff --git a/LibCarla/source/carla/streaming/Stream.h b/LibCarla/source/carla/streaming/Stream.h index fd8183eb1..3a70d30d5 100644 --- a/LibCarla/source/carla/streaming/Stream.h +++ b/LibCarla/source/carla/streaming/Stream.h @@ -6,13 +6,11 @@ #pragma once +#include "carla/Buffer.h" #include "carla/Debug.h" -#include "carla/streaming/Message.h" #include "carla/streaming/Token.h" #include "carla/streaming/detail/StreamState.h" -#include - #include namespace carla { @@ -39,14 +37,16 @@ namespace detail { return _shared_state->token(); } - template - void Write(ConstBufferSequence buffer) { - _shared_state->Write(std::make_shared(buffer)); + /// Flush @a buffers down the stream. No copies are made. + template + void Write(Buffers... buffers) { + _shared_state->Write(std::move(buffers)...); } + /// Make a copy of @a data and flush it down the stream. template - Stream &operator<<(const T &rhs) { - Write(boost::asio::buffer(rhs)); + Stream &operator<<(const T &data) { + Write(Buffer(data)); return *this; } diff --git a/LibCarla/source/carla/streaming/detail/StreamState.h b/LibCarla/source/carla/streaming/detail/StreamState.h index 8c427a6c7..dba081e59 100644 --- a/LibCarla/source/carla/streaming/detail/StreamState.h +++ b/LibCarla/source/carla/streaming/detail/StreamState.h @@ -7,7 +7,6 @@ #pragma once #include "carla/NonCopyable.h" -#include "carla/streaming/Message.h" #include "carla/streaming/detail/Session.h" #include "carla/streaming/detail/Token.h" @@ -54,10 +53,11 @@ namespace detail { return _token; } - void Write(std::shared_ptr message) { + template + void Write(Buffers... buffers) { auto session = get_session(); if (session != nullptr) { - session->Write(message); + session->Write(std::move(buffers)...); } } diff --git a/LibCarla/source/carla/streaming/detail/Types.h b/LibCarla/source/carla/streaming/detail/Types.h index 6c140c87d..5ba73a936 100644 --- a/LibCarla/source/carla/streaming/detail/Types.h +++ b/LibCarla/source/carla/streaming/detail/Types.h @@ -9,6 +9,7 @@ #include "carla/Buffer.h" #include +#include namespace carla { namespace streaming { @@ -16,7 +17,11 @@ namespace detail { using stream_id_type = uint32_t; - using message_size_type = Buffer::size_type; + using message_size_type = uint32_t; + + static_assert( + std::is_same::value, + "uint type mismatch!"); } // namespace detail } // namespace streaming diff --git a/LibCarla/source/carla/streaming/detail/tcp/Client.cpp b/LibCarla/source/carla/streaming/detail/tcp/Client.cpp index 68eaa7bf4..4dc43a6c4 100644 --- a/LibCarla/source/carla/streaming/detail/tcp/Client.cpp +++ b/LibCarla/source/carla/streaming/detail/tcp/Client.cpp @@ -34,9 +34,9 @@ namespace tcp { boost::asio::mutable_buffer body() { DEBUG_ASSERT(_size > 0u); - DEBUG_ASSERT(_message == nullptr); - _message = std::make_shared(_size); - return _message->buffer(); + DEBUG_ASSERT(_message.empty()); + _message.reset(_size); + return _message.buffer(); } auto size() const { @@ -51,7 +51,7 @@ namespace tcp { message_size_type _size = 0u; - std::shared_ptr _message; + Buffer _message; }; // =========================================================================== diff --git a/LibCarla/source/carla/streaming/detail/tcp/Client.h b/LibCarla/source/carla/streaming/detail/tcp/Client.h index 3a2befa85..1718a4f63 100644 --- a/LibCarla/source/carla/streaming/detail/tcp/Client.h +++ b/LibCarla/source/carla/streaming/detail/tcp/Client.h @@ -6,8 +6,8 @@ #pragma once +#include "carla/Buffer.h" #include "carla/NonCopyable.h" -#include "carla/streaming/Message.h" #include "carla/streaming/detail/Token.h" #include "carla/streaming/detail/Types.h" @@ -33,7 +33,7 @@ namespace tcp { using endpoint = boost::asio::ip::tcp::endpoint; using protocol_type = endpoint::protocol_type; - using callback_function_type = std::function)>; + using callback_function_type = std::function; Client( boost::asio::io_service &io_service, diff --git a/LibCarla/source/carla/streaming/detail/tcp/Message.h b/LibCarla/source/carla/streaming/detail/tcp/Message.h new file mode 100644 index 000000000..5f972cbaa --- /dev/null +++ b/LibCarla/source/carla/streaming/detail/tcp/Message.h @@ -0,0 +1,86 @@ +// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma +// de Barcelona (UAB). +// +// This work is licensed under the terms of the MIT license. +// For a copy, see . + +#pragma once + +#include "carla/ListView.h" +#include "carla/Buffer.h" +#include "carla/Debug.h" +#include "carla/NonCopyable.h" +#include "carla/streaming/detail/Types.h" + +#include + +#include +#include +#include +#include +#include + +namespace carla { +namespace streaming { +namespace detail { +namespace tcp { + + template + class MessageTmpl + : public std::enable_shared_from_this>, + private NonCopyable { + public: + + static constexpr size_t max_size() { + return MaxNumberOfBuffers; + } + + private: + + MessageTmpl(size_t) {} + + template + MessageTmpl(size_t size, Buffer buffer, Buffers... buffers) + : MessageTmpl(size, std::move(buffers)...) { + ++_number_of_buffers; + _total_size += buffer.size(); + _buffer_views[1u + size - _number_of_buffers] = buffer.cbuffer(); + _buffers[size - _number_of_buffers] = std::move(buffer); + } + + public: + + template + MessageTmpl(Buffer buf, Buffers... buffers) + : MessageTmpl(sizeof...(Buffers) + 1u, std::move(buf), std::move(buffers)...) { + static_assert(sizeof...(Buffers) < max_size(), "Too many buffers!"); + _buffer_views[0u] = boost::asio::buffer(&_total_size, sizeof(_total_size)); + } + + /// Size in bytes of the message excluding the header. + auto size() const { + return _total_size; + } + + auto GetBufferSequence() const { + auto begin = _buffer_views.begin(); + return MakeListView(begin, begin + _number_of_buffers + 1u); + } + + private: + + message_size_type _number_of_buffers = 0u; + + message_size_type _total_size = 0u; + + std::array _buffers; + + std::array _buffer_views; + }; + + using Message = MessageTmpl<2u>; + +} // namespace tcp +} // namespace detail +} // namespace streaming +} // namespace carla diff --git a/LibCarla/source/carla/streaming/detail/tcp/ServerSession.cpp b/LibCarla/source/carla/streaming/detail/tcp/ServerSession.cpp index 41b714596..45ecb378e 100644 --- a/LibCarla/source/carla/streaming/detail/tcp/ServerSession.cpp +++ b/LibCarla/source/carla/streaming/detail/tcp/ServerSession.cpp @@ -61,6 +61,15 @@ namespace tcp { }); } + void ServerSession::Close() { + _strand.post([this, self = shared_from_this()]() { + if (_socket.is_open()) { + _socket.close(); + } + log_debug("session", _session_id, "closed"); + }); + } + void ServerSession::Write(std::shared_ptr message) { auto self = shared_from_this(); _strand.post([=]() { @@ -88,20 +97,11 @@ namespace tcp { _deadline.expires_from_now(_timeout); boost::asio::async_write( _socket, - message->encode(), + message->GetBufferSequence(), _strand.wrap(handle_sent)); }); } - void ServerSession::Close() { - _strand.post([this, self = shared_from_this()]() { - if (_socket.is_open()) { - _socket.close(); - } - log_debug("session", _session_id, "closed"); - }); - } - void ServerSession::StartTimer() { if (_deadline.expires_at() <= boost::asio::deadline_timer::traits_type::now()) { log_debug("session", _session_id, "timed out"); diff --git a/LibCarla/source/carla/streaming/detail/tcp/ServerSession.h b/LibCarla/source/carla/streaming/detail/tcp/ServerSession.h index f1b645b19..0b4105542 100644 --- a/LibCarla/source/carla/streaming/detail/tcp/ServerSession.h +++ b/LibCarla/source/carla/streaming/detail/tcp/ServerSession.h @@ -8,8 +8,9 @@ #include "carla/NonCopyable.h" #include "carla/Time.h" -#include "carla/streaming/Message.h" +#include "carla/TypeTraits.h" #include "carla/streaming/detail/Types.h" +#include "carla/streaming/detail/tcp/Message.h" #include #include @@ -50,13 +51,21 @@ namespace tcp { } /// Writes some data to the socket. - void Write(std::shared_ptr message); + template + void Write(Buffers... buffers) { + static_assert( + are_same::value, + "This function only accepts arguments of type Buffer."); + Write(std::make_shared(std::move(buffers)...)); + } /// Posts a job to close this session. void Close(); private: + void Write(std::shared_ptr message); + void StartTimer(); friend class Server; diff --git a/LibCarla/source/test/test_benchmark_streaming.cpp b/LibCarla/source/test/test_benchmark_streaming.cpp index fb05786dc..68e2b91da 100644 --- a/LibCarla/source/test/test_benchmark_streaming.cpp +++ b/LibCarla/source/test/test_benchmark_streaming.cpp @@ -13,7 +13,7 @@ using namespace carla::streaming; static auto make_special_message(size_t size) { std::vector v(size/sizeof(uint32_t), 42u); - Message msg(boost::asio::buffer(v)); + carla::Buffer msg(v); EXPECT_EQ(msg.size(), size); return msg; } @@ -31,9 +31,9 @@ public: void AddStream() { Stream stream = _server.MakeStream(); - _client.Subscribe(stream.token(), [this](std::shared_ptr DEBUG_ONLY(msg)) { - DEBUG_ASSERT_EQ(msg->size(), _message.size()); - DEBUG_ASSERT(*msg == _message); + _client.Subscribe(stream.token(), [this](carla::Buffer DEBUG_ONLY(msg)) { + DEBUG_ASSERT_EQ(msg.size(), _message.size()); + DEBUG_ASSERT(msg == _message); _client_callback.post([this]() { CARLA_PROFILE_FPS(client, listen_callback); ++_number_of_messages_received; @@ -92,7 +92,7 @@ private: Client _client; - const Message _message; + const carla::Buffer _message; boost::asio::io_service _client_callback; diff --git a/LibCarla/source/test/test_streaming_low_level.cpp b/LibCarla/source/test/test_streaming_low_level.cpp index cc5e557c4..fb391adb9 100644 --- a/LibCarla/source/test/test_streaming_low_level.cpp +++ b/LibCarla/source/test/test_streaming_low_level.cpp @@ -35,9 +35,8 @@ TEST(streaming_low_level, sending_strings) { Client c; c.Subscribe(io_service, stream.token(), [&](auto message) { ++message_count; - ASSERT_NE(message, nullptr); - ASSERT_EQ(message->size(), message_text.size()); - const std::string msg = as_string(*message); + ASSERT_EQ(message.size(), message_text.size()); + const std::string msg = as_string(message); ASSERT_EQ(msg, message_text); }); diff --git a/LibCarla/source/test/test_streaming_low_level_tcp.cpp b/LibCarla/source/test/test_streaming_low_level_tcp.cpp index d8ed87cda..6d3fc8559 100644 --- a/LibCarla/source/test/test_streaming_low_level_tcp.cpp +++ b/LibCarla/source/test/test_streaming_low_level_tcp.cpp @@ -28,9 +28,9 @@ TEST(streaming_detail_tcp, small_message) { srv.Listen([&](std::shared_ptr session) { ASSERT_EQ(session->get_stream_id(), 1u); const std::string msg = "Hola!"; - auto message = std::make_shared(boost::asio::buffer(msg)); + auto message = carla::Buffer(msg); while (!done) { - session->Write(message); + session->Write(std::move(message)); std::this_thread::sleep_for(1ns); } std::cout << "done!\n"; @@ -38,11 +38,11 @@ TEST(streaming_detail_tcp, small_message) { Dispatcher dispatcher{make_endpoint(ep)}; auto stream = dispatcher.MakeStream(); - tcp::Client c(io_service, stream.token(), [&](std::shared_ptr message) { + tcp::Client c(io_service, stream.token(), [&](carla::Buffer message) { ++message_count; - ASSERT_NE(message, nullptr); - ASSERT_EQ(message->size(), 5u); - const std::string msg = util::buffer::as_string(*message); + ASSERT_FALSE(message.empty()); + ASSERT_EQ(message.size(), 5u); + const std::string msg = util::buffer::as_string(message); ASSERT_EQ(msg, std::string("Hola!")); }); diff --git a/PythonAPI/source/libcarla/Actor.cpp b/PythonAPI/source/libcarla/Actor.cpp index 591580eeb..b4f882d10 100644 --- a/PythonAPI/source/libcarla/Actor.cpp +++ b/PythonAPI/source/libcarla/Actor.cpp @@ -113,7 +113,7 @@ void export_actor() { self.Listen([callback](auto message) { cc::SharedPtr image; try { - image = cc::Image::FromBuffer(message->buffer()); + image = cc::Image::FromBuffer(std::move(message)); } catch (const std::exception &e) { std::cerr << "exception while parsing the image: " << e.what() << std::endl; return; diff --git a/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Server/TheNewCarlaServer.cpp b/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Server/TheNewCarlaServer.cpp index 6ab1dc539..6993e0abc 100644 --- a/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Server/TheNewCarlaServer.cpp +++ b/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Server/TheNewCarlaServer.cpp @@ -77,12 +77,11 @@ public: void Write(const FSensorDataView &SensorData) final { auto MakeBuffer = [](FReadOnlyBufferView View) { - return boost::asio::buffer(View.GetData(), View.GetSize()); + return carla::Buffer(boost::asio::buffer(View.GetData(), View.GetSize())); }; - std::array SequencedBuffer; - SequencedBuffer[0u] = MakeBuffer(SensorData.GetHeader()); - SequencedBuffer[1u] = MakeBuffer(SensorData.GetData()); - TheStream.Write(SequencedBuffer); + TheStream.Write( + MakeBuffer(SensorData.GetHeader()), + MakeBuffer(SensorData.GetData())); } private: