reconnect client

This commit is contained in:
Xisco Bosch 2017-03-16 18:56:51 +01:00
parent 33e00a7090
commit a907f6bd0a
8 changed files with 76 additions and 41 deletions

View File

@ -14,7 +14,7 @@ namespace carla {
_pimpl(std::make_unique<Pimpl>(worldPort, writePort, readPort)) {} _pimpl(std::make_unique<Pimpl>(worldPort, writePort, readPort)) {}
CarlaServer::~CarlaServer() {} CarlaServer::~CarlaServer() {}
void CarlaServer::init(uint32_t LevelCount) { void CarlaServer::init(uint32_t LevelCount) {
_pimpl->sendWorld(static_cast<uint32_t>(Mode::NUMBER_OF_MODES), LevelCount); _pimpl->sendWorld(static_cast<uint32_t>(Mode::NUMBER_OF_MODES), LevelCount);
} }

View File

@ -16,26 +16,27 @@ namespace server {
} }
// This is the thread that sends a string over the TCP socket. // This is the thread that sends a string over the TCP socket.
static void serverWorkerThread(TCPServer &server, const Reward &rwd) { static void serverWorkerThread(TCPServer &server, thread::AsyncReaderJobQueue<Reward> &thr, const Reward &rwd) {
std::string message;
bool correctSerialize = rwd.SerializeToString(&message);
std::string message; TCPServer::error_code error;
bool correctSerialize = rwd.SerializeToString(&message);
TCPServer::error_code error; if (correctSerialize) {
server.writeString(message, error);
if (error) { logTCPError("Failed to send", error); }
} else {
logTCPError("Falied to serialize", error);
}
if (correctSerialize) { if (!server.Connected()) thr.restart();
server.writeString(message, error);
if (error) { logTCPError("Failed to send", error); }
} else {
logTCPError("Falied to serialize", error);
}
} }
//TODO: //TODO:
// Sortida amb google protocol // Sortida amb google protocol
// This is the thread that listens for string over the TCP socket. // This is the thread that listens for string over the TCP socket.
static std::string clientWorkerThread(TCPServer &server) { static std::string clientWorkerThread(TCPServer &server, thread::AsyncWriterJobQueue<std::string> &thr) {
//if (!server.Connected()) server.AcceptSocket(); //if (!server.Connected()) server.AcceptSocket();
@ -43,16 +44,19 @@ namespace server {
TCPServer::error_code error; TCPServer::error_code error;
server.readString(message, error); server.readString(message, error);
if (error && (error != boost::asio::error::eof)) { // eof is expected. if (error && (error != boost::asio::error::eof)) { // eof is expected.
logTCPError("Failed to read", error); logTCPError("Failed to read", error);
return std::string(); return std::string();
} }
if (!server.Connected()) thr.restart();
return message; return message;
} }
// This is the thread that listens & sends a string over the TCP world socket. // This is the thread that listens & sends a string over the TCP world socket.
static std::string worldReceiveThread(TCPServer &server) { static std::string worldReceiveThread(TCPServer &server, thread::AsyncReadWriteJobQueue<std::string, std::string> &thr) {
std::string message; std::string message;
TCPServer::error_code error; TCPServer::error_code error;
@ -63,11 +67,14 @@ namespace server {
logTCPError("Failed to read world", error); logTCPError("Failed to read world", error);
return std::string(); return std::string();
} }
if (!server.Connected()) thr.restart();
return message; return message;
} }
static void worldSendThread(TCPServer &server, const std::string &message) { static void worldSendThread(TCPServer &server, thread::AsyncReadWriteJobQueue<std::string, std::string> &thr, const std::string &message) {
TCPServer::error_code error; TCPServer::error_code error;
//message = message.size + message; //message = message.size + message;
@ -78,6 +85,8 @@ namespace server {
logTCPError("Failed to send world", error); logTCPError("Failed to send world", error);
} }
if (!server.Connected()) thr.restart();
} }
static void Connect(TCPServer &server) { static void Connect(TCPServer &server) {
@ -89,16 +98,16 @@ namespace server {
_server(writePort), _server(writePort),
_client(readPort), _client(readPort),
_worldThread { _worldThread {
[this]() { return worldReceiveThread(this->_world); }, [this]() { return worldReceiveThread(this->_world, this->_worldThread); },
[this](const std::string & msg) { worldSendThread(this->_world, msg); }, [this](const std::string & msg) { worldSendThread(this->_world, this->_worldThread, msg); },
[this]() { Connect(this->_world); } [this]() { Connect(this->_world); }
}, },
_serverThread { _serverThread {
[this](const Reward &rwd) { serverWorkerThread(this->_server, rwd); }, [this](const Reward &rwd) { serverWorkerThread(this->_server, this->_serverThread, rwd); },
[this]() { Connect(this->_server); } [this]() { Connect(this->_server); }
}, },
_clientThread { _clientThread {
[this]() { return clientWorkerThread(this->_client); }, [this]() { return clientWorkerThread(this->_client, this->_clientThread); },
[this]() { Connect(this->_client); } [this]() { Connect(this->_client); }
} }
{ {

View File

@ -44,6 +44,8 @@ namespace server {
thread::AsyncWriterJobQueue<std::string> _clientThread; thread::AsyncWriterJobQueue<std::string> _clientThread;
thread::AsyncReadWriteJobQueue<std::string, std::string> _worldThread; thread::AsyncReadWriteJobQueue<std::string, std::string> _worldThread;
bool communicationEnabled;
}; };
} }

View File

@ -67,7 +67,6 @@ namespace carla {
if ((boost::asio::error::eof == error) || if ((boost::asio::error::eof == error) ||
(boost::asio::error::connection_reset == error)) (boost::asio::error::connection_reset == error))
{ {
std::cout<<"Client disconected"<<std::endl;
_connected = false; _connected = false;
} }
} }
@ -86,7 +85,6 @@ namespace carla {
if ((boost::asio::error::eof == error) || if ((boost::asio::error::eof == error) ||
(boost::asio::error::connection_reset == error)) (boost::asio::error::connection_reset == error))
{ {
std::cerr<<"Client disconected"<<std::endl;
_connected = false; _connected = false;
} }
else else

View File

@ -35,13 +35,13 @@ namespace server {
private: private:
boost::asio::io_service _service; boost::asio::io_service _service;
boost::asio::ip::tcp::acceptor _acceptor; boost::asio::ip::tcp::acceptor _acceptor;
boost::asio::ip::tcp::socket _socket; boost::asio::ip::tcp::socket _socket;
bool _connected; bool _connected;
}; };
} // namespace server } // namespace server

