From 67c2d900e52be9b6ff75b849669d66b57e37bdc4 Mon Sep 17 00:00:00 2001 From: nsubiron Date: Sun, 27 Jan 2019 16:19:16 +0100 Subject: [PATCH] Fix regression introduced in #1150, make data streams more robust for asynchronous use --- .../Source/Carla/Sensor/AsyncDataStream.h | 104 ++++++++++++++ .../Source/Carla/Sensor/CollisionSensor.cpp | 2 +- .../Carla/Source/Carla/Sensor/DataStream.h | 135 +++--------------- .../Carla/Source/Carla/Sensor/PixelReader.h | 29 ++-- .../Source/Carla/Sensor/RayCastLidar.cpp | 4 +- .../Carla/Source/Carla/Sensor/Sensor.h | 8 +- .../Source/Carla/Sensor/WorldObserver.cpp | 6 +- 7 files changed, 154 insertions(+), 134 deletions(-) create mode 100644 Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/AsyncDataStream.h diff --git a/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/AsyncDataStream.h b/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/AsyncDataStream.h new file mode 100644 index 000000000..015de68c8 --- /dev/null +++ b/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/AsyncDataStream.h @@ -0,0 +1,104 @@ +// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma +// de Barcelona (UAB). +// +// This work is licensed under the terms of the MIT license. +// For a copy, see . + +#pragma once + +#include +#include +#include +#include +#include +#include + +template +class FDataStreamTmpl; + +// ============================================================================= +// -- FAsyncDataStreamTmpl ----------------------------------------------------- +// ============================================================================= + +/// A streaming channel for sending sensor data to clients, supports sending +/// data asynchronously. Data sent by the "Send" functions is passed to the +/// serializer registered with the sensor at carla::sensor:SensorRegistry before +/// being sent down the stream. +/// +/// FAsyncDataStream also has a pool of carla::Buffer that allows reusing the +/// allocated memory, use it whenever possible. +template +class FAsyncDataStreamTmpl +{ +public: + + using StreamType = T; + + FAsyncDataStreamTmpl(FAsyncDataStreamTmpl &&) = default; + + /// Return the token that allows subscribing to this stream. + auto GetToken() const + { + return Stream.GetToken(); + } + + /// Pop a Buffer from the pool. Buffers in the pool can reuse the memory + /// allocated by previous messages, significantly improving performance for + /// big messages. + carla::Buffer PopBufferFromPool() + { + return Stream.MakeBuffer(); + } + + /// Send some data down the stream. + template + void Send(SensorT &Sensor, ArgsT &&... Args); + +private: + + friend class FDataStreamTmpl; + + /// @pre This functions needs to be called in the game-thread. + template + explicit FAsyncDataStreamTmpl(const SensorT &InSensor, StreamType InStream); + + StreamType Stream; + + carla::Buffer Header; +}; + +// ============================================================================= +// -- FAsyncDataStream and FAsyncDataMultiStream ------------------------------- +// ============================================================================= + +using FAsyncDataStream = FAsyncDataStreamTmpl; + +using FAsyncDataMultiStream = FAsyncDataStreamTmpl; + +// ============================================================================= +// -- FAsyncDataStreamTmpl implementation -------------------------------------- +// ============================================================================= + +template +template +inline void FAsyncDataStreamTmpl::Send(SensorT &Sensor, ArgsT &&... Args) +{ + Stream.Write( + std::move(Header), + carla::sensor::SensorRegistry::Serialize(Sensor, std::forward(Args)...)); +} + +template +template +inline FAsyncDataStreamTmpl::FAsyncDataStreamTmpl( + const SensorT &Sensor, + StreamType InStream) + : Stream(std::move(InStream)), + Header([&Sensor]() { + check(IsInGameThread()); + using Serializer = carla::sensor::s11n::SensorHeaderSerializer; + return Serializer::Serialize( + carla::sensor::SensorRegistry::template get::index, + GFrameCounter, + Sensor.GetActorTransform()); + }()) {} diff --git a/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/CollisionSensor.cpp b/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/CollisionSensor.cpp index 3991b39f8..01ec81de7 100644 --- a/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/CollisionSensor.cpp +++ b/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/CollisionSensor.cpp @@ -62,7 +62,7 @@ void ACollisionSensor::OnCollisionEvent( { constexpr float TO_METERS = 1e-2; NormalImpulse *= TO_METERS; - GetDataStream().Send_GameThread( + GetDataStream(*this).Send( *this, Episode->SerializeActor(Episode->FindOrFakeActor(Actor)), Episode->SerializeActor(Episode->FindOrFakeActor(OtherActor)), diff --git a/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/DataStream.h b/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/DataStream.h index 95535181b..1b945c72c 100644 --- a/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/DataStream.h +++ b/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/DataStream.h @@ -6,47 +6,22 @@ #pragma once +#include "Carla/Sensor/AsyncDataStream.h" + #include -#include -#include -#include #include #include #include -// ============================================================================= -// -- FSensorMessageHeader ----------------------------------------------------- -// ============================================================================= - -/// Contains meta-information of a sensor message. -class FSensorMessageHeader -{ -public: - - FSensorMessageHeader(FSensorMessageHeader &&) = default; - -private: - - FSensorMessageHeader(carla::Buffer InBuffer) : Buffer(std::move(InBuffer)) {} - - template - friend class FDataStreamTmpl; - - carla::Buffer Buffer; -}; - // ============================================================================= // -- FDataStreamTmpl ---------------------------------------------------------- // ============================================================================= /// A streaming channel for sending sensor data to clients. Each sensor has its -/// own FDataStream. Use Send_GameThread and Send_Async for sending data -/// generated by the sensor. Data sent by these functions is passed to the -/// serializer registered with the sensor at carla::sensor:SensorRegistry before -/// being sent down the stream. -/// -/// FDataStream also has a pool of carla::Buffer that allows reusing the -/// allocated memory, use it whenever possible. +/// own FDataStream. Note however that this class does not provide a send +/// function. In order to send data, a FAsyncDataStream needs to be created +/// using "MakeAsyncDataStream" function. FAsyncDataStream allows sending data +/// from any thread. template class FDataStreamTmpl { @@ -56,33 +31,24 @@ public: FDataStreamTmpl() = default; - FDataStreamTmpl(FDataStreamTmpl &&) = default; - FDataStreamTmpl &operator=(FDataStreamTmpl &&) = default; - FDataStreamTmpl(StreamType InStream) : Stream(std::move(InStream)) {} - /// Create the meta-information header associated with the sensor message. - /// This functions needs to be called in the game-thread. + /// Create a FAsyncDataStream object. + /// + /// @pre This functions needs to be called in the game-thread. template - FSensorMessageHeader MakeHeader(const SensorT &Sensor); + auto MakeAsyncDataStream(const SensorT &Sensor) + { + check(Stream.has_value()); + return FAsyncDataStreamTmpl{Sensor, *Stream}; + } - /// Pop a Buffer from the pool. Buffers in the pool can reuse the memory - /// allocated by previous messages, significantly improving performance for - /// big messages. - carla::Buffer PopBufferFromPool(); - - /// Send some data down the stream. This function can only be called from the - /// game-thread. No need to provide a FSensorMessageHeader to this function. - template - void Send_GameThread(SensorT &Sensor, ArgsT &&... Args); - - /// Send some data down the stream. This function can be called from a - /// different thread. It requires you however to provide a - /// FSensorMessageHeader previously generated in the game-thread. - template - void Send_Async(FSensorMessageHeader Header, SensorT &Sensor, ArgsT &&... Args); - - auto GetToken() const; + /// Return the token that allows subscribing to this stream. + auto GetToken() const + { + check(Stream.has_value()); + return (*Stream).token(); + } private: @@ -96,64 +62,3 @@ private: using FDataStream = FDataStreamTmpl; using FDataMultiStream = FDataStreamTmpl; - -// ============================================================================= -// -- FDataStreamTmpl implementation ------------------------------------------- -// ============================================================================= - -template -template -inline FSensorMessageHeader FDataStreamTmpl::MakeHeader(const SensorT &Sensor) -{ - check(IsInGameThread()); - using Serializer = carla::sensor::s11n::SensorHeaderSerializer; - return {Serializer::Serialize( - carla::sensor::SensorRegistry::template get::index, - GFrameCounter, - Sensor.GetActorTransform())}; -} - -template -inline carla::Buffer FDataStreamTmpl::PopBufferFromPool() -{ -#ifdef WITH_EDITOR - if (!Stream.has_value()) - { - UE_LOG(LogCarla, Error, TEXT("Sensor does not have a stream!")); - return {}; - } -#endif // WITH_EDITOR - check(Stream.has_value()); - return (*Stream).MakeBuffer(); -} - -template -template -inline void FDataStreamTmpl::Send_Async(FSensorMessageHeader Header, SensorT &Sensor, ArgsT &&... Args) -{ -#ifdef WITH_EDITOR - if (!Stream.has_value()) - { - UE_LOG(LogCarla, Error, TEXT("Sensor does not have a stream!")); - return; - } -#endif // WITH_EDITOR - check(Stream.has_value()); - (*Stream).Write( - std::move(Header.Buffer), - carla::sensor::SensorRegistry::Serialize(Sensor, std::forward(Args)...)); -} - -template -template -inline void FDataStreamTmpl::Send_GameThread(SensorT &Sensor, ArgsT &&... Args) -{ - Send_Async(MakeHeader(Sensor), Sensor, std::forward(Args)...); -} - -template -inline auto FDataStreamTmpl::GetToken() const -{ - check(Stream.has_value()); - return (*Stream).token(); -} diff --git a/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/PixelReader.h b/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/PixelReader.h index 8104cb9ea..8cf045d2c 100644 --- a/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/PixelReader.h +++ b/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/PixelReader.h @@ -87,26 +87,31 @@ void FPixelReader::SendPixelsInRenderThread(TSensor &Sensor) // First we create the message header (needs to be created in the // game-thread). - auto Header = Sensor.GetDataStream().MakeHeader(Sensor); + auto AsyncStream = Sensor.GetDataStream(Sensor); + // We need a shared ptr here because UE4 macros do not move the arguments -_- - auto HeaderPtr = MakeShared(std::move(Header)); + auto StreamPtr = std::make_shared(std::move(AsyncStream)); // Then we enqueue commands in the render-thread that will write the image // buffer to the data stream. - auto WriteAndSend = [&Sensor, Hdr=std::move(HeaderPtr)](auto &InRHICmdList) mutable { - auto &Stream = Sensor.GetDataStream(); - auto Buffer = Stream.PopBufferFromPool(); - WritePixelsToBuffer( - *Sensor.CaptureRenderTarget, - Buffer, - carla::sensor::SensorRegistry::get::type::header_offset, - InRHICmdList); - Stream.Send_Async(std::move(*Hdr), Sensor, std::move(Buffer)); + auto WriteAndSend = [&Sensor, Stream=std::move(StreamPtr)](auto &InRHICmdList) mutable + { + /// @todo Can we make sure the sensor is not going to be destroyed? + if (!Sensor.IsPendingKill()) + { + auto Buffer = Stream->PopBufferFromPool(); + WritePixelsToBuffer( + *Sensor.CaptureRenderTarget, + Buffer, + carla::sensor::SensorRegistry::get::type::header_offset, + InRHICmdList); + Stream->Send(Sensor, std::move(Buffer)); + } }; ENQUEUE_UNIQUE_RENDER_COMMAND_ONEPARAMETER( - FWritePixels_Vulkan, + FWritePixels_SendPixelsInRenderThread, std::function, WriteAndSendFunction, std::move(WriteAndSend), { WriteAndSendFunction(RHICmdList); diff --git a/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/RayCastLidar.cpp b/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/RayCastLidar.cpp index e1bed136d..3219f9784 100644 --- a/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/RayCastLidar.cpp +++ b/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/RayCastLidar.cpp @@ -69,8 +69,8 @@ void ARayCastLidar::Tick(const float DeltaTime) ReadPoints(DeltaTime); - auto &DataStream = GetDataStream(); - DataStream.Send_GameThread(*this, LidarMeasurement, DataStream.PopBufferFromPool()); + auto DataStream = GetDataStream(*this); + DataStream.Send(*this, LidarMeasurement, DataStream.PopBufferFromPool()); } void ARayCastLidar::ReadPoints(const float DeltaTime) diff --git a/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/Sensor.h b/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/Sensor.h index ee7636fd1..bdfa2daa5 100644 --- a/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/Sensor.h +++ b/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/Sensor.h @@ -43,9 +43,13 @@ protected: void EndPlay(EEndPlayReason::Type EndPlayReason) override; /// Return the FDataStream associated with this sensor. - FDataStream &GetDataStream() + /// + /// Do you need to provide a reference to self, this is necessary for template + /// deduction. + template + FAsyncDataStream GetDataStream(const SensorT &Self) { - return Stream; + return Stream.MakeAsyncDataStream(Self); } private: diff --git a/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/WorldObserver.cpp b/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/WorldObserver.cpp index a44135e31..3294033a8 100644 --- a/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/WorldObserver.cpp +++ b/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/WorldObserver.cpp @@ -112,11 +112,13 @@ void AWorldObserver::Tick(float DeltaSeconds) GameTimeStamp += DeltaSeconds; + auto AsyncStream = Stream.MakeAsyncDataStream(*this); + auto buffer = AWorldObserver_Serialize( - Stream.PopBufferFromPool(), + AsyncStream.PopBufferFromPool(), GameTimeStamp, FPlatformTime::Seconds(), Episode->GetActorRegistry()); - Stream.Send_GameThread(*this, std::move(buffer)); + AsyncStream.Send(*this, std::move(buffer)); }