diff --git a/Source/CarlaServer/source/carla/server/CarlaCommunication.cpp b/Source/CarlaServer/source/carla/server/CarlaCommunication.cpp index 890c8414a..c3808cbe1 100644 --- a/Source/CarlaServer/source/carla/server/CarlaCommunication.cpp +++ b/Source/CarlaServer/source/carla/server/CarlaCommunication.cpp @@ -11,13 +11,16 @@ namespace carla { std::cerr << "CarlaConnection - TCP Server: " << text << ": " << errorCode.message() << std::endl; } + // This is the thread that sends a string over the TCP socket. static void serverWorkerThread(TCPServer &server, std::string &message) { - TCPServer::error_code error; //message = message.size.c_ + message; - server.writeString(message, error); + //if (!server.Connected()) server.AcceptSocket(); + + server.writeString(message, error); + if (error) logTCPError("Failed to send", error); @@ -28,8 +31,12 @@ namespace carla { // Sortida amb google protocol // This is the thread that listens for string over the TCP socket. static std::string clientWorkerThread(TCPServer &server) { + + //if (!server.Connected()) server.AcceptSocket(); + std::string message; TCPServer::error_code error; + server.readString(message, error); if (error && (error != boost::asio::error::eof)) { // eof is expected. logTCPError("Failed to read", error); @@ -39,16 +46,17 @@ namespace carla { return message; } - // TODO: - //2threads para el mundo ? uno para leer y uno para enviar + // This is the thread that listens & sends a string over the TCP world socket. static std::string worldReceiveThread(TCPServer &server) { - std::string message; TCPServer::error_code error; + + //if (!server.Connected()) server.AcceptSocket(); + server.readString(message, error); if (error && (error != boost::asio::error::eof)) { // eof is expected. - logTCPError("Failed to read", error); + logTCPError("Failed to read world", error); return std::string(); } return message; @@ -56,29 +64,37 @@ namespace carla { } static void worldSendThread(TCPServer &server, std::string &message) { - TCPServer::error_code error; //message = message.size + message; + + //if (!server.Connected()) server.AcceptSocket(); + server.writeString(message, error); if (error) - logTCPError("Failed to send", error); + logTCPError("Failed to send world", error); } + static void Connect(TCPServer &server) { + if (!server.Connected()) server.AcceptSocket(); + } + CarlaCommunication::CarlaCommunication(int worldPort, int writePort, int readPort) : + _world(worldPort), _server(writePort), _client(readPort), - _world(worldPort), _worldThread([this]() {return worldReceiveThread(this->_world); }, - [this](std::string &msg) { worldSendThread(this->_world, msg); }), - _serverThread([this](std::string &str) { serverWorkerThread(this->_server, str); }), - _clientThread([this]() { return clientWorkerThread(this->_client); }) + [this](std::string &msg) { worldSendThread(this->_world, msg); }, + [this]() {Connect(this->_world); }), + _serverThread([this](std::string &str) { serverWorkerThread(this->_server, str); }, + [this]() {Connect(this->_server); }), + _clientThread([this]() { return clientWorkerThread(this->_client); }, + [this]() {Connect(this->_client); }) { - - std::cout << "WorldPort: " << worldPort << std::endl; + /*std::cout << "WorldPort: " << worldPort << std::endl; std::cout << "writePort: " << writePort << std::endl; - std::cout << "readPort: " << readPort << std::endl; + std::cout << "readPort: " << readPort << std::endl;*/ } @@ -111,7 +127,7 @@ namespace carla { std::string message; bool error = !ready.SerializeToString(&message); if (!error) { - std::cout << "Send End Reset" << std::endl; + //std::cout << "Send End Reset" << std::endl; _worldThread.push(message); } else { diff --git a/Source/CarlaServer/source/carla/server/CarlaServer.cpp b/Source/CarlaServer/source/carla/server/CarlaServer.cpp index 4425faa0d..ad4a183e5 100644 --- a/Source/CarlaServer/source/carla/server/CarlaServer.cpp +++ b/Source/CarlaServer/source/carla/server/CarlaServer.cpp @@ -19,7 +19,8 @@ namespace server { } - CarlaServer::~CarlaServer() {} + CarlaServer::~CarlaServer() { + } void CarlaServer::sendReward(const Reward_Values &values) { Reward reward = _proto->LoadReward(values); @@ -27,8 +28,6 @@ namespace server { } void CarlaServer::sendSceneValues(const Scene_Values &values) { - - std::cout << "Send Scene Values" << std::endl; Scene scene = _proto->LoadScene(values); _communication->sendScene(scene); } @@ -40,73 +39,72 @@ namespace server { } void CarlaServer::sendWorld() { - std::cout << "Send World" << std::endl; World world = _proto->LoadWorld(); _communication->sendWorld(world); } bool CarlaServer::tryReadControl(float &steer, float &gas) { std::string controlMessage; - bool error = !_communication->tryReadControl(controlMessage); + bool success = _communication->tryReadControl(controlMessage); Control control; - if (!error) { - error &= !control.ParseFromString(controlMessage); + if (success) { + success &= control.ParseFromString(controlMessage); } steer = control.steer(); gas = control.gas(); - if (error) { + if (!success) { steer = 0.0f; gas = 0.0f; } else { steer = control.steer(); gas = control.gas(); - std::cout << "Steer: " << steer << " Gas: " << gas << std::endl; + //std::cout << "Steer: " << steer << " Gas: " << gas << std::endl; } - return !error; + return success; } bool CarlaServer::tryReadSceneInit(int &mode, int &scene) { std::string initMessage; - bool error = !_communication->tryReadWorldInfo(initMessage); + bool success = _communication->tryReadWorldInfo(initMessage); SceneInit sceneInit; - if (!error) { - error &= !sceneInit.ParseFromString(initMessage); + if (success) { + success &= sceneInit.ParseFromString(initMessage); } - if (error) { + if (!success) { mode = -1; scene = -1; } else { mode = sceneInit.mode(); scene = sceneInit.scene(); - std::cout << "Mode: " << mode << " Scene: " << scene << std::endl; + //std::cout << "Mode: " << mode << " Scene: " << scene << std::endl; } - return !error; + return success; } bool CarlaServer::tryReadEpisodeStart(float &start_index, float &end_index) { std::string startData; - bool error = !_communication->tryReadWorldInfo(startData); + bool success = _communication->tryReadWorldInfo(startData); EpisodeStart episodeStart; - error &= !episodeStart.ParseFromString(startData); + success &= episodeStart.ParseFromString(startData); - if (error) { - start_index = 0.0f; - end_index = 0.0f; + if (!success) { + start_index = -1.0f; + end_index = -1.0f; } else { start_index = episodeStart.start_index(); end_index = episodeStart.end_index(); - std::cout << "Start: " << start_index << " End: " << end_index << std::endl; + //std::cout << "Start: " << start_index << " End: " << end_index << std::endl; } - return !error; + return success; } void CarlaServer::setMode(Mode mode) { diff --git a/Source/CarlaServer/source/carla/server/TCPServer.cpp b/Source/CarlaServer/source/carla/server/TCPServer.cpp index c2895c056..6f0cabcd6 100644 --- a/Source/CarlaServer/source/carla/server/TCPServer.cpp +++ b/Source/CarlaServer/source/carla/server/TCPServer.cpp @@ -6,104 +6,143 @@ #include namespace carla { -namespace server { + namespace server { - using boost::asio::ip::tcp; + using boost::asio::ip::tcp; - std::ofstream myfile; + std::string GetBytes(int n) { + std::string bytes; + + bytes = (n >> 24) & 0xFF; + bytes += (n >> 16) & 0xFF; + bytes += (n >> 8) & 0xFF; + bytes += n & 0xFF; + + return bytes; + } + + int GetInt(char b1, char b2, char b3, char b4) { + int result = 0; + result = (result << 8) + b1; + result = (result << 8) + b2; + result = (result << 8) + b3; + result = (result << 8) + b4; + + return result; + } - std::string GetBytes(int n) { - std::string bytes; + TCPServer::TCPServer(int port) : + _service(), + _acceptor(_service, tcp::endpoint(tcp::v4(), port)), + _port(port), + _socket(_service), + _connected(false){ + /*std::ofstream myfile; + myfile.open("TCP" + std::to_string(_port) + ".txt"); + myfile << " INIT TCP_SERVER " << std::to_string(_port) << std::endl; + myfile.close();*/ + } - bytes = (n >> 24) & 0xFF; - bytes += (n >> 16) & 0xFF; - bytes += (n >> 8) & 0xFF; - bytes += n & 0xFF; + TCPServer::~TCPServer() { + + /*std::ofstream myfile; + myfile.open("TCP" + std::to_string(_port) + ".txt"); + myfile << " Y VOLO " << std::to_string(_port) << std::endl; + myfile.close();*/ - return bytes; - } + } + + void TCPServer::AcceptSocket() { + /*std::ofstream myfile; + myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app); + myfile << " Create socket " << std::to_string(_port) <>>>> ACCEPT ERROR <<<<<" << std::endl; + myfile.close();*/ + }; - TCPServer::TCPServer(int port) : - _service(), - _acceptor(_service, tcp::endpoint(tcp::v4(), port)), - _port(port){ + /*myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app); + myfile << " CONNECTED... " << std::to_string(_port) << std::endl; + myfile.close();*/ + } - myfile.open("TCP" + std::to_string(_port) + ".txt"); - myfile << " INIT TCP_SERVER " << std::to_string(_port) << std::endl; - myfile.close(); + bool TCPServer::Connected() { + return _connected; + } - } + void TCPServer::writeString(const std::string &message, error_code &error) { - TCPServer::~TCPServer() {} + /*std::ofstream myfile; + myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app); + myfile << "--> WRITE <--" << std::endl; + myfile << " Message: " << message << " // length: " << message.length() << " // byte: " << GetBytes(message.length()) << std::endl; + myfile.close();*/ - void TCPServer::writeString(const std::string &message, error_code &error) { + std::string outMessage(GetBytes(message.length()) + message); - myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app); - myfile << "--> WRITE <--" << std::endl; - myfile << " Create socket " << std::endl; - myfile.close(); + boost::asio::write(_socket, boost::asio::buffer(outMessage), error); - //Sleep(500); + /*myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app); + myfile << "------- DONE ------" << std::endl; + myfile.close();*/ + } - tcp::socket socket(_service); - _acceptor.accept(socket); + void TCPServer::readString(std::string &message, error_code &error) { - myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app); - myfile << " Connected " << std::endl; - myfile << " Message: " << message << " // length: " << message.length() << " // byte: "<< GetBytes(message.length()) < READ <--" << std::endl; + myfile << " Create socket " << std::endl; + myfile.close(); - boost::asio::write(socket, boost::asio::buffer(outMessage), error); + myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app); + myfile << "--> READ <--" << std::endl; + myfile.close();*/ - myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app); - myfile << "------- DONE ------" << std::endl; - myfile.close(); + bool end = false, readedBytes = false; + int readedSize = 0, sizeToRead = -1; + do { - //Sleep(500); - //std::cout << _port << ": DONE " << std::endl; - } + std::array buf; - void TCPServer::readString(std::string &message, error_code &error) { - myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app); - myfile << "--> READ <--" << std::endl; - myfile << " Create socket " << std::endl; - myfile.close(); + size_t len = _socket.read_some(boost::asio::buffer(buf), error); - tcp::socket socket(_service); - _acceptor.accept(socket); + // @todo find a better way. + for (size_t i = 0; i < len && !end; ++i) { + if (!readedBytes) { + sizeToRead = GetInt(buf[0], buf[1], buf[2], buf[3]); + i = 3; + readedBytes = true; + } + else { + message += buf[i]; + ++readedSize; + } - myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app); - myfile << " Connected " << std::endl; - myfile.close(); + } - for (;; ) { - std::array buf; + } while (!readedBytes || sizeToRead > readedSize); - size_t len = socket.read_some(boost::asio::buffer(buf), error); + /*myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app); + myfile << "Receive Message: " << message << std::endl; + myfile << "------ DONE ------" << message << std::endl; + myfile.close();*/ - if (error == boost::asio::error::eof) { - break; // Connection closed cleanly by peer. - } else if (error) { - return; - } + } - // @todo find a better way. - for (size_t i = 0u; i < len; ++i) { - message += buf[i]; - } - } - - myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app); - myfile << "Receive Message: " << message << std::endl; - myfile << "------ DONE ------" << message << std::endl; - myfile.close(); - } - -} // namespace server + } // namespace server } // namespace carla diff --git a/Source/CarlaServer/source/carla/server/TCPServer.h b/Source/CarlaServer/source/carla/server/TCPServer.h index 4ef639f2d..fcda3077e 100644 --- a/Source/CarlaServer/source/carla/server/TCPServer.h +++ b/Source/CarlaServer/source/carla/server/TCPServer.h @@ -3,7 +3,7 @@ #pragma once #include - +#include #include namespace carla { @@ -21,19 +21,25 @@ namespace server { ~TCPServer(); - void startAccept(); - void writeString(const std::string &message, error_code &error); void readString(std::string &message, error_code &error); + void AcceptSocket(); + + bool Connected(); + + const int _port; + private: boost::asio::io_service _service; boost::asio::ip::tcp::acceptor _acceptor; - const int _port; + boost::asio::ip::tcp::socket _socket; + + bool _connected; }; } // namespace server diff --git a/Source/CarlaServer/source/carla/thread/AsyncReadWriteJobQueue.h b/Source/CarlaServer/source/carla/thread/AsyncReadWriteJobQueue.h index f16fadbf3..59b816819 100644 --- a/Source/CarlaServer/source/carla/thread/AsyncReadWriteJobQueue.h +++ b/Source/CarlaServer/source/carla/thread/AsyncReadWriteJobQueue.h @@ -19,11 +19,13 @@ namespace thread { using WritingJob = std::function; using ReadingJob = std::function; + using ConnectJob = std::function; - explicit AsyncReadWriteJobQueue(WritingJob &&writingJob, ReadingJob &&readingJob) : + explicit AsyncReadWriteJobQueue(WritingJob &&writingJob, ReadingJob &&readingJob, ConnectJob &&connectJob) : _done(false), _writeJob(std::move(writingJob)), _readJob(std::move(readingJob)), + _connectJob(std::move(connectJob)), _writeQueue(), _thread(new std::thread(&AsyncReadWriteJobQueue::workerThread, this)) {} @@ -42,12 +44,12 @@ namespace thread { private: void workerThread() { + _connectJob(); while (!_done) { R value; _readQueue.wait_and_pop(value); _readJob(value); _writeQueue.push(_writeJob()); - Sleep(10); } } @@ -55,6 +57,7 @@ namespace thread { WritingJob _writeJob; ReadingJob _readJob; + ConnectJob _connectJob; ThreadSafeQueue _writeQueue; ThreadSafeQueue _readQueue; diff --git a/Source/CarlaServer/source/carla/thread/AsyncReaderJobQueue.h b/Source/CarlaServer/source/carla/thread/AsyncReaderJobQueue.h index 846b853af..a9da61229 100644 --- a/Source/CarlaServer/source/carla/thread/AsyncReaderJobQueue.h +++ b/Source/CarlaServer/source/carla/thread/AsyncReaderJobQueue.h @@ -20,10 +20,12 @@ namespace thread { public: using Job = std::function; + using ConnectJob = std::function; - explicit AsyncReaderJobQueue(Job &&job) : - _done(false), - _job(std::move(job)), + explicit AsyncReaderJobQueue(Job &&job, ConnectJob &&connectionJob) : + _done(false), + _job(std::move(job)), + _connectionJob(std::move(connectionJob)), _queue(), _thread(new std::thread(&AsyncReaderJobQueue::workerThread, this)) {} @@ -38,6 +40,7 @@ namespace thread { private: void workerThread() { + _connectionJob(); while (!_done) { T value; _queue.wait_and_pop(value); @@ -49,6 +52,8 @@ namespace thread { std::atomic_bool _done; Job _job; + ConnectJob _connectionJob; + ThreadSafeQueue _queue; diff --git a/Source/CarlaServer/source/carla/thread/AsyncWriterJobQueue.h b/Source/CarlaServer/source/carla/thread/AsyncWriterJobQueue.h index af4887a44..fe8c8d139 100644 --- a/Source/CarlaServer/source/carla/thread/AsyncWriterJobQueue.h +++ b/Source/CarlaServer/source/carla/thread/AsyncWriterJobQueue.h @@ -18,10 +18,12 @@ namespace thread { public: using Job = std::function; + using ConnectJob = std::function; - explicit AsyncWriterJobQueue(Job &&job) : - _done(false), - _job(std::move(job)), + explicit AsyncWriterJobQueue(Job &&job, ConnectJob &&connectJob) : + _done(false), + _job(std::move(job)), + _connectJob(std::move(connectJob)), _queue(), _thread(new std::thread(&AsyncWriterJobQueue::workerThread, this)) {} @@ -37,6 +39,7 @@ namespace thread { private: void workerThread() { + _connectJob(); while (!_done) { _queue.push(_job()); Sleep(10); @@ -46,6 +49,7 @@ namespace thread { std::atomic_bool _done; Job _job; + ConnectJob _connectJob; ThreadSafeQueue _queue; diff --git a/Source/CarlaServer/source/carla/thread/ThreadSafeQueue.h b/Source/CarlaServer/source/carla/thread/ThreadSafeQueue.h index 446a9b802..45398e6f7 100644 --- a/Source/CarlaServer/source/carla/thread/ThreadSafeQueue.h +++ b/Source/CarlaServer/source/carla/thread/ThreadSafeQueue.h @@ -34,7 +34,7 @@ namespace thread { std::unique_lock lock(_mutex); _condition.wait(lock, [this] {return !_queue.empty(); }); value = _queue.front(); - _queue.pop(); + _queue.pop(); } // std::shared_ptr wait_and_pop() { diff --git a/Source/CarlaServer/source/test/async_server.cpp b/Source/CarlaServer/source/test/async_server.cpp index 60041ba26..d57908243 100644 --- a/Source/CarlaServer/source/test/async_server.cpp +++ b/Source/CarlaServer/source/test/async_server.cpp @@ -100,12 +100,14 @@ int main(int argc, char* argv[]) { testData.depth_1 = depth_1; testData.depth_2 = depth_2; - + std::cout << "Server send World" << std::endl; server.sendWorld(); int mode, scene; bool end = false; + + std::cout << "Server wait scene init" << std::endl; do { end = server.tryReadSceneInit(mode, scene); }while (!end); @@ -126,33 +128,49 @@ int main(int argc, char* argv[]) { pMatrix, }; + std::cout << "Server send positions" << std::endl; + server.sendSceneValues(sceneVal); end = false; float startPoint, endPoint; + + std::cout << "Server wait new episode" << std::endl; + do { + end = server.tryReadEpisodeStart(startPoint, endPoint); } while (!end); + std::cout << "Server send end reset" << std::endl; + server.sendEndReset(); + float steer, gas; + bool wait_control = false; for (;;) { - Sleep(50); - //std::cout << "Sending..." << std::endl; - auto time = daytimeString(); - // server.reward = testData; - server.sendReward(testData); - using namespace std::chrono_literals; + if (server.tryReadEpisodeStart(startPoint, endPoint)) { + std::cout << "------> RESET <------" << std::endl; + std::cout << " --> Start: " << startPoint << " End: " << endPoint << std::endl; + server.sendEndReset(); + } + else { + + if (wait_control && server.tryReadControl(steer, gas)) { + std::cout << "Steer: " << steer << "Gas: " << gas << std::endl; + wait_control = false; + } + else if (!wait_control) { + server.sendReward(testData); + wait_control = true; + } + + } + + Sleep(100); - Sleep(50); - //std::cout << "Listening..." << std::endl; - float steer, gas; - if (server.tryReadControl(steer, gas)) { - /*if ((message == "q") || (message == "quit")) - break;*/ - } } } catch (const std::exception &e) {