From ce05c87974afaab2eacda5ea399a42c340abe4ba Mon Sep 17 00:00:00 2001 From: Xisco Bosch Date: Thu, 25 May 2017 15:18:19 +0200 Subject: [PATCH] Problem with threads solved --- .../carla/server/CarlaCommunication.cpp | 25 +++++++++++-------- .../source/carla/server/TCPServer.cpp | 12 ++++++--- .../carla/thread/AsyncReadWriteJobQueue.h | 2 ++ .../source/carla/thread/AsyncWriterJobQueue.h | 5 ++-- .../source/carla/thread/ThreadSafeQueue.h | 13 ++++++++++ 5 files changed, 40 insertions(+), 17 deletions(-) diff --git a/Source/CarlaServer/source/carla/server/CarlaCommunication.cpp b/Source/CarlaServer/source/carla/server/CarlaCommunication.cpp index 9e2b9ade0..dae9080a7 100644 --- a/Source/CarlaServer/source/carla/server/CarlaCommunication.cpp +++ b/Source/CarlaServer/source/carla/server/CarlaCommunication.cpp @@ -54,8 +54,6 @@ namespace server { auto message = std::make_unique(); bool success = false; - do { - if (!thr.getRestart()) { TCPServer::error_code error; @@ -68,13 +66,16 @@ namespace server { if (!server.Connected()) { thr.reconnect(); - break; + return nullptr; } } - } while (!success); - - return message; + if (!success){ + return nullptr; + } + else{ + return message; + } } // This is the thread that listens a string over the TCP world socket. @@ -82,7 +83,6 @@ namespace server { auto message = std::make_unique(); bool success = false; - do { if (!thr.getRestart()) { TCPServer::error_code error; @@ -95,13 +95,16 @@ namespace server { if (!server.Connected()) { thr.reconnect(); - break; + return nullptr; } } - } while (!success); - - return message; + if (!success) { + return nullptr; + } + else { + return message; + } } // This is the thread that sends a string over the TCP world socket. diff --git a/Source/CarlaServer/source/carla/server/TCPServer.cpp b/Source/CarlaServer/source/carla/server/TCPServer.cpp index d8d6ac60b..2999ce480 100644 --- a/Source/CarlaServer/source/carla/server/TCPServer.cpp +++ b/Source/CarlaServer/source/carla/server/TCPServer.cpp @@ -79,13 +79,14 @@ namespace carla { bool end = false, readedBytes = false; int readedSize = 0, sizeToRead = -1; - if (_socket.available() > 0){ + std::cout << "Try to read " << std::endl; + //if (_socket.available() > 0){ do { std::array buf; - size_t len = _socket.read_some(boost::asio::buffer(buf), error); + size_t len = _socket.read_some(boost::asio::buffer(buf), error); if (error) { @@ -109,9 +110,12 @@ namespace carla { } } while ((!readedBytes || sizeToRead > readedSize) && _connected); + + std::cout << "End read" << std::endl; + return true; - } - else return false; + //} + //else return false; } diff --git a/Source/CarlaServer/source/carla/thread/AsyncReadWriteJobQueue.h b/Source/CarlaServer/source/carla/thread/AsyncReadWriteJobQueue.h index 83d13b7bf..82ee5cfb1 100644 --- a/Source/CarlaServer/source/carla/thread/AsyncReadWriteJobQueue.h +++ b/Source/CarlaServer/source/carla/thread/AsyncReadWriteJobQueue.h @@ -88,7 +88,9 @@ namespace thread { if (value != nullptr) { _readJob(*value); } + if (!_restart){ + //_writeQueue.wait_and_push(_writeJob); _writeQueue.push(std::move(_writeJob())); } diff --git a/Source/CarlaServer/source/carla/thread/AsyncWriterJobQueue.h b/Source/CarlaServer/source/carla/thread/AsyncWriterJobQueue.h index 313e25cc9..c23479cb8 100644 --- a/Source/CarlaServer/source/carla/thread/AsyncWriterJobQueue.h +++ b/Source/CarlaServer/source/carla/thread/AsyncWriterJobQueue.h @@ -17,7 +17,7 @@ namespace thread { class AsyncWriterJobQueue { public: - using Job = std::function()>; + using Job = std::function()>; using ConnectJob = std::function; using ReconnectJob = std::function; @@ -65,7 +65,8 @@ namespace thread { _restart = false; _queue.canWait(true); while (!_restart && !_done) { - _queue.push(std::move(_job())); + //_queue.wait_and_push(_job); + _queue.push(std::move(_job())); //Sleep(10); } } diff --git a/Source/CarlaServer/source/carla/thread/ThreadSafeQueue.h b/Source/CarlaServer/source/carla/thread/ThreadSafeQueue.h index 9e27ca4af..3418447c8 100644 --- a/Source/CarlaServer/source/carla/thread/ThreadSafeQueue.h +++ b/Source/CarlaServer/source/carla/thread/ThreadSafeQueue.h @@ -12,8 +12,11 @@ namespace thread { /// A thread safe buffer. template class ThreadSafeQueue { + public: + using Job = std::function()>; + ThreadSafeQueue() : _canWait(true) {} ThreadSafeQueue(const ThreadSafeQueue &) = delete; @@ -25,6 +28,16 @@ namespace thread { _condition.notify_one(); } + + void wait_and_push(Job job){ + std::unique_lock lock(_mutex); + _condition.wait(lock, [this, job]() { + _value = std::move(job()); + return _value != nullptr || !_canWait; + }); + _condition.notify_one(); + } + void canWait(bool wait){ std::lock_guard lock(_mutex); _canWait = wait;