diff --git a/LibCarla/source/carla/streaming/Server.h b/LibCarla/source/carla/streaming/Server.h index b99bbe666..245d00557 100644 --- a/LibCarla/source/carla/streaming/Server.h +++ b/LibCarla/source/carla/streaming/Server.h @@ -53,6 +53,10 @@ namespace streaming { return _server.MakeStream(); } + void CloseStream(carla::streaming::detail::stream_id_type id) { + return _server.CloseStream(id); + } + void Run() { _pool.Run(); } diff --git a/LibCarla/source/carla/streaming/detail/Dispatcher.cpp b/LibCarla/source/carla/streaming/detail/Dispatcher.cpp index f10e0de6b..a524ad97d 100644 --- a/LibCarla/source/carla/streaming/detail/Dispatcher.cpp +++ b/LibCarla/source/carla/streaming/detail/Dispatcher.cpp @@ -57,15 +57,32 @@ namespace detail { } } + void Dispatcher::CloseStream(carla::streaming::detail::stream_id_type id) { + std::lock_guard lock(_mutex); + log_info("Calling CloseStream for ", id); + auto search = _stream_map.find(id); + if (search != _stream_map.end()) { + auto stream_state = search->second.lock(); + if (stream_state != nullptr) { + log_info("Disconnecting all sessions (stream ", id, ")"); + stream_state->ClearSessions(); + } + _stream_map.erase(search); + } + } + bool Dispatcher::RegisterSession(std::shared_ptr session) { DEBUG_ASSERT(session != nullptr); std::lock_guard lock(_mutex); auto search = _stream_map.find(session->get_stream_id()); if (search != _stream_map.end()) { - auto stream_state = search->second; - log_info("Connecting session (stream ", session->get_stream_id(), ")"); - stream_state->ConnectSession(std::move(session)); - return true; + auto stream_state = search->second.lock(); + if (stream_state != nullptr) { + log_info("Connecting session (stream ", session->get_stream_id(), ")"); + stream_state->ConnectSession(std::move(session)); + log_info("Current streams: ", _stream_map.size()); + return true; + } } log_error("Invalid session: no stream available with id", session->get_stream_id()); return false; @@ -74,14 +91,18 @@ namespace detail { void Dispatcher::DeregisterSession(std::shared_ptr session) { DEBUG_ASSERT(session != nullptr); std::lock_guard lock(_mutex); + log_info("Calling DeregisterSession for ", session->get_stream_id()); auto search = _stream_map.find(session->get_stream_id()); if (search != _stream_map.end()) { - auto stream_state = search->second; - log_info("Disconnecting session (stream ", session->get_stream_id(), ")"); - stream_state->DisconnectSession(session); + auto stream_state = search->second.lock(); + if (stream_state != nullptr) { + log_info("Disconnecting session (stream ", session->get_stream_id(), ")"); + stream_state->DisconnectSession(session); + log_info("Current streams: ", _stream_map.size()); + } } } - + token_type Dispatcher::GetToken(stream_id_type sensor_id) { std::lock_guard lock(_mutex); log_info("Searching sensor id: ", sensor_id); diff --git a/LibCarla/source/carla/streaming/detail/Dispatcher.h b/LibCarla/source/carla/streaming/detail/Dispatcher.h index 491ea2f8d..bfbf75893 100644 --- a/LibCarla/source/carla/streaming/detail/Dispatcher.h +++ b/LibCarla/source/carla/streaming/detail/Dispatcher.h @@ -35,6 +35,8 @@ namespace detail { carla::streaming::Stream MakeStream(); + void CloseStream(carla::streaming::detail::stream_id_type id); + bool RegisterSession(std::shared_ptr session); void DeregisterSession(std::shared_ptr session); diff --git a/LibCarla/source/carla/streaming/detail/MultiStreamState.h b/LibCarla/source/carla/streaming/detail/MultiStreamState.h index 0f1800aee..532a753a6 100644 --- a/LibCarla/source/carla/streaming/detail/MultiStreamState.h +++ b/LibCarla/source/carla/streaming/detail/MultiStreamState.h @@ -39,6 +39,8 @@ namespace detail { if (session != nullptr) { auto message = Session::MakeMessage(std::move(buffers)...); session->Write(std::move(message)); + // log_info("sensor ", session->get_stream_id()," data sent ", message->size(), " by"); + log_info("sensor ", session->get_stream_id()," data sent"); // Return here, _session is only valid if we have a // single session. return; @@ -46,10 +48,14 @@ namespace detail { // try write multiple stream std::lock_guard lock(_mutex); - auto message = Session::MakeMessage(std::move(buffers)...); - for (auto &s : _sessions) { - if (s != nullptr) { - s->Write(message); + if (_sessions.size() > 0) { + auto message = Session::MakeMessage(std::move(buffers)...); + for (auto &s : _sessions) { + if (s != nullptr) { + s->Write(message); + // log_info("sensor ", s->get_stream_id()," data sent ", message->size(), " by"); + log_info("sensor ", s->get_stream_id()," data sent "); + } } } } @@ -74,6 +80,7 @@ namespace detail { void DisconnectSession(std::shared_ptr session) final { DEBUG_ASSERT(session != nullptr); std::lock_guard lock(_mutex); + log_info("Calling DisconnectSession for ", session->get_stream_id()); if (_sessions.size() == 0) return; if (_sessions.size() == 1) { DEBUG_ASSERT(session == _session.load()); @@ -96,6 +103,11 @@ namespace detail { void ClearSessions() final { std::lock_guard lock(_mutex); + for (auto &s : _sessions) { + if (s != nullptr) { + s->Close(); + } + } _sessions.clear(); _session.store(nullptr); log_debug("Disconnecting all multistream sessions"); diff --git a/LibCarla/source/carla/streaming/low_level/Server.h b/LibCarla/source/carla/streaming/low_level/Server.h index f859bc425..ef9e11110 100644 --- a/LibCarla/source/carla/streaming/low_level/Server.h +++ b/LibCarla/source/carla/streaming/low_level/Server.h @@ -65,6 +65,10 @@ namespace low_level { return _dispatcher.MakeStream(); } + void CloseStream(carla::streaming::detail::stream_id_type id) { + return _dispatcher.CloseStream(id); + } + void SetSynchronousMode(bool is_synchro) { _server.SetSynchronousMode(is_synchro); } @@ -82,6 +86,7 @@ namespace low_level { } }; auto on_session_closed = [this](auto session) { + log_info("on_session_closed called"); _dispatcher.DeregisterSession(session); }; _server.Listen(on_session_opened, on_session_closed); diff --git a/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/Sensor.cpp b/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/Sensor.cpp index f7e236b28..37ff7dd0c 100644 --- a/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/Sensor.cpp +++ b/Unreal/CarlaUE4/Plugins/Carla/Source/Carla/Sensor/Sensor.cpp @@ -10,6 +10,7 @@ #include "Carla/Actor/ActorDescription.h" #include "Carla/Actor/ActorBlueprintFunctionLibrary.h" +#include "Carla/Game/CarlaStatics.h" ASensor::ASensor(const FObjectInitializer &ObjectInitializer) : Super(ObjectInitializer) @@ -97,6 +98,10 @@ void ASensor::PostActorCreated() void ASensor::EndPlay(EEndPlayReason::Type EndPlayReason) { Super::EndPlay(EndPlayReason); + + // close all sessions associated to the sensor stream + UCarlaStatics::GetGameInstance(GetEpisode().GetWorld())->GetServer().GetStreamingServer().CloseStream(carla::streaming::detail::token_type(Stream.GetToken()).get_stream_id()); + Stream = FDataStream(); UCarlaEpisode* Episode = UCarlaStatics::GetCurrentEpisode(GetWorld());