Multi GPU first version

This commit is contained in:
bernatx 2022-06-08 23:22:30 +02:00 committed by bernat
parent 0926da2847
commit 9c9eda84bc
124 changed files with 12911 additions and 281 deletions

View File

@ -224,6 +224,12 @@ file(GLOB libcarla_carla_streaming_low_level_sources
set(libcarla_sources "${libcarla_sources};${libcarla_carla_streaming_low_level_sources}")
install(FILES ${libcarla_carla_streaming_low_level_sources} DESTINATION include/carla/streaming/low_level)
file(GLOB libcarla_carla_multigpu_sources
"${libcarla_source_path}/carla/multigpu/*.cpp"
"${libcarla_source_path}/carla/multigpu/*.h")
set(libcarla_sources "${libcarla_sources};${libcarla_carla_multigpu_sources}")
install(FILES ${libcarla_carla_multigpu_sources} DESTINATION include/carla/multigpu)
file(GLOB libcarla_odr_spiral_sources
"${libcarla_source_thirdparty_path}/odrSpiral/*.cpp"
"${libcarla_source_thirdparty_path}/odrSpiral/*.h")

View File

@ -68,6 +68,9 @@ install(FILES ${libcarla_carla_streaming_detail_tcp_headers} DESTINATION include
file(GLOB libcarla_carla_streaming_low_level_headers "${libcarla_source_path}/carla/streaming/low_level/*.h")
install(FILES ${libcarla_carla_streaming_low_level_headers} DESTINATION include/carla/streaming/low_level)
file(GLOB libcarla_carla_multigpu_headers "${libcarla_source_path}/carla/multigpu/*.h")
install(FILES ${libcarla_carla_multigpu_headers} DESTINATION include/carla/multigpu)
install(DIRECTORY "${BOOST_INCLUDE_PATH}/boost" DESTINATION include)
if(WIN32)
@ -109,8 +112,9 @@ file(GLOB libcarla_server_sources
"${libcarla_source_path}/carla/streaming/detail/*.cpp"
"${libcarla_source_path}/carla/streaming/detail/*.h"
"${libcarla_source_path}/carla/streaming/detail/tcp/*.cpp"
"${libcarla_source_path}/carla/streaming/detail/tcp/*.h"
"${libcarla_source_path}/carla/streaming/low_level/*.h"
"${libcarla_source_path}/carla/multigpu/*.h"
"${libcarla_source_path}/carla/multigpu/*.cpp"
"${libcarla_source_thirdparty_path}/odrSpiral/*.cpp"
"${libcarla_source_thirdparty_path}/odrSpiral/*.h"
"${libcarla_source_thirdparty_path}/moodycamel/*.cpp"

View File

@ -18,7 +18,9 @@
#include <type_traits>
#ifdef LIBCARLA_INCLUDED_FROM_UE4
#include <compiler/enable-ue4-macros.h>
#include "Containers/Array.h"
#include <compiler/disable-ue4-macros.h>
#endif // LIBCARLA_INCLUDED_FROM_UE4
namespace carla {

View File

@ -15,6 +15,7 @@ namespace carla {
namespace client {
ServerSideSensor::~ServerSideSensor() {
log_warning("calling sensor destructor ", GetDisplayId());
if (IsAlive() && IsListening()) {
log_warning(
"sensor object went out of the scope but the sensor is still alive",
@ -31,12 +32,14 @@ namespace client {
}
void ServerSideSensor::Listen(CallbackFunctionType callback) {
log_warning("calling sensor Listen() ", GetDisplayId());
log_debug(GetDisplayId(), ": subscribing to stream");
GetEpisode().Lock()->SubscribeToSensor(*this, std::move(callback));
_is_listening = true;
}
void ServerSideSensor::Stop() {
log_warning("calling sensor Stop() ", GetDisplayId());
if (!_is_listening) {
log_warning(
"attempting to unsubscribe from stream but sensor wasn't listening:",
@ -48,6 +51,7 @@ namespace client {
}
bool ServerSideSensor::Destroy() {
log_warning("calling sensor Destroy() ", GetDisplayId());
if (IsListening()) {
Stop();
}

View File

@ -575,7 +575,9 @@ namespace detail {
void Client::SubscribeToStream(
const streaming::Token &token,
std::function<void(Buffer)> callback) {
_pimpl->streaming_client.Subscribe(token, std::move(callback));
carla::streaming::detail::token_type thisToken(token);
streaming::Token receivedToken = _pimpl->CallAndWait<streaming::Token>("get_sensor_token", thisToken.get_stream_id());
_pimpl->streaming_client.Subscribe(receivedToken, std::move(callback));
}
void Client::UnSubscribeFromStream(const streaming::Token &token) {

View File

@ -15,7 +15,9 @@
#include <array>
#ifdef LIBCARLA_INCLUDED_FROM_UE4
# include "Carla/Util/BoundingBox.h"
#include <compiler/enable-ue4-macros.h>
#include "Carla/Util/BoundingBox.h"
#include <compiler/disable-ue4-macros.h>
#endif // LIBCARLA_INCLUDED_FROM_UE4
namespace carla {

View File

@ -11,7 +11,9 @@
#include "carla/geom/Math.h"
#ifdef LIBCARLA_INCLUDED_FROM_UE4
# include "Math/Vector.h"
#include <compiler/enable-ue4-macros.h>
#include "Math/Vector.h"
#include <compiler/disable-ue4-macros.h>
#endif // LIBCARLA_INCLUDED_FROM_UE4
namespace carla {

View File

@ -12,7 +12,9 @@
#include <carla/geom/Vector2D.h>
#ifdef LIBCARLA_INCLUDED_FROM_UE4
#include <compiler/enable-ue4-macros.h>
#include "Util/ProceduralCustomMesh.h"
#include <compiler/disable-ue4-macros.h>
#endif // LIBCARLA_INCLUDED_FROM_UE4
namespace carla {

View File

@ -11,7 +11,9 @@
#include "carla/geom/Vector3D.h"
#ifdef LIBCARLA_INCLUDED_FROM_UE4
# include "Math/Rotator.h"
#include <compiler/enable-ue4-macros.h>
#include "Math/Rotator.h"
#include <compiler/disable-ue4-macros.h>
#endif // LIBCARLA_INCLUDED_FROM_UE4
namespace carla {

View File

@ -12,7 +12,9 @@
#include "carla/geom/Rotation.h"
#ifdef LIBCARLA_INCLUDED_FROM_UE4
#include <compiler/enable-ue4-macros.h>
#include "Math/Transform.h"
#include <compiler/disable-ue4-macros.h>
#endif // LIBCARLA_INCLUDED_FROM_UE4
namespace carla {

View File

@ -0,0 +1,27 @@
// Copyright (c) 2022 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#pragma once
#include <cstdint>
namespace carla {
namespace multigpu {
enum MultiGPUCommand : uint32_t {
SEND_FRAME = 0,
LOAD_MAP,
GET_TOKEN,
YOU_ALIVE
};
struct CommandHeader {
MultiGPUCommand id;
uint32_t size;
};
} // namespace multigpu
} // namespace carla

View File

@ -0,0 +1,53 @@
// Copyright (c) 2022 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#include "carla/BufferPool.h"
#include "carla/Debug.h"
#include "carla/Exception.h"
#include "carla/Logging.h"
#include "carla/streaming/detail/Types.h"
#include "carla/Time.h"
#include <exception>
namespace carla {
namespace multigpu {
/// Helper for reading incoming TCP messages. Allocates the whole message in
/// a single buffer.
class IncomingMessage {
public:
explicit IncomingMessage(Buffer &&buffer) : _buffer(std::move(buffer)) {}
boost::asio::mutable_buffer size_as_buffer() {
return boost::asio::buffer(&_size, sizeof(_size));
}
boost::asio::mutable_buffer buffer() {
DEBUG_ASSERT(_size > 0u);
_buffer.reset(_size);
return _buffer.buffer();
}
auto size() const {
return _size;
}
auto pop() {
return std::move(_buffer);
}
private:
carla::streaming::detail::message_size_type _size = 0u;
Buffer _buffer;
};
} // namespace multigpu
} // namespace carla

View File

@ -0,0 +1,55 @@
// Copyright (c) 2022 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#include "carla/multigpu/listener.h"
#include "carla/multigpu/primary.h"
#include <boost/asio/post.hpp>
#include "carla/Logging.h"
#include <memory>
namespace carla {
namespace multigpu {
Listener::Listener(boost::asio::io_context &io_context, endpoint ep)
: _io_context(io_context),
_acceptor(_io_context, std::move(ep)),
_timeout(time_duration::seconds(1u)) {
_acceptor.listen();
}
Listener::~Listener() {
}
void Listener::Stop() {
_acceptor.cancel();
}
void Listener::OpenSession(
time_duration timeout,
callback_function_type on_opened,
callback_function_type on_closed,
callback_function_type_response on_response) {
using boost::system::error_code;
auto session = std::make_shared<Primary>(_io_context, timeout, *this);
auto self = shared_from_this();
_acceptor.async_accept(session->_socket, [=](error_code ec) {
if (!ec) {
session->Open(on_opened, on_closed, on_response);
} else {
log_error("tcp accept error:", ec.message());
}
self->OpenSession(timeout, on_opened, on_closed, on_response);
});
}
} // namespace multigpu
} // namespace carla

View File

@ -0,0 +1,79 @@
// Copyright (c) 2022 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#pragma once
#include "carla/NonCopyable.h"
#include "carla/Time.h"
#include "carla/Buffer.h"
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/post.hpp>
#include <atomic>
namespace carla {
namespace multigpu {
class Primary;
/// @warning This server cannot be destructed before its @a io_context is
/// stopped.
class Listener : public std::enable_shared_from_this<Listener>, private NonCopyable {
public:
using endpoint = boost::asio::ip::tcp::endpoint;
using protocol_type = endpoint::protocol_type;
using Session = std::shared_ptr<Primary>;
using callback_function_type = std::function<void(std::shared_ptr<Primary>)>;
using callback_function_type_response = std::function<void(std::shared_ptr<Primary>, carla::Buffer)>;
explicit Listener(boost::asio::io_context &io_context, endpoint ep);
~Listener();
endpoint GetLocalEndpoint() const {
return _acceptor.local_endpoint();
}
/// Set session time-out. Applies only to newly created sessions. By default
/// the time-out is set to 10 seconds.
void SetTimeout(time_duration timeout) {
_timeout = timeout;
}
/// Start listening for connections. On each new connection, @a
/// on_session_opened is called, and @a on_session_closed when the session
/// is closed, also @a on_response is called when an answer is received.
void Listen(callback_function_type on_session_opened,
callback_function_type on_session_closed,
callback_function_type_response on_response) {
boost::asio::post(_io_context, [=]() {
OpenSession(
_timeout,
std::move(on_session_opened),
std::move(on_session_closed),
std::move(on_response));
});
}
void Stop();
private:
void OpenSession(
time_duration timeout,
callback_function_type on_session_opened,
callback_function_type on_session_closed,
callback_function_type_response on_response);
boost::asio::io_context &_io_context;
boost::asio::ip::tcp::acceptor _acceptor;
std::atomic<time_duration> _timeout;
};
} // namespace multigpu
} // namespace carla

View File

@ -0,0 +1,219 @@
// Copyright (c) 2022 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#include "carla/multigpu/primary.h"
#include "carla/Debug.h"
#include "carla/Logging.h"
#include "carla/multigpu/incomingMessage.h"
#include "carla/multigpu/listener.h"
#include <boost/asio/read.hpp>
#include <boost/asio/write.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/post.hpp>
#include <atomic>
#include <thread>
namespace carla {
namespace multigpu {
static std::atomic_size_t SESSION_COUNTER{0u};
Primary::Primary(
boost::asio::io_context &io_context,
const time_duration timeout,
Listener &server)
: LIBCARLA_INITIALIZE_LIFETIME_PROFILER(
std::string("tcp multigpu server session ") + std::to_string(SESSION_COUNTER)),
_server(server),
_session_id(SESSION_COUNTER++),
_socket(io_context),
_timeout(timeout),
_deadline(io_context),
_strand(io_context),
_buffer_pool(std::make_shared<BufferPool>()) {}
Primary::~Primary() {
if (_socket.is_open()) {
boost::system::error_code ec;
_socket.shutdown(boost::asio::socket_base::shutdown_both, ec);
_socket.close();
}
}
void Primary::Open(
Listener::callback_function_type on_opened,
Listener::callback_function_type on_closed,
Listener::callback_function_type_response on_response) {
DEBUG_ASSERT(on_opened && on_closed);
// This forces not using Nagle's algorithm.
// Improves the sync mode velocity on Linux by a factor of ~3.
const boost::asio::ip::tcp::no_delay option(true);
_socket.set_option(option);
// callbacks
_on_closed = std::move(on_closed);
_on_response = std::move(on_response);
on_opened(shared_from_this());
ReadData();
}
void Primary::Write(std::shared_ptr<const carla::streaming::detail::tcp::Message> message) {
DEBUG_ASSERT(message != nullptr);
DEBUG_ASSERT(!message->empty());
std::weak_ptr<Primary> weak = shared_from_this();
boost::asio::post(_strand, [=]() {
auto self = weak.lock();
if (!self) return;
if (!self->_socket.is_open()) {
return;
}
auto handle_sent = [weak, message](const boost::system::error_code &ec, size_t DEBUG_ONLY(bytes)) {
auto self = weak.lock();
if (!self) return;
if (ec) {
log_error("session ", self->_session_id, ": error sending data: ", ec.message());
self->CloseNow();
} else {
// DEBUG_ASSERT_EQ(bytes, sizeof(message_size_type) + message->size());
}
};
self->_deadline.expires_from_now(self->_timeout);
boost::asio::async_write(
self->_socket,
message->GetBufferSequence(),
boost::asio::bind_executor(self->_strand, handle_sent));
});
}
void Primary::Write(std::string text) {
std::weak_ptr<Primary> weak = shared_from_this();
boost::asio::post(_strand, [=]() {
auto self = weak.lock();
if (!self) return;
if (!self->_socket.is_open()) {
return;
}
auto handle_sent = [weak](const boost::system::error_code &ec, size_t DEBUG_ONLY(bytes)) {
// auto self = weak.lock();
// if (!self) return;
};
// sent first size buffer
self->_deadline.expires_from_now(self->_timeout);
int this_size = text.size();
boost::asio::async_write(
self->_socket,
boost::asio::buffer(&this_size, sizeof(this_size)),
boost::asio::bind_executor(self->_strand, [](const boost::system::error_code &ec, size_t bytes){ }));
// send characters
boost::asio::async_write(
self->_socket,
boost::asio::buffer(text.c_str(), text.size()),
boost::asio::bind_executor(self->_strand, handle_sent));
});
}
void Primary::ReadData() {
std::weak_ptr<Primary> weak = shared_from_this();
boost::asio::post(_strand, [weak]() {
auto self = weak.lock();
if (!self) return;
auto message = std::make_shared<IncomingMessage>(self->_buffer_pool->Pop());
auto handle_read_data = [weak, message](boost::system::error_code ec, size_t DEBUG_ONLY(bytes)) {
auto self = weak.lock();
if (!self) return;
if (!ec) {
DEBUG_ASSERT_EQ(bytes, message->size());
DEBUG_ASSERT_NE(bytes, 0u);
// Move the buffer to the callback function and start reading the next
// piece of data.
self->_on_response(self, message->pop());
std::cout << "Getting data on listener\n";
self->ReadData();
} else {
// As usual, if anything fails start over from the very top.
log_error("primary server: failed to read data: ", ec.message());
}
};
auto handle_read_header = [weak, message, handle_read_data](
boost::system::error_code ec,
size_t DEBUG_ONLY(bytes)) {
auto self = weak.lock();
if (!self) return;
if (!ec && (message->size() > 0u)) {
// Now that we know the size of the coming buffer, we can allocate our
// buffer and start putting data into it.
boost::asio::async_read(
self->_socket,
message->buffer(),
boost::asio::bind_executor(self->_strand, handle_read_data));
} else {
if (ec) {
log_error("Primary server: failed to read header: ", ec.message());
}
// Connect();
self->Close();
}
};
// Read the size of the buffer that is coming.
boost::asio::async_read(
self->_socket,
message->size_as_buffer(),
boost::asio::bind_executor(self->_strand, handle_read_header));
});
}
void Primary::Close() {
std::weak_ptr<Primary> weak = shared_from_this();
boost::asio::post(_strand, [weak]() {
auto self = weak.lock();
if (!self) return;
self->CloseNow();
});
}
void Primary::StartTimer() {
if (_deadline.expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
log_debug("session ", _session_id, " time out");
Close();
} else {
std::weak_ptr<Primary> weak = shared_from_this();
_deadline.async_wait([weak](boost::system::error_code ec) {
auto self = weak.lock();
if (!self) return;
if (!ec) {
self->StartTimer();
} else {
log_error("session ", self->_session_id, " timed out error: ", ec.message());
}
});
}
}
void Primary::CloseNow() {
_deadline.cancel();
if (_socket.is_open()) {
boost::system::error_code ec;
_socket.shutdown(boost::asio::socket_base::shutdown_both, ec);
_socket.close();
_on_closed(shared_from_this());
}
}
} // namespace multigpu
} // namespace carla

View File

@ -0,0 +1,110 @@
// Copyright (c) 2022 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#pragma once
#include "carla/NonCopyable.h"
#include "carla/Time.h"
#include "carla/TypeTraits.h"
#include "carla/profiler/LifetimeProfiled.h"
#include "carla/streaming/detail/Types.h"
#include "carla/streaming/detail/tcp/Message.h"
#include "carla/multigpu/listener.h"
#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/strand.hpp>
#include <functional>
#include <memory>
namespace carla {
namespace multigpu {
/// A TCP server session. When a session opens, it reads from the socket a
/// stream id object and passes itself to the callback functor. The session
/// closes itself after @a timeout of inactivity is met.
class Primary
: public std::enable_shared_from_this<Primary>,
private profiler::LifetimeProfiled,
private NonCopyable {
public:
using socket_type = boost::asio::ip::tcp::socket;
explicit Primary(
boost::asio::io_context &io_context,
time_duration timeout,
Listener &server);
~Primary();
/// Starts the session and calls @a on_opened after successfully reading the
/// stream id, and @a on_closed once the session is closed.
void Open(
Listener::callback_function_type on_opened,
Listener::callback_function_type on_closed,
Listener::callback_function_type_response on_response);
template <typename... Buffers>
static auto MakeMessage(Buffers &&... buffers) {
static_assert(
are_same<Buffer, Buffers...>::value,
"This function only accepts arguments of type Buffer.");
return std::make_shared<const carla::streaming::detail::tcp::Message>(std::move(buffers)...);
}
/// Writes some data to the socket.
void Write(std::shared_ptr<const carla::streaming::detail::tcp::Message> message);
/// Writes a string
void Write(std::string text);
/// read data
void ReadData();
/// Writes some data to the socket.
template <typename... Buffers>
void Write(Buffers &&... buffers) {
Write(MakeMessage(std::move(buffers)...));
}
/// Post a job to close the session.
void Close();
private:
void StartTimer();
void CloseNow();
friend class Listener;
Listener &_server;
const size_t _session_id;
socket_type _socket;
time_duration _timeout;
boost::asio::deadline_timer _deadline;
boost::asio::io_context::strand _strand;
Listener::callback_function_type _on_closed;
Listener::callback_function_type_response _on_response;
std::shared_ptr<BufferPool> _buffer_pool;
bool _is_writing = false;
};
} // namespace multigpu
} // namespace carla

View File

@ -0,0 +1,66 @@
// Copyright (c) 2022 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#include "carla/multigpu/primaryCommands.h"
// #include "carla/Logging.h"
#include "carla/multigpu/commands.h"
#include "carla/multigpu/primary.h"
#include "carla/multigpu/router.h"
#include "carla/streaming/detail/tcp/Message.h"
#include "carla/streaming/detail/Token.h"
#include "carla/streaming/detail/Types.h"
namespace carla {
namespace multigpu {
PrimaryCommands::PrimaryCommands() {
}
PrimaryCommands::PrimaryCommands(std::shared_ptr<Router> router) :
_router(router) {
}
// broadcast to all secondary servers the frame data
void PrimaryCommands::SendFrameData(carla::Buffer buffer) {
_router->Write(MultiGPUCommand::SEND_FRAME, std::move(buffer));
// log_info("sending frame command");
}
// broadcast to all secondary servers the map to load
void PrimaryCommands::SendLoadMap(std::string map) {
// carla::Buffer buf((unsigned char *) map.c_str(), (size_t) map.size());
log_info("sending load map command");
}
// send to who the router wants the request for a token
token_type PrimaryCommands::SendGetToken(carla::streaming::detail::stream_id_type sensor_id) {
log_info("asking for a token");
carla::Buffer buf((carla::Buffer::value_type *) &sensor_id, (size_t) sizeof(carla::streaming::detail::stream_id_type));
auto fut = _router->WriteToNext(MultiGPUCommand::GET_TOKEN, std::move(buf));
auto response = fut.get();
token_type new_token(*reinterpret_cast<carla::streaming::detail::token_data *>(response.buffer.data()));
log_info("got a token: ", new_token.get_stream_id(), ", ", new_token.get_port());
return new_token;
}
// send to know if a connection is alive
void PrimaryCommands::SendIsAlive() {
std::string msg("Are you alive?");
carla::Buffer buf((unsigned char *) msg.c_str(), (size_t) msg.size());
log_info("sending is alive command");
auto fut = _router->WriteToNext(MultiGPUCommand::YOU_ALIVE, std::move(buf));
auto response = fut.get();
log_info("response from alive command: ", response.buffer.data());
}
void PrimaryCommands::set_router(std::shared_ptr<Router> router) {
_router = router;
}
} // namespace multigpu
} // namespace carla

View File

@ -0,0 +1,52 @@
// Copyright (c) 2022 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#pragma once
// #include "carla/Logging.h"
#include "carla/multigpu/commands.h"
#include "carla/multigpu/primary.h"
#include "carla/streaming/detail/tcp/Message.h"
#include "carla/streaming/detail/Token.h"
#include "carla/streaming/detail/Types.h"
namespace carla {
namespace multigpu {
// using session = std::shared_ptr<Primary>;
// using callback_response = std::function<void(std::shared_ptr<Primary>, carla::Buffer)>;
using token_type = carla::streaming::detail::token_type;
class Router;
class PrimaryCommands {
public:
PrimaryCommands();
PrimaryCommands(std::shared_ptr<Router> router);
// broadcast to all secondary servers the frame data
void SendFrameData(carla::Buffer buffer);
// broadcast to all secondary servers the map to load
void SendLoadMap(std::string map);
// send to one secondary to get the token of a sensor
token_type SendGetToken(carla::streaming::detail::stream_id_type sensor_id);
// send to know if a connection is alive
void SendIsAlive();
void set_router(std::shared_ptr<Router> router);
private:
std::shared_ptr<Router> _router;
};
} // namespace multigpu
} // namespace carla

View File

@ -0,0 +1,147 @@
// Copyright (c) 2022 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#include "carla/multigpu/router.h"
#include "carla/multigpu/listener.h"
#include "carla/streaming/EndPoint.h"
namespace carla {
namespace multigpu {
Router::Router(void) :
_next(0) { }
Router::~Router() {
ClearSessions();
_listener->Stop();
_pool.Stop();
}
Router::Router(uint16_t port) :
_next(0) {
_endpoint = boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string("0.0.0.0"), port);
_listener = std::make_shared<carla::multigpu::Listener>(_pool.io_context(), _endpoint);
}
void Router::SetCallbacks() {
// prepare server
std::weak_ptr<Router> weak = shared_from_this();
carla::multigpu::Listener::callback_function_type on_open = [=](std::shared_ptr<carla::multigpu::Primary> session) {
auto self = weak.lock();
if (!self) return;
self->ConnectSession(session);
};
carla::multigpu::Listener::callback_function_type on_close = [=](std::shared_ptr<carla::multigpu::Primary> session) {
auto self = weak.lock();
if (!self) return;
self->DisconnectSession(session);
};
carla::multigpu::Listener::callback_function_type_response on_response = [=](std::shared_ptr<carla::multigpu::Primary> session, carla::Buffer buffer) {
auto self = weak.lock();
if (!self) return;
std::lock_guard<std::mutex> lock(self->_mutex);
auto prom =self-> _promises.find(session.get());
if (prom != self->_promises.end()) {
log_info("Got data from secondary (with promise): ", buffer.size());
prom->second->set_value({session, std::move(buffer)});
self->_promises.erase(prom);
} else {
log_info("Got data from secondary (without promise): ", buffer.size());
}
};
_commander.set_router(shared_from_this());
_listener->Listen(on_open, on_close, on_response);
log_info("Listening at ", _endpoint);
}
void Router::AsyncRun(size_t worker_threads) {
_pool.AsyncRun(worker_threads);
}
boost::asio::ip::tcp::endpoint Router::GetLocalEndpoint() const {
return _endpoint;
}
void Router::ConnectSession(std::shared_ptr<Primary> session) {
DEBUG_ASSERT(session != nullptr);
std::lock_guard<std::mutex> lock(_mutex);
_sessions.emplace_back(std::move(session));
log_info("Connected secondary servers:", _sessions.size());
}
void Router::DisconnectSession(std::shared_ptr<Primary> session) {
DEBUG_ASSERT(session != nullptr);
std::lock_guard<std::mutex> lock(_mutex);
if (_sessions.size() == 0) return;
_sessions.erase(
std::remove(_sessions.begin(), _sessions.end(), session),
_sessions.end());
log_info("Connected secondary servers:", _sessions.size());
}
void Router::ClearSessions() {
std::lock_guard<std::mutex> lock(_mutex);
_sessions.clear();
log_info("Disconnecting all secondary servers");
}
void Router::Write(MultiGPUCommand id, Buffer &&buffer) {
// define the command header
CommandHeader header;
header.id = id;
header.size = buffer.size();
Buffer buf_header((uint8_t *) &header, sizeof(header));
auto message = Primary::MakeMessage(std::move(buf_header), std::move(buffer));
// write to multiple servers
std::lock_guard<std::mutex> lock(_mutex);
for (auto &s : _sessions) {
if (s != nullptr) {
s->Write(message);
}
}
}
std::future<SessionInfo> Router::WriteToNext(MultiGPUCommand id, Buffer &&buffer) {
// define the command header
CommandHeader header;
header.id = id;
header.size = buffer.size();
Buffer buf_header((uint8_t *) &header, sizeof(header));
auto message = Primary::MakeMessage(std::move(buf_header), std::move(buffer));
// create the promise for the posible answer
auto response = std::make_shared<std::promise<SessionInfo>>();
// write to the next server only
std::lock_guard<std::mutex> lock(_mutex);
if (_next >= _sessions.size()) {
_next = 0;
}
if (_next < _sessions.size()) {
// std::cout << "Sending to session " << _next << std::endl;
auto s = _sessions[_next];
if (s != nullptr) {
_promises[s.get()] = response;
std::cout << "Updated promise into map: " << _promises.size() << std::endl;
s->Write(message);
}
}
++_next;
return response->get_future();
}
} // namespace multigpu
} // namespace carla

View File

@ -0,0 +1,75 @@
// Copyright (c) 2022 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#pragma once
// #include "carla/Logging.h"
#include "carla/streaming/detail/tcp/Message.h"
#include "carla/ThreadPool.h"
#include "carla/multigpu/primary.h"
#include "carla/multigpu/primaryCommands.h"
#include "carla/multigpu/commands.h"
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <mutex>
#include <vector>
#include <sstream>
#include <unordered_map>
namespace carla {
namespace multigpu {
// class Primary;
class Listener;
struct SessionInfo {
std::shared_ptr<Primary> session;
carla::Buffer buffer;
};
class Router : public std::enable_shared_from_this<Router> {
public:
Router(void);
explicit Router(uint16_t port);
~Router();
void Write(MultiGPUCommand id, Buffer &&buffer);
std::future<SessionInfo> WriteToNext(MultiGPUCommand id, Buffer &&buffer);
void SetCallbacks();
void AsyncRun(size_t worker_threads);
boost::asio::ip::tcp::endpoint GetLocalEndpoint() const;
bool HasClientsConnected() {
return (!_sessions.empty());
}
PrimaryCommands &GetCommander() {
return _commander;
}
private:
void ConnectSession(std::shared_ptr<Primary> session);
void DisconnectSession(std::shared_ptr<Primary> session);
void ClearSessions();
// mutex and thread pool must be at the beginning to be destroyed last
std::mutex _mutex;
ThreadPool _pool;
boost::asio::ip::tcp::endpoint _endpoint;
std::vector<std::shared_ptr<Primary>> _sessions;
std::shared_ptr<Listener> _listener;
int _next;
std::unordered_map<Primary *, std::shared_ptr<std::promise<SessionInfo>>> _promises;
PrimaryCommands _commander;
};
} // namespace multigpu
} // namespace carla

View File

@ -0,0 +1,284 @@
// Copyright (c) 2022 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#include "carla/multigpu/incomingMessage.h"
#include "carla/multigpu/secondary.h"
#include "carla/BufferPool.h"
#include "carla/Debug.h"
#include "carla/Exception.h"
#include "carla/Logging.h"
#include "carla/Time.h"
#include <boost/asio/connect.hpp>
#include <boost/asio/read.hpp>
#include <boost/asio/write.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/bind_executor.hpp>
#include <exception>
namespace carla {
namespace multigpu {
Secondary::Secondary(
boost::asio::ip::tcp::endpoint ep,
SecondaryCommands::callback_type callback) :
_pool(),
_endpoint(ep),
_buffer_pool(std::make_shared<BufferPool>()),
_socket(_pool.io_context()),
_strand(_pool.io_context()),
_connection_timer(_pool.io_context()) {
_commander.set_callback(callback);
}
Secondary::Secondary(
std::string ip,
uint16_t port,
SecondaryCommands::callback_type callback) :
_pool(),
_buffer_pool(std::make_shared<BufferPool>()),
_socket(_pool.io_context()),
_strand(_pool.io_context()),
_connection_timer(_pool.io_context()) {
boost::asio::ip::address ip_address = boost::asio::ip::address::from_string(ip);
_endpoint = boost::asio::ip::tcp::endpoint(ip_address, port);
_commander.set_callback(callback);
}
Secondary::~Secondary() {
_pool.Stop();
}
void Secondary::Connect() {
AsyncRun(2u);
_commander.set_secondary(shared_from_this());
std::weak_ptr<Secondary> weak = shared_from_this();
boost::asio::post(_strand, [weak]() {
auto self = weak.lock();
if (!self) return;
if (self->_done) {
return;
}
if (self->_socket.is_open()) {
self->_socket.close();
}
auto handle_connect = [weak](boost::system::error_code ec) {
auto self = weak.lock();
if (!self) return;
if (ec) {
log_error("secondary server: connection failed:", ec.message());
if (!self->_done)
self->Reconnect();
return;
}
if (self->_done) {
return;
}
// This forces not using Nagle's algorithm.
// Improves the sync mode velocity on Linux by a factor of ~3.
self->_socket.set_option(boost::asio::ip::tcp::no_delay(true));
log_info("secondary server: connected to ", self->_endpoint);
self->ReadData();
};
self->_socket.async_connect(self->_endpoint, boost::asio::bind_executor(self->_strand, handle_connect));
});
}
void Secondary::Stop() {
_connection_timer.cancel();
std::weak_ptr<Secondary> weak = shared_from_this();
boost::asio::post(_strand, [weak]() {
auto self = weak.lock();
if (!self) return;
self->_done = true;
if (self->_socket.is_open()) {
self->_socket.close();
}
});
}
void Secondary::Reconnect() {
std::weak_ptr<Secondary> weak = shared_from_this();
_connection_timer.expires_from_now(time_duration::seconds(1u));
_connection_timer.async_wait([weak](boost::system::error_code ec) {
auto self = weak.lock();
if (!self) return;
if (!ec) {
self->Connect();
}
});
}
void Secondary::AsyncRun(size_t worker_threads) {
_pool.AsyncRun(worker_threads);
}
void Secondary::Write(std::shared_ptr<const carla::streaming::detail::tcp::Message> message) {
DEBUG_ASSERT(message != nullptr);
DEBUG_ASSERT(!message->empty());
std::weak_ptr<Secondary> weak = shared_from_this();
boost::asio::post(_strand, [=]() {
auto self = weak.lock();
if (!self) return;
if (!self->_socket.is_open()) {
return;
}
auto handle_sent = [weak, message](const boost::system::error_code &ec, size_t DEBUG_ONLY(bytes)) {
auto self = weak.lock();
if (!self) return;
if (ec) {
log_error("error sending data: ", ec.message());
}
};
// _deadline.expires_from_now(_timeout);
boost::asio::async_write(
self->_socket,
message->GetBufferSequence(),
boost::asio::bind_executor(self->_strand, handle_sent));
});
}
void Secondary::Write(Buffer buffer) {
auto message = Secondary::MakeMessage(std::move(buffer));
DEBUG_ASSERT(message != nullptr);
DEBUG_ASSERT(!message->empty());
std::weak_ptr<Secondary> weak = shared_from_this();
boost::asio::post(_strand, [=]() {
auto self = weak.lock();
if (!self) return;
if (!self->_socket.is_open()) {
return;
}
auto handle_sent = [weak, message](const boost::system::error_code &ec, size_t DEBUG_ONLY(bytes)) {
auto self = weak.lock();
if (!self) return;
if (ec) {
log_error("error sending data: ", ec.message());
}
};
// _deadline.expires_from_now(_timeout);
boost::asio::async_write(
self->_socket,
message->GetBufferSequence(),
boost::asio::bind_executor(self->_strand, handle_sent));
});
}
void Secondary::Write(std::string text) {
std::weak_ptr<Secondary> weak = shared_from_this();
boost::asio::post(_strand, [=]() {
auto self = weak.lock();
if (!self) return;
if (!self->_socket.is_open()) {
return;
}
auto handle_sent = [weak](const boost::system::error_code &ec, size_t DEBUG_ONLY(bytes)) {
auto self = weak.lock();
if (!self) return;
if (ec) {
log_error("error sending data: ", ec.message());
}
};
// _deadline.expires_from_now(_timeout);
// sent first size buffer
int this_size = text.size();
boost::asio::async_write(
self->_socket,
boost::asio::buffer(&this_size, sizeof(this_size)),
boost::asio::bind_executor(self->_strand, handle_sent));
// send characters
boost::asio::async_write(
self->_socket,
boost::asio::buffer(text.c_str(), text.size()),
boost::asio::bind_executor(self->_strand, handle_sent));
});
}
void Secondary::ReadData() {
std::weak_ptr<Secondary> weak = shared_from_this();
boost::asio::post(_strand, [weak]() {
auto self = weak.lock();
if (!self) return;
if (self->_done) {
return;
}
auto message = std::make_shared<IncomingMessage>(self->_buffer_pool->Pop());
auto handle_read_data = [weak, message](boost::system::error_code ec, size_t DEBUG_ONLY(bytes)) {
auto self = weak.lock();
if (!self) return;
if (!ec) {
DEBUG_ASSERT_EQ(bytes, message->size());
DEBUG_ASSERT_NE(bytes, 0u);
// Move the buffer to the callback function and start reading the next
// piece of data.
self->GetCommander().process_command(message->pop());
self->ReadData();
} else {
// As usual, if anything fails start over from the very top.
log_error("secondary server: failed to read data: ", ec.message());
// Connect();
}
};
auto handle_read_header = [weak, message, handle_read_data](
boost::system::error_code ec,
size_t DEBUG_ONLY(bytes)) {
auto self = weak.lock();
if (!self) return;
if (!ec && (message->size() > 0u)) {
DEBUG_ASSERT_EQ(bytes, sizeof(carla::streaming::detail::message_size_type));
if (self->_done) {
return;
}
// Now that we know the size of the coming buffer, we can allocate our
// buffer and start putting data into it.
boost::asio::async_read(
self->_socket,
message->buffer(),
boost::asio::bind_executor(self->_strand, handle_read_data));
} else if (!self->_done) {
log_error("secondary server: failed to read header: ", ec.message());
// DEBUG_ONLY(printf("size = ", message->size()));
// DEBUG_ONLY(printf("bytes = ", bytes));
// Connect();
}
};
// Read the size of the buffer that is coming.
boost::asio::async_read(
self->_socket,
message->size_as_buffer(),
boost::asio::bind_executor(self->_strand, handle_read_header));
});
}
} // namespace multigpu
} // namespace carla

View File

@ -0,0 +1,86 @@
// Copyright (c) 2022 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#pragma once
#include "carla/Buffer.h"
#include "carla/NonCopyable.h"
#include "carla/TypeTraits.h"
#include "carla/profiler/LifetimeProfiled.h"
#include "carla/multigpu/secondaryCommands.h"
#include "carla/streaming/detail/tcp/Message.h"
#include "carla/streaming/detail/Token.h"
#include "carla/streaming/detail/Types.h"
#include "carla/ThreadPool.h"
#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/strand.hpp>
#include <atomic>
#include <functional>
#include <memory>
namespace carla {
class BufferPool;
namespace multigpu {
class Secondary
: public std::enable_shared_from_this<Secondary>,
private profiler::LifetimeProfiled,
private NonCopyable {
public:
using endpoint = boost::asio::ip::tcp::endpoint;
using protocol_type = endpoint::protocol_type;
Secondary(boost::asio::ip::tcp::endpoint ep, SecondaryCommands::callback_type callback);
Secondary(std::string ip, uint16_t port, SecondaryCommands::callback_type callback);
~Secondary();
void Connect();
void Stop();
void AsyncRun(size_t worker_threads);
void Write(std::shared_ptr<const carla::streaming::detail::tcp::Message> message);
void Write(Buffer buffer);
void Write(std::string text);
SecondaryCommands &GetCommander() {
return _commander;
}
template <typename... Buffers>
static auto MakeMessage(Buffers &&... buffers) {
static_assert(
are_same<Buffer, Buffers...>::value,
"This function only accepts arguments of type Buffer.");
return std::make_shared<const carla::streaming::detail::tcp::Message>(std::move(buffers)...);
}
private:
void Reconnect();
void ReadData();
ThreadPool _pool;
boost::asio::ip::tcp::socket _socket;
boost::asio::ip::tcp::endpoint _endpoint;
boost::asio::io_context::strand _strand;
boost::asio::deadline_timer _connection_timer;
std::shared_ptr<BufferPool> _buffer_pool;
std::atomic_bool _done {false};
SecondaryCommands _commander;
};
} // namespace multigpu
} // namespace carla

View File

@ -0,0 +1,36 @@
// Copyright (c) 2022 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
// #include "carla/Logging.h"
#include "carla/multigpu/secondaryCommands.h"
// #include "carla/streaming/detail/tcp/Message.h"
namespace carla {
namespace multigpu {
void SecondaryCommands::set_secondary(std::shared_ptr<Secondary> secondary) {
_secondary = secondary;
}
void SecondaryCommands::set_callback(callback_type callback) {
_callback = callback;
}
void SecondaryCommands::process_command(Buffer buffer) {
// get the header
CommandHeader *header;
header = reinterpret_cast<CommandHeader *>(buffer.data());
// send only data to the callback
Buffer data(buffer.data() + sizeof(CommandHeader), header->size);
_callback(header->id, std::move(data));
// log_info("Secondary got a command to process");
}
} // namespace multigpu
} // namespace carla

View File

@ -0,0 +1,34 @@
// Copyright (c) 2022 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#pragma once
// #include "carla/Logging.h"
#include "carla/Buffer.h"
#include "carla/multigpu/commands.h"
#include <functional>
namespace carla {
namespace multigpu {
class Secondary;
class SecondaryCommands {
public:
using callback_type = std::function<void(MultiGPUCommand, carla::Buffer)>;
void set_secondary(std::shared_ptr<Secondary> secondary);
void set_callback(callback_type callback);
void process_command(Buffer buffer);
private:
std::shared_ptr<Secondary> _secondary;
callback_type _callback;
};
} // namespace multigpu
} // namespace carla

View File

@ -15,7 +15,9 @@
MSGPACK_ADD_ENUM(carla::rpc::ActorAttributeType);
#ifdef LIBCARLA_INCLUDED_FROM_UE4
# include "Carla/Actor/ActorAttribute.h"
#include <compiler/enable-ue4-macros.h>
#include "Carla/Actor/ActorAttribute.h"
#include <compiler/disable-ue4-macros.h>
#endif // LIBCARLA_INCLUDED_FROM_UE4
namespace carla {

View File

@ -14,7 +14,9 @@
#include <vector>
#ifdef LIBCARLA_INCLUDED_FROM_UE4
# include "Carla/Actor/ActorDescription.h"
#include <compiler/enable-ue4-macros.h>
#include "Carla/Actor/ActorDescription.h"
#include <compiler/disable-ue4-macros.h>
#endif // LIBCARLA_INCLUDED_FROM_UE4
namespace carla {

View File

@ -11,7 +11,9 @@
#include <cstdint>
#ifdef LIBCARLA_INCLUDED_FROM_UE4
# include "Math/Color.h"
#include <compiler/enable-ue4-macros.h>
#include "Math/Color.h"
#include <compiler/disable-ue4-macros.h>
#endif // LIBCARLA_INCLUDED_FROM_UE4
namespace carla {

View File

@ -10,7 +10,9 @@
#include "carla/MsgPackAdaptors.h"
#ifdef LIBCARLA_INCLUDED_FROM_UE4
# include "Carla/Settings/EpisodeSettings.h"
#include <compiler/enable-ue4-macros.h>
#include "Carla/Settings/EpisodeSettings.h"
#include <compiler/disable-ue4-macros.h>
#endif // LIBCARLA_INCLUDED_FROM_UE4
#include <boost/optional.hpp>

View File

@ -11,7 +11,9 @@
#include <cstdint>
#ifdef LIBCARLA_INCLUDED_FROM_UE4
# include "Math/Color.h"
#include <compiler/enable-ue4-macros.h>
#include "Math/Color.h"
#include <compiler/disable-ue4-macros.h>
#endif // LIBCARLA_INCLUDED_FROM_UE4
namespace carla {

View File

@ -52,7 +52,9 @@ namespace rpc {
void SyncRunFor(time_duration duration) {
#ifdef LIBCARLA_INCLUDED_FROM_UE4
#include <compiler/enable-ue4-macros.h>
TRACE_CPUPROFILER_EVENT_SCOPE_STR(__FUNCTION__);
#include <compiler/disable-ue4-macros.h>
#endif // LIBCARLA_INCLUDED_FROM_UE4
_sync_io_context.reset();
_sync_io_context.run_for(duration.to_chrono());

View File

@ -9,7 +9,9 @@
#include <string>
#ifdef LIBCARLA_INCLUDED_FROM_UE4
# include "UnrealString.h"
#include <compiler/enable-ue4-macros.h>
#include "UnrealString.h"
#include <compiler/disable-ue4-macros.h>
#endif // LIBCARLA_INCLUDED_FROM_UE4
namespace carla {

View File

@ -9,7 +9,9 @@
#include "carla/MsgPack.h"
#ifdef LIBCARLA_INCLUDED_FROM_UE4
# include "Carla/Vehicle/VehicleControl.h"
#include <compiler/enable-ue4-macros.h>
#include "Carla/Vehicle/VehicleControl.h"
#include <compiler/disable-ue4-macros.h>
#endif // LIBCARLA_INCLUDED_FROM_UE4
namespace carla {

View File

@ -9,7 +9,9 @@
#include "carla/MsgPack.h"
#ifdef LIBCARLA_INCLUDED_FROM_UE4
# include "Carla/Vehicle/VehicleLightState.h"
#include <compiler/enable-ue4-macros.h>
#include "Carla/Vehicle/VehicleLightState.h"
#include <compiler/disable-ue4-macros.h>
#endif // LIBCARLA_INCLUDED_FROM_UE4
namespace carla {

View File

@ -12,7 +12,9 @@
#include "carla/rpc/Transform.h"
#ifdef LIBCARLA_INCLUDED_FROM_UE4
# include "Carla/Walker/WalkerBoneControlIn.h"
#include <compiler/enable-ue4-macros.h>
#include "Carla/Walker/WalkerBoneControlIn.h"
#include <compiler/disable-ue4-macros.h>
#endif // LIBCARLA_INCLUDED_FROM_UE4
#include <vector>

View File

@ -12,7 +12,9 @@
#include "carla/rpc/Transform.h"
#ifdef LIBCARLA_INCLUDED_FROM_UE4
# include "Carla/Walker/WalkerBoneControlOut.h"
#include <compiler/enable-ue4-macros.h>
#include "Carla/Walker/WalkerBoneControlOut.h"
#include <compiler/disable-ue4-macros.h>
#endif // LIBCARLA_INCLUDED_FROM_UE4
#include <vector>

View File

@ -9,7 +9,9 @@
#include "carla/MsgPack.h"
#ifdef LIBCARLA_INCLUDED_FROM_UE4
# include "Carla/Walker/WalkerControl.h"
#include <compiler/enable-ue4-macros.h>
#include "Carla/Walker/WalkerControl.h"
#include <compiler/disable-ue4-macros.h>
#endif // LIBCARLA_INCLUDED_FROM_UE4
namespace carla {

View File

@ -9,7 +9,9 @@
#include "carla/MsgPack.h"
#ifdef LIBCARLA_INCLUDED_FROM_UE4
# include "Carla/Weather/WeatherParameters.h"
#include <compiler/enable-ue4-macros.h>
#include "Carla/Weather/WeatherParameters.h"
#include <compiler/disable-ue4-macros.h>
#endif // LIBCARLA_INCLUDED_FROM_UE4
namespace carla {

View File

@ -8,6 +8,7 @@
#include "carla/ThreadPool.h"
#include "carla/streaming/detail/tcp/Server.h"
#include "carla/streaming/detail/Types.h"
#include "carla/streaming/low_level/Server.h"
#include <boost/asio/io_context.hpp>
@ -64,6 +65,10 @@ namespace streaming {
_server.SetSynchronousMode(is_synchro);
}
carla::streaming::detail::token_type GetToken(carla::streaming::detail::stream_id_type sensor_id) {
return _server.GetToken(sensor_id);
}
private:
// The order of these two arguments is very important.

View File

@ -16,16 +16,6 @@ namespace carla {
namespace streaming {
namespace detail {
template <typename StreamStateT, typename StreamMapT>
static auto MakeStreamState(const token_type &cached_token, StreamMapT &stream_map) {
auto ptr = std::make_shared<StreamStateT>(cached_token);
auto result = stream_map.emplace(std::make_pair(cached_token.get_stream_id(), ptr));
if (!result.second) {
throw_exception(std::runtime_error("failed to create stream!"));
}
return ptr;
}
Dispatcher::~Dispatcher() {
// Disconnect all the sessions from their streams, this should kill any
// session remaining since at this point the io_context should be already
@ -34,10 +24,8 @@ namespace detail {
#ifndef LIBCARLA_NO_EXCEPTIONS
try {
#endif // LIBCARLA_NO_EXCEPTIONS
auto stream_state = pair.second.lock();
if (stream_state != nullptr) {
stream_state->ClearSessions();
}
auto stream_state = pair.second;
stream_state->ClearSessions();
#ifndef LIBCARLA_NO_EXCEPTIONS
} catch (const std::exception &e) {
log_error("failed to clear sessions:", e.what());
@ -49,8 +37,24 @@ namespace detail {
carla::streaming::Stream Dispatcher::MakeStream() {
std::lock_guard<std::mutex> lock(_mutex);
++_cached_token._token.stream_id; // id zero only happens in overflow.
log_info("Created new stream:", _cached_token._token.stream_id);
return MakeStreamState<MultiStreamState>(_cached_token, _stream_map);
log_info("New stream:", _cached_token._token.stream_id);
std::shared_ptr<MultiStreamState> ptr;
auto search = _stream_map.find(_cached_token.get_stream_id());
if (search == _stream_map.end()) {
// creating new stream
ptr = std::make_shared<MultiStreamState>(_cached_token);
auto result = _stream_map.emplace(std::make_pair(_cached_token.get_stream_id(), ptr));
if (!result.second) {
throw_exception(std::runtime_error("failed to create stream!"));
}
log_info("Stream created");
return carla::streaming::Stream(ptr);
} else {
// reusing existing stream
log_info("Stream reused");
ptr = search->second;
return carla::streaming::Stream(ptr);
}
}
bool Dispatcher::RegisterSession(std::shared_ptr<Session> session) {
@ -58,12 +62,10 @@ namespace detail {
std::lock_guard<std::mutex> lock(_mutex);
auto search = _stream_map.find(session->get_stream_id());
if (search != _stream_map.end()) {
auto stream_state = search->second.lock();
if (stream_state != nullptr) {
log_info("Connecting session (stream ", session->get_stream_id(), ")");
stream_state->ConnectSession(std::move(session));
return true;
}
auto stream_state = search->second;
log_info("Connecting session (stream ", session->get_stream_id(), ")");
stream_state->ConnectSession(std::move(session));
return true;
}
log_error("Invalid session: no stream available with id", session->get_stream_id());
return false;
@ -72,25 +74,36 @@ namespace detail {
void Dispatcher::DeregisterSession(std::shared_ptr<Session> session) {
DEBUG_ASSERT(session != nullptr);
std::lock_guard<std::mutex> lock(_mutex);
ClearExpiredStreams();
auto search = _stream_map.find(session->get_stream_id());
if (search != _stream_map.end()) {
auto stream_state = search->second.lock();
if (stream_state != nullptr) {
log_info("Disconnecting session (stream ", session->get_stream_id(), ")");
stream_state->DisconnectSession(session);
}
auto stream_state = search->second;
log_info("Disconnecting session (stream ", session->get_stream_id(), ")");
stream_state->DisconnectSession(session);
}
}
void Dispatcher::ClearExpiredStreams() {
for (auto it = _stream_map.begin(); it != _stream_map.end(); ) {
if (it->second.expired()) {
it = _stream_map.erase(it);
} else {
++it;
token_type Dispatcher::GetToken(stream_id_type sensor_id) {
std::lock_guard<std::mutex> lock(_mutex);
log_info("Searching sensor id: ", sensor_id);
auto search = _stream_map.find(sensor_id);
if (search != _stream_map.end()) {
log_info("Found sensor id: ", sensor_id);
auto stream_state = search->second;
log_info("Getting token from stream ", sensor_id, " on port ", stream_state->token().get_port());
return stream_state->token();
} else {
log_info("Not Found sensor id, creating sensor stream: ", sensor_id);
token_type temp_token(_cached_token);
temp_token.set_stream_id(sensor_id);
auto ptr = std::make_shared<MultiStreamState>(temp_token);
auto result = _stream_map.emplace(std::make_pair(temp_token.get_stream_id(), ptr));
if (!result.second) {
log_info("Failed to create multistream for stream ", sensor_id, " on port ", temp_token.get_port());
}
log_info("Created token from stream ", sensor_id, " on port ", temp_token.get_port());
return temp_token;
}
return token_type();
}
} // namespace detail

View File

@ -9,6 +9,7 @@
#include "carla/streaming/EndPoint.h"
#include "carla/streaming/Stream.h"
#include "carla/streaming/detail/Session.h"
#include "carla/streaming/detail/Stream.h"
#include "carla/streaming/detail/Token.h"
#include <memory>
@ -19,7 +20,8 @@ namespace carla {
namespace streaming {
namespace detail {
class StreamStateBase;
class MultiStreamState;
using StreamMap = std::unordered_map<stream_id_type, std::shared_ptr<MultiStreamState>>;
/// Keeps the mapping between streams and sessions.
class Dispatcher {
@ -36,20 +38,18 @@ namespace detail {
bool RegisterSession(std::shared_ptr<Session> session);
void DeregisterSession(std::shared_ptr<Session> session);
token_type GetToken(stream_id_type sensor_id);
private:
void ClearExpiredStreams();
// We use a mutex here, but we assume that sessions and streams won't be
// created too often.
std::mutex _mutex;
token_type _cached_token;
std::unordered_map<
stream_id_type,
std::weak_ptr<StreamStateBase>> _stream_map;
StreamMap _stream_map;
};
} // namespace detail

View File

@ -34,11 +34,10 @@ namespace detail {
template <typename... Buffers>
void Write(Buffers &&... buffers) {
auto message = Session::MakeMessage(std::move(buffers)...);
// try write single stream
auto session = _session.load();
if (session != nullptr) {
auto message = Session::MakeMessage(std::move(buffers)...);
session->Write(std::move(message));
// Return here, _session is only valid if we have a
// single session.
@ -47,6 +46,7 @@ namespace detail {
// try write multiple stream
std::lock_guard<std::mutex> lock(_mutex);
auto message = Session::MakeMessage(std::move(buffers)...);
for (auto &s : _sessions) {
if (s != nullptr) {
s->Write(message);
@ -54,7 +54,9 @@ namespace detail {
}
}
private:
bool AreClientsListening() {
return (_sessions.size() > 0);
}
void ConnectSession(std::shared_ptr<Session> session) final {
DEBUG_ASSERT(session != nullptr);
@ -77,6 +79,7 @@ namespace detail {
DEBUG_ASSERT(session == _session.load());
_session.store(nullptr);
_sessions.clear();
log_warning("Last session disconnected");
} else {
_sessions.erase(
std::remove(_sessions.begin(), _sessions.end(), session),
@ -98,6 +101,8 @@ namespace detail {
log_debug("Disconnecting all multistream sessions");
}
private:
std::mutex _mutex;
// if there is only one session, then we use atomic

View File

@ -61,6 +61,11 @@ namespace detail {
return *this;
}
bool AreClientsListening()
{
return _shared_state ? _shared_state->AreClientsListening() : false;
}
private:
friend class detail::Dispatcher;

View File

@ -78,6 +78,8 @@ namespace detail {
return {get_address(), _token.port};
}
public:
template <typename Protocol>
explicit token_type(
stream_id_type stream_id,
@ -97,13 +99,16 @@ namespace detail {
_token.protocol = get_protocol<Protocol>();
}
public:
token_type() = default;
token_type(const token_type &) = default;
token_type(const Token &rhs);
explicit token_type(token_data data) {
_token = data;
}
operator Token() const;
// We need to return a reference here so we can use the address of the
@ -112,6 +117,10 @@ namespace detail {
return _token.stream_id;
}
void set_stream_id(stream_id_type id) {
_token.stream_id = id;
}
bool has_address() const {
return _token.address_type != token_data::address::not_set;
}

View File

@ -140,12 +140,11 @@ namespace tcp {
void ServerSession::CloseNow() {
_deadline.cancel();
if (_socket.is_open()) {
boost::system::error_code ec;
_socket.shutdown(boost::asio::socket_base::shutdown_both, ec);
_socket.close();
}
boost::asio::post(_strand.context(), [self=shared_from_this()]() {
DEBUG_ASSERT(self->_on_closed);
self->_on_closed(self);
});
_on_closed(shared_from_this());
log_debug("session", _session_id, "closed");
}

View File

@ -66,6 +66,7 @@ namespace low_level {
}
void UnSubscribe(token_type token) {
log_warning("calling sensor UnSubscribe()");
auto it = _clients.find(token.get_stream_id());
if (it != _clients.end()) {
it->second->Stop();

View File

@ -7,6 +7,7 @@
#pragma once
#include "carla/streaming/detail/Dispatcher.h"
#include "carla/streaming/detail/Types.h"
#include "carla/streaming/Stream.h"
#include <boost/asio/io_context.hpp>
@ -68,6 +69,10 @@ namespace low_level {
_server.SetSynchronousMode(is_synchro);
}
carla::streaming::detail::token_type GetToken(carla::streaming::detail::stream_id_type sensor_id) {
return _dispatcher.GetToken(sensor_id);
}
private:
void StartServer() {

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -126,7 +126,7 @@ private:
static size_t get_max_concurrency() {
size_t concurrency = std::thread::hardware_concurrency() / 2u;
return std::max(2ul, concurrency);
return std::max((size_t) 2u, concurrency);
}
static void benchmark_image(

View File

@ -18,6 +18,16 @@
#include "PhysicsEngine/PhysicsSettings.h"
#include "Carla/MapGen/LargeMapManager.h"
#include <compiler/disable-ue4-macros.h>
#include <carla/Logging.h>
#include <carla/multigpu/primaryCommands.h>
#include <carla/multigpu/commands.h>
#include <carla/multigpu/secondary.h>
#include <carla/multigpu/secondaryCommands.h>
#include <carla/streaming/EndPoint.h>
#include <carla/streaming/Server.h>
#include <compiler/enable-ue4-macros.h>
#include <thread>
// =============================================================================
@ -62,8 +72,12 @@ void FCarlaEngine::NotifyInitGame(const UCarlaSettings &Settings)
TRACE_CPUPROFILER_EVENT_SCOPE_STR(__FUNCTION__);
if (!bIsRunning)
{
const auto StreamingPort = Settings.StreamingPort.Get(Settings.RPCPort + 1u);
auto BroadcastStream = Server.Start(Settings.RPCPort, StreamingPort);
const auto StreamingPort = Settings.StreamingPort;
const auto SecondaryPort = Settings.SecondaryPort;
const auto PrimaryIP = Settings.PrimaryIP;
const auto PrimaryPort = Settings.PrimaryPort;
auto BroadcastStream = Server.Start(Settings.RPCPort, StreamingPort, SecondaryPort);
Server.AsyncRun(FCarlaEngine_GetNumberOfThreadsForRPCServer());
WorldObserver.SetStream(BroadcastStream);
@ -79,10 +93,78 @@ void FCarlaEngine::NotifyInitGame(const UCarlaSettings &Settings)
&FCarlaEngine::OnEpisodeSettingsChanged);
bIsRunning = true;
// check to convert this as secondary server
if (!PrimaryIP.empty())
{
// we are secondary server, connecting to primary server
bIsPrimaryServer = false;
Secondary = std::make_shared<carla::multigpu::Secondary>(
PrimaryIP,
PrimaryPort,
[=](carla::multigpu::MultiGPUCommand Id, carla::Buffer Data) {
struct CarlaStreamBuffer : public std::streambuf
{
CarlaStreamBuffer(char *buf, std::size_t size) { setg(buf, buf, buf + size); }
};
switch (Id) {
case carla::multigpu::MultiGPUCommand::SEND_FRAME:
{
if(GetCurrentEpisode())
{
TRACE_CPUPROFILER_EVENT_SCOPE_STR("MultiGPUCommand::SEND_FRAME");
// convert frame data from buffer to istream
CarlaStreamBuffer TempStream((char *) Data.data(), Data.size());
std::istream InStream(&TempStream);
GetCurrentEpisode()->GetFrameData().Read(InStream);
{
TRACE_CPUPROFILER_EVENT_SCOPE_STR("FramesToProcess.emplace_back");
std::lock_guard<std::mutex> Lock(FrameToProcessMutex);
FramesToProcess.emplace_back(GetCurrentEpisode()->GetFrameData());
}
}
// forces a tick
Server.Tick();
break;
}
case carla::multigpu::MultiGPUCommand::LOAD_MAP:
break;
case carla::multigpu::MultiGPUCommand::GET_TOKEN:
{
// get the sensor id
auto sensor_id = *(reinterpret_cast<carla::streaming::detail::stream_id_type *>(Data.data()));
// query dispatcher
carla::streaming::detail::token_type token(Server.GetStreamingServer().GetToken(sensor_id));
carla::Buffer buf(reinterpret_cast<unsigned char *>(&token), (size_t) sizeof(token));
carla::log_info("responding with a token for port ", token.get_port());
Secondary->Write(std::move(buf));
break;
}
case carla::multigpu::MultiGPUCommand::YOU_ALIVE:
{
std::string msg("Yes, I'm alive");
carla::Buffer buf((unsigned char *) msg.c_str(), (size_t) msg.size());
carla::log_info("responding is alive command");
Secondary->Write(std::move(buf));
break;
}
}
});
Secondary->Connect();
// set this server in synchronous mode
bSynchronousMode = true;
}
else
{
// we are primary server, starting server
bIsPrimaryServer = true;
SecondaryServer = Server.GetSecondaryServer();
}
}
bMapChanged = true;
}
void FCarlaEngine::NotifyBeginEpisode(UCarlaEpisode &Episode)
@ -99,9 +181,16 @@ void FCarlaEngine::NotifyBeginEpisode(UCarlaEpisode &Episode)
CurrentSettings.TileStreamingDistance = LargeMapManager->GetLayerStreamingDistance();
CurrentSettings.ActorActiveDistance = LargeMapManager->GetActorStreamingDistance();
}
if (!bIsPrimaryServer)
{
// set this secondary server with no-rendering mode
CurrentSettings.bNoRenderingMode = true;
}
CurrentEpisode->ApplySettings(CurrentSettings);
ResetFrameCounter();
ResetFrameCounter(GFrameNumber);
// make connection between Episode and Recorder
if (Recorder)
@ -112,6 +201,8 @@ void FCarlaEngine::NotifyBeginEpisode(UCarlaEpisode &Episode)
}
Server.NotifyBeginEpisode(Episode);
Episode.bIsPrimaryServer = bIsPrimaryServer;
}
void FCarlaEngine::NotifyEndEpisode()
@ -125,29 +216,66 @@ void FCarlaEngine::OnPreTick(UWorld *, ELevelTick TickType, float DeltaSeconds)
TRACE_CPUPROFILER_EVENT_SCOPE_STR(__FUNCTION__);
if (TickType == ELevelTick::LEVELTICK_All)
{
// process RPC commands
if (bIsPrimaryServer)
{
do
{
Server.RunSome(10u);
}
while (bSynchronousMode && !Server.TickCueReceived());
}
else
{
do
{
Server.RunSome(10u);
}
while (!FramesToProcess.size());
}
// update frame counter
UpdateFrameCounter();
// process RPC commands
do
{
Server.RunSome(10u);
}
while (bSynchronousMode && !Server.TickCueReceived());
if (CurrentEpisode != nullptr)
{
CurrentEpisode->TickTimers(DeltaSeconds);
}
if (!bIsPrimaryServer && GetCurrentEpisode())
{
if (FramesToProcess.size())
{
TRACE_CPUPROFILER_EVENT_SCOPE_STR("FramesToProcess.PlayFrameData");
std::lock_guard<std::mutex> Lock(FrameToProcessMutex);
FramesToProcess.front().PlayFrameData(GetCurrentEpisode(), MappedId);
FramesToProcess.erase(FramesToProcess.begin()); // remove first element
}
}
}
}
void FCarlaEngine::OnPostTick(UWorld *World, ELevelTick TickType, float DeltaSeconds)
{
TRACE_CPUPROFILER_EVENT_SCOPE_STR(__FUNCTION__);
// tick the recorder/replayer system
if (GetCurrentEpisode())
{
if (bIsPrimaryServer)
{
if (SecondaryServer->HasClientsConnected()) {
GetCurrentEpisode()->GetFrameData().GetFrameData(GetCurrentEpisode());
std::ostringstream OutStream;
GetCurrentEpisode()->GetFrameData().Write(OutStream);
// send frame data to secondary
std::string Tmp(OutStream.str());
SecondaryServer->GetCommander().SendFrameData(carla::Buffer(std::move((unsigned char *) Tmp.c_str()), (size_t) Tmp.size()));
GetCurrentEpisode()->GetFrameData().Clear();
}
}
auto* EpisodeRecorder = GetCurrentEpisode()->GetRecorder();
if (EpisodeRecorder)
{
@ -170,6 +298,7 @@ void FCarlaEngine::OnPostTick(UWorld *World, ELevelTick TickType, float DeltaSec
// send the worldsnapshot
WorldObserver.BroadcastTick(*CurrentEpisode, DeltaSeconds, bMapChanged, LightUpdatePending);
CurrentEpisode->GetSensorManager().PostPhysTick(World, TickType, DeltaSeconds);
ResetSimulationState();
}
}

View File

@ -14,6 +14,15 @@
#include "Misc/CoreDelegates.h"
#include <compiler/disable-ue4-macros.h>
#include <carla/multigpu/router.h>
#include <carla/multigpu/primaryCommands.h>
#include <carla/multigpu/secondary.h>
#include <carla/multigpu/secondaryCommands.h>
#include <compiler/enable-ue4-macros.h>
#include <mutex>
class UCarlaSettings;
struct FEpisodeSettings;
@ -93,4 +102,14 @@ private:
FDelegateHandle OnPostTickHandle;
FDelegateHandle OnEpisodeSettingsChangeHandle;
bool bIsPrimaryServer = true;
std::unordered_map<uint32_t, uint32_t> MappedId;
std::shared_ptr<carla::multigpu::Router> SecondaryServer;
std::shared_ptr<carla::multigpu::Secondary> Secondary;
std::vector<FFrameData> FramesToProcess;
std::mutex FrameToProcessMutex;
};

View File

@ -54,6 +54,7 @@ UCarlaEpisode::UCarlaEpisode(const FObjectInitializer &ObjectInitializer)
Id(URandomEngine::GenerateRandomId())
{
ActorDispatcher = CreateDefaultSubobject<UActorDispatcher>(TEXT("ActorDispatcher"));
FrameData.SetEpisode(this);
}
bool UCarlaEpisode::LoadNewEpisode(const FString &MapString, bool ResetSettings)
@ -282,6 +283,13 @@ void UCarlaEpisode::AttachActors(
UActorAttacher::AttachActors(Child, Parent, InAttachmentType);
if (bIsPrimaryServer)
{
GetFrameData().AddEvent(
CarlaRecorderEventParent{
FindCarlaActor(Child)->GetActorId(),
FindCarlaActor(Parent)->GetActorId()});
}
// recorder event
if (Recorder->IsEnabled())
{
@ -405,6 +413,14 @@ TPair<EActorSpawnResultStatus, FCarlaActor*> UCarlaEpisode::SpawnActorWithInfo(
// NewTransform.AddToTranslation(-1.0f * FVector(CurrentMapOrigin));
auto result = ActorDispatcher->SpawnActor(LocalTransform, thisActorDescription, DesiredId);
if (result.Key == EActorSpawnResultStatus::Success && bIsPrimaryServer)
{
GetFrameData().CreateRecorderEventAdd(
result.Value->GetActorId(),
static_cast<uint8_t>(result.Value->GetActorType()),
Transform,
std::move(thisActorDescription));
}
if (Recorder->IsEnabled())
{
if (result.Key == EActorSpawnResultStatus::Success)

View File

@ -13,6 +13,8 @@
#include "Carla/Settings/EpisodeSettings.h"
#include "Carla/Util/ActorAttacher.h"
#include "Carla/Weather/Weather.h"
#include "Carla/Game/FrameData.h"
#include "Carla/Sensor/SensorManager.h"
#include "GameFramework/Pawn.h"
@ -233,6 +235,11 @@ public:
bool DestroyActor(carla::rpc::ActorId ActorId)
{
if (bIsPrimaryServer)
{
GetFrameData().AddEvent(
CarlaRecorderEventDel{ActorId});
}
if (Recorder->IsEnabled())
{
// recorder event
@ -289,6 +296,12 @@ public:
void SetCurrentMapOrigin(const FIntVector& NewOrigin) { CurrentMapOrigin = NewOrigin; }
FFrameData& GetFrameData() { return FrameData; }
FSensorManager& GetSensorManager() { return SensorManager; }
bool bIsPrimaryServer = true;
private:
friend class ACarlaGameModeBase;
@ -340,4 +353,8 @@ private:
carla::geom::GeoLocation MapGeoReference;
FIntVector CurrentMapOrigin;
FFrameData FrameData;
FSensorManager SensorManager;
};

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,158 @@
// Copyright (c) 2022 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#pragma once
#include "Carla/Recorder/CarlaRecorderTraficLightTime.h"
#include "Carla/Recorder/CarlaRecorderPhysicsControl.h"
#include "Carla/Recorder/CarlaRecorderPlatformTime.h"
#include "Carla/Recorder/CarlaRecorderBoundingBox.h"
#include "Carla/Recorder/CarlaRecorderKinematics.h"
#include "Carla/Recorder/CarlaRecorderLightScene.h"
#include "Carla/Recorder/CarlaRecorderLightVehicle.h"
#include "Carla/Recorder/CarlaRecorderAnimVehicle.h"
#include "Carla/Recorder/CarlaRecorderAnimWalker.h"
#include "Carla/Recorder/CarlaRecorderCollision.h"
#include "Carla/Recorder/CarlaRecorderEventAdd.h"
#include "Carla/Recorder/CarlaRecorderEventDel.h"
#include "Carla/Recorder/CarlaRecorderEventParent.h"
#include "Carla/Recorder/CarlaRecorderFrames.h"
#include "Carla/Recorder/CarlaRecorderInfo.h"
#include "Carla/Recorder/CarlaRecorderPosition.h"
#include "Carla/Recorder/CarlaRecorderFrameCounter.h"
#include <sstream>
class UCarlaEpisode;
class FCarlaActor;
class FFrameData
{
// structures
CarlaRecorderInfo Info;
CarlaRecorderFrames Frames;
CarlaRecorderEventsAdd EventsAdd;
CarlaRecorderEventsDel EventsDel;
CarlaRecorderEventsParent EventsParent;
CarlaRecorderCollisions Collisions;
CarlaRecorderPositions Positions;
CarlaRecorderStates States;
CarlaRecorderAnimVehicles Vehicles;
CarlaRecorderAnimWalkers Walkers;
CarlaRecorderLightVehicles LightVehicles;
CarlaRecorderLightScenes LightScenes;
CarlaRecorderActorsKinematics Kinematics;
CarlaRecorderActorBoundingBoxes BoundingBoxes;
CarlaRecorderActorTriggerVolumes TriggerVolumes;
CarlaRecorderPlatformTime PlatformTime;
CarlaRecorderPhysicsControls PhysicsControls;
CarlaRecorderTrafficLightTimes TrafficLightTimes;
CarlaRecorderFrameCounter FrameCounter;
#pragma pack(push, 1)
struct Header
{
char Id;
uint32_t Size;
};
#pragma pack(pop)
public:
void SetEpisode(UCarlaEpisode* ThisEpisode) {Episode = ThisEpisode;}
void GetFrameData(UCarlaEpisode *ThisEpisode, bool bAdditionalData = false);
void PlayFrameData(UCarlaEpisode *ThisEpisode, std::unordered_map<uint32_t, uint32_t>& MappedId);
void Clear();
void Write(std::ostream& OutStream);
void Read(std::istream& InStream);
// record functions
void CreateRecorderEventAdd(
uint32_t DatabaseId,
uint8_t Type,
const FTransform &Transform,
FActorDescription ActorDescription);
void AddEvent(const CarlaRecorderEventAdd &Event);
void AddEvent(const CarlaRecorderEventDel &Event);
void AddEvent(const CarlaRecorderEventParent &Event);
private:
void AddCollision(AActor *Actor1, AActor *Actor2);
void AddPosition(const CarlaRecorderPosition &Position);
void AddState(const CarlaRecorderStateTrafficLight &State);
void AddAnimVehicle(const CarlaRecorderAnimVehicle &Vehicle);
void AddAnimWalker(const CarlaRecorderAnimWalker &Walker);
void AddLightVehicle(const CarlaRecorderLightVehicle &LightVehicle);
void AddEventLightSceneChanged(const UCarlaLight* Light);
void AddKinematics(const CarlaRecorderKinematics &ActorKinematics);
void AddBoundingBox(const CarlaRecorderActorBoundingBox &ActorBoundingBox);
void AddTriggerVolume(const ATrafficSignBase &TrafficSign);
void AddPhysicsControl(const ACarlaWheeledVehicle& Vehicle);
void AddTrafficLightTime(const ATrafficLightBase& TrafficLight);
void AddActorPosition(FCarlaActor *CarlaActor);
void AddWalkerAnimation(FCarlaActor *CarlaActor);
void AddVehicleAnimation(FCarlaActor *CarlaActor);
void AddTrafficLightState(FCarlaActor *CarlaActor);
void AddVehicleLight(FCarlaActor *CarlaActor);
void AddActorKinematics(FCarlaActor *CarlaActor);
void AddActorBoundingBox(FCarlaActor *CarlaActor);
void GetFrameCounter();
std::pair<int, FCarlaActor*> TryToCreateReplayerActor(
FVector &Location,
FVector &Rotation,
FActorDescription &ActorDesc,
uint32_t DesiredId,
bool SpawnSensors);
// replay event for creating actor
std::pair<int, uint32_t> ProcessReplayerEventAdd(
FVector Location,
FVector Rotation,
CarlaRecorderActorDescription Description,
uint32_t DesiredId,
bool bIgnoreHero,
bool ReplaySensors);
// replay event for removing actor
bool ProcessReplayerEventDel(uint32_t DatabaseId);
// replay event for parenting actors
bool ProcessReplayerEventParent(uint32_t ChildId, uint32_t ParentId);
// reposition actors
bool ProcessReplayerPosition(CarlaRecorderPosition Pos1, CarlaRecorderPosition Pos2, double Per, double DeltaTime);
// replay event for traffic light state
bool ProcessReplayerStateTrafficLight(CarlaRecorderStateTrafficLight State);
// set the animation for Vehicles
void ProcessReplayerAnimVehicle(CarlaRecorderAnimVehicle Vehicle);
// set the animation for walkers
void ProcessReplayerAnimWalker(CarlaRecorderAnimWalker Walker);
// set the vehicle light
void ProcessReplayerLightVehicle(CarlaRecorderLightVehicle LightVehicle);
// set scene lights
void ProcessReplayerLightScene(CarlaRecorderLightScene LightScene);
// replay finish
bool ProcessReplayerFinish(bool bApplyAutopilot, bool bIgnoreHero, std::unordered_map<uint32_t, bool> &IsHero);
// set the camera position to follow an actor
bool SetCameraPosition(uint32_t Id, FVector Offset, FQuat Rotation);
// set the velocity of the actor
void SetActorVelocity(FCarlaActor *CarlaActor, FVector Velocity);
// set the animation speed for walkers
void SetWalkerSpeed(uint32_t ActorId, float Speed);
// enable / disable physics for an actor
bool SetActorSimulatePhysics(FCarlaActor *CarlaActor, bool bEnabled);
void SetFrameCounter();
FCarlaActor* FindTrafficLightAt(FVector Location);
UCarlaEpisode *Episode;
};

View File

@ -50,6 +50,7 @@ void UCarlaLight::EndPlay(const EEndPlayReason::Type EndPlayReason)
UCarlaLightSubsystem* CarlaLightSubsystem = World->GetSubsystem<UCarlaLightSubsystem>();
CarlaLightSubsystem->UnregisterLight(this);
}
Super::EndPlay(EndPlayReason);
}
void UCarlaLight::SetLightIntensity(float Intensity)

View File

@ -7,10 +7,11 @@
#include "Carla.h"
#include "Carla/OpenDrive/OpenDrive.h"
#include "Carla/Game/CarlaGameModeBase.h"
#include "Carla/Game/CarlaStatics.h"
#include "GenericPlatform/GenericPlatformProcess.h"
#include "Runtime/Core/Public/HAL/FileManagerGeneric.h"
#include "Misc/FileHelper.h"
FString UOpenDrive::FindPathToXODRFile(const FString &InMapName){

View File

@ -9,6 +9,9 @@
#include "Carla/OpenDrive/OpenDrive.h"
#include "UObject/ConstructorHelpers.h"
#include "DrawDebugHelpers.h"
#include <compiler/disable-ue4-macros.h>
#include <carla/geom/Math.h>
#include <carla/opendrive/OpenDriveParser.h>

View File

@ -59,7 +59,8 @@ enum class CarlaRecorderPacketId : uint8_t
PlatformTime,
PhysicsControl,
TrafficLightTime,
TriggerVolume
TriggerVolume,
FrameCounter
};
/// Recorder for the simulation

View File

@ -8,7 +8,7 @@
#include "CarlaRecorderAnimVehicle.h"
#include "CarlaRecorderHelpers.h"
void CarlaRecorderAnimVehicle::Write(std::ofstream &OutFile)
void CarlaRecorderAnimVehicle::Write(std::ostream &OutFile)
{
// database id
WriteValue<uint32_t>(OutFile, this->DatabaseId);
@ -18,7 +18,7 @@ void CarlaRecorderAnimVehicle::Write(std::ofstream &OutFile)
WriteValue<bool>(OutFile, this->bHandbrake);
WriteValue<int32_t>(OutFile, this->Gear);
}
void CarlaRecorderAnimVehicle::Read(std::ifstream &InFile)
void CarlaRecorderAnimVehicle::Read(std::istream &InFile)
{
// database id
ReadValue<uint32_t>(InFile, this->DatabaseId);
@ -41,7 +41,7 @@ void CarlaRecorderAnimVehicles::Add(const CarlaRecorderAnimVehicle &Vehicle)
Vehicles.push_back(Vehicle);
}
void CarlaRecorderAnimVehicles::Write(std::ofstream &OutFile)
void CarlaRecorderAnimVehicles::Write(std::ostream &OutFile)
{
// write the packet id
WriteValue<char>(OutFile, static_cast<char>(CarlaRecorderPacketId::AnimVehicle));
@ -66,3 +66,22 @@ void CarlaRecorderAnimVehicles::Write(std::ofstream &OutFile)
WriteValue<uint32_t>(OutFile, Total);
OutFile.seekp(PosEnd, std::ios::beg);
}
void CarlaRecorderAnimVehicles::Read(std::istream &InFile)
{
uint16_t i, Total;
CarlaRecorderAnimVehicle Vehicle;
// read Total Vehicles
ReadValue<uint16_t>(InFile, Total);
for (i = 0; i < Total; ++i)
{
Vehicle.Read(InFile);
Add(Vehicle);
}
}
const std::vector<CarlaRecorderAnimVehicle>& CarlaRecorderAnimVehicles::GetVehicles()
{
return Vehicles;
}

View File

@ -6,7 +6,7 @@
#pragma once
#include <fstream>
#include <sstream>
#include <vector>
#pragma pack(push, 1)
@ -19,9 +19,9 @@ struct CarlaRecorderAnimVehicle
bool bHandbrake;
int32_t Gear;
void Read(std::ifstream &InFile);
void Read(std::istream &InFile);
void Write(std::ofstream &OutFile);
void Write(std::ostream &OutFile);
};
#pragma pack(pop)
@ -34,8 +34,11 @@ public:
void Clear(void);
void Write(std::ofstream &OutFile);
void Write(std::ostream &OutFile);
void Read(std::istream &InFile);
const std::vector<CarlaRecorderAnimVehicle>& GetVehicles();
private:
std::vector<CarlaRecorderAnimVehicle> Vehicles;

View File

@ -8,13 +8,13 @@
#include "CarlaRecorderAnimWalker.h"
#include "CarlaRecorderHelpers.h"
void CarlaRecorderAnimWalker::Write(std::ofstream &OutFile)
void CarlaRecorderAnimWalker::Write(std::ostream &OutFile)
{
// database id
WriteValue<uint32_t>(OutFile, this->DatabaseId);
WriteValue<float>(OutFile, this->Speed);
}
void CarlaRecorderAnimWalker::Read(std::ifstream &InFile)
void CarlaRecorderAnimWalker::Read(std::istream &InFile)
{
// database id
ReadValue<uint32_t>(InFile, this->DatabaseId);
@ -33,7 +33,7 @@ void CarlaRecorderAnimWalkers::Add(const CarlaRecorderAnimWalker &Walker)
Walkers.push_back(Walker);
}
void CarlaRecorderAnimWalkers::Write(std::ofstream &OutFile)
void CarlaRecorderAnimWalkers::Write(std::ostream &OutFile)
{
// write the packet id
WriteValue<char>(OutFile, static_cast<char>(CarlaRecorderPacketId::AnimWalker));
@ -53,3 +53,22 @@ void CarlaRecorderAnimWalkers::Write(std::ofstream &OutFile)
Walkers.size() * sizeof(CarlaRecorderAnimWalker));
}
}
void CarlaRecorderAnimWalkers::Read(std::istream &InFile)
{
uint16_t i, Total;
CarlaRecorderAnimWalker Walker;
// read Total walkers
ReadValue<uint16_t>(InFile, Total);
for (i = 0; i < Total; ++i)
{
Walker.Read(InFile);
Add(Walker);
}
}
const std::vector<CarlaRecorderAnimWalker>& CarlaRecorderAnimWalkers::GetWalkers()
{
return Walkers;
}

View File

@ -6,7 +6,7 @@
#pragma once
#include <fstream>
#include <sstream>
#include <vector>
#pragma pack(push, 1)
@ -15,9 +15,9 @@ struct CarlaRecorderAnimWalker
uint32_t DatabaseId;
float Speed;
void Read(std::ifstream &InFile);
void Read(std::istream &InFile);
void Write(std::ofstream &OutFile);
void Write(std::ostream &OutFile);
};
#pragma pack(pop)
@ -30,8 +30,12 @@ public:
void Clear(void);
void Write(std::ofstream &OutFile);
void Write(std::ostream &OutFile);
void Read(std::istream &InFile);
const std::vector<CarlaRecorderAnimWalker>& GetWalkers();
private:
std::vector<CarlaRecorderAnimWalker> Walkers;

View File

@ -8,25 +8,25 @@
#include "CarlaRecorder.h"
#include "CarlaRecorderHelpers.h"
void CarlaRecorderBoundingBox::Write(std::ofstream &OutFile)
void CarlaRecorderBoundingBox::Write(std::ostream &OutFile)
{
WriteFVector(OutFile, this->Origin);
WriteFVector(OutFile, this->Extension);
}
void CarlaRecorderBoundingBox::Read(std::ifstream &InFile)
void CarlaRecorderBoundingBox::Read(std::istream &InFile)
{
ReadFVector(InFile, this->Origin);
ReadFVector(InFile, this->Extension);
}
void CarlaRecorderActorBoundingBox::Write(std::ofstream &OutFile)
void CarlaRecorderActorBoundingBox::Write(std::ostream &OutFile)
{
WriteValue<uint32_t>(OutFile, this->DatabaseId);
BoundingBox.Write(OutFile);
}
void CarlaRecorderActorBoundingBox::Read(std::ifstream &InFile)
void CarlaRecorderActorBoundingBox::Read(std::istream &InFile)
{
ReadValue<uint32_t>(InFile, this->DatabaseId);
BoundingBox.Read(InFile);
@ -45,7 +45,7 @@ void CarlaRecorderActorBoundingBoxes::Add(const CarlaRecorderActorBoundingBox &I
Boxes.push_back(InObj);
}
void CarlaRecorderActorBoundingBoxes::Write(std::ofstream &OutFile)
void CarlaRecorderActorBoundingBoxes::Write(std::ostream &OutFile)
{
// write the packet id
WriteValue<char>(OutFile, static_cast<char>(CarlaRecorderPacketId::BoundingBox));
@ -75,7 +75,7 @@ void CarlaRecorderActorTriggerVolumes::Add(const CarlaRecorderActorBoundingBox &
Boxes.push_back(InObj);
}
void CarlaRecorderActorTriggerVolumes::Write(std::ofstream &OutFile)
void CarlaRecorderActorTriggerVolumes::Write(std::ostream &OutFile)
{
if (Boxes.size() == 0)
{

View File

@ -6,7 +6,7 @@
#pragma once
#include <fstream>
#include <sstream>
#include <vector>
#pragma pack(push, 1)
@ -15,9 +15,9 @@ struct CarlaRecorderBoundingBox
FVector Origin;
FVector Extension;
void Read(std::ifstream &InFile);
void Read(std::istream &InFile);
void Write(std::ofstream &OutFile);
void Write(std::ostream &OutFile);
};
#pragma pack(pop)
@ -27,9 +27,9 @@ struct CarlaRecorderActorBoundingBox
uint32_t DatabaseId;
CarlaRecorderBoundingBox BoundingBox;
void Read(std::ifstream &InFile);
void Read(std::istream &InFile);
void Write(std::ofstream &OutFile);
void Write(std::ostream &OutFile);
};
#pragma pack(pop)
@ -41,7 +41,7 @@ class CarlaRecorderActorBoundingBoxes
void Clear(void);
void Write(std::ofstream &OutFile);
void Write(std::ostream &OutFile);
private:
@ -56,7 +56,7 @@ class CarlaRecorderActorTriggerVolumes
void Clear(void);
void Write(std::ofstream &OutFile);
void Write(std::ostream &OutFile);
private:

View File

@ -8,7 +8,7 @@
#include "CarlaRecorderCollision.h"
#include "CarlaRecorderHelpers.h"
void CarlaRecorderCollision::Read(std::ifstream &InFile)
void CarlaRecorderCollision::Read(std::istream &InFile)
{
// id
ReadValue<uint32_t>(InFile, this->Id);
@ -19,7 +19,7 @@ void CarlaRecorderCollision::Read(std::ifstream &InFile)
ReadValue<bool>(InFile, this->IsActor1Hero);
ReadValue<bool>(InFile, this->IsActor2Hero);
}
void CarlaRecorderCollision::Write(std::ofstream &OutFile) const
void CarlaRecorderCollision::Write(std::ostream &OutFile) const
{
// id
WriteValue<uint32_t>(OutFile, this->Id);
@ -47,7 +47,7 @@ void CarlaRecorderCollisions::Add(const CarlaRecorderCollision &Collision)
Collisions.insert(std::move(Collision));
}
void CarlaRecorderCollisions::Write(std::ofstream &OutFile)
void CarlaRecorderCollisions::Write(std::ostream &OutFile)
{
// write the packet id
WriteValue<char>(OutFile, static_cast<char>(CarlaRecorderPacketId::Collision));
@ -65,4 +65,4 @@ void CarlaRecorderCollisions::Write(std::ofstream &OutFile)
{
Coll.Write(OutFile);
}
}
}

View File

@ -6,7 +6,7 @@
#pragma once
#include <fstream>
#include <sstream>
#include <vector>
#include <unordered_set>
@ -19,8 +19,8 @@ struct CarlaRecorderCollision
bool IsActor1Hero;
bool IsActor2Hero;
void Read(std::ifstream &InFile);
void Write(std::ofstream &OutFile) const;
void Read(std::istream &InFile);
void Write(std::ostream &OutFile) const;
// define operator == needed for the 'unordered_set'
bool operator==(const CarlaRecorderCollision &Other) const;
};
@ -47,7 +47,7 @@ class CarlaRecorderCollisions{
public:
void Add(const CarlaRecorderCollision &Collision);
void Clear(void);
void Write(std::ofstream &OutFile);
void Write(std::ostream &OutFile);
private:
std::unordered_set<CarlaRecorderCollision> Collisions;

View File

@ -8,7 +8,7 @@
#include "CarlaRecorderEventAdd.h"
#include "CarlaRecorderHelpers.h"
void CarlaRecorderEventAdd::Write(std::ofstream &OutFile) const
void CarlaRecorderEventAdd::Write(std::ostream &OutFile) const
{
// database id
WriteValue<uint32_t>(OutFile, this->DatabaseId);
@ -34,7 +34,7 @@ void CarlaRecorderEventAdd::Write(std::ofstream &OutFile) const
}
}
void CarlaRecorderEventAdd::Read(std::ifstream &InFile)
void CarlaRecorderEventAdd::Read(std::istream &InFile)
{
// database id
ReadValue<uint32_t>(InFile, this->DatabaseId);
@ -77,7 +77,7 @@ void CarlaRecorderEventsAdd::Add(const CarlaRecorderEventAdd &Event)
Events.push_back(std::move(Event));
}
void CarlaRecorderEventsAdd::Write(std::ofstream &OutFile)
void CarlaRecorderEventsAdd::Write(std::ostream &OutFile)
{
// write the packet id
WriteValue<char>(OutFile, static_cast<char>(CarlaRecorderPacketId::EventAdd));
@ -102,3 +102,20 @@ void CarlaRecorderEventsAdd::Write(std::ofstream &OutFile)
WriteValue<uint32_t>(OutFile, Total);
OutFile.seekp(PosEnd, std::ios::beg);
}
void CarlaRecorderEventsAdd::Read(std::istream &InFile)
{
CarlaRecorderEventAdd EventAdd;
uint16_t i, Total;
ReadValue<uint16_t>(InFile, Total);
for (i = 0; i < Total; ++i)
{
EventAdd.Read(InFile);
Add(EventAdd);
}
}
const std::vector<CarlaRecorderEventAdd>& CarlaRecorderEventsAdd::GetEvents()
{
return Events;
}

View File

@ -6,7 +6,7 @@
#pragma once
#include <fstream>
#include <sstream>
#include <vector>
struct CarlaRecorderActorAttribute
@ -31,8 +31,8 @@ struct CarlaRecorderEventAdd
FVector Rotation;
CarlaRecorderActorDescription Description;
void Read(std::ifstream &InFile);
void Write(std::ofstream &OutFile) const;
void Read(std::istream &InFile);
void Write(std::ostream &OutFile) const;
};
class CarlaRecorderEventsAdd
@ -41,7 +41,9 @@ class CarlaRecorderEventsAdd
public:
void Add(const CarlaRecorderEventAdd &Event);
void Clear(void);
void Write(std::ofstream &OutFile);
void Write(std::ostream &OutFile);
void Read(std::istream &InFile);
const std::vector<CarlaRecorderEventAdd>& GetEvents();
private:
std::vector<CarlaRecorderEventAdd> Events;

View File

@ -8,12 +8,12 @@
#include "CarlaRecorderEventDel.h"
#include "CarlaRecorderHelpers.h"
void CarlaRecorderEventDel::Read(std::ifstream &InFile)
void CarlaRecorderEventDel::Read(std::istream &InFile)
{
// database id
ReadValue<uint32_t>(InFile, this->DatabaseId);
}
void CarlaRecorderEventDel::Write(std::ofstream &OutFile) const
void CarlaRecorderEventDel::Write(std::ostream &OutFile) const
{
// database id
WriteValue<uint32_t>(OutFile, this->DatabaseId);
@ -31,7 +31,7 @@ void CarlaRecorderEventsDel::Add(const CarlaRecorderEventDel &Event)
Events.push_back(std::move(Event));
}
void CarlaRecorderEventsDel::Write(std::ofstream &OutFile)
void CarlaRecorderEventsDel::Write(std::ostream &OutFile)
{
// write the packet id
WriteValue<char>(OutFile, static_cast<char>(CarlaRecorderPacketId::EventDel));
@ -57,4 +57,23 @@ void CarlaRecorderEventsDel::Write(std::ofstream &OutFile)
OutFile.seekp(PosStart, std::ios::beg);
WriteValue<uint32_t>(OutFile, Total);
OutFile.seekp(PosEnd, std::ios::beg);
}
}
void CarlaRecorderEventsDel::Read(std::istream &InFile)
{
uint16_t i, Total;
CarlaRecorderEventDel EventDel;
// process destroy events
ReadValue<uint16_t>(InFile, Total);
for (i = 0; i < Total; ++i)
{
EventDel.Read(InFile);
Add(EventDel);
}
}
const std::vector<CarlaRecorderEventDel>& CarlaRecorderEventsDel::GetEvents()
{
return Events;
}

View File

@ -6,15 +6,15 @@
#pragma once
#include <fstream>
#include <sstream>
#include <vector>
struct CarlaRecorderEventDel
{
uint32_t DatabaseId;
void Read(std::ifstream &InFile);
void Write(std::ofstream &OutFile) const;
void Read(std::istream &InFile);
void Write(std::ostream &OutFile) const;
};
class CarlaRecorderEventsDel
@ -23,7 +23,9 @@ class CarlaRecorderEventsDel
public:
void Add(const CarlaRecorderEventDel &Event);
void Clear(void);
void Write(std::ofstream &OutFile);
void Write(std::ostream &OutFile);
void Read(std::istream &InFile);
const std::vector<CarlaRecorderEventDel>& GetEvents();
private:
std::vector<CarlaRecorderEventDel> Events;

View File

@ -9,14 +9,14 @@
#include "CarlaRecorderHelpers.h"
void CarlaRecorderEventParent::Read(std::ifstream &InFile)
void CarlaRecorderEventParent::Read(std::istream &InFile)
{
// database id
ReadValue<uint32_t>(InFile, this->DatabaseId);
// database id parent
ReadValue<uint32_t>(InFile, this->DatabaseIdParent);
}
void CarlaRecorderEventParent::Write(std::ofstream &OutFile) const
void CarlaRecorderEventParent::Write(std::ostream &OutFile) const
{
// database id
WriteValue<uint32_t>(OutFile, this->DatabaseId);
@ -36,7 +36,7 @@ void CarlaRecorderEventsParent::Add(const CarlaRecorderEventParent &Event)
Events.push_back(std::move(Event));
}
void CarlaRecorderEventsParent::Write(std::ofstream &OutFile)
void CarlaRecorderEventsParent::Write(std::ostream &OutFile)
{
// write the packet id
WriteValue<char>(OutFile, static_cast<char>(CarlaRecorderPacketId::EventParent));
@ -62,4 +62,24 @@ void CarlaRecorderEventsParent::Write(std::ofstream &OutFile)
OutFile.seekp(PosStart, std::ios::beg);
WriteValue<uint32_t>(OutFile, Total);
OutFile.seekp(PosEnd, std::ios::beg);
}
}
void CarlaRecorderEventsParent::Read(std::istream &InFile)
{
uint16_t i, Total;
CarlaRecorderEventParent EventParent;
std::stringstream Info;
// process parenting events
ReadValue<uint16_t>(InFile, Total);
for (i = 0; i < Total; ++i)
{
EventParent.Read(InFile);
Add(EventParent);
}
}
const std::vector<CarlaRecorderEventParent>& CarlaRecorderEventsParent::GetEvents()
{
return Events;
}

View File

@ -6,7 +6,7 @@
#pragma once
#include <fstream>
#include <sstream>
#include <vector>
struct CarlaRecorderEventParent
@ -14,8 +14,8 @@ struct CarlaRecorderEventParent
uint32_t DatabaseId;
uint32_t DatabaseIdParent;
void Read(std::ifstream &InFile);
void Write(std::ofstream &OutFile) const;
void Read(std::istream &InFile);
void Write(std::ostream &OutFile) const;
};
class CarlaRecorderEventsParent
@ -24,8 +24,10 @@ class CarlaRecorderEventsParent
public:
void Add(const CarlaRecorderEventParent &Event);
void Clear(void);
void Write(std::ofstream &OutFile);
void Write(std::ostream &OutFile);
void Read(std::istream &InFile);
const std::vector<CarlaRecorderEventParent>& GetEvents();
private:
std::vector<CarlaRecorderEventParent> Events;
};

View File

@ -0,0 +1,26 @@
// Copyright (c) 2022 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#include "CarlaRecorderFrameCounter.h"
#include "CarlaRecorder.h"
#include "CarlaRecorderHelpers.h"
void CarlaRecorderFrameCounter::Read(std::istream &InFile)
{
ReadValue<uint64_t>(InFile, this->FrameCounter);
}
void CarlaRecorderFrameCounter::Write(std::ostream &OutFile)
{
// write the packet id
WriteValue<char>(OutFile, static_cast<char>(CarlaRecorderPacketId::FrameCounter));
// write packet size
uint32_t Total = sizeof(uint64_t);
WriteValue<uint32_t>(OutFile, Total);
WriteValue<uint64_t>(OutFile, this->FrameCounter);
}

View File

@ -0,0 +1,22 @@
// Copyright (c) 2022 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#pragma once
#include <sstream>
#pragma pack(push, 1)
struct CarlaRecorderFrameCounter
{
uint64_t FrameCounter;
void Read(std::istream &InFile);
void Write(std::ostream &OutFile);
};
#pragma pack(pop)

View File

@ -8,12 +8,12 @@
#include "CarlaRecorderFrames.h"
#include "CarlaRecorderHelpers.h"
void CarlaRecorderFrame::Read(std::ifstream &InFile)
void CarlaRecorderFrame::Read(std::istream &InFile)
{
ReadValue<CarlaRecorderFrame>(InFile, *this);
}
void CarlaRecorderFrame::Write(std::ofstream &OutFile)
void CarlaRecorderFrame::Write(std::ostream &OutFile)
{
WriteValue<CarlaRecorderFrame>(OutFile, *this);
}
@ -49,7 +49,7 @@ void CarlaRecorderFrames::SetFrame(double DeltaSeconds)
++Frame.Id;
}
void CarlaRecorderFrames::WriteStart(std::ofstream &OutFile)
void CarlaRecorderFrames::WriteStart(std::ostream &OutFile)
{
std::streampos Pos, Offset;
double Dummy = -1.0f;
@ -80,7 +80,7 @@ void CarlaRecorderFrames::WriteStart(std::ofstream &OutFile)
OffsetPreviousFrame = Offset;
}
void CarlaRecorderFrames::WriteEnd(std::ofstream &OutFile)
void CarlaRecorderFrames::WriteEnd(std::ostream &OutFile)
{
// write the packet id
WriteValue<char>(OutFile, static_cast<char>(CarlaRecorderPacketId::FrameEnd));

View File

@ -6,7 +6,7 @@
#pragma once
#include <fstream>
#include <sstream>
#pragma pack(push, 1)
struct CarlaRecorderFrame
@ -15,9 +15,9 @@ struct CarlaRecorderFrame
double DurationThis;
double Elapsed;
void Read(std::ifstream &InFile);
void Read(std::istream &InFile);
void Write(std::ofstream &OutFile);
void Write(std::ostream &OutFile);
};
#pragma pack(pop)
@ -32,8 +32,8 @@ public:
void SetFrame(double DeltaSeconds);
void WriteStart(std::ofstream &OutFile);
void WriteEnd(std::ofstream &OutFile);
void WriteStart(std::ostream &OutFile);
void WriteEnd(std::ostream &OutFile);
private:

View File

@ -34,7 +34,7 @@ std::string GetRecorderFilename(std::string Filename)
// ------
// write binary data from FVector
void WriteFVector(std::ofstream &OutFile, const FVector &InObj)
void WriteFVector(std::ostream &OutFile, const FVector &InObj)
{
WriteValue<float>(OutFile, InObj.X);
WriteValue<float>(OutFile, InObj.Y);
@ -42,13 +42,13 @@ void WriteFVector(std::ofstream &OutFile, const FVector &InObj)
}
// write binary data from FTransform
// void WriteFTransform(std::ofstream &OutFile, const FTransform &InObj){
// void WriteFTransform(std::ostream &OutFile, const FTransform &InObj){
// WriteFVector(OutFile, InObj.GetTranslation());
// WriteFVector(OutFile, InObj.GetRotation().Euler());
// }
// write binary data from FString (length + text)
void WriteFString(std::ofstream &OutFile, const FString &InObj)
void WriteFString(std::ostream &OutFile, const FString &InObj)
{
// encode the string to UTF8 to know the final length
FTCHARToUTF8 EncodedString(*InObj);
@ -63,7 +63,7 @@ void WriteFString(std::ofstream &OutFile, const FString &InObj)
// -----
// read binary data to FVector
void ReadFVector(std::ifstream &InFile, FVector &OutObj)
void ReadFVector(std::istream &InFile, FVector &OutObj)
{
ReadValue<float>(InFile, OutObj.X);
ReadValue<float>(InFile, OutObj.Y);
@ -71,7 +71,7 @@ void ReadFVector(std::ifstream &InFile, FVector &OutObj)
}
// read binary data to FTransform
// void ReadFTransform(std::ifstream &InFile, FTransform &OutObj){
// void ReadFTransform(std::istream &InFile, FTransform &OutObj){
// FVector Vec;
// ReadFVector(InFile, Vec);
// OutObj.SetTranslation(Vec);
@ -80,7 +80,7 @@ void ReadFVector(std::ifstream &InFile, FVector &OutObj)
// }
// read binary data to FString (length + text)
void ReadFString(std::ifstream &InFile, FString &OutObj)
void ReadFString(std::istream &InFile, FString &OutObj)
{
uint16_t Length;
ReadValue<uint16_t>(InFile, Length);

View File

@ -6,7 +6,7 @@
#pragma once
#include <fstream>
#include <sstream>
#include <vector>
// get the final path + filename
@ -18,13 +18,13 @@ std::string GetRecorderFilename(std::string Filename);
// write binary data (using sizeof())
template <typename T>
void WriteValue(std::ofstream &OutFile, const T &InObj)
void WriteValue(std::ostream &OutFile, const T &InObj)
{
OutFile.write(reinterpret_cast<const char *>(&InObj), sizeof(T));
}
template <typename T>
void WriteStdVector(std::ofstream &OutFile, const std::vector<T> &InVec)
void WriteStdVector(std::ostream &OutFile, const std::vector<T> &InVec)
{
WriteValue<uint32_t>(OutFile, InVec.size());
for (const auto& InObj : InVec)
@ -34,7 +34,7 @@ void WriteStdVector(std::ofstream &OutFile, const std::vector<T> &InVec)
}
template <typename T>
void WriteTArray(std::ofstream &OutFile, const TArray<T> &InVec)
void WriteTArray(std::ostream &OutFile, const TArray<T> &InVec)
{
WriteValue<uint32_t>(OutFile, InVec.Num());
for (const auto& InObj : InVec)
@ -44,12 +44,12 @@ void WriteTArray(std::ofstream &OutFile, const TArray<T> &InVec)
}
// write binary data from FVector
void WriteFVector(std::ofstream &OutFile, const FVector &InObj);
void WriteFVector(std::ostream &OutFile, const FVector &InObj);
// write binary data from FTransform
// void WriteFTransform(std::ofstream &OutFile, const FTransform &InObj);
// void WriteFTransform(std::ostream &OutFile, const FTransform &InObj);
// write binary data from FString (length + text)
void WriteFString(std::ofstream &OutFile, const FString &InObj);
void WriteFString(std::ostream &OutFile, const FString &InObj);
// ---------
// replayer
@ -57,13 +57,13 @@ void WriteFString(std::ofstream &OutFile, const FString &InObj);
// read binary data (using sizeof())
template <typename T>
void ReadValue(std::ifstream &InFile, T &OutObj)
void ReadValue(std::istream &InFile, T &OutObj)
{
InFile.read(reinterpret_cast<char *>(&OutObj), sizeof(T));
}
template <typename T>
void ReadStdVector(std::ifstream &InFile, std::vector<T> &OutVec)
void ReadStdVector(std::istream &InFile, std::vector<T> &OutVec)
{
uint32_t VecSize;
ReadValue<uint32_t>(InFile, VecSize);
@ -77,7 +77,7 @@ void ReadStdVector(std::ifstream &InFile, std::vector<T> &OutVec)
}
template <typename T>
void ReadTArray(std::ifstream &InFile, TArray<T> &OutVec)
void ReadTArray(std::istream &InFile, TArray<T> &OutVec)
{
uint32_t VecSize;
ReadValue<uint32_t>(InFile, VecSize);
@ -91,9 +91,9 @@ void ReadTArray(std::ifstream &InFile, TArray<T> &OutVec)
}
// read binary data from FVector
void ReadFVector(std::ifstream &InFile, FVector &OutObj);
void ReadFVector(std::istream &InFile, FVector &OutObj);
// read binary data from FTransform
// void ReadTransform(std::ifstream &InFile, FTransform &OutObj);
// void ReadTransform(std::istream &InFile, FTransform &OutObj);
// read binary data from FString (length + text)
void ReadFString(std::ifstream &InFile, FString &OutObj);
void ReadFString(std::istream &InFile, FString &OutObj);

View File

@ -8,7 +8,7 @@
#include "CarlaRecorderHelpers.h"
#include <fstream>
#include <sstream>
#include <ctime>
struct CarlaRecorderInfo
@ -18,7 +18,7 @@ struct CarlaRecorderInfo
std::time_t Date;
FString Mapfile;
void Read(std::ifstream &File)
void Read(std::istream &File)
{
ReadValue<uint16_t>(File, Version);
ReadFString(File, Magic);
@ -26,7 +26,7 @@ struct CarlaRecorderInfo
ReadFString(File, Mapfile);
}
void Write(std::ofstream &File)
void Write(std::ostream &File)
{
WriteValue<uint16_t>(File, Version);
WriteFString(File, Magic);

View File

@ -8,14 +8,14 @@
#include "CarlaRecorder.h"
#include "CarlaRecorderHelpers.h"
void CarlaRecorderKinematics::Write(std::ofstream &OutFile)
void CarlaRecorderKinematics::Write(std::ostream &OutFile)
{
WriteValue<uint32_t>(OutFile, this->DatabaseId);
WriteFVector(OutFile, this->LinearVelocity);
WriteFVector(OutFile, this->AngularVelocity);
}
void CarlaRecorderKinematics::Read(std::ifstream &InFile)
void CarlaRecorderKinematics::Read(std::istream &InFile)
{
ReadValue<uint32_t>(InFile, this->DatabaseId);
ReadFVector(InFile, this->LinearVelocity);
@ -34,7 +34,7 @@ void CarlaRecorderActorsKinematics::Add(const CarlaRecorderKinematics &InObj)
Kinematics.push_back(InObj);
}
void CarlaRecorderActorsKinematics::Write(std::ofstream &OutFile)
void CarlaRecorderActorsKinematics::Write(std::ostream &OutFile)
{
if (Kinematics.size() == 0)
{

View File

@ -6,7 +6,7 @@
#pragma once
#include <fstream>
#include <sstream>
#include <vector>
#pragma pack(push, 1)
@ -16,9 +16,9 @@ struct CarlaRecorderKinematics
FVector LinearVelocity;
FVector AngularVelocity;
void Read(std::ifstream &InFile);
void Read(std::istream &InFile);
void Write(std::ofstream &OutFile);
void Write(std::ostream &OutFile);
};
#pragma pack(pop)
@ -30,7 +30,7 @@ class CarlaRecorderActorsKinematics
void Clear(void);
void Write(std::ofstream &OutFile);
void Write(std::ostream &OutFile);
private:

View File

@ -9,7 +9,7 @@
#include "CarlaRecorderHelpers.h"
void CarlaRecorderLightScene::Write(std::ofstream &OutFile)
void CarlaRecorderLightScene::Write(std::ostream &OutFile)
{
WriteValue<int>(OutFile, this->LightId);
WriteValue<float>(OutFile, this->Intensity);
@ -17,7 +17,7 @@ void CarlaRecorderLightScene::Write(std::ofstream &OutFile)
WriteValue<bool>(OutFile, this->bOn);
WriteValue<uint8>(OutFile, this->Type);
}
void CarlaRecorderLightScene::Read(std::ifstream &InFile)
void CarlaRecorderLightScene::Read(std::istream &InFile)
{
ReadValue<int>(InFile, this->LightId);
ReadValue<float>(InFile, this->Intensity);
@ -38,7 +38,7 @@ void CarlaRecorderLightScenes::Add(const CarlaRecorderLightScene &Vehicle)
Lights.push_back(Vehicle);
}
void CarlaRecorderLightScenes::Write(std::ofstream &OutFile)
void CarlaRecorderLightScenes::Write(std::ostream &OutFile)
{
if (Lights.size() == 0)
{
@ -63,3 +63,22 @@ void CarlaRecorderLightScenes::Write(std::ofstream &OutFile)
}
}
void CarlaRecorderLightScenes::Read(std::istream &InFile)
{
uint16_t Total;
CarlaRecorderLightScene LightScene;
// read Total light events
ReadValue<uint16_t>(InFile, Total);
for (uint16_t i = 0; i < Total; ++i)
{
LightScene.Read(InFile);
Add(LightScene);
}
}
const std::vector<CarlaRecorderLightScene>& CarlaRecorderLightScenes::GetLights()
{
return Lights;
}

View File

@ -6,7 +6,7 @@
#pragma once
#include <fstream>
#include <sstream>
#include <vector>
#include <type_traits>
@ -20,9 +20,9 @@ struct CarlaRecorderLightScene
bool bOn;
uint8 Type;
void Read(std::ifstream &InFile);
void Read(std::istream &InFile);
void Write(std::ofstream &OutFile);
void Write(std::ostream &OutFile);
};
#pragma pack(pop)
@ -34,7 +34,11 @@ public:
void Clear(void);
void Write(std::ofstream &OutFile);
void Write(std::ostream &OutFile);
void Read(std::istream &InFile);
const std::vector<CarlaRecorderLightScene>& GetLights();
private:

View File

@ -9,13 +9,13 @@
#include "CarlaRecorderHelpers.h"
void CarlaRecorderLightVehicle::Write(std::ofstream &OutFile)
void CarlaRecorderLightVehicle::Write(std::ostream &OutFile)
{
// database id
WriteValue<uint32_t>(OutFile, this->DatabaseId);
WriteValue<VehicleLightStateType>(OutFile, this->State);
}
void CarlaRecorderLightVehicle::Read(std::ifstream &InFile)
void CarlaRecorderLightVehicle::Read(std::istream &InFile)
{
// database id
ReadValue<uint32_t>(InFile, this->DatabaseId);
@ -34,7 +34,7 @@ void CarlaRecorderLightVehicles::Add(const CarlaRecorderLightVehicle &Vehicle)
Vehicles.push_back(Vehicle);
}
void CarlaRecorderLightVehicles::Write(std::ofstream &OutFile)
void CarlaRecorderLightVehicles::Write(std::ostream &OutFile)
{
// write the packet id
WriteValue<char>(OutFile, static_cast<char>(CarlaRecorderPacketId::VehicleLight));
@ -52,3 +52,22 @@ void CarlaRecorderLightVehicles::Write(std::ofstream &OutFile)
Vehicle.Write(OutFile);
}
}
void CarlaRecorderLightVehicles::Read(std::istream &InFile)
{
uint16_t Total;
CarlaRecorderLightVehicle LightVehicle;
// read Total walkers
ReadValue<uint16_t>(InFile, Total);
for (uint16_t i = 0; i < Total; ++i)
{
LightVehicle.Read(InFile);
Add(LightVehicle);
}
}
const std::vector<CarlaRecorderLightVehicle>& CarlaRecorderLightVehicles::GetLightVehicles()
{
return Vehicles;
}

View File

@ -6,7 +6,7 @@
#pragma once
#include <fstream>
#include <sstream>
#include <vector>
#include <type_traits>
@ -19,9 +19,9 @@ struct CarlaRecorderLightVehicle
uint32_t DatabaseId;
VehicleLightStateType State;
void Read(std::ifstream &InFile);
void Read(std::istream &InFile);
void Write(std::ofstream &OutFile);
void Write(std::ostream &OutFile);
};
#pragma pack(pop)
@ -33,7 +33,11 @@ public:
void Clear(void);
void Write(std::ofstream &OutFile);
void Write(std::ostream &OutFile);
void Read(std::istream &InFile);
const std::vector<CarlaRecorderLightVehicle>& GetLightVehicles();
private:

View File

@ -13,7 +13,7 @@
#include <compiler/enable-ue4-macros.h>
void CarlaRecorderPhysicsControl::Write(std::ofstream &OutFile)
void CarlaRecorderPhysicsControl::Write(std::ostream &OutFile)
{
carla::rpc::VehiclePhysicsControl RPCPhysicsControl(VehiclePhysicsControl);
WriteValue<uint32_t>(OutFile, this->DatabaseId);
@ -42,7 +42,7 @@ void CarlaRecorderPhysicsControl::Write(std::ofstream &OutFile)
WriteStdVector(OutFile, RPCPhysicsControl.wheels);
}
void CarlaRecorderPhysicsControl::Read(std::ifstream &InFile)
void CarlaRecorderPhysicsControl::Read(std::istream &InFile)
{
carla::rpc::VehiclePhysicsControl RPCPhysicsControl;
ReadValue<uint32_t>(InFile, this->DatabaseId);
@ -85,7 +85,7 @@ void CarlaRecorderPhysicsControls::Add(const CarlaRecorderPhysicsControl &InObj)
PhysicsControls.push_back(InObj);
}
void CarlaRecorderPhysicsControls::Write(std::ofstream &OutFile)
void CarlaRecorderPhysicsControls::Write(std::ostream &OutFile)
{
if (PhysicsControls.size() == 0)
{

View File

@ -6,7 +6,7 @@
#pragma once
#include <fstream>
#include <sstream>
#include <vector>
#include "Carla/Vehicle/VehiclePhysicsControl.h"
@ -17,9 +17,9 @@ struct CarlaRecorderPhysicsControl
uint32_t DatabaseId;
FVehiclePhysicsControl VehiclePhysicsControl;
void Read(std::ifstream &InFile);
void Read(std::istream &InFile);
void Write(std::ofstream &OutFile);
void Write(std::ostream &OutFile);
};
#pragma pack(pop)
@ -31,7 +31,7 @@ class CarlaRecorderPhysicsControls
void Clear(void);
void Write(std::ofstream &OutFile);
void Write(std::ostream &OutFile);
private:

View File

@ -20,12 +20,12 @@ void CarlaRecorderPlatformTime::UpdateTime()
Time = diff/1000000.0;
}
void CarlaRecorderPlatformTime::Read(std::ifstream &InFile)
void CarlaRecorderPlatformTime::Read(std::istream &InFile)
{
ReadValue<double>(InFile, this->Time);
}
void CarlaRecorderPlatformTime::Write(std::ofstream &OutFile)
void CarlaRecorderPlatformTime::Write(std::ostream &OutFile)
{
// write the packet id
WriteValue<char>(OutFile, static_cast<char>(CarlaRecorderPacketId::PlatformTime));

View File

@ -6,7 +6,7 @@
#pragma once
#include <fstream>
#include <sstream>
#include <chrono>
#pragma pack(push, 1)
@ -21,9 +21,9 @@ struct CarlaRecorderPlatformTime
void SetStartTime();
void UpdateTime();
void Read(std::ifstream &InFile);
void Read(std::istream &InFile);
void Write(std::ofstream &OutFile);
void Write(std::ostream &OutFile);
};
#pragma pack(pop)

View File

@ -8,7 +8,7 @@
#include "CarlaRecorderPosition.h"
#include "CarlaRecorderHelpers.h"
void CarlaRecorderPosition::Write(std::ofstream &OutFile)
void CarlaRecorderPosition::Write(std::ostream &OutFile)
{
// database id
WriteValue<uint32_t>(OutFile, this->DatabaseId);
@ -16,7 +16,7 @@ void CarlaRecorderPosition::Write(std::ofstream &OutFile)
WriteFVector(OutFile, this->Location);
WriteFVector(OutFile, this->Rotation);
}
void CarlaRecorderPosition::Read(std::ifstream &InFile)
void CarlaRecorderPosition::Read(std::istream &InFile)
{
// database id
ReadValue<uint32_t>(InFile, this->DatabaseId);
@ -37,7 +37,7 @@ void CarlaRecorderPositions::Add(const CarlaRecorderPosition &Position)
Positions.push_back(Position);
}
void CarlaRecorderPositions::Write(std::ofstream &OutFile)
void CarlaRecorderPositions::Write(std::ostream &OutFile)
{
// write the packet id
WriteValue<char>(OutFile, static_cast<char>(CarlaRecorderPacketId::Position));
@ -57,3 +57,22 @@ void CarlaRecorderPositions::Write(std::ofstream &OutFile)
Positions.size() * sizeof(CarlaRecorderPosition));
}
}
void CarlaRecorderPositions::Read(std::istream &InFile)
{
uint16_t i, Total;
// read all positions
ReadValue<uint16_t>(InFile, Total);
for (i = 0; i < Total; ++i)
{
CarlaRecorderPosition Pos;
Pos.Read(InFile);
Add(Pos);
}
}
const std::vector<CarlaRecorderPosition>& CarlaRecorderPositions::GetPositions()
{
return Positions;
}

View File

@ -6,7 +6,7 @@
#pragma once
#include <fstream>
#include <sstream>
#include <vector>
#pragma pack(push, 1)
@ -16,9 +16,9 @@ struct CarlaRecorderPosition
FVector Location;
FVector Rotation;
void Read(std::ifstream &InFile);
void Read(std::istream &InFile);
void Write(std::ofstream &OutFile);
void Write(std::ostream &OutFile);
};
#pragma pack(pop)
@ -31,7 +31,11 @@ public:
void Clear(void);
void Write(std::ofstream &OutFile);
void Write(std::ostream &OutFile);
void Read(std::istream &InFile);
const std::vector<CarlaRecorderPosition>& GetPositions();
private:

View File

@ -8,7 +8,7 @@
#include "CarlaRecorderState.h"
#include "CarlaRecorderHelpers.h"
void CarlaRecorderStateTrafficLight::Write(std::ofstream &OutFile)
void CarlaRecorderStateTrafficLight::Write(std::ostream &OutFile)
{
WriteValue<uint32_t>(OutFile, this->DatabaseId);
WriteValue<bool>(OutFile, this->IsFrozen);
@ -16,7 +16,7 @@ void CarlaRecorderStateTrafficLight::Write(std::ofstream &OutFile)
WriteValue<char>(OutFile, this->State);
}
void CarlaRecorderStateTrafficLight::Read(std::ifstream &InFile)
void CarlaRecorderStateTrafficLight::Read(std::istream &InFile)
{
ReadValue<uint32_t>(InFile, this->DatabaseId);
ReadValue<bool>(InFile, this->IsFrozen);
@ -36,7 +36,7 @@ void CarlaRecorderStates::Add(const CarlaRecorderStateTrafficLight &State)
StatesTrafficLights.push_back(std::move(State));
}
void CarlaRecorderStates::Write(std::ofstream &OutFile)
void CarlaRecorderStates::Write(std::ostream &OutFile)
{
// write the packet id
WriteValue<char>(OutFile, static_cast<char>(CarlaRecorderPacketId::State));
@ -54,3 +54,22 @@ void CarlaRecorderStates::Write(std::ofstream &OutFile)
StatesTrafficLights[i].Write(OutFile);
}
}
void CarlaRecorderStates::Read(std::istream &InFile)
{
uint16_t i, Total;
CarlaRecorderStateTrafficLight StateTrafficLight;
// read Total traffic light states
ReadValue<uint16_t>(InFile, Total);
for (i = 0; i < Total; ++i)
{
StateTrafficLight.Read(InFile);
Add(StateTrafficLight);
}
}
const std::vector<CarlaRecorderStateTrafficLight>& CarlaRecorderStates::GetStates()
{
return StatesTrafficLights;
}

View File

@ -6,7 +6,7 @@
#pragma once
#include <fstream>
#include <sstream>
#pragma pack(push, 1)
@ -17,9 +17,9 @@ struct CarlaRecorderStateTrafficLight
float ElapsedTime;
char State;
void Read(std::ifstream &InFile);
void Read(std::istream &InFile);
void Write(std::ofstream &OutFile);
void Write(std::ostream &OutFile);
};
@ -34,7 +34,11 @@ public:
void Clear(void);
void Write(std::ofstream &OutFile);
void Write(std::ostream &OutFile);
void Read(std::istream &InFile);
const std::vector<CarlaRecorderStateTrafficLight>& GetStates();
private:

View File

@ -9,7 +9,7 @@
#include "CarlaRecorderHelpers.h"
void CarlaRecorderTrafficLightTime::Write(std::ofstream &OutFile)
void CarlaRecorderTrafficLightTime::Write(std::ostream &OutFile)
{
WriteValue<uint32_t>(OutFile, this->DatabaseId);
WriteValue(OutFile, this->GreenTime);
@ -17,7 +17,7 @@ void CarlaRecorderTrafficLightTime::Write(std::ofstream &OutFile)
WriteValue(OutFile, this->RedTime);
}
void CarlaRecorderTrafficLightTime::Read(std::ifstream &InFile)
void CarlaRecorderTrafficLightTime::Read(std::istream &InFile)
{
ReadValue<uint32_t>(InFile, this->DatabaseId);
ReadValue(InFile, this->GreenTime);
@ -37,7 +37,7 @@ void CarlaRecorderTrafficLightTimes::Add(const CarlaRecorderTrafficLightTime &In
TrafficLightTimes.push_back(InObj);
}
void CarlaRecorderTrafficLightTimes::Write(std::ofstream &OutFile)
void CarlaRecorderTrafficLightTimes::Write(std::ostream &OutFile)
{
if (TrafficLightTimes.size() == 0)
{

View File

@ -6,7 +6,7 @@
#pragma once
#include <fstream>
#include <sstream>
#include <vector>
#pragma pack(push, 1)
@ -17,9 +17,9 @@ struct CarlaRecorderTrafficLightTime
float YellowTime = 0;
float RedTime = 0;
void Read(std::ifstream &InFile);
void Read(std::istream &InFile);
void Write(std::ofstream &OutFile);
void Write(std::ostream &OutFile);
};
#pragma pack(pop)
@ -31,7 +31,7 @@ class CarlaRecorderTrafficLightTimes
void Clear(void);
void Write(std::ofstream &OutFile);
void Write(std::ostream &OutFile);
private:

View File

@ -57,6 +57,16 @@ public:
template <typename SensorT, typename... ArgsT>
void Send(SensorT &Sensor, ArgsT &&... Args);
/// allow to change the frame number of the header
void SetFrameNumber(uint64_t FrameNumber)
{
carla::sensor::s11n::SensorHeaderSerializer::Header *HeaderStr = reinterpret_cast<carla::sensor::s11n::SensorHeaderSerializer::Header *>(Header.data());
if (HeaderStr)
{
HeaderStr->frame = FrameNumber;
}
}
private:
friend class FDataStreamTmpl<T>;

View File

@ -131,7 +131,7 @@ void ADVSCamera::PostPhysTick(UWorld *World, ELevelTick TickType, float DeltaTim
/// Immediate enqueues render commands of the scene at the current time.
EnqueueRenderSceneImmediate();
WaitForRenderThreadToFinsih();
WaitForRenderThreadToFinish();
//Super (ASceneCaptureSensor) Capture the Scene in a (UTextureRenderTarget2D) CaptureRenderTarge from the CaptureComponent2D
/** Read the image **/

View File

@ -50,6 +50,11 @@ public:
return (*Stream).token();
}
bool AreClientsListening()
{
return Stream ? Stream->AreClientsListening() : false;
}
private:
boost::optional<StreamType> Stream;

View File

@ -46,7 +46,7 @@ static void WritePixelsToBuffer_Vulkan(
uint32 Offset,
FRHICommandListImmediate &InRHICmdList)
{
TRACE_CPUPROFILER_EVENT_SCOPE_STR(__FUNCTION__);
TRACE_CPUPROFILER_EVENT_SCOPE_STR("WritePixelsToBuffer_Vulkan");
check(IsInRenderingThread());
auto RenderResource =
static_cast<const FTextureRenderTarget2DResource *>(RenderTarget.Resource);
@ -177,7 +177,7 @@ void FPixelReader::WritePixelsToBuffer(
bool use16BitFormat
)
{
TRACE_CPUPROFILER_EVENT_SCOPE_STR(__FUNCTION__);
TRACE_CPUPROFILER_EVENT_SCOPE_STR("WritePixelsToBuffer");
check(IsInRenderingThread());
if (IsVulkanPlatform(GMaxRHIShaderPlatform) || IsD3DPlatform(GMaxRHIShaderPlatform, false))

Some files were not shown because too many files have changed in this diff Show More