View File

@ -26,6 +26,7 @@ namespace thread {
ReadingJob && readingJob, ReadingJob && readingJob,
ConnectJob && connectJob) : ConnectJob && connectJob) :
_done(false), _done(false),
_restart(true),
_writeJob(std::move(writingJob)), _writeJob(std::move(writingJob)),
_readJob(std::move(readingJob)), _readJob(std::move(readingJob)),
_connectJob(std::move(connectJob)), _connectJob(std::move(connectJob)),
@ -44,19 +45,27 @@ namespace thread {
_readQueue.push(item); _readQueue.push(item);
} }
void restart(){
_restart = true;
}
private: private:
void workerThread() { void workerThread() {
_connectJob(); while(!_done){
while (!_done) { _restart = false;
R value; _connectJob();
_readQueue.wait_and_pop(value); while (!_restart && !_done) {
_readJob(value); R value;
_writeQueue.push(_writeJob()); _readQueue.wait_and_pop(value);
_readJob(value);
_writeQueue.push(_writeJob());
}
} }
} }
std::atomic_bool _done; std::atomic_bool _done;
std::atomic_bool _restart;
WritingJob _writeJob; WritingJob _writeJob;
ReadingJob _readJob; ReadingJob _readJob;

View File

@ -24,6 +24,7 @@ namespace thread {
explicit AsyncReaderJobQueue(Job &&job, ConnectJob &&connectionJob) : explicit AsyncReaderJobQueue(Job &&job, ConnectJob &&connectionJob) :
_done(false), _done(false),
_restart(true),
_job(std::move(job)), _job(std::move(job)),
_connectionJob(std::move(connectionJob)), _connectionJob(std::move(connectionJob)),
_queue(), _queue(),
@ -40,19 +41,26 @@ namespace thread {
_queue.push(item); _queue.push(item);
} }
private: void restart(){
_restart = true;
}
private:
void workerThread() { void workerThread() {
_connectionJob(); while (!_done){
while (!_done) { _restart = false;
T value; _connectionJob();
_queue.wait_and_pop(value); while (!_restart && !_done) {
_job(value); T value;
//Sleep(10); _queue.wait_and_pop(value);
_job(value);
//Sleep(10);
}
} }
} }
std::atomic_bool _done; std::atomic_bool _done;
std::atomic_bool _restart;
Job _job; Job _job;
ConnectJob _connectionJob; ConnectJob _connectionJob;

View File

@ -22,6 +22,7 @@ namespace thread {
explicit AsyncWriterJobQueue(Job &&job, ConnectJob &&connectJob) : explicit AsyncWriterJobQueue(Job &&job, ConnectJob &&connectJob) :
_done(false), _done(false),
_restart(true),
_job(std::move(job)), _job(std::move(job)),
_connectJob(std::move(connectJob)), _connectJob(std::move(connectJob)),
_queue(), _queue(),
@ -36,17 +37,25 @@ namespace thread {
return _queue.try_pop(value); return _queue.try_pop(value);
} }
void restart(){
_restart = true;
}
private: private:
void workerThread() { void workerThread() {
_connectJob(); while (!_done){
while (!_done) { _restart = false;
_queue.push(_job()); _connectJob();
//Sleep(10); while (!_restart && !_done) {
_queue.push(_job());
//Sleep(10);
}
} }
} }
std::atomic_bool _done; std::atomic_bool _done;
std::atomic_bool _restart;
Job _job; Job _job;
ConnectJob _connectJob; ConnectJob _connectJob;