Fix regression introduced in #1150, make data streams more robust for asynchronous use

This commit is contained in:
nsubiron 2019-01-27 16:19:16 +01:00
parent 027362bb48
commit 67c2d900e5
7 changed files with 154 additions and 134 deletions

View File

@ -0,0 +1,104 @@
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#pragma once
#include <compiler/disable-ue4-macros.h>
#include <carla/Buffer.h>
#include <carla/sensor/SensorRegistry.h>
#include <carla/sensor/s11n/SensorHeaderSerializer.h>
#include <carla/streaming/Stream.h>
#include <compiler/enable-ue4-macros.h>
template <typename T>
class FDataStreamTmpl;
// =============================================================================
// -- FAsyncDataStreamTmpl -----------------------------------------------------
// =============================================================================
/// A streaming channel for sending sensor data to clients, supports sending
/// data asynchronously. Data sent by the "Send" functions is passed to the
/// serializer registered with the sensor at carla::sensor:SensorRegistry before
/// being sent down the stream.
///
/// FAsyncDataStream also has a pool of carla::Buffer that allows reusing the
/// allocated memory, use it whenever possible.
template <typename T>
class FAsyncDataStreamTmpl
{
public:
using StreamType = T;
FAsyncDataStreamTmpl(FAsyncDataStreamTmpl &&) = default;
/// Return the token that allows subscribing to this stream.
auto GetToken() const
{
return Stream.GetToken();
}
/// Pop a Buffer from the pool. Buffers in the pool can reuse the memory
/// allocated by previous messages, significantly improving performance for
/// big messages.
carla::Buffer PopBufferFromPool()
{
return Stream.MakeBuffer();
}
/// Send some data down the stream.
template <typename SensorT, typename... ArgsT>
void Send(SensorT &Sensor, ArgsT &&... Args);
private:
friend class FDataStreamTmpl<T>;
/// @pre This functions needs to be called in the game-thread.
template <typename SensorT>
explicit FAsyncDataStreamTmpl(const SensorT &InSensor, StreamType InStream);
StreamType Stream;
carla::Buffer Header;
};
// =============================================================================
// -- FAsyncDataStream and FAsyncDataMultiStream -------------------------------
// =============================================================================
using FAsyncDataStream = FAsyncDataStreamTmpl<carla::streaming::Stream>;
using FAsyncDataMultiStream = FAsyncDataStreamTmpl<carla::streaming::MultiStream>;
// =============================================================================
// -- FAsyncDataStreamTmpl implementation --------------------------------------
// =============================================================================
template <typename T>
template <typename SensorT, typename... ArgsT>
inline void FAsyncDataStreamTmpl<T>::Send(SensorT &Sensor, ArgsT &&... Args)
{
Stream.Write(
std::move(Header),
carla::sensor::SensorRegistry::Serialize(Sensor, std::forward<ArgsT>(Args)...));
}
template <typename T>
template <typename SensorT>
inline FAsyncDataStreamTmpl<T>::FAsyncDataStreamTmpl(
const SensorT &Sensor,
StreamType InStream)
: Stream(std::move(InStream)),
Header([&Sensor]() {
check(IsInGameThread());
using Serializer = carla::sensor::s11n::SensorHeaderSerializer;
return Serializer::Serialize(
carla::sensor::SensorRegistry::template get<SensorT*>::index,
GFrameCounter,
Sensor.GetActorTransform());
}()) {}

View File

