diff --git a/Util/CarlaServer/include/carla/carla_server.h b/Util/CarlaServer/include/carla/carla_server.h index 21c4729dd..3a89458ca 100644 --- a/Util/CarlaServer/include/carla/carla_server.h +++ b/Util/CarlaServer/include/carla/carla_server.h @@ -34,6 +34,21 @@ extern "C" { float roll; }; +// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> + struct carla_sensor_definition { + uint32_t id; + uint32_t type; + const char *name; + }; + + struct carla_sensor_data { + uint32_t id; + const void *header; + uint32_t header_size; + const void *data; + uint32_t data_size; + }; +// ----------------------------------------------------------------------------- struct carla_image { uint32_t width; uint32_t height; @@ -48,6 +63,7 @@ extern "C" { const uint32_t *points_count_by_channel; const double *data; }; +// <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< struct carla_transform { struct carla_vector3d location; diff --git a/Util/CarlaServer/source/carla/server/SensorDataInbox.h b/Util/CarlaServer/source/carla/server/SensorDataInbox.h new file mode 100644 index 000000000..ca23d3605 --- /dev/null +++ b/Util/CarlaServer/source/carla/server/SensorDataInbox.h @@ -0,0 +1,59 @@ +// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma +// de Barcelona (UAB). +// +// This work is licensed under the terms of the MIT license. +// For a copy, see . + +#pragma once + +#include "carla/Debug.h" +#include "carla/NonCopyable.h" +#include "carla/server/DoubleBuffer.h" +#include "carla/server/SensorDataMessage.h" + +#include + +struct carla_sensor_data; +struct carla_sensor_definition; + +namespace carla { +namespace server { + + /// Stores the data received from the sensors (asynchronously) to be sent next + /// on next tick. + /// + /// Each sensor has a double-buffer for one producer and one consumer per + /// sensor. Several threads can simultaneously write as long as they write to + /// different buffers, i.e. each sensor can have its own producer and consumer + /// threads. + class SensorDataInbox : private NonCopyable { + public: + + /// We need to initialize the map before hand so it remains constant and + /// doesn't need a lock. + /// + /// @warning This function is not thread-safe. + void RegisterSensor(const carla_sensor_definition &sensor) { + _buffers[sensor.id]; + } + + void Write(const carla_sensor_data &data) { + auto message = _buffers.at(data.id).MakeWriter(); + message->Write(data); + } + + /// Tries to acquire a reader on the buffer of the given sensor. See + /// DoubleBuffer. + auto TryMakeReader(uint32_t sensor_id) { + return _buffers.at(sensor_id).TryMakeReader(timeout_t::milliseconds(0u)); + } + + private: + + using DataBuffer = DoubleBuffer; + + std::unordered_map _buffers; + }; + +} // namespace server +} // namespace carla diff --git a/Util/CarlaServer/source/carla/server/SensorDataMessage.cpp b/Util/CarlaServer/source/carla/server/SensorDataMessage.cpp new file mode 100644 index 000000000..7cafa5579 --- /dev/null +++ b/Util/CarlaServer/source/carla/server/SensorDataMessage.cpp @@ -0,0 +1,48 @@ +// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma +// de Barcelona (UAB). +// +// This work is licensed under the terms of the MIT license. +// For a copy, see . + +#include "carla/server/SensorDataMessage.h" + +#include "carla/Logging.h" +#include "carla/server/CarlaServerAPI.h" + +namespace carla { +namespace server { + + void SensorDataMessage::Write(const carla_sensor_data &data) { + // The buffer contains id + data-header + data. + const uint32_t buffer_size = + sizeof(uint32_t) + + data.header_size + + data.data_size; + // The message is prepended by the size of the buffer. + Reset(sizeof(uint32_t) + buffer_size); + + auto begin = _buffer.get(); + + std::memcpy(begin, &buffer_size, sizeof(uint32_t)); + begin += sizeof(uint32_t); + + std::memcpy(begin, &data.id, sizeof(uint32_t)); + begin += sizeof(uint32_t); + + std::memcpy(begin, data.header, data.header_size); + begin += data.header_size; + + std::memcpy(begin, data.data, data.data_size); + } + + void SensorDataMessage::Reset(uint32_t count) { + if (_capacity < count) { + log_debug("allocating sensor buffer of", count, "bytes"); + _buffer = std::make_unique(count); + _capacity = count; + } + _size = count; + } + +} // namespace server +} // namespace carla diff --git a/Util/CarlaServer/source/carla/server/SensorDataMessage.h b/Util/CarlaServer/source/carla/server/SensorDataMessage.h new file mode 100644 index 000000000..f29527d0b --- /dev/null +++ b/Util/CarlaServer/source/carla/server/SensorDataMessage.h @@ -0,0 +1,41 @@ +// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma +// de Barcelona (UAB). +// +// This work is licensed under the terms of the MIT license. +// For a copy, see . + +#pragma once + +#include "carla/NonCopyable.h" +#include "carla/server/ServerTraits.h" + +#include + +struct carla_sensor_data; +struct carla_sensor_definition; + +namespace carla { +namespace server { + + class SensorDataMessage : private NonCopyable { + public: + + void Write(const carla_sensor_data &data); + + const_buffer buffer() const { + return boost::asio::buffer(_buffer.get(), _size); + } + + private: + + void Reset(uint32_t count); + + std::unique_ptr _buffer = nullptr; + + uint32_t _size = 0u; + + uint32_t _capacity = 0u; + }; + +} // namespace server +} // namespace carla diff --git a/Util/CarlaServer/source/test/Sensor.cpp b/Util/CarlaServer/source/test/Sensor.cpp new file mode 100644 index 000000000..ef121338b --- /dev/null +++ b/Util/CarlaServer/source/test/Sensor.cpp @@ -0,0 +1,49 @@ +#include "Sensor.h" + +#include + +#include + +namespace test { + + static uint32_t ID_COUNT = 0u; + + Sensor::Sensor() : Sensor(++ID_COUNT) {} + + Sensor::Sensor(const uint32_t id) + : _name(std::string("Sensor") + std::to_string(id)), + _definition({id, 0u, _name.c_str()}), + _data({id, nullptr, 0u, nullptr, 0u}) {} + + carla_sensor_data Sensor::MakeRandomData() { + std::lock_guard lock(_mutex); + std::random_device device; + std::default_random_engine rng(device()); + std::uniform_int_distribution dist(1, 10000); + + _data.header_size = dist(rng); + _data.data_size = dist(rng); + + _header = std::make_unique(_data.header_size); + _buffer = std::make_unique(_data.data_size); + + _data.header = _header.get(); + _data.data = _buffer.get(); + + return _data; + } + + void Sensor::CheckData(boost::asio::const_buffer buffer) const { + std::lock_guard lock(_mutex); + const auto size = boost::asio::buffer_size(buffer); + const auto begin = boost::asio::buffer_cast(buffer); + const auto header_begin = begin + 2u * sizeof(uint32_t); + const auto data_begin = header_begin + _data.header_size; + + const auto expected_size = 2u * sizeof(uint32_t) + _data.header_size + _data.data_size; + ASSERT_EQ(size, expected_size); + ASSERT_EQ(0, std::memcmp(header_begin, _header.get(), _data.header_size)); + ASSERT_EQ(0, std::memcmp(data_begin, _buffer.get(), _data.data_size)); + } + +} // test diff --git a/Util/CarlaServer/source/test/Sensor.h b/Util/CarlaServer/source/test/Sensor.h new file mode 100644 index 000000000..745cc1587 --- /dev/null +++ b/Util/CarlaServer/source/test/Sensor.h @@ -0,0 +1,50 @@ +#pragma once + +#include + +#include "carla/server/ServerTraits.h" + +#include + +#include +#include +#include + +namespace test { + + /// A class for testing usage of sensor data. + class Sensor { + public: + + Sensor(); + + uint32_t id() const { + return _definition.id; + } + + const carla_sensor_definition &definition() const { + return _definition; + } + + carla_sensor_data MakeRandomData(); + + void CheckData(boost::asio::const_buffer buffer) const; + + private: + + Sensor(uint32_t id); + + mutable std::mutex _mutex; + + const std::string _name; + + const carla_sensor_definition _definition; + + std::unique_ptr _header; + + std::unique_ptr _buffer; + + carla_sensor_data _data; + }; + +} // test diff --git a/Util/CarlaServer/source/test/Test_SensorDataInbox.cpp b/Util/CarlaServer/source/test/Test_SensorDataInbox.cpp new file mode 100644 index 000000000..3b902d87a --- /dev/null +++ b/Util/CarlaServer/source/test/Test_SensorDataInbox.cpp @@ -0,0 +1,83 @@ +#include + +#include + +#include + +#include "carla/server/SensorDataInbox.h" + +#include "Sensor.h" + +#include +#include +#include + +TEST(SensorDataInbox, SyncSingleSensor) { + using namespace carla::server; + test::Sensor sensor0; + SensorDataInbox inbox; + inbox.RegisterSensor(sensor0.definition()); + for (auto j = 0u; j < 1000u; ++j) { + auto data = sensor0.MakeRandomData(); + inbox.Write(data); + auto buffer = inbox.TryMakeReader(sensor0.id()); + ASSERT_TRUE(buffer != nullptr); + sensor0.CheckData(buffer->buffer()); + } +} + +TEST(SensorDataInbox, SyncMultipleSensors) { + using namespace carla::server; + std::array sensors; + SensorDataInbox inbox; + for (auto &sensor : sensors) + inbox.RegisterSensor(sensor.definition()); + for (auto j = 0u; j < 1000u; ++j) { + for (auto &sensor : sensors) { + inbox.Write(sensor.MakeRandomData()); + auto buffer = inbox.TryMakeReader(sensor.id()); + ASSERT_TRUE(buffer != nullptr); + sensor.CheckData(buffer->buffer()); + } + } +} + +TEST(SensorDataInbox, Async) { + using namespace carla::server; + std::array sensors; + SensorDataInbox inbox; + for (auto &sensor : sensors) + inbox.RegisterSensor(sensor.definition()); + + constexpr auto numberOfWrites = 200u; + + std::atomic_bool done{false}; + + auto result_writer = std::async(std::launch::async, [&](){ + for (size_t i = 0u; i < numberOfWrites; ++i) { + for (auto &sensor : sensors) { + inbox.Write(sensor.MakeRandomData()); + } + std::this_thread::sleep_for(std::chrono::milliseconds(10u)); + }; + std::this_thread::sleep_for(std::chrono::milliseconds(200u)); + done = true; + }); + + auto result_reader = std::async(std::launch::async, [&](){ + auto readings = 0u; + while (!done && (readings < numberOfWrites * sensors.size())) { + for (auto &sensor : sensors) { + auto buffer = inbox.TryMakeReader(sensor.id()); + if (buffer != nullptr) { + sensor.CheckData(buffer->buffer()); + ++readings; + } + } + } + ASSERT_EQ(numberOfWrites * sensors.size(), readings); + }); + + result_reader.get(); + result_writer.get(); +}