fix closing streams if an error happens

This commit is contained in:
bernatx 2023-06-09 15:57:03 +02:00 committed by bernat
parent 2b78e09db2
commit d5fb48eb5f
4 changed files with 29 additions and 22 deletions

View File

@ -81,7 +81,7 @@ namespace multigpu {
if (!self) return; if (!self) return;
if (ec) { if (ec) {
log_error("session ", self->_session_id, ": error sending data: ", ec.message()); log_error("session ", self->_session_id, ": error sending data: ", ec.message());
self->CloseNow(); self->CloseNow(ec);
} else { } else {
// DEBUG_ASSERT_EQ(bytes, sizeof(message_size_type) + message->size()); // DEBUG_ASSERT_EQ(bytes, sizeof(message_size_type) + message->size());
} }
@ -94,7 +94,7 @@ namespace multigpu {
boost::asio::bind_executor(self->_strand, handle_sent)); boost::asio::bind_executor(self->_strand, handle_sent));
}); });
} }
void Primary::Write(std::string text) { void Primary::Write(std::string text) {
std::weak_ptr<Primary> weak = shared_from_this(); std::weak_ptr<Primary> weak = shared_from_this();
boost::asio::post(_strand, [=]() { boost::asio::post(_strand, [=]() {
@ -175,10 +175,10 @@ namespace multigpu {
void Primary::Close() { void Primary::Close() {
std::weak_ptr<Primary> weak = shared_from_this(); std::weak_ptr<Primary> weak = shared_from_this();
boost::asio::post(_strand, [weak]() { boost::asio::post(_strand, [weak]() {
auto self = weak.lock(); auto self = weak.lock();
if (!self) return; if (!self) return;
self->CloseNow(); self->CloseNow();
}); });
} }
@ -200,14 +200,18 @@ namespace multigpu {
} }
} }
void Primary::CloseNow() { void Primary::CloseNow(boost::system::error_code ec) {
_deadline.cancel(); _deadline.cancel();
if (_socket.is_open()) { if (!ec)
boost::system::error_code ec; {
_socket.shutdown(boost::asio::socket_base::shutdown_both, ec); if (_socket.is_open()) {
_socket.close(); boost::system::error_code ec2;
_on_closed(shared_from_this()); _socket.shutdown(boost::asio::socket_base::shutdown_both, ec2);
_socket.close();
}
} }
_on_closed(shared_from_this());
log_debug("session", _session_id, "closed");
} }
} // namespace multigpu } // namespace multigpu

View File

@ -42,7 +42,7 @@ namespace multigpu {
Listener &server); Listener &server);
~Primary(); ~Primary();
/// Starts the session and calls @a on_opened after successfully reading the /// Starts the session and calls @a on_opened after successfully reading the
/// stream id, and @a on_closed once the session is closed. /// stream id, and @a on_closed once the session is closed.
void Open( void Open(
@ -60,7 +60,7 @@ namespace multigpu {
/// Writes some data to the socket. /// Writes some data to the socket.
void Write(std::shared_ptr<const carla::streaming::detail::tcp::Message> message); void Write(std::shared_ptr<const carla::streaming::detail::tcp::Message> message);
/// Writes a string /// Writes a string
void Write(std::string text); void Write(std::string text);
@ -80,7 +80,7 @@ namespace multigpu {
void StartTimer(); void StartTimer();
void CloseNow(); void CloseNow(boost::system::error_code ec = boost::system::error_code());
friend class Listener; friend class Listener;

View File

@ -62,7 +62,7 @@ namespace tcp {
boost::asio::post(_strand.context(), [=]() { callback(self); }); boost::asio::post(_strand.context(), [=]() { callback(self); });
} else { } else {
log_error("session", _session_id, ": error retrieving stream id :", ec.message()); log_error("session", _session_id, ": error retrieving stream id :", ec.message());
CloseNow(); CloseNow(ec);
} }
}; };
@ -93,7 +93,7 @@ namespace tcp {
// ignore this message // ignore this message
log_debug("session", _session_id, ": connection too slow: message discarded"); log_debug("session", _session_id, ": connection too slow: message discarded");
return; return;
} }
} }
_is_writing = true; _is_writing = true;
@ -101,7 +101,7 @@ namespace tcp {
_is_writing = false; _is_writing = false;
if (ec) { if (ec) {
log_info("session", _session_id, ": error sending data :", ec.message()); log_info("session", _session_id, ": error sending data :", ec.message());
CloseNow(); CloseNow(ec);
} else { } else {
DEBUG_ONLY(log_debug("session", _session_id, ": successfully sent", bytes, "bytes")); DEBUG_ONLY(log_debug("session", _session_id, ": successfully sent", bytes, "bytes"));
DEBUG_ASSERT_EQ(bytes, sizeof(message_size_type) + message->size()); DEBUG_ASSERT_EQ(bytes, sizeof(message_size_type) + message->size());
@ -137,12 +137,15 @@ namespace tcp {
} }
} }
void ServerSession::CloseNow() { void ServerSession::CloseNow(boost::system::error_code ec) {
_deadline.cancel(); _deadline.cancel();
if (_socket.is_open()) { if (!ec)
boost::system::error_code ec; {
_socket.shutdown(boost::asio::socket_base::shutdown_both, ec); if (_socket.is_open()) {
_socket.close(); boost::system::error_code ec2;
_socket.shutdown(boost::asio::socket_base::shutdown_both, ec2);
_socket.close();
}
} }
_on_closed(shared_from_this()); _on_closed(shared_from_this());
log_debug("session", _session_id, "closed"); log_debug("session", _session_id, "closed");

View File

@ -88,7 +88,7 @@ namespace tcp {
void StartTimer(); void StartTimer();
void CloseNow(); void CloseNow(boost::system::error_code ec = boost::system::error_code());
friend class Server; friend class Server;