@ -62,7 +62,7 @@ void ACollisionSensor::OnCollisionEvent(
{ {
constexpr float TO_METERS = 1e-2; constexpr float TO_METERS = 1e-2;
NormalImpulse *= TO_METERS; NormalImpulse *= TO_METERS;
GetDataStream().Send_GameThread( GetDataStream(*this).Send(
*this, *this,
Episode->SerializeActor(Episode->FindOrFakeActor(Actor)), Episode->SerializeActor(Episode->FindOrFakeActor(Actor)),
Episode->SerializeActor(Episode->FindOrFakeActor(OtherActor)), Episode->SerializeActor(Episode->FindOrFakeActor(OtherActor)),

View File

@ -6,47 +6,22 @@
#pragma once #pragma once
#include "Carla/Sensor/AsyncDataStream.h"
#include <compiler/disable-ue4-macros.h> #include <compiler/disable-ue4-macros.h>
#include <carla/Buffer.h>
#include <carla/sensor/SensorRegistry.h>
#include <carla/sensor/s11n/SensorHeaderSerializer.h>
#include <carla/streaming/Stream.h> #include <carla/streaming/Stream.h>
#include <boost/optional.hpp> #include <boost/optional.hpp>
#include <compiler/enable-ue4-macros.h> #include <compiler/enable-ue4-macros.h>
// =============================================================================
// -- FSensorMessageHeader -----------------------------------------------------
// =============================================================================
/// Contains meta-information of a sensor message.
class FSensorMessageHeader
{
public:
FSensorMessageHeader(FSensorMessageHeader &&) = default;
private:
FSensorMessageHeader(carla::Buffer InBuffer) : Buffer(std::move(InBuffer)) {}
template <typename T>
friend class FDataStreamTmpl;
carla::Buffer Buffer;
};
// ============================================================================= // =============================================================================
// -- FDataStreamTmpl ---------------------------------------------------------- // -- FDataStreamTmpl ----------------------------------------------------------
// ============================================================================= // =============================================================================
/// A streaming channel for sending sensor data to clients. Each sensor has its /// A streaming channel for sending sensor data to clients. Each sensor has its
/// own FDataStream. Use Send_GameThread and Send_Async for sending data /// own FDataStream. Note however that this class does not provide a send
/// generated by the sensor. Data sent by these functions is passed to the /// function. In order to send data, a FAsyncDataStream needs to be created
/// serializer registered with the sensor at carla::sensor:SensorRegistry before /// using "MakeAsyncDataStream" function. FAsyncDataStream allows sending data
/// being sent down the stream. /// from any thread.
///
/// FDataStream also has a pool of carla::Buffer that allows reusing the
/// allocated memory, use it whenever possible.
template <typename T> template <typename T>
class FDataStreamTmpl class FDataStreamTmpl
{ {
@ -56,33 +31,24 @@ public:
FDataStreamTmpl() = default; FDataStreamTmpl() = default;
FDataStreamTmpl(FDataStreamTmpl &&) = default;
FDataStreamTmpl &operator=(FDataStreamTmpl &&) = default;
FDataStreamTmpl(StreamType InStream) : Stream(std::move(InStream)) {} FDataStreamTmpl(StreamType InStream) : Stream(std::move(InStream)) {}
/// Create the meta-information header associated with the sensor message. /// Create a FAsyncDataStream object.
/// This functions needs to be called in the game-thread. ///
/// @pre This functions needs to be called in the game-thread.
template <typename SensorT> template <typename SensorT>
FSensorMessageHeader MakeHeader(const SensorT &Sensor); auto MakeAsyncDataStream(const SensorT &Sensor)
{
check(Stream.has_value());
return FAsyncDataStreamTmpl<T>{Sensor, *Stream};
}
/// Pop a Buffer from the pool. Buffers in the pool can reuse the memory /// Return the token that allows subscribing to this stream.
/// allocated by previous messages, significantly improving performance for auto GetToken() const
/// big messages. {
carla::Buffer PopBufferFromPool(); check(Stream.has_value());
return (*Stream).token();
/// Send some data down the stream. This function can only be called from the }
/// game-thread. No need to provide a FSensorMessageHeader to this function.
template <typename SensorT, typename... ArgsT>
void Send_GameThread(SensorT &Sensor, ArgsT &&... Args);
/// Send some data down the stream. This function can be called from a
/// different thread. It requires you however to provide a
/// FSensorMessageHeader previously generated in the game-thread.
template <typename SensorT, typename... ArgsT>
void Send_Async(FSensorMessageHeader Header, SensorT &Sensor, ArgsT &&... Args);
auto GetToken() const;
private: private:
@ -96,64 +62,3 @@ private:
using FDataStream = FDataStreamTmpl<carla::streaming::Stream>; using FDataStream = FDataStreamTmpl<carla::streaming::Stream>;
using FDataMultiStream = FDataStreamTmpl<carla::streaming::MultiStream>; using FDataMultiStream = FDataStreamTmpl<carla::streaming::MultiStream>;
// =============================================================================
// -- FDataStreamTmpl implementation -------------------------------------------
// =============================================================================
template <typename T>
template <typename SensorT>
inline FSensorMessageHeader FDataStreamTmpl<T>::MakeHeader(const SensorT &Sensor)
{
check(IsInGameThread());
using Serializer = carla::sensor::s11n::SensorHeaderSerializer;
return {Serializer::Serialize(
carla::sensor::SensorRegistry::template get<SensorT*>::index,
GFrameCounter,
Sensor.GetActorTransform())};
}
template <typename T>
inline carla::Buffer FDataStreamTmpl<T>::PopBufferFromPool()
{
#ifdef WITH_EDITOR
if (!Stream.has_value())
{
UE_LOG(LogCarla, Error, TEXT("Sensor does not have a stream!"));
return {};
}
#endif // WITH_EDITOR
check(Stream.has_value());
return (*Stream).MakeBuffer();
}
template <typename T>
template <typename SensorT, typename... ArgsT>
inline void FDataStreamTmpl<T>::Send_Async(FSensorMessageHeader Header, SensorT &Sensor, ArgsT &&... Args)
{
#ifdef WITH_EDITOR
if (!Stream.has_value())
{
UE_LOG(LogCarla, Error, TEXT("Sensor does not have a stream!"));
return;
}
#endif // WITH_EDITOR
check(Stream.has_value());
(*Stream).Write(
std::move(Header.Buffer),
carla::sensor::SensorRegistry::Serialize(Sensor, std::forward<ArgsT>(Args)...));
}
template <typename T>
template <typename SensorT, typename... ArgsT>
inline void FDataStreamTmpl<T>::Send_GameThread(SensorT &Sensor, ArgsT &&... Args)
{
Send_Async(MakeHeader(Sensor), Sensor, std::forward<ArgsT>(Args)...);
}
template <typename T>
inline auto FDataStreamTmpl<T>::GetToken() const
{
check(Stream.has_value());
return (*Stream).token();
}

View File

@ -87,26 +87,31 @@ void FPixelReader::SendPixelsInRenderThread(TSensor &Sensor)
// First we create the message header (needs to be created in the // First we create the message header (needs to be created in the
// game-thread). // game-thread).
auto Header = Sensor.GetDataStream().MakeHeader(Sensor); auto AsyncStream = Sensor.GetDataStream(Sensor);
// We need a shared ptr here because UE4 macros do not move the arguments -_- // We need a shared ptr here because UE4 macros do not move the arguments -_-
auto HeaderPtr = MakeShared<decltype(Header)>(std::move(Header)); auto StreamPtr = std::make_shared<decltype(AsyncStream)>(std::move(AsyncStream));
// Then we enqueue commands in the render-thread that will write the image // Then we enqueue commands in the render-thread that will write the image
// buffer to the data stream. // buffer to the data stream.
auto WriteAndSend = [&Sensor, Hdr=std::move(HeaderPtr)](auto &InRHICmdList) mutable { auto WriteAndSend = [&Sensor, Stream=std::move(StreamPtr)](auto &InRHICmdList) mutable
auto &Stream = Sensor.GetDataStream(); {
auto Buffer = Stream.PopBufferFromPool(); /// @todo Can we make sure the sensor is not going to be destroyed?
WritePixelsToBuffer( if (!Sensor.IsPendingKill())
*Sensor.CaptureRenderTarget, {
Buffer, auto Buffer = Stream->PopBufferFromPool();
carla::sensor::SensorRegistry::get<TSensor *>::type::header_offset, WritePixelsToBuffer(
InRHICmdList); *Sensor.CaptureRenderTarget,
Stream.Send_Async(std::move(*Hdr), Sensor, std::move(Buffer)); Buffer,
carla::sensor::SensorRegistry::get<TSensor *>::type::header_offset,
InRHICmdList);
Stream->Send(Sensor, std::move(Buffer));
}
}; };
ENQUEUE_UNIQUE_RENDER_COMMAND_ONEPARAMETER( ENQUEUE_UNIQUE_RENDER_COMMAND_ONEPARAMETER(
FWritePixels_Vulkan, FWritePixels_SendPixelsInRenderThread,
std::function<void(FRHICommandListImmediate &)>, WriteAndSendFunction, std::move(WriteAndSend), std::function<void(FRHICommandListImmediate &)>, WriteAndSendFunction, std::move(WriteAndSend),
{ {
WriteAndSendFunction(RHICmdList); WriteAndSendFunction(RHICmdList);

View File

@ -69,8 +69,8 @@ void ARayCastLidar::Tick(const float DeltaTime)
ReadPoints(DeltaTime); ReadPoints(DeltaTime);
auto &DataStream = GetDataStream(); auto DataStream = GetDataStream(*this);
DataStream.Send_GameThread(*this, LidarMeasurement, DataStream.PopBufferFromPool()); DataStream.Send(*this, LidarMeasurement, DataStream.PopBufferFromPool());
} }
void ARayCastLidar::ReadPoints(const float DeltaTime) void ARayCastLidar::ReadPoints(const float DeltaTime)

View File

@ -43,9 +43,13 @@ protected:
void EndPlay(EEndPlayReason::Type EndPlayReason) override; void EndPlay(EEndPlayReason::Type EndPlayReason) override;
/// Return the FDataStream associated with this sensor. /// Return the FDataStream associated with this sensor.
FDataStream &GetDataStream() ///
/// Do you need to provide a reference to self, this is necessary for template
/// deduction.
template <typename SensorT>
FAsyncDataStream GetDataStream(const SensorT &Self)
{ {
return Stream; return Stream.MakeAsyncDataStream(Self);
} }
private: private:

View File

@ -112,11 +112,13 @@ void AWorldObserver::Tick(float DeltaSeconds)
GameTimeStamp += DeltaSeconds; GameTimeStamp += DeltaSeconds;
auto AsyncStream = Stream.MakeAsyncDataStream(*this);
auto buffer = AWorldObserver_Serialize( auto buffer = AWorldObserver_Serialize(
Stream.PopBufferFromPool(), AsyncStream.PopBufferFromPool(),
GameTimeStamp, GameTimeStamp,
FPlatformTime::Seconds(), FPlatformTime::Seconds(),
Episode->GetActorRegistry()); Episode->GetActorRegistry());
Stream.Send_GameThread(*this, std::move(buffer)); AsyncStream.Send(*this, std::move(buffer));
} }