Let skip messages in async, and block on sync mode

This commit is contained in:
bernat 2020-10-04 18:43:25 +02:00
parent 1d91e187c3
commit 4550573b5e
8 changed files with 48 additions and 6 deletions

View File

@ -1,3 +1,7 @@
## Latest
* Fixed random dead-lock on synchronous mode at high frame rate
## CARLA 0.9.10
* Added retrieval of bounding boxes for all the elements of the level

View File

@ -60,6 +60,10 @@ namespace streaming {
_pool.AsyncRun(worker_threads);
}
void SetSynchronousMode(bool is_synchro) {
_server.SetSynchronousMode(is_synchro);
}
private:
// The order of these two arguments is very important.

View File

@ -20,7 +20,8 @@ namespace tcp {
Server::Server(boost::asio::io_context &io_context, endpoint ep)
: _io_context(io_context),
_acceptor(_io_context, std::move(ep)),
_timeout(time_duration::seconds(10u)) {}
_timeout(time_duration::seconds(10u)),
_synchronous(false) {}
void Server::OpenSession(
time_duration timeout,
@ -28,7 +29,7 @@ namespace tcp {
ServerSession::callback_function_type on_closed) {
using boost::system::error_code;
auto session = std::make_shared<ServerSession>(_io_context, timeout);
auto session = std::make_shared<ServerSession>(_io_context, timeout, *this);
auto handle_query = [on_opened, on_closed, session](const error_code &ec) {
if (!ec) {

View File

@ -54,6 +54,14 @@ namespace tcp {
});
}
void SetSynchronousMode(bool is_synchro) {
_synchronous = is_synchro;
}
bool IsSynchronousMode(void) {
return _synchronous;
}
private:
void OpenSession(
@ -66,6 +74,8 @@ namespace tcp {
boost::asio::ip::tcp::acceptor _acceptor;
std::atomic<time_duration> _timeout;
bool _synchronous;
};
} // namespace tcp

View File

@ -5,6 +5,7 @@
// For a copy, see <https://opensource.org/licenses/MIT>.
#include "carla/streaming/detail/tcp/ServerSession.h"
#include "carla/streaming/detail/tcp/Server.h"
#include "carla/Debug.h"
#include "carla/Logging.h"
@ -15,6 +16,7 @@
#include <boost/asio/post.hpp>
#include <atomic>
#include <thread>
namespace carla {
namespace streaming {
@ -25,9 +27,11 @@ namespace tcp {
ServerSession::ServerSession(
boost::asio::io_context &io_context,
const time_duration timeout)
const time_duration timeout,
Server &server)
: LIBCARLA_INITIALIZE_LIFETIME_PROFILER(
std::string("tcp server session ") + std::to_string(SESSION_COUNTER)),
_server(server),
_session_id(SESSION_COUNTER++),
_socket(io_context),
_timeout(timeout),
@ -79,9 +83,18 @@ namespace tcp {
if (!_socket.is_open()) {
return;
}
if (_is_writing) {
if (_server.IsSynchronousMode()) {
// wait until previous message has been sent
while (_is_writing) {
std::this_thread::yield();
}
} else {
// ignore this message
log_debug("session", _session_id, ": connection too slow: message discarded");
return;
}
}
_is_writing = true;
auto handle_sent = [this, self, message](const boost::system::error_code &ec, size_t DEBUG_ONLY(bytes)) {

View File

@ -26,6 +26,8 @@ namespace streaming {
namespace detail {
namespace tcp {
class Server;
/// A TCP server session. When a session opens, it reads from the socket a
/// stream id object and passes itself to the callback functor. The session
/// closes itself after @a timeout of inactivity is met.
@ -40,7 +42,8 @@ namespace tcp {
explicit ServerSession(
boost::asio::io_context &io_context,
time_duration timeout);
time_duration timeout,
Server &server);
/// Starts the session and calls @a on_opened after successfully reading the
/// stream id, and @a on_closed once the session is closed.
@ -82,6 +85,8 @@ namespace tcp {
friend class Server;
Server &_server;
const size_t _session_id;
stream_id_type _stream_id = 0u;

View File

@ -64,6 +64,10 @@ namespace low_level {
return _dispatcher.MakeStream();
}
void SetSynchronousMode(bool is_synchro) {
_server.SetSynchronousMode(is_synchro);
}
private:
void StartServer() {

View File

@ -287,6 +287,7 @@ void FCarlaServer::FPimpl::BindActions()
{
REQUIRE_CARLA_EPISODE();
Episode->ApplySettings(settings);
StreamingServer.SetSynchronousMode(settings.synchronous_mode);
return GFrameCounter;
};