Merge branch 'marcel/leak-workaround' of https://github.com/carla-simulator/carla into marcel/leak-workaround

This commit is contained in:
Jorge Virgos 2024-09-04 10:55:51 +02:00
commit 89c211e780
4 changed files with 106 additions and 64 deletions

View File

@ -86,7 +86,7 @@ namespace tcp {
void Client::Connect() {
auto self = shared_from_this();
boost::asio::post(_strand, [this, self]() {
//boost::asio::post(_strand, [this, self]() {
if (_done) {
return;
}
@ -139,18 +139,18 @@ namespace tcp {
log_debug("streaming client: connecting to", ep);
_socket.async_connect(ep, boost::asio::bind_executor(_strand, handle_connect));
});
//});
}
void Client::Stop() {
_connection_timer.cancel();
auto self = shared_from_this();
boost::asio::post(_strand, [this, self]() {
//boost::asio::post(_strand, [this, self]() {
_done = true;
if (_socket.is_open()) {
_socket.close();
}
});
//});
}
void Client::Reconnect() {
@ -165,7 +165,7 @@ namespace tcp {
void Client::ReadData() {
auto self = shared_from_this();
boost::asio::post(_strand, [this, self]() {
//boost::asio::post(_strand, [this, self]() {
if (_done) {
return;
}
@ -182,7 +182,8 @@ namespace tcp {
// Move the buffer to the callback function and start reading the next
// piece of data.
// log_debug("streaming client: success reading data, calling the callback");
boost::asio::post(_strand, [self, message]() { self->_callback(message->pop()); });
//boost::asio::post(_strand, [self, message]() { self->_callback(message->pop()); });
_callback(message->pop());
ReadData();
} else {
// As usual, if anything fails start over from the very top.
@ -200,12 +201,23 @@ namespace tcp {
if (_done) {
return;
}
//#define LIBCARLA_SEQUENTIAL_CALLBACKS // Temporary fix for memory usage blowup during save_to_disk
#ifndef LIBCARLA_SEQUENTIAL_CALLBACKS
// Now that we know the size of the coming buffer, we can allocate our
// buffer and start putting data into it.
boost::asio::async_read(
_socket,
message->buffer(),
boost::asio::bind_executor(_strand, handle_read_data));
boost::asio::bind_executor(_strand, handle_read_data));
#else
boost::asio::read(
_socket,
message->buffer());
handle_read_data(
boost::system::error_code(),
message->size());
#endif
} else if (!_done) {
log_debug("streaming client: failed to read header:", ec.message());
DEBUG_ONLY(log_debug("size = ", message->size()));
@ -214,12 +226,21 @@ namespace tcp {
}
};
#ifndef LIBCARLA_SEQUENTIAL_CALLBACKS
// Read the size of the buffer that is coming.
boost::asio::async_read(
_socket,
message->size_as_buffer(),
boost::asio::bind_executor(_strand, handle_read_header));
});
#else
boost::asio::read(
_socket,
message->size_as_buffer());
handle_read_header(
boost::system::error_code(),
sizeof(message_size_type));
#endif
//});
}
} // namespace tcp

View File

@ -79,43 +79,39 @@ namespace tcp {
DEBUG_ASSERT(message != nullptr);
DEBUG_ASSERT(!message->empty());
auto self = shared_from_this();
boost::asio::post(_strand, [=]() {
if (!_socket.is_open()) {
if (!_socket.is_open()) {
return;
}
if (_is_writing) {
if (_server.IsSynchronousMode()) {
// wait until previous message has been sent
while (_is_writing) {
std::this_thread::yield();
}
} else {
// ignore this message
log_debug("session", _session_id, ": connection too slow: message discarded");
return;
}
if (_is_writing) {
if (_server.IsSynchronousMode()) {
// wait until previous message has been sent
while (_is_writing) {
std::this_thread::yield();
}
} else {
// ignore this message
log_debug("session", _session_id, ": connection too slow: message discarded");
return;
}
}
_is_writing = true;
auto handle_sent = [this, self, message](const boost::system::error_code &ec, size_t DEBUG_ONLY(bytes)) {
_is_writing = false;
if (ec) {
log_info("session", _session_id, ": error sending data :", ec.message());
CloseNow(ec);
} else {
DEBUG_ONLY(log_debug("session", _session_id, ": successfully sent", bytes, "bytes"));
DEBUG_ASSERT_EQ(bytes, sizeof(message_size_type) + message->size());
}
_is_writing = true;
};
auto handle_sent = [this, self, message](const boost::system::error_code &ec, size_t DEBUG_ONLY(bytes)) {
_is_writing = false;
if (ec) {
log_info("session", _session_id, ": error sending data :", ec.message());
CloseNow(ec);
} else {
DEBUG_ONLY(log_debug("session", _session_id, ": successfully sent", bytes, "bytes"));
DEBUG_ASSERT_EQ(bytes, sizeof(message_size_type) + message->size());
}
};
log_debug("session", _session_id, ": sending message of", message->size(), "bytes");
log_debug("session", _session_id, ": sending message of", message->size(), "bytes");
_deadline.expires_from_now(_timeout);
boost::asio::async_write(
_socket,
message->GetBufferSequence(),
handle_sent);
});
_deadline.expires_from_now(_timeout);
boost::asio::async_write(_socket, message->GetBufferSequence(),
boost::asio::bind_executor(_strand, handle_sent));
}
void ServerSession::Close() {

View File

@ -1,2 +1 @@
carla.so
carla.pyd
_out/

View File

@ -226,7 +226,16 @@ namespace ImageUtil
static void ReadImageDataAsyncCommand(
struct ReadImageDataContext
{
EPixelFormat Format;
FIntPoint Size;
ReadImageDataAsyncCallback Callback;
TUniquePtr<FRHIGPUTextureReadback> Readback;
};
static void ReadImageDataBegin(
ReadImageDataContext& Self,
UTextureRenderTarget2D& RenderTarget,
ReadImageDataAsyncCallback&& Callback)
{
@ -239,12 +248,13 @@ namespace ImageUtil
auto Texture = Resource->GetRenderTargetTexture();
if (Texture == nullptr)
return;
auto Readback = MakeUnique<FRHIGPUTextureReadback>(
Self.Callback = std::move(Callback);
Self.Readback = MakeUnique<FRHIGPUTextureReadback>(
TEXT("ReadImageData-Readback"));
auto Size = Texture->GetSizeXY();
auto Format = Texture->GetFormat();
Self.Size = Texture->GetSizeXY();
Self.Format = Texture->GetFormat();
auto ResolveRect = FResolveRect();
Readback->EnqueueCopy(CmdList, Texture, ResolveRect);
Self.Readback->EnqueueCopy(CmdList, Texture, ResolveRect);
auto Query = RenderQueryPool->AllocateQuery();
CmdList.EndRenderQuery(Query.GetQuery());
@ -252,26 +262,42 @@ namespace ImageUtil
uint64 DeltaTime;
RHIGetRenderQueryResult(Query.GetQuery(), DeltaTime, true);
Query.ReleaseQuery();
}
static void ReadImageDataEnd(
ReadImageDataContext& Self)
{
int32 RowPitch, BufferHeight;
auto MappedPtr = Self.Readback->Lock(RowPitch, &BufferHeight);
if (MappedPtr != nullptr)
{
ScopedCallback Unlock = [&] { Self.Readback->Unlock(); };
Self.Callback(MappedPtr, RowPitch, BufferHeight, Self.Format, Self.Size);
}
}
static void ReadImageDataEndAsync(
ReadImageDataContext&& Self)
{
AsyncTask(
ENamedThreads::HighTaskPriority, [
Readback = MoveTemp(Readback),
Callback = std::move(Callback),
Size,
Format]
Self = std::move(Self)]() mutable
{
while (!Readback->IsReady())
while (!Self.Readback->IsReady())
std::this_thread::yield();
int32 RowPitch, BufferHeight;
auto MappedPtr = Readback->Lock(RowPitch, &BufferHeight);
if (MappedPtr != nullptr)
{
ScopedCallback Unlock = [&] { Readback->Unlock(); };
Callback(MappedPtr, RowPitch, BufferHeight, Format, Size);
}
ReadImageDataEnd(Self);
});
}
static void ReadImageDataAsyncCommand(
UTextureRenderTarget2D& RenderTarget,
ReadImageDataAsyncCallback&& Callback)
{
ReadImageDataContext Context = { };
ReadImageDataBegin(Context, RenderTarget, std::move(Callback));
ReadImageDataEndAsync(std::move(Context));
}
bool ReadImageDataAsync(
@ -287,14 +313,14 @@ namespace ImageUtil
else
{
ENQUEUE_RENDER_COMMAND(ReadImageDataAsyncCmd)([
&RenderTarget,
Callback = std::move(Callback)
&RenderTarget, Callback = std::move(Callback)
](auto& CmdList) mutable
{
ReadImageDataAsyncCommand(
RenderTarget,
std::move(Callback));
ReadImageDataContext Context = { };
ReadImageDataBegin(Context, RenderTarget, std::move(Callback));
ReadImageDataEnd(Context);
});
FlushRenderingCommands();
}
return true;
}