Allow to deregister sessions when closed

This commit is contained in:
nsubiron 2018-10-10 22:56:05 +02:00
parent 5de16e9eee
commit 75cec0b615
8 changed files with 73 additions and 24 deletions

View File

@ -34,15 +34,27 @@ namespace detail {
return ptr;
}
void Dispatcher::RegisterSession(std::shared_ptr<Session> session) {
bool Dispatcher::RegisterSession(std::shared_ptr<Session> session) {
DEBUG_ASSERT(session != nullptr);
std::lock_guard<std::mutex> lock(_mutex);
auto search = _stream_map.find(session->get_stream_id());
if (search != _stream_map.end()) {
DEBUG_ASSERT(search->second != nullptr);
search->second->set_session(std::move(session));
return true;
} else {
log_error("Invalid session: no stream available with id", session->get_stream_id());
return false;
}
}
void Dispatcher::DeregisterSession(std::shared_ptr<Session> session) {
DEBUG_ASSERT(session != nullptr);
std::lock_guard<std::mutex> lock(_mutex);
auto search = _stream_map.find(session->get_stream_id());
if (search != _stream_map.end()) {
DEBUG_ASSERT(search->second != nullptr);
search->second->set_session(nullptr);
}
}

View File

@ -32,7 +32,9 @@ namespace detail {
Stream MakeStream();
void RegisterSession(std::shared_ptr<Session> session);
bool RegisterSession(std::shared_ptr<Session> session);
void DeregisterSession(std::shared_ptr<Session> session);
private:

View File

@ -20,15 +20,16 @@ namespace tcp {
_timeout(time_duration::seconds(10u)) {}
void Server::OpenSession(
const time_duration timeout,
ServerSession::callback_function_type callback) {
time_duration timeout,
ServerSession::callback_function_type on_opened,
ServerSession::callback_function_type on_closed) {
using boost::system::error_code;
auto session = std::make_shared<ServerSession>(_acceptor.get_io_service(), timeout);
auto handle_query = [callback, session](const error_code &ec) {
auto handle_query = [on_opened, on_closed, session](const error_code &ec) {
if (!ec) {
session->Open(callback);
session->Open(std::move(on_opened), std::move(on_closed));
} else {
log_error("tcp accept error:", ec.message());
}
@ -37,7 +38,7 @@ namespace tcp {
_acceptor.async_accept(session->_socket, [=](error_code ec) {
// Handle query and open a new session immediately.
_acceptor.get_io_service().post([=]() { handle_query(ec); });
OpenSession(timeout, callback);
OpenSession(timeout, on_opened, on_closed);
});
}

View File

@ -36,16 +36,25 @@ namespace tcp {
_timeout = timeout;
}
/// Start listening for connections, on each new connection @a callback is
/// called.
template <typename Functor>
void Listen(Functor callback) {
_acceptor.get_io_service().post([=]() { OpenSession(_timeout, callback); });
/// Start listening for connections. On each new connection, @a
/// on_session_opened is called, and @a on_session_closed when the session
/// is closed.
template <typename FunctorT1, typename FunctorT2>
void Listen(FunctorT1 on_session_opened, FunctorT2 on_session_closed) {
_acceptor.get_io_service().post([=]() {
OpenSession(
_timeout,
std::move(on_session_opened),
std::move(on_session_closed));
});
}
private:
void OpenSession(time_duration timeout, ServerSession::callback_function_type callback);
void OpenSession(
time_duration timeout,
ServerSession::callback_function_type on_session_opened,
ServerSession::callback_function_type on_session_closed);
boost::asio::ip::tcp::acceptor _acceptor;

View File

@ -32,18 +32,22 @@ namespace tcp {
_deadline(io_service),
_strand(io_service) {}
void ServerSession::Open(callback_function_type callback) {
void ServerSession::Open(
callback_function_type on_opened,
callback_function_type on_closed) {
DEBUG_ASSERT(on_opened && on_closed);
_on_closed = std::move(on_closed);
StartTimer();
auto self = shared_from_this(); // To keep myself alive.
_strand.post([=]() {
auto handle_query = [this, self, cb=std::move(callback)](
auto handle_query = [this, self, callback=std::move(on_opened)](
const boost::system::error_code &ec,
size_t DEBUG_ONLY(bytes_received)) {
DEBUG_ASSERT_EQ(bytes_received, sizeof(_stream_id));
if (!ec) {
log_debug("session", _session_id, "for stream", _stream_id, " started");
_socket.get_io_service().post([=]() { cb(self); });
_socket.get_io_service().post([=]() { callback(self); });
} else {
log_error("session", _session_id, ": error retrieving stream id :", ec.message());
CloseNow();
@ -59,6 +63,10 @@ namespace tcp {
});
}
void ServerSession::Close() {
_strand.post([self=shared_from_this()]() { self->CloseNow(); });
}
void ServerSession::Write(std::shared_ptr<const Message> message) {
DEBUG_ASSERT(message != nullptr);
DEBUG_ASSERT(!message->empty());
@ -97,7 +105,7 @@ namespace tcp {
void ServerSession::StartTimer() {
if (_deadline.expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
log_debug("session", _session_id, "timed out");
_strand.post([self=shared_from_this()]() { self->CloseNow(); });
Close();
} else {
_deadline.async_wait([this, self=shared_from_this()](boost::system::error_code ec) {
if (!ec) {
@ -115,6 +123,10 @@ namespace tcp {
if (_socket.is_open()) {
_socket.close();
}
_socket.get_io_service().post([self=shared_from_this()]() {
DEBUG_ASSERT(self->_on_closed);
self->_on_closed(self);
});
log_debug("session", _session_id, "closed");
}

View File

@ -40,9 +40,11 @@ namespace tcp {
explicit ServerSession(boost::asio::io_service &io_service, time_duration timeout);
/// Starts the session and calls @a callback after successfully reading the
/// stream id.
void Open(callback_function_type callback);
/// Starts the session and calls @a on_opened after successfully reading the
/// stream id, and @a on_closed once the session is closed.
void Open(
callback_function_type on_opened,
callback_function_type on_closed);
/// @warning This function should only be called after the session is
/// opened. It is safe to call this function from within the @a callback.
@ -59,6 +61,9 @@ namespace tcp {
Write(std::make_shared<const Message>(std::move(buffers)...));
}
/// Post a job to close the session.
void Close();
private:
void Write(std::shared_ptr<const Message> message);
@ -81,6 +86,8 @@ namespace tcp {
boost::asio::io_service::strand _strand;
callback_function_type _on_closed;
bool _is_writing = false;
};

View File

@ -36,9 +36,15 @@ namespace low_level {
detail::EndPoint<protocol_type, ExternalEPType> external_ep)
: _server(io_service, std::move(internal_ep)),
_dispatcher(std::move(external_ep)) {
_server.Listen([this](auto session) {
_dispatcher.RegisterSession(session);
});
auto on_session_opened = [this](auto session) {
if (!_dispatcher.RegisterSession(session)) {
session->Close();
}
};
auto on_session_closed = [this](auto session) {
_dispatcher.DeregisterSession(session);
};
_server.Listen(on_session_opened, on_session_closed);
}
template <typename InternalEPType>

View File

@ -139,7 +139,7 @@ TEST(streaming, low_level_tcp_small_message) {
std::this_thread::sleep_for(1ns);
}
std::cout << "done!\n";
});
}, [](std::shared_ptr<tcp::ServerSession>) { std::cout << "session closed!\n"; });
Dispatcher dispatcher{make_endpoint<tcp::Client::protocol_type>(ep)};
auto stream = dispatcher.MakeStream();