Problem with threads solved

This commit is contained in:
Xisco Bosch 2017-05-25 15:18:19 +02:00
parent dce1075eab
commit ce05c87974
5 changed files with 40 additions and 17 deletions

View File

@ -54,8 +54,6 @@ namespace server {
auto message = std::make_unique<std::string>();
bool success = false;
do {
if (!thr.getRestart()) {
TCPServer::error_code error;
@ -68,13 +66,16 @@ namespace server {
if (!server.Connected()) {
thr.reconnect();
break;
return nullptr;
}
}
} while (!success);
return message;
if (!success){
return nullptr;
}
else{
return message;
}
}
// This is the thread that listens a string over the TCP world socket.
@ -82,7 +83,6 @@ namespace server {
auto message = std::make_unique<std::string>();
bool success = false;
do {
if (!thr.getRestart()) {
TCPServer::error_code error;
@ -95,13 +95,16 @@ namespace server {
if (!server.Connected()) {
thr.reconnect();
break;
return nullptr;
}
}
} while (!success);
return message;
if (!success) {
return nullptr;
}
else {
return message;
}
}
// This is the thread that sends a string over the TCP world socket.

View File

@ -79,13 +79,14 @@ namespace carla {
bool end = false, readedBytes = false;
int readedSize = 0, sizeToRead = -1;
if (_socket.available() > 0){
std::cout << "Try to read " << std::endl;
//if (_socket.available() > 0){
do {
std::array<char, 128> buf;
size_t len = _socket.read_some(boost::asio::buffer(buf), error);
size_t len = _socket.read_some(boost::asio::buffer(buf), error);
if (error)
{
@ -109,9 +110,12 @@ namespace carla {
}
} while ((!readedBytes || sizeToRead > readedSize) && _connected);
std::cout << "End read" << std::endl;
return true;
}
else return false;
//}
//else return false;
}

View File

@ -88,7 +88,9 @@ namespace thread {
if (value != nullptr) {
_readJob(*value);
}
if (!_restart){
//_writeQueue.wait_and_push(_writeJob);
_writeQueue.push(std::move(_writeJob()));
}

View File

@ -17,7 +17,7 @@ namespace thread {
class AsyncWriterJobQueue {
public:
using Job = std::function<std::unique_ptr<T>()>;
using Job = std::function<std::unique_ptr<T>()>;
using ConnectJob = std::function<void()>;
using ReconnectJob = std::function<void()>;
@ -65,7 +65,8 @@ namespace thread {
_restart = false;
_queue.canWait(true);
while (!_restart && !_done) {
_queue.push(std::move(_job()));
//_queue.wait_and_push(_job);
_queue.push(std::move(_job()));
//Sleep(10);
}
}

View File

@ -12,8 +12,11 @@ namespace thread {
/// A thread safe buffer.
template<typename T>
class ThreadSafeQueue {
public:
using Job = std::function<std::unique_ptr<T>()>;
ThreadSafeQueue() : _canWait(true) {}
ThreadSafeQueue(const ThreadSafeQueue &) = delete;
@ -25,6 +28,16 @@ namespace thread {
_condition.notify_one();
}
void wait_and_push(Job job){
std::unique_lock<std::mutex> lock(_mutex);
_condition.wait(lock, [this, job]() {
_value = std::move(job());
return _value != nullptr || !_canWait;
});
_condition.notify_one();
}
void canWait(bool wait){
std::lock_guard<std::mutex> lock(_mutex);
_canWait = wait;