New class SensorDataInbox

This commit is contained in:
nsubiron 2018-02-07 16:56:52 +01:00
parent ba1dc589d4
commit b07409210b
7 changed files with 346 additions and 0 deletions

View File

@ -34,6 +34,21 @@ extern "C" {
float roll;
};
// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
struct carla_sensor_definition {
uint32_t id;
uint32_t type;
const char *name;
};
struct carla_sensor_data {
uint32_t id;
const void *header;
uint32_t header_size;
const void *data;
uint32_t data_size;
};
// -----------------------------------------------------------------------------
struct carla_image {
uint32_t width;
uint32_t height;
@ -48,6 +63,7 @@ extern "C" {
const uint32_t *points_count_by_channel;
const double *data;
};
// <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
struct carla_transform {
struct carla_vector3d location;

View File

@ -0,0 +1,59 @@
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#pragma once
#include "carla/Debug.h"
#include "carla/NonCopyable.h"
#include "carla/server/DoubleBuffer.h"
#include "carla/server/SensorDataMessage.h"
#include <unordered_map>
struct carla_sensor_data;
struct carla_sensor_definition;
namespace carla {
namespace server {
/// Stores the data received from the sensors (asynchronously) to be sent next
/// on next tick.
///
/// Each sensor has a double-buffer for one producer and one consumer per
/// sensor. Several threads can simultaneously write as long as they write to
/// different buffers, i.e. each sensor can have its own producer and consumer
/// threads.
class SensorDataInbox : private NonCopyable {
public:
/// We need to initialize the map before hand so it remains constant and
/// doesn't need a lock.
///
/// @warning This function is not thread-safe.
void RegisterSensor(const carla_sensor_definition &sensor) {
_buffers[sensor.id];
}
void Write(const carla_sensor_data &data) {
auto message = _buffers.at(data.id).MakeWriter();
message->Write(data);
}
/// Tries to acquire a reader on the buffer of the given sensor. See
/// DoubleBuffer.
auto TryMakeReader(uint32_t sensor_id) {
return _buffers.at(sensor_id).TryMakeReader(timeout_t::milliseconds(0u));
}
private:
using DataBuffer = DoubleBuffer<SensorDataMessage>;
std::unordered_map<uint32_t, DataBuffer> _buffers;
};
} // namespace server
} // namespace carla

View File

@ -0,0 +1,48 @@
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#include "carla/server/SensorDataMessage.h"
#include "carla/Logging.h"
#include "carla/server/CarlaServerAPI.h"
namespace carla {
namespace server {
void SensorDataMessage::Write(const carla_sensor_data &data) {
// The buffer contains id + data-header + data.
const uint32_t buffer_size =
sizeof(uint32_t) +
data.header_size +
data.data_size;
// The message is prepended by the size of the buffer.
Reset(sizeof(uint32_t) + buffer_size);
auto begin = _buffer.get();
std::memcpy(begin, &buffer_size, sizeof(uint32_t));
begin += sizeof(uint32_t);
std::memcpy(begin, &data.id, sizeof(uint32_t));
begin += sizeof(uint32_t);
std::memcpy(begin, data.header, data.header_size);
begin += data.header_size;
std::memcpy(begin, data.data, data.data_size);
}
void SensorDataMessage::Reset(uint32_t count) {
if (_capacity < count) {
log_debug("allocating sensor buffer of", count, "bytes");
_buffer = std::make_unique<unsigned char[]>(count);
_capacity = count;
}
_size = count;
}
} // namespace server
} // namespace carla

View File

@ -0,0 +1,41 @@
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#pragma once
#include "carla/NonCopyable.h"
#include "carla/server/ServerTraits.h"
#include <memory>
struct carla_sensor_data;
struct carla_sensor_definition;
namespace carla {
namespace server {
class SensorDataMessage : private NonCopyable {
public:
void Write(const carla_sensor_data &data);
const_buffer buffer() const {
return boost::asio::buffer(_buffer.get(), _size);
}
private:
void Reset(uint32_t count);
std::unique_ptr<unsigned char[]> _buffer = nullptr;
uint32_t _size = 0u;
uint32_t _capacity = 0u;
};
} // namespace server
} // namespace carla

View File

@ -0,0 +1,49 @@
#include "Sensor.h"
#include <gtest/gtest.h>
#include <random>
namespace test {
static uint32_t ID_COUNT = 0u;
Sensor::Sensor() : Sensor(++ID_COUNT) {}
Sensor::Sensor(const uint32_t id)
: _name(std::string("Sensor") + std::to_string(id)),
_definition({id, 0u, _name.c_str()}),
_data({id, nullptr, 0u, nullptr, 0u}) {}
carla_sensor_data Sensor::MakeRandomData() {
std::lock_guard<std::mutex> lock(_mutex);
std::random_device device;
std::default_random_engine rng(device());
std::uniform_int_distribution<uint32_t> dist(1, 10000);
_data.header_size = dist(rng);
_data.data_size = dist(rng);
_header = std::make_unique<const unsigned char[]>(_data.header_size);
_buffer = std::make_unique<const unsigned char[]>(_data.data_size);
_data.header = _header.get();
_data.data = _buffer.get();
return _data;
}
void Sensor::CheckData(boost::asio::const_buffer buffer) const {
std::lock_guard<std::mutex> lock(_mutex);
const auto size = boost::asio::buffer_size(buffer);
const auto begin = boost::asio::buffer_cast<const unsigned char *>(buffer);
const auto header_begin = begin + 2u * sizeof(uint32_t);
const auto data_begin = header_begin + _data.header_size;
const auto expected_size = 2u * sizeof(uint32_t) + _data.header_size + _data.data_size;
ASSERT_EQ(size, expected_size);
ASSERT_EQ(0, std::memcmp(header_begin, _header.get(), _data.header_size));
ASSERT_EQ(0, std::memcmp(data_begin, _buffer.get(), _data.data_size));
}
} // test

View File

@ -0,0 +1,50 @@
#pragma once
#include <carla/carla_server.h>
#include "carla/server/ServerTraits.h"
#include <boost/asio/buffer.hpp>
#include <memory>
#include <mutex>
#include <string>
namespace test {
/// A class for testing usage of sensor data.
class Sensor {
public:
Sensor();
uint32_t id() const {
return _definition.id;
}
const carla_sensor_definition &definition() const {
return _definition;
}
carla_sensor_data MakeRandomData();
void CheckData(boost::asio::const_buffer buffer) const;
private:
Sensor(uint32_t id);
mutable std::mutex _mutex;
const std::string _name;
const carla_sensor_definition _definition;
std::unique_ptr<const unsigned char[]> _header;
std::unique_ptr<const unsigned char[]> _buffer;
carla_sensor_data _data;
};
} // test

View File

@ -0,0 +1,83 @@
#include <iostream>
#include <gtest/gtest.h>
#include <carla/carla_server.h>
#include "carla/server/SensorDataInbox.h"
#include "Sensor.h"
#include <array>
#include <atomic>
#include <future>
TEST(SensorDataInbox, SyncSingleSensor) {
using namespace carla::server;
test::Sensor sensor0;
SensorDataInbox inbox;
inbox.RegisterSensor(sensor0.definition());
for (auto j = 0u; j < 1000u; ++j) {
auto data = sensor0.MakeRandomData();
inbox.Write(data);
auto buffer = inbox.TryMakeReader(sensor0.id());
ASSERT_TRUE(buffer != nullptr);
sensor0.CheckData(buffer->buffer());
}
}
TEST(SensorDataInbox, SyncMultipleSensors) {
using namespace carla::server;
std::array<test::Sensor, 50u> sensors;
SensorDataInbox inbox;
for (auto &sensor : sensors)
inbox.RegisterSensor(sensor.definition());
for (auto j = 0u; j < 1000u; ++j) {
for (auto &sensor : sensors) {
inbox.Write(sensor.MakeRandomData());
auto buffer = inbox.TryMakeReader(sensor.id());
ASSERT_TRUE(buffer != nullptr);
sensor.CheckData(buffer->buffer());
}
}
}
TEST(SensorDataInbox, Async) {
using namespace carla::server;
std::array<test::Sensor, 50u> sensors;
SensorDataInbox inbox;
for (auto &sensor : sensors)
inbox.RegisterSensor(sensor.definition());
constexpr auto numberOfWrites = 200u;
std::atomic_bool done{false};
auto result_writer = std::async(std::launch::async, [&](){
for (size_t i = 0u; i < numberOfWrites; ++i) {
for (auto &sensor : sensors) {
inbox.Write(sensor.MakeRandomData());
}
std::this_thread::sleep_for(std::chrono::milliseconds(10u));
};
std::this_thread::sleep_for(std::chrono::milliseconds(200u));
done = true;
});
auto result_reader = std::async(std::launch::async, [&](){
auto readings = 0u;
while (!done && (readings < numberOfWrites * sensors.size())) {
for (auto &sensor : sensors) {
auto buffer = inbox.TryMakeReader(sensor.id());
if (buffer != nullptr) {
sensor.CheckData(buffer->buffer());
++readings;
}
}
}
ASSERT_EQ(numberOfWrites * sensors.size(), readings);
});
result_reader.get();
result_writer.get();
}