diff --git a/Source/CarlaServer/source/carla/CarlaServer.cpp b/Source/CarlaServer/source/carla/CarlaServer.cpp index ce1af9381..da0c9e6d2 100644 --- a/Source/CarlaServer/source/carla/CarlaServer.cpp +++ b/Source/CarlaServer/source/carla/CarlaServer.cpp @@ -14,7 +14,7 @@ namespace carla { _pimpl(std::make_unique(worldPort, writePort, readPort)) {} CarlaServer::~CarlaServer() {} - + void CarlaServer::init(uint32_t LevelCount) { _pimpl->sendWorld(static_cast(Mode::NUMBER_OF_MODES), LevelCount); } @@ -31,16 +31,38 @@ namespace carla { return _pimpl->tryReadControl(steer, throttle); } - void CarlaServer::sendReward(const Reward_Values &values) { + bool CarlaServer::sendReward(const Reward_Values &values) { + if (needRestart()) return false; _pimpl->sendReward(values); + return true; } - void CarlaServer::sendSceneValues(const Scene_Values &values) { + bool CarlaServer::sendSceneValues(const Scene_Values &values) { + if (needRestart()) return false; _pimpl->sendSceneValues(values); + return true; } - void CarlaServer::sendEndReset() { + bool CarlaServer::sendEndReset() { + if (needRestart()) return false; _pimpl->sendEndReset(); + return true; + } + + bool CarlaServer::worldConnected(){ + return _pimpl->worldConnected(); + } + + bool CarlaServer::clientConnected(){ + return _pimpl->clientConnected(); + } + + bool CarlaServer::serverConnected(){ + return _pimpl->serverConnected(); + } + + bool CarlaServer::needRestart() { + return _pimpl->needRestart(); } } // namespace carla diff --git a/Source/CarlaServer/source/carla/CarlaServer.h b/Source/CarlaServer/source/carla/CarlaServer.h index 144c2ef58..8f660d49c 100644 --- a/Source/CarlaServer/source/carla/CarlaServer.h +++ b/Source/CarlaServer/source/carla/CarlaServer.h @@ -109,13 +109,25 @@ namespace carla { bool tryReadControl(float &steer, float &throttle); /// Send values of the current player status. - void sendReward(const Reward_Values &values); + bool sendReward(const Reward_Values &values); /// Send the values of the generated scene. - void sendSceneValues(const Scene_Values &values); + bool sendSceneValues(const Scene_Values &values); /// Send a signal to the client to notify that the car is ready. - void sendEndReset(); + bool sendEndReset(); + + /// return true if client thread is connected + bool clientConnected(); + + /// return true if server thread is connected + bool serverConnected(); + + /// return true if world thread is connected + bool worldConnected(); + + // Returns true if the server needs to restart the communication protocol + bool needRestart(); private: diff --git a/Source/CarlaServer/source/carla/Protocol/carlaProtocol/protoc.exe b/Source/CarlaServer/source/carla/Protocol/carlaProtocol/protoc.exe new file mode 100644 index 000000000..5162f7c35 Binary files /dev/null and b/Source/CarlaServer/source/carla/Protocol/carlaProtocol/protoc.exe differ diff --git a/Source/CarlaServer/source/carla/server/CarlaCommunication.cpp b/Source/CarlaServer/source/carla/server/CarlaCommunication.cpp index 5b916b2af..5fe763c93 100644 --- a/Source/CarlaServer/source/carla/server/CarlaCommunication.cpp +++ b/Source/CarlaServer/source/carla/server/CarlaCommunication.cpp @@ -2,6 +2,7 @@ #include "lodepng.h" #include "carla_protocol.pb.h" +#include "../CarlaServer.h" #include @@ -10,140 +11,347 @@ namespace server { // -- Static methods --------------------------------------------------------- + std::mutex _generalMutex; + + static Mode getMode(int modeInt) { + switch (modeInt) { + case 0: return Mode::MONO; + case 1: return Mode::STEREO; + default: return Mode::INVALID; + } + } + + template static void logTCPError(const std::string &text, const ERROR_CODE &errorCode) { std::cerr << "CarlaConnection - TCP Server: " << text << ": " << errorCode.message() << std::endl; } // This is the thread that sends a string over the TCP socket. - static void serverWorkerThread(TCPServer &server, const Reward &rwd) { + static void serverWorkerThread( + TCPServer &server, + thread::AsyncReaderJobQueue &thr, + const std::unique_ptr &proto , + const Reward_Values &rwd + ) { - std::string message; - bool correctSerialize = rwd.SerializeToString(&message); + if (!thr.getRestart()){ + std::string message; - TCPServer::error_code error; + Reward reward; + proto->LoadReward(reward, rwd); - if (correctSerialize) { - server.writeString(message, error); - if (error) { logTCPError("Failed to send", error); } - } else { - logTCPError("Falied to serialize", error); + bool correctSerialize = reward.SerializeToString(&message); + + TCPServer::error_code error; + + if (correctSerialize) { + server.writeString(message, error); + if (error) { + logTCPError("Failed to send", error); + } + + if (!server.Connected() && !thr.getRestart()) { + thr.reconnect(); + } + + } else { + logTCPError("Falied to serialize", error); + } + + } - } //TODO: // Sortida amb google protocol // This is the thread that listens for string over the TCP socket. - static std::string clientWorkerThread(TCPServer &server) { + static std::string clientWorkerThread(TCPServer &server, thread::AsyncWriterJobQueue &thr) { //if (!server.Connected()) server.AcceptSocket(); std::string message; - TCPServer::error_code error; + bool success = false; - server.readString(message, error); - if (error && (error != boost::asio::error::eof)) { // eof is expected. - logTCPError("Failed to read", error); - return std::string(); - } + do{ + + if (!thr.getRestart()){ + TCPServer::error_code error; + + success = server.readString(message, error); + + if (error && (error != boost::asio::error::eof)) { // eof is expected. + logTCPError("Failed to read", error); + return std::string(); + } + + if (!server.Connected() && !thr.getRestart()) { + thr.reconnect(); + break; + } + } + + } while (!success); return message; } // This is the thread that listens & sends a string over the TCP world socket. - static std::string worldReceiveThread(TCPServer &server) { + static std::string worldReceiveThread(TCPServer &server, thread::AsyncReadWriteJobQueue &thr) { + //std::lock_guard lock(server.getMutex()); std::string message; - TCPServer::error_code error; + bool success = false; - //if (!server.Connected()) server.AcceptSocket(); + do{ + + if (!thr.getRestart()){ + TCPServer::error_code error; + + success = server.readString(message, error); + if (error && (error != boost::asio::error::eof)) { // eof is expected. + logTCPError("Failed to read world", error); + return std::string(); + } + + if (!server.Connected() && !thr.getRestart()) { + thr.reconnect(); + break; + } + } + + }while (!success); - server.readString(message, error); - if (error && (error != boost::asio::error::eof)) { // eof is expected. - logTCPError("Failed to read world", error); - return std::string(); - } return message; } - static void worldSendThread(TCPServer &server, const std::string &message) { - TCPServer::error_code error; - //message = message.size + message; + static void worldSendThread(TCPServer &server, thread::AsyncReadWriteJobQueue &thr, const std::string &message) { - //if (!server.Connected()) server.AcceptSocket(); + if (!thr.getRestart()){ + TCPServer::error_code error; - server.writeString(message, error); - if (error) { - logTCPError("Failed to send world", error); + server.writeString(message, error); + if (error) { + logTCPError("Failed to send world", error); + } + + if (!server.Connected() && !thr.getRestart()) { + thr.reconnect(); + } } - } static void Connect(TCPServer &server) { - if (!server.Connected()) { server.AcceptSocket(); } + std::cout << "Waiting... port: " << server.port << std::endl; + server.AcceptSocket(); + } + + static void ReconnectAll(CarlaCommunication &communication){ + + std::lock_guard lock(_generalMutex); + + + + if (!communication.NeedRestart()){ + + std::cout << " ---- RECONNECT ALL ...." << std::endl; + + if (!communication.getWorldThread().getRestart()){ + std::cout << " ---- RESTART WORLD ...." << std::endl; + communication.restartWorld(); + communication.getWorldThread().restart(); + } + + if (!communication.getServerThread().getRestart()){ + std::cout << " ---- RESTART SERVER ...." << std::endl; + communication.restartServer(); + communication.getServerThread().restart(); + } + + if (!communication.getClientThread().getRestart()){ + std::cout << " ---- RESTART CLIENT ...." << std::endl; + communication.restartClient(); + communication.getClientThread().restart(); + } + + communication.Restart(); + + } } CarlaCommunication::CarlaCommunication(int worldPort, int writePort, int readPort) : + _serverPort(writePort), + _clientPort(readPort), + _worldPort(worldPort), _world(worldPort), _server(writePort), _client(readPort), + _needRestart(false), + _proto(std::make_unique(this)), _worldThread { - [this]() { return worldReceiveThread(this->_world); }, - [this](const std::string & msg) { worldSendThread(this->_world, msg); }, - [this]() { Connect(this->_world); } + [this]() { return worldReceiveThread(this->_world, this->_worldThread); }, + [this](const std::string & msg) { worldSendThread(this->_world, this->_worldThread, msg); }, + [this]() { Connect(this->_world); }, + [this]() { ReconnectAll(*this);} }, _serverThread { - [this](const Reward &rwd) { serverWorkerThread(this->_server, rwd); }, - [this]() { Connect(this->_server); } + [this](const Reward_Values &rwd) { serverWorkerThread(this->_server, this->_serverThread, this->_proto, rwd); }, + [this]() { Connect(this->_server); }, + [this]() { ReconnectAll(*this);} }, _clientThread { - [this]() { return clientWorkerThread(this->_client); }, - [this]() { Connect(this->_client); } + [this]() { return clientWorkerThread(this->_client, this->_clientThread); }, + [this]() { Connect(this->_client); }, + [this]() { ReconnectAll(*this);} } { - + _mode = Mode::MONO; /*std::cout << "WorldPort: " << worldPort << std::endl; std::cout << "writePort: " << writePort << std::endl; std::cout << "readPort: " << readPort << std::endl;*/ } - void CarlaCommunication::sendReward(const Reward &reward) { - _serverThread.push(reward); - std::cout << "Send Reward" << std::endl; + void CarlaCommunication::sendReward(const Reward_Values &values) { + _serverThread.push(values); } - bool CarlaCommunication::tryReadControl(std::string &control) { - return _clientThread.tryPop(control); + bool CarlaCommunication::tryReadControl(float &steer, float &gas) { + + steer = 0.0f; + gas = 0.0f; + std::string controlMessage; + + if (!_clientThread.tryPop(controlMessage)) return false; + + Control control; + if (!control.ParseFromString(controlMessage)) return false; + + steer = control.steer(); + gas = control.gas(); + + return true; + } - void CarlaCommunication::sendWorld(const World &world) { + void CarlaCommunication::sendWorld(const uint32_t modes,const uint32_t scenes) { + + _needRestart = false; + + World world; + _proto->LoadWorld(world, modes, scenes); + std::string message; bool error = !world.SerializeToString(&message); + _worldThread.push(message); } - void CarlaCommunication::sendScene(const Scene &scene) { + void CarlaCommunication::sendScene(const Scene_Values &values) { + Scene scene; + _proto -> LoadScene(scene, values); + std::string message; bool error = !scene.SerializeToString(&message); _worldThread.push(message); } - void CarlaCommunication::sendReset(const EpisodeReady &ready) { + void CarlaCommunication::sendReset() { + + EpisodeReady eReady; + eReady.set_ready(true); + std::string message; - bool error = !ready.SerializeToString(&message); - if (!error) { - //std::cout << "Send End Reset" << std::endl; + if (eReady.SerializeToString(&message)) { _worldThread.push(message); - } else { - std::cout << " >> SEND RESET ERROR <<" << std::endl; } } - bool CarlaCommunication::tryReadWorldInfo(std::string &info) { - return _worldThread.tryPop(info); + bool CarlaCommunication::tryReadSceneInit(Mode &mode, uint32_t &scene) { + + mode = Mode::INVALID; + scene = 0u; + + std::string info; + if (!_worldThread.tryPop(info)) return false; + + SceneInit sceneInit; + + if (!sceneInit.ParseFromString(info)) return false; + + mode = getMode(sceneInit.mode()); + scene = sceneInit.scene(); + + _mode = mode; + + return true; } + bool CarlaCommunication::tryReadEpisodeStart(uint32_t &start_index, uint32_t &end_index){ + start_index = 0; + end_index = 0; + + std::string startData; + if (!_worldThread.tryPop(startData)) return false; + + EpisodeStart episodeStart; + if(!episodeStart.ParseFromString(startData)) return false; + + start_index = episodeStart.start_index(); + end_index = episodeStart.end_index(); + + return true; + } + + void CarlaCommunication::restartServer(){ + _server.close(); + //_server = TCPServer(_serverPort); + } + + void CarlaCommunication::restartWorld(){ + _world.close(); + //_world = TCPServer(_worldPort); + } + + void CarlaCommunication::restartClient(){ + _client.close(); + //_client = TCPServer(_clientPort); + } + + thread::AsyncReaderJobQueue& CarlaCommunication::getServerThread(){ + return _serverThread; + } + + thread::AsyncWriterJobQueue& CarlaCommunication::getClientThread(){ + return _clientThread; + } + + thread::AsyncReadWriteJobQueue& CarlaCommunication::getWorldThread(){ + return _worldThread; + } + + bool CarlaCommunication::worldConnected(){ + return _world.Connected(); + } + + bool CarlaCommunication::clientConnected(){ + return _client.Connected(); + } + + bool CarlaCommunication::serverConnected(){ + return _server.Connected(); + } + + Mode CarlaCommunication::GetMode(){ + return _mode; + } + + bool CarlaCommunication::NeedRestart(){ + return _needRestart; + } + + void CarlaCommunication::Restart(){ + _needRestart = true; + } } } diff --git a/Source/CarlaServer/source/carla/server/CarlaCommunication.h b/Source/CarlaServer/source/carla/server/CarlaCommunication.h index d40702ef5..fb0f9e6f6 100644 --- a/Source/CarlaServer/source/carla/server/CarlaCommunication.h +++ b/Source/CarlaServer/source/carla/server/CarlaCommunication.h @@ -14,6 +14,11 @@ class EpisodeReady; namespace carla { + + struct Scene_Values; + struct Reward_Values; + enum class Mode : int8_t; + namespace server { class CarlaCommunication : private NonCopyable { @@ -21,31 +26,64 @@ namespace server { explicit CarlaCommunication(int worldPort, int writePort, int readPort); - void sendReward(const Reward &values); + void sendReward(const Reward_Values &values); - bool tryReadControl(std::string &control); + bool tryReadControl(float &steer, float &gas); - void sendScene(const Scene &scene); + void sendScene(const Scene_Values &scene); - void sendReset(const EpisodeReady &ready); + void sendReset(); - void sendWorld(const World &world); + //void sendWorld(const World &world); + void sendWorld(const uint32_t modes, const uint32_t scenes); - bool tryReadWorldInfo(std::string &info); + bool tryReadSceneInit(Mode &mode, uint32_t &scene); + + bool tryReadEpisodeStart(uint32_t &start_index, uint32_t &end_index); + + void restartServer(); + + void restartWorld(); + + void restartClient(); + + thread::AsyncReaderJobQueue& getServerThread(); + thread::AsyncWriterJobQueue& getClientThread(); + thread::AsyncReadWriteJobQueue& getWorldThread(); + + bool worldConnected(); + bool clientConnected(); + bool serverConnected(); + + std::mutex getGeneralMutex(); + + Mode GetMode(); + + bool NeedRestart(); + void Restart(); private: + TCPServer _server; TCPServer _client; TCPServer _world; - thread::AsyncReaderJobQueue _serverThread; + int _worldPort, _clientPort, _serverPort; + + thread::AsyncReaderJobQueue _serverThread; thread::AsyncWriterJobQueue _clientThread; thread::AsyncReadWriteJobQueue _worldThread; + + std::atomic_bool _needRestart; + + std::atomic _mode; + + const std::unique_ptr _proto; }; } diff --git a/Source/CarlaServer/source/carla/server/Protocol.cpp b/Source/CarlaServer/source/carla/server/Protocol.cpp index 4c43b4571..3c364bef8 100644 --- a/Source/CarlaServer/source/carla/server/Protocol.cpp +++ b/Source/CarlaServer/source/carla/server/Protocol.cpp @@ -1,6 +1,7 @@ #include "Protocol.h" -#include "Server.h" +#include "CarlaCommunication.h" +#include "carla/CarlaServer.h" #include "lodepng.h" #include "carla_protocol.pb.h" @@ -28,7 +29,7 @@ namespace server { color_img.emplace_back(color.R); color_img.emplace_back(color.G); color_img.emplace_back(color.B); - color_img.emplace_back(color.A); + color_img.emplace_back(255u); } // Compress to png. lodepng::State state; @@ -41,8 +42,8 @@ namespace server { return true; } - Protocol::Protocol(carla::server::Server *server) { - _server = server; + Protocol::Protocol(carla::server::CarlaCommunication *communication) { + _communication = communication; } Protocol::~Protocol() {} @@ -71,13 +72,13 @@ namespace server { } } - // auto depths = {values.image_depth_0, values.image_depth_1}; - // for (const std::vector &image : depths) { - // std::vector png_image; - // if (getPNGImage(image, values.image_width, values.image_height, png_image)) { - // reward.add_depth(std::string(png_image.begin(), png_image.end())); - // } - // } + auto depths = {values.image_depth_0, values.image_depth_1}; + for (const std::vector &image : depths) { + std::vector png_image; + if (getPNGImage(image, values.image_width, values.image_height, png_image)) { + reward.set_depth(std::string(png_image.begin(), png_image.end())); + } + } } void Protocol::LoadScene(Scene &scene, const Scene_Values &values) { @@ -89,7 +90,7 @@ namespace server { point->set_pos_y(positions[i].y); } - if (_server->GetMode() == Mode::STEREO) { + if (_communication->GetMode() == Mode::STEREO) { Scene::Projection_Matrix* matrix; std::vector> projection_matrix = values.projection_matrices; for (int i = 0; i < projection_matrix.size(); ++i) { diff --git a/Source/CarlaServer/source/carla/server/Protocol.h b/Source/CarlaServer/source/carla/server/Protocol.h index 7918bd60e..ffa63b0a0 100644 --- a/Source/CarlaServer/source/carla/server/Protocol.h +++ b/Source/CarlaServer/source/carla/server/Protocol.h @@ -11,12 +11,12 @@ namespace carla { namespace server { - class Server; + class CarlaCommunication; class Protocol { public: - explicit Protocol(Server *server); + explicit Protocol(CarlaCommunication *communication); ~Protocol(); @@ -28,7 +28,7 @@ namespace server { private: - carla::server::Server *_server; + carla::server::CarlaCommunication *_communication; }; } diff --git a/Source/CarlaServer/source/carla/server/Server.cpp b/Source/CarlaServer/source/carla/server/Server.cpp index 2c4089450..d3e7100a9 100644 --- a/Source/CarlaServer/source/carla/server/Server.cpp +++ b/Source/CarlaServer/source/carla/server/Server.cpp @@ -11,133 +11,76 @@ namespace carla { namespace server { - static Mode getMode(int modeInt) { - switch (modeInt) { - case 1: return Mode::MONO; - case 2: return Mode::STEREO; - default: return Mode::INVALID; - } - } + // -- CarlaServer ------------------------------------------------------------ Server::Server(uint32_t worldPort, uint32_t writePort, uint32_t readPort) : - _communication(std::make_unique(worldPort, writePort, readPort)), - _proto(std::make_unique(this)){} + _communication(std::make_unique(worldPort, writePort, readPort)){} Server::~Server() { } void Server::sendReward(const Reward_Values &values) { - Reward reward; - _proto->LoadReward(reward, values); - _communication->sendReward(reward); + //Reward reward; + //_proto->LoadReward(reward, values); + //_communication->sendReward(reward); + _communication->sendReward(values); } void Server::sendSceneValues(const Scene_Values &values) { - Scene *scene = new Scene; - _proto->LoadScene(*scene, values); - _communication->sendScene(*scene); + //Scene scene; + //_proto->LoadScene(scene, values); + //_communication->sendScene(scene); + _communication->sendScene(values); } void Server::sendEndReset() { - EpisodeReady eReady; - eReady.set_ready(true); - _communication->sendReset(eReady); + + _communication->sendReset(); } void Server::sendWorld(const uint32_t modes, const uint32_t scenes) { - World world; - _proto->LoadWorld(world, modes, scenes); - _communication->sendWorld(world); + //World world; + //_proto->LoadWorld(world, modes, scenes); + //_communication->sendWorld(world); + _communication->sendWorld(modes, scenes); } bool Server::tryReadControl(float &steer, float &gas) { - std::string controlMessage; - bool success = _communication->tryReadControl(controlMessage); - Control control; - if (success) { - success &= control.ParseFromString(controlMessage); - } - steer = control.steer(); - gas = control.gas(); - - if (!success) { - steer = 0.0f; - gas = 0.0f; - } - else { - steer = control.steer(); - gas = control.gas(); - //std::cout << "Steer: " << steer << " Gas: " << gas << std::endl; - } - - return success; + return _communication->tryReadControl(steer, gas); } bool Server::tryReadSceneInit(Mode &mode, uint32_t &scene) { - std::string initMessage; - bool success = _communication->tryReadWorldInfo(initMessage); - SceneInit sceneInit; - - if (success) { - success &= sceneInit.ParseFromString(initMessage); - } - - if (!success) { - mode = Mode::INVALID; - scene = 0u; - } - else { - mode = getMode(sceneInit.mode()); - scene = sceneInit.scene(); - //std::cout << "Mode: " << mode << " Scene: " << scene << std::endl; - } - - return success; + return _communication->tryReadSceneInit(mode, scene); } bool Server::tryReadEpisodeStart(uint32_t &start_index, uint32_t &end_index) { - std::string startData; - bool success = _communication->tryReadWorldInfo(startData); - EpisodeStart episodeStart; - success &= episodeStart.ParseFromString(startData); - - if (!success) { - start_index = 0.0; - end_index = 0.0; - } - else { - start_index = episodeStart.start_index(); - end_index = episodeStart.end_index(); - //std::cout << "Start: " << start_index << " End: " << end_index << std::endl; - } - - return success; - } - - void Server::setMode(Mode mode) { - _mode = mode; + return _communication->tryReadEpisodeStart(start_index, end_index); } Mode Server::GetMode() const { - return _mode; + return _communication->GetMode(); } void Server::SetScene(int scene) { _scene = scene; } - int Server::GetScene() const { - return _scene; + bool Server::worldConnected() const{ + return _communication->worldConnected(); } - void Server::SetReset(bool reset) { - _reset = reset; + bool Server::clientConnected() const{ + return _communication->clientConnected(); } - bool Server::Reset() const { - return _reset; + bool Server::serverConnected() const{ + return _communication->serverConnected(); + } + + bool Server::needRestart() const { + return _communication->NeedRestart(); } } // namespace server diff --git a/Source/CarlaServer/source/carla/server/Server.h b/Source/CarlaServer/source/carla/server/Server.h index a9457dfea..4f5687fc2 100644 --- a/Source/CarlaServer/source/carla/server/Server.h +++ b/Source/CarlaServer/source/carla/server/Server.h @@ -15,7 +15,6 @@ namespace carla { namespace server { class CarlaCommunication; - class Protocol; /// Asynchronous TCP server. Uses two ports, one for sending messages (write) /// and one for receiving messages (read). @@ -62,21 +61,24 @@ namespace server { int GetScene() const; - void SetReset(bool reset); + bool worldConnected() const; - bool Reset() const; + bool clientConnected() const; + + bool serverConnected() const; + + bool needRestart() const; private: //std::mutex _mutex; - std::atomic _mode { Mode::MONO }; + std::atomic_int _scene; - std::atomic_bool _reset; const std::unique_ptr _communication; - const std::unique_ptr _proto; + //const std::unique_ptr _proto; }; } // namespace server diff --git a/Source/CarlaServer/source/carla/server/TCPServer.cpp b/Source/CarlaServer/source/carla/server/TCPServer.cpp index 1a2f055e4..8e15a586b 100644 --- a/Source/CarlaServer/source/carla/server/TCPServer.cpp +++ b/Source/CarlaServer/source/carla/server/TCPServer.cpp @@ -6,145 +6,129 @@ #include namespace carla { - namespace server { + namespace server { - using boost::asio::ip::tcp; + using boost::asio::ip::tcp; - std::string GetBytes(int n) { - std::string bytes; + std::string GetBytes(int n) { + std::string bytes; - bytes = (n >> 24) & 0xFF; - bytes += (n >> 16) & 0xFF; - bytes += (n >> 8) & 0xFF; - bytes += n & 0xFF; + bytes = (n >> 24) & 0xFF; + bytes += (n >> 16) & 0xFF; + bytes += (n >> 8) & 0xFF; + bytes += n & 0xFF; - return bytes; - } + return bytes; + } - int GetInt(char b1, char b2, char b3, char b4) { - int result = 0; - result = (result << 8) + b1; - result = (result << 8) + b2; - result = (result << 8) + b3; - result = (result << 8) + b4; + int GetInt(char b1, char b2, char b3, char b4) { + int result = 0; + result = (result << 8) + b1; + result = (result << 8) + b2; + result = (result << 8) + b3; + result = (result << 8) + b4; - return result; - } + return result; + } - TCPServer::TCPServer(int port) : - _service(), - _acceptor(_service, tcp::endpoint(tcp::v4(), port)), - _port(port), - _socket(_service), - _connected(false){ - /*std::ofstream myfile; - myfile.open("TCP" + std::to_string(_port) + ".txt"); - myfile << " INIT TCP_SERVER " << std::to_string(_port) << std::endl; - myfile.close();*/ - } + TCPServer::TCPServer(int port) : + _service(), + port(port), + _socket(_service), + _connected(false){} - TCPServer::~TCPServer() { + TCPServer::~TCPServer() {} - /*std::ofstream myfile; - myfile.open("TCP" + std::to_string(_port) + ".txt"); - myfile << " Y VOLO " << std::to_string(_port) << std::endl; - myfile.close();*/ + void TCPServer::AcceptSocket() { - } + try { + boost::asio::ip::tcp::acceptor acceptor(_service, tcp::endpoint(tcp::v4(), port)); + acceptor.accept(_socket); + _connected = true; + _service.run(); + std::cout << "Connected port " << port << std::endl; + } - void TCPServer::AcceptSocket() { - /*std::ofstream myfile; - myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app); - myfile << " Create socket " << std::to_string(_port) <>>>> ACCEPT ERROR <<<<<" << std::endl; - myfile.close();*/ - }; + std::string outMessage(GetBytes(message.length()) + message); + + boost::asio::write(_socket, boost::asio::buffer(outMessage), error); + + if (error) + { + _connected = false; + } + } + + bool TCPServer::readString(std::string &message, error_code &error) { + + bool end = false, readedBytes = false; + int readedSize = 0, sizeToRead = -1; + + if (_socket.available() > 0){ + do { + + std::array buf; + + size_t len = _socket.read_some(boost::asio::buffer(buf), error); - /*myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app); - myfile << " CONNECTED... " << std::to_string(_port) << std::endl; - myfile.close();*/ - } + if (boost::asio::error::connection_reset == error) + { + _connected = false; + } + else if (!error){ + // @todo find a better way. + for (size_t i = 0; i < len && !end; ++i) { + if (!readedBytes) { + sizeToRead = GetInt(buf[0], buf[1], buf[2], buf[3]); + i = 3; + readedBytes = true; + } + else { + message += buf[i]; + ++readedSize; + } - bool TCPServer::Connected() { - return _connected; - } + } + } - void TCPServer::writeString(const std::string &message, error_code &error) { + } while ((!readedBytes || sizeToRead > readedSize) && _connected); + return true; + } + else return false; - /*std::ofstream myfile; - myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app); - myfile << "--> WRITE <--" << std::endl; - myfile << " Message: " << message << " // length: " << message.length() << " // byte: " << GetBytes(message.length()) << std::endl; - myfile.close();*/ + } - std::string outMessage(GetBytes(message.length()) + message); + void TCPServer::close(){ - std::cout << message.length() << std::endl; + _connected = false; + // flush the socket buffer + std::string message; + TCPServer::error_code error; + readString(message, error); + _service.stop(); + _socket.close(); + /*_socket.cancel(); + _socket.close(); + _socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both); + _acceptor.cancel();*/ + + } - boost::asio::write(_socket, boost::asio::buffer(outMessage), error); - - /*myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app); - myfile << "------- DONE ------" << std::endl; - myfile.close();*/ - } - - void TCPServer::readString(std::string &message, error_code &error) { - - /*tcp::socket socket(_service); - _acceptor.accept(socket);*/ - - /*std::ofstream myfile; - myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app); - myfile << "--> READ <--" << std::endl; - myfile << " Create socket " << std::endl; - myfile.close(); - - myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app); - myfile << "--> READ <--" << std::endl; - myfile.close();*/ - - bool end = false, readedBytes = false; - int readedSize = 0, sizeToRead = -1; - do { - - std::array buf; - - size_t len = _socket.read_some(boost::asio::buffer(buf), error); - - // @todo find a better way. - for (size_t i = 0; i < len && !end; ++i) { - if (!readedBytes) { - sizeToRead = GetInt(buf[0], buf[1], buf[2], buf[3]); - i = 3; - readedBytes = true; - } - else { - message += buf[i]; - ++readedSize; - } - - } - - } while (!readedBytes || sizeToRead > readedSize); - - /*myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app); - myfile << "Receive Message: " << message << std::endl; - myfile << "------ DONE ------" << message << std::endl; - myfile.close();*/ - - } - - } // namespace server + } // namespace server } // namespace carla + + + + + diff --git a/Source/CarlaServer/source/carla/server/TCPServer.h b/Source/CarlaServer/source/carla/server/TCPServer.h index 5b2824261..c7653e06e 100644 --- a/Source/CarlaServer/source/carla/server/TCPServer.h +++ b/Source/CarlaServer/source/carla/server/TCPServer.h @@ -14,7 +14,7 @@ namespace server { /// { TCP server. /// /// A new socket is created for every connection (every write and read). - class TCPServer : private NonCopyable { + class TCPServer { public: using error_code = boost::system::error_code; @@ -25,23 +25,24 @@ namespace server { void writeString(const std::string &message, error_code &error); - void readString(std::string &message, error_code &error); + bool readString(std::string &message, error_code &error); void AcceptSocket(); bool Connected(); - const int _port; + void close(); + + const int port; private: + + boost::asio::io_service _service; - boost::asio::io_service _service; - boost::asio::ip::tcp::acceptor _acceptor; + boost::asio::ip::tcp::socket _socket; - boost::asio::ip::tcp::socket _socket; - - bool _connected; + std::atomic_bool _connected; }; } // namespace server diff --git a/Source/CarlaServer/source/carla/thread/AsyncReadWriteJobQueue.h b/Source/CarlaServer/source/carla/thread/AsyncReadWriteJobQueue.h index e688fd00f..fdf8891e6 100644 --- a/Source/CarlaServer/source/carla/thread/AsyncReadWriteJobQueue.h +++ b/Source/CarlaServer/source/carla/thread/AsyncReadWriteJobQueue.h @@ -7,6 +7,7 @@ #include #include +#include namespace carla { namespace thread { @@ -20,15 +21,19 @@ namespace thread { using WritingJob = std::function; using ReadingJob = std::function; using ConnectJob = std::function; + using ReconnectJob = std::function; explicit AsyncReadWriteJobQueue( WritingJob && writingJob, ReadingJob && readingJob, - ConnectJob && connectJob) : + ConnectJob && connectJob, + ReconnectJob && reconnectJob) : _done(false), + _restart(true), _writeJob(std::move(writingJob)), _readJob(std::move(readingJob)), _connectJob(std::move(connectJob)), + _reconnectJob(std::move(reconnectJob)), _writeQueue(), _thread(new std::thread(&AsyncReadWriteJobQueue::workerThread, this)) {} @@ -44,28 +49,54 @@ namespace thread { _readQueue.push(item); } + void reconnect(){ + _reconnectJob(); + } + + void restart(){ + _restart = true; + _readQueue.canWait(false); + _writeQueue.canWait(false); + //_thread->detach(); + //_thread = ThreadUniquePointer(new std::thread(&AsyncReadWriteJobQueue::workerThread, this)); + } + + bool getRestart(){ + return _restart; + } + private: void workerThread() { - _connectJob(); - while (!_done) { - R value; - _readQueue.wait_and_pop(value); - _readJob(value); - _writeQueue.push(_writeJob()); + while(!_done){ + _connectJob(); + _restart = false; + _readQueue.canWait(true); + while (!_restart && !_done) { + R value; + if (_readQueue.wait_and_pop(value)) { + _readJob(value); + } + if (!_restart){ + _writeQueue.push(_writeJob()); + } + + } } } std::atomic_bool _done; + std::atomic_bool _restart; WritingJob _writeJob; ReadingJob _readJob; ConnectJob _connectJob; + ReconnectJob _reconnectJob; ThreadSafeQueue _writeQueue; ThreadSafeQueue _readQueue; - const ThreadUniquePointer _thread; + ThreadUniquePointer _thread; }; } // namespace thread diff --git a/Source/CarlaServer/source/carla/thread/AsyncReaderJobQueue.h b/Source/CarlaServer/source/carla/thread/AsyncReaderJobQueue.h index 4ff116dc6..b2dee939c 100644 --- a/Source/CarlaServer/source/carla/thread/AsyncReaderJobQueue.h +++ b/Source/CarlaServer/source/carla/thread/AsyncReaderJobQueue.h @@ -21,11 +21,14 @@ namespace thread { using Job = std::function; using ConnectJob = std::function; + using ReconnectJob = std::function; - explicit AsyncReaderJobQueue(Job &&job, ConnectJob &&connectionJob) : + explicit AsyncReaderJobQueue(Job &&job, ConnectJob &&connectionJob, ReconnectJob &&reconnectJob) : _done(false), + _restart(true), _job(std::move(job)), _connectionJob(std::move(connectionJob)), + _reconnectJob(std::move(reconnectJob)), _queue(), _thread(new std::thread(&AsyncReaderJobQueue::workerThread, this)) {} @@ -40,27 +43,44 @@ namespace thread { _queue.push(item); } - private: + void restart(){ + _restart = true; + _queue.canWait(false); + } + bool getRestart(){ + return _restart; + } + + void reconnect(){ + _reconnectJob(); + } + + private: void workerThread() { - _connectionJob(); - while (!_done) { - T value; - _queue.wait_and_pop(value); - _job(value); - //Sleep(10); + while (!_done){ + _connectionJob(); + _restart = false; + _queue.canWait(true); + while (!_restart && !_done) { + T value; + if (_queue.wait_and_pop(value)) _job(value); + //Sleep(10); + } } } std::atomic_bool _done; + std::atomic_bool _restart; Job _job; - ConnectJob _connectionJob; + ConnectJob _connectionJob; + ReconnectJob _reconnectJob; ThreadSafeQueue _queue; - const ThreadUniquePointer _thread; + ThreadUniquePointer _thread; }; } // namespace thread diff --git a/Source/CarlaServer/source/carla/thread/AsyncWriterJobQueue.h b/Source/CarlaServer/source/carla/thread/AsyncWriterJobQueue.h index 9ce650681..ca1006e6c 100644 --- a/Source/CarlaServer/source/carla/thread/AsyncWriterJobQueue.h +++ b/Source/CarlaServer/source/carla/thread/AsyncWriterJobQueue.h @@ -19,11 +19,14 @@ namespace thread { using Job = std::function; using ConnectJob = std::function; + using ReconnectJob = std::function; - explicit AsyncWriterJobQueue(Job &&job, ConnectJob &&connectJob) : + explicit AsyncWriterJobQueue(Job &&job, ConnectJob &&connectJob, ReconnectJob &&reconnectJob) : _done(false), + _restart(true), _job(std::move(job)), _connectJob(std::move(connectJob)), + _reconnectJob(std::move(reconnectJob)), _queue(), _thread(new std::thread(&AsyncWriterJobQueue::workerThread, this)) {} @@ -36,24 +39,43 @@ namespace thread { return _queue.try_pop(value); } + void restart(){ + _restart = true; + _queue.canWait(false); + } + + bool getRestart(){ + return _restart; + } + + void reconnect(){ + _reconnectJob(); + } + private: void workerThread() { - _connectJob(); - while (!_done) { - _queue.push(_job()); - //Sleep(10); + while (!_done){ + _connectJob(); + _restart = false; + _queue.canWait(true); + while (!_restart && !_done) { + _queue.push(_job()); + //Sleep(10); + } } } std::atomic_bool _done; + std::atomic_bool _restart; Job _job; - ConnectJob _connectJob; + ConnectJob _connectJob; + ReconnectJob _reconnectJob; ThreadSafeQueue _queue; - const ThreadUniquePointer _thread; + ThreadUniquePointer _thread; }; diff --git a/Source/CarlaServer/source/carla/thread/ThreadSafeQueue.h b/Source/CarlaServer/source/carla/thread/ThreadSafeQueue.h index b1e9eed15..fd9644e65 100644 --- a/Source/CarlaServer/source/carla/thread/ThreadSafeQueue.h +++ b/Source/CarlaServer/source/carla/thread/ThreadSafeQueue.h @@ -7,6 +7,8 @@ #include #include +#include + namespace carla { namespace thread { @@ -22,6 +24,7 @@ namespace thread { ThreadSafeQueue(const ThreadSafeQueue &other) { std::lock_guard lock(other._mutex); _queue = other._queue; + _canWait = true; } void push(T new_value) { @@ -30,55 +33,61 @@ namespace thread { _condition.notify_one(); } - void wait_and_pop(T &value) { - std::unique_lock lock(_mutex); - _condition.wait(lock, [this] {return !_queue.empty(); }); - value = _queue.front(); - _queue.pop(); + void canWait(bool wait){ + //std::lock_guard lock(_mutex); + _canWait = wait; + _condition.notify_one(); } - // std::shared_ptr wait_and_pop() { - // std::unique_lock lock(_mutex); - // _condition.wait(lock, [this] {return !_queue.empty(); }); - // std::shared_ptr res(std::make_shared(_queue.front())); - // _queue.pop(); - // return res; - // } + bool wait_and_pop(T &value) { + std::unique_lock lock(_mutex); + _condition.wait(lock, [this] { + return !_queue.empty() || !_canWait; + }); + + //while(_queue.empty() && _canWait); + + if (!_queue.empty() && _canWait) { + value = _queue.front(); + _queue.pop(); + return true; + } + + else return false; + } bool try_pop(T &value) { std::lock_guard lock(_mutex); if (_queue.empty()) { return false; } - else { - value = _queue.front(); - _queue.pop(); - return true; - } + else { + value = _queue.front(); + _queue.pop(); + return true; + } } - - // std::shared_ptr try_pop() { - // std::lock_guard lock(_mutex); - // if (_queue.empty()) { - // return std::shared_ptr(); - // } - // std::shared_ptr res(std::make_shared(_queue.front())); - // _queue.pop(); - // return res; - // } - + bool empty() const { std::lock_guard lock(_mutex); return _queue.empty(); } + + + + + private: mutable std::mutex _mutex; + std::atomic_bool _canWait; + std::queue _queue; std::condition_variable _condition; + }; } // namespace thread diff --git a/Source/CarlaServer/source/test/async_server.cpp b/Source/CarlaServer/source/test/async_server.cpp index 14b0d6c27..e81bd1fe1 100644 --- a/Source/CarlaServer/source/test/async_server.cpp +++ b/Source/CarlaServer/source/test/async_server.cpp @@ -38,6 +38,7 @@ static std::vector makeImage(uint32_t width, uint32_t height) { img[4 * width * i + 4 * e + 3] = 255; } } + std::vector image(width * height); size_t i = 0u; for (carla::Color &color : image) { @@ -55,7 +56,6 @@ int main(int argc, char *argv[]) { std::cerr << "Usage: server " << std::endl; return InvalidArguments; } - const uint32_t worldPort = toInt(argv[1u]); const uint32_t writePort = toInt(argv[2u]); const uint32_t readPort = toInt(argv[3u]); @@ -87,57 +87,65 @@ int main(int argc, char *argv[]) { reward.image_depth_0 = makeImage(imageWidth, imageHeight); reward.image_depth_1 = makeImage(imageWidth, imageHeight); - std::cout << "Server send World" << std::endl; - server.init(1u); - { - std::cout << "Server wait scene init" << std::endl; - carla::Mode mode; - uint32_t scene; - while (!server.tryReadSceneInit(mode, scene)) {} - std::cout << "Received: mode = " - << (mode == carla::Mode::MONO ? "MONO" : "STEREO") - << ", scene = " - << scene << std::endl; - } - { - carla::Scene_Values sceneValues; - sceneValues.possible_positions.push_back({0.0f, 0.0f}); - sceneValues.possible_positions.push_back({1.0f, 2.0f}); - sceneValues.possible_positions.push_back({3.0f, 4.0f}); - const std::array pMatrix = {{ 10.0 }}; - sceneValues.projection_matrices.push_back(pMatrix); + for (;;){ + if (server.worldConnected()){ - std::cout << "Server send positions" << std::endl; - server.sendSceneValues(sceneValues); + server.init(1u); - std::cout << "Server wait new episode" << std::endl; - uint32_t start, end; - while (!server.tryReadEpisodeStart(start, end)) {} - std::cout << "Received: startIndex = " << start - << ", endIndex = " << end << std::endl; - } + { + carla::Mode mode; + uint32_t scene; + + while(!server.needRestart() && !server.tryReadSceneInit(mode, scene)); - std::cout << "Server send end reset" << std::endl; - server.sendEndReset(); - - for (;;) { - float steer, gas; - uint32_t startPoint, endPoint; - if (server.tryReadEpisodeStart(startPoint, endPoint)) { - std::cout << "------> RESET <------" << std::endl; - std::cout << " --> Start: " << startPoint << " End: " << endPoint << std::endl; - server.sendEndReset(); - } else { - if (server.tryReadControl(steer, gas)) { - std::cout << "Steer: " << steer << "Gas: " << gas << std::endl; + std::cout << "Received: mode = " + << (mode == carla::Mode::MONO ? "MONO" : "STEREO") + << ", scene = " + << scene << std::endl; } - static decltype(carla::Reward_Values::timestamp) timestamp = 0u; - reward.timestamp = timestamp++; - server.sendReward(reward); + + + { + carla::Scene_Values sceneValues; + sceneValues.possible_positions.push_back({0.0f, 0.0f}); + sceneValues.possible_positions.push_back({1.0f, 2.0f}); + sceneValues.possible_positions.push_back({3.0f, 4.0f}); + const std::array pMatrix = {{ 10.0 }}; + sceneValues.projection_matrices.push_back(pMatrix); + + server.sendSceneValues(sceneValues); + + std::cout << "New episode" << std::endl; + uint32_t start, end; + while (!server.needRestart() && !server.tryReadEpisodeStart(start, end)); + std::cout << "Received: startIndex = " << start + << ", endIndex = " << end << std::endl; + } + + server.sendEndReset(); + while (!server.needRestart()) { + if (server.clientConnected() && server.serverConnected()){ + float steer, gas; + uint32_t startPoint, endPoint; + if (server.tryReadEpisodeStart(startPoint, endPoint)) { + std::cout << "-------- RESET --------" << std::endl; + std::cout << "--> Start: " << startPoint << " End: " << endPoint << " <--" << std::endl; + server.sendEndReset(); + } else { + if (server.tryReadControl(steer, gas)) { + std::cout << "Steer: " << steer << " Gas: " << gas << std::endl; + } + static decltype(carla::Reward_Values::timestamp) timestamp = 0u; + reward.timestamp = timestamp++; + server.sendReward(reward); + } + } + } + + std::cout << " ----- RESTARTING -----" << std::endl; } } - } catch (const std::exception &e) { std::cerr << e.what() << std::endl; return STLException;