Stream is removed when the sensor is destroyed

This commit is contained in:
bernatx 2022-05-05 10:43:02 +02:00 committed by bernat
parent cb0ae25111
commit f10b83c4fe
6 changed files with 61 additions and 12 deletions

View File

@ -53,6 +53,10 @@ namespace streaming {
return _server.MakeStream(); return _server.MakeStream();
} }
void CloseStream(carla::streaming::detail::stream_id_type id) {
return _server.CloseStream(id);
}
void Run() { void Run() {
_pool.Run(); _pool.Run();
} }

View File

@ -57,15 +57,32 @@ namespace detail {
} }
} }
void Dispatcher::CloseStream(carla::streaming::detail::stream_id_type id) {
std::lock_guard<std::mutex> lock(_mutex);
log_info("Calling CloseStream for ", id);
auto search = _stream_map.find(id);
if (search != _stream_map.end()) {
auto stream_state = search->second.lock();
if (stream_state != nullptr) {
log_info("Disconnecting all sessions (stream ", id, ")");
stream_state->ClearSessions();
}
_stream_map.erase(search);
}
}
bool Dispatcher::RegisterSession(std::shared_ptr<Session> session) { bool Dispatcher::RegisterSession(std::shared_ptr<Session> session) {
DEBUG_ASSERT(session != nullptr); DEBUG_ASSERT(session != nullptr);
std::lock_guard<std::mutex> lock(_mutex); std::lock_guard<std::mutex> lock(_mutex);
auto search = _stream_map.find(session->get_stream_id()); auto search = _stream_map.find(session->get_stream_id());
if (search != _stream_map.end()) { if (search != _stream_map.end()) {
auto stream_state = search->second; auto stream_state = search->second.lock();
log_info("Connecting session (stream ", session->get_stream_id(), ")"); if (stream_state != nullptr) {
stream_state->ConnectSession(std::move(session)); log_info("Connecting session (stream ", session->get_stream_id(), ")");
return true; stream_state->ConnectSession(std::move(session));
log_info("Current streams: ", _stream_map.size());
return true;
}
} }
log_error("Invalid session: no stream available with id", session->get_stream_id()); log_error("Invalid session: no stream available with id", session->get_stream_id());
return false; return false;
@ -74,11 +91,15 @@ namespace detail {
void Dispatcher::DeregisterSession(std::shared_ptr<Session> session) { void Dispatcher::DeregisterSession(std::shared_ptr<Session> session) {
DEBUG_ASSERT(session != nullptr); DEBUG_ASSERT(session != nullptr);
std::lock_guard<std::mutex> lock(_mutex); std::lock_guard<std::mutex> lock(_mutex);
log_info("Calling DeregisterSession for ", session->get_stream_id());
auto search = _stream_map.find(session->get_stream_id()); auto search = _stream_map.find(session->get_stream_id());
if (search != _stream_map.end()) { if (search != _stream_map.end()) {
auto stream_state = search->second; auto stream_state = search->second.lock();
log_info("Disconnecting session (stream ", session->get_stream_id(), ")"); if (stream_state != nullptr) {
stream_state->DisconnectSession(session); log_info("Disconnecting session (stream ", session->get_stream_id(), ")");
stream_state->DisconnectSession(session);
log_info("Current streams: ", _stream_map.size());
}
} }
} }

View File

@ -35,6 +35,8 @@ namespace detail {
carla::streaming::Stream MakeStream(); carla::streaming::Stream MakeStream();
void CloseStream(carla::streaming::detail::stream_id_type id);
bool RegisterSession(std::shared_ptr<Session> session); bool RegisterSession(std::shared_ptr<Session> session);
void DeregisterSession(std::shared_ptr<Session> session); void DeregisterSession(std::shared_ptr<Session> session);

View File

@ -39,6 +39,8 @@ namespace detail {
if (session != nullptr) { if (session != nullptr) {
auto message = Session::MakeMessage(std::move(buffers)...); auto message = Session::MakeMessage(std::move(buffers)...);
session->Write(std::move(message)); session->Write(std::move(message));
// log_info("sensor ", session->get_stream_id()," data sent ", message->size(), " by");
log_info("sensor ", session->get_stream_id()," data sent");
// Return here, _session is only valid if we have a // Return here, _session is only valid if we have a
// single session. // single session.
return; return;
@ -46,10 +48,14 @@ namespace detail {
// try write multiple stream // try write multiple stream
std::lock_guard<std::mutex> lock(_mutex); std::lock_guard<std::mutex> lock(_mutex);
auto message = Session::MakeMessage(std::move(buffers)...); if (_sessions.size() > 0) {
for (auto &s : _sessions) { auto message = Session::MakeMessage(std::move(buffers)...);
if (s != nullptr) { for (auto &s : _sessions) {
s->Write(message); if (s != nullptr) {
s->Write(message);
// log_info("sensor ", s->get_stream_id()," data sent ", message->size(), " by");
log_info("sensor ", s->get_stream_id()," data sent ");
}
} }
} }
} }
@ -74,6 +80,7 @@ namespace detail {
void DisconnectSession(std::shared_ptr<Session> session) final { void DisconnectSession(std::shared_ptr<Session> session) final {
DEBUG_ASSERT(session != nullptr); DEBUG_ASSERT(session != nullptr);
std::lock_guard<std::mutex> lock(_mutex); std::lock_guard<std::mutex> lock(_mutex);
log_info("Calling DisconnectSession for ", session->get_stream_id());
if (_sessions.size() == 0) return; if (_sessions.size() == 0) return;
if (_sessions.size() == 1) { if (_sessions.size() == 1) {
DEBUG_ASSERT(session == _session.load()); DEBUG_ASSERT(session == _session.load());
@ -96,6 +103,11 @@ namespace detail {
void ClearSessions() final { void ClearSessions() final {
std::lock_guard<std::mutex> lock(_mutex); std::lock_guard<std::mutex> lock(_mutex);
for (auto &s : _sessions) {
if (s != nullptr) {
s->Close();
}
}
_sessions.clear(); _sessions.clear();
_session.store(nullptr); _session.store(nullptr);
log_debug("Disconnecting all multistream sessions"); log_debug("Disconnecting all multistream sessions");

View File

@ -65,6 +65,10 @@ namespace low_level {
return _dispatcher.MakeStream(); return _dispatcher.MakeStream();
} }
void CloseStream(carla::streaming::detail::stream_id_type id) {
return _dispatcher.CloseStream(id);
}
void SetSynchronousMode(bool is_synchro) { void SetSynchronousMode(bool is_synchro) {
_server.SetSynchronousMode(is_synchro); _server.SetSynchronousMode(is_synchro);
} }
@ -82,6 +86,7 @@ namespace low_level {
} }
}; };
auto on_session_closed = [this](auto session) { auto on_session_closed = [this](auto session) {
log_info("on_session_closed called");
_dispatcher.DeregisterSession(session); _dispatcher.DeregisterSession(session);
}; };
_server.Listen(on_session_opened, on_session_closed); _server.Listen(on_session_opened, on_session_closed);

View File

@ -10,6 +10,7 @@
#include "Carla/Actor/ActorDescription.h" #include "Carla/Actor/ActorDescription.h"
#include "Carla/Actor/ActorBlueprintFunctionLibrary.h" #include "Carla/Actor/ActorBlueprintFunctionLibrary.h"
#include "Carla/Game/CarlaStatics.h"
ASensor::ASensor(const FObjectInitializer &ObjectInitializer) ASensor::ASensor(const FObjectInitializer &ObjectInitializer)
: Super(ObjectInitializer) : Super(ObjectInitializer)
@ -97,6 +98,10 @@ void ASensor::PostActorCreated()
void ASensor::EndPlay(EEndPlayReason::Type EndPlayReason) void ASensor::EndPlay(EEndPlayReason::Type EndPlayReason)
{ {
Super::EndPlay(EndPlayReason); Super::EndPlay(EndPlayReason);
// close all sessions associated to the sensor stream
UCarlaStatics::GetGameInstance(GetEpisode().GetWorld())->GetServer().GetStreamingServer().CloseStream(carla::streaming::detail::token_type(Stream.GetToken()).get_stream_id());
Stream = FDataStream(); Stream = FDataStream();
UCarlaEpisode* Episode = UCarlaStatics::GetCurrentEpisode(GetWorld()); UCarlaEpisode* Episode = UCarlaStatics::GetCurrentEpisode(GetWorld());