CarlaServer correct test 1

This commit is contained in:
Xisco Bosch 2017-03-13 18:57:04 +01:00 committed by nsubiron
parent 96ed50ece4
commit 2085ca8dde
9 changed files with 228 additions and 139 deletions

View File

@ -11,13 +11,16 @@ namespace carla {
std::cerr << "CarlaConnection - TCP Server: " << text << ": " << errorCode.message() << std::endl;
}
// This is the thread that sends a string over the TCP socket.
static void serverWorkerThread(TCPServer &server, std::string &message) {
TCPServer::error_code error;
//message = message.size.c_ + message;
server.writeString(message, error);
//if (!server.Connected()) server.AcceptSocket();
server.writeString(message, error);
if (error)
logTCPError("Failed to send", error);
@ -28,8 +31,12 @@ namespace carla {
// Sortida amb google protocol
// This is the thread that listens for string over the TCP socket.
static std::string clientWorkerThread(TCPServer &server) {
//if (!server.Connected()) server.AcceptSocket();
std::string message;
TCPServer::error_code error;
server.readString(message, error);
if (error && (error != boost::asio::error::eof)) { // eof is expected.
logTCPError("Failed to read", error);
@ -39,16 +46,17 @@ namespace carla {
return message;
}
// TODO:
//2threads para el mundo ? uno para leer y uno para enviar
// This is the thread that listens & sends a string over the TCP world socket.
static std::string worldReceiveThread(TCPServer &server) {
std::string message;
TCPServer::error_code error;
//if (!server.Connected()) server.AcceptSocket();
server.readString(message, error);
if (error && (error != boost::asio::error::eof)) { // eof is expected.
logTCPError("Failed to read", error);
logTCPError("Failed to read world", error);
return std::string();
}
return message;
@ -56,29 +64,37 @@ namespace carla {
}
static void worldSendThread(TCPServer &server, std::string &message) {
TCPServer::error_code error;
//message = message.size + message;
//if (!server.Connected()) server.AcceptSocket();
server.writeString(message, error);
if (error)
logTCPError("Failed to send", error);
logTCPError("Failed to send world", error);
}
static void Connect(TCPServer &server) {
if (!server.Connected()) server.AcceptSocket();
}
CarlaCommunication::CarlaCommunication(int worldPort, int writePort, int readPort) :
_world(worldPort),
_server(writePort),
_client(readPort),
_world(worldPort),
_worldThread([this]() {return worldReceiveThread(this->_world); },
[this](std::string &msg) { worldSendThread(this->_world, msg); }),
_serverThread([this](std::string &str) { serverWorkerThread(this->_server, str); }),
_clientThread([this]() { return clientWorkerThread(this->_client); })
[this](std::string &msg) { worldSendThread(this->_world, msg); },
[this]() {Connect(this->_world); }),
_serverThread([this](std::string &str) { serverWorkerThread(this->_server, str); },
[this]() {Connect(this->_server); }),
_clientThread([this]() { return clientWorkerThread(this->_client); },
[this]() {Connect(this->_client); })
{
std::cout << "WorldPort: " << worldPort << std::endl;
/*std::cout << "WorldPort: " << worldPort << std::endl;
std::cout << "writePort: " << writePort << std::endl;
std::cout << "readPort: " << readPort << std::endl;
std::cout << "readPort: " << readPort << std::endl;*/
}
@ -111,7 +127,7 @@ namespace carla {
std::string message;
bool error = !ready.SerializeToString(&message);
if (!error) {
std::cout << "Send End Reset" << std::endl;
//std::cout << "Send End Reset" << std::endl;
_worldThread.push(message);
}
else {

View File

@ -19,7 +19,8 @@ namespace server {
}
CarlaServer::~CarlaServer() {}
CarlaServer::~CarlaServer() {
}
void CarlaServer::sendReward(const Reward_Values &values) {
Reward reward = _proto->LoadReward(values);
@ -27,8 +28,6 @@ namespace server {
}
void CarlaServer::sendSceneValues(const Scene_Values &values) {
std::cout << "Send Scene Values" << std::endl;
Scene scene = _proto->LoadScene(values);
_communication->sendScene(scene);
}
@ -40,73 +39,72 @@ namespace server {
}
void CarlaServer::sendWorld() {
std::cout << "Send World" << std::endl;
World world = _proto->LoadWorld();
_communication->sendWorld(world);
}
bool CarlaServer::tryReadControl(float &steer, float &gas) {
std::string controlMessage;
bool error = !_communication->tryReadControl(controlMessage);
bool success = _communication->tryReadControl(controlMessage);
Control control;
if (!error) {
error &= !control.ParseFromString(controlMessage);
if (success) {
success &= control.ParseFromString(controlMessage);
}
steer = control.steer();
gas = control.gas();
if (error) {
if (!success) {
steer = 0.0f;
gas = 0.0f;
}
else {
steer = control.steer();
gas = control.gas();
std::cout << "Steer: " << steer << " Gas: " << gas << std::endl;
//std::cout << "Steer: " << steer << " Gas: " << gas << std::endl;
}
return !error;
return success;
}
bool CarlaServer::tryReadSceneInit(int &mode, int &scene) {
std::string initMessage;
bool error = !_communication->tryReadWorldInfo(initMessage);
bool success = _communication->tryReadWorldInfo(initMessage);
SceneInit sceneInit;
if (!error) {
error &= !sceneInit.ParseFromString(initMessage);
if (success) {
success &= sceneInit.ParseFromString(initMessage);
}
if (error) {
if (!success) {
mode = -1;
scene = -1;
}
else {
mode = sceneInit.mode();
scene = sceneInit.scene();
std::cout << "Mode: " << mode << " Scene: " << scene << std::endl;
//std::cout << "Mode: " << mode << " Scene: " << scene << std::endl;
}
return !error;
return success;
}
bool CarlaServer::tryReadEpisodeStart(float &start_index, float &end_index) {
std::string startData;
bool error = !_communication->tryReadWorldInfo(startData);
bool success = _communication->tryReadWorldInfo(startData);
EpisodeStart episodeStart;
error &= !episodeStart.ParseFromString(startData);
success &= episodeStart.ParseFromString(startData);
if (error) {
start_index = 0.0f;
end_index = 0.0f;
if (!success) {
start_index = -1.0f;
end_index = -1.0f;
}
else {
start_index = episodeStart.start_index();
end_index = episodeStart.end_index();
std::cout << "Start: " << start_index << " End: " << end_index << std::endl;
//std::cout << "Start: " << start_index << " End: " << end_index << std::endl;
}
return !error;
return success;
}
void CarlaServer::setMode(Mode mode) {

View File

@ -6,104 +6,143 @@
#include <fstream>
namespace carla {
namespace server {
namespace server {
using boost::asio::ip::tcp;
using boost::asio::ip::tcp;
std::ofstream myfile;
std::string GetBytes(int n) {
std::string bytes;
bytes = (n >> 24) & 0xFF;
bytes += (n >> 16) & 0xFF;
bytes += (n >> 8) & 0xFF;
bytes += n & 0xFF;
return bytes;
}
int GetInt(char b1, char b2, char b3, char b4) {
int result = 0;
result = (result << 8) + b1;
result = (result << 8) + b2;
result = (result << 8) + b3;
result = (result << 8) + b4;
return result;
}
std::string GetBytes(int n) {
std::string bytes;
TCPServer::TCPServer(int port) :
_service(),
_acceptor(_service, tcp::endpoint(tcp::v4(), port)),
_port(port),
_socket(_service),
_connected(false){
/*std::ofstream myfile;
myfile.open("TCP" + std::to_string(_port) + ".txt");
myfile << " INIT TCP_SERVER " << std::to_string(_port) << std::endl;
myfile.close();*/
}
bytes = (n >> 24) & 0xFF;
bytes += (n >> 16) & 0xFF;
bytes += (n >> 8) & 0xFF;
bytes += n & 0xFF;
TCPServer::~TCPServer() {
/*std::ofstream myfile;
myfile.open("TCP" + std::to_string(_port) + ".txt");
myfile << " Y VOLO " << std::to_string(_port) << std::endl;
myfile.close();*/
return bytes;
}
}
void TCPServer::AcceptSocket() {
/*std::ofstream myfile;
myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app);
myfile << " Create socket " << std::to_string(_port) <<std::endl;
myfile.close();*/
try {
_acceptor.accept(_socket);
_connected = true;
}
catch (boost::system::system_error) {
/*myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app);
myfile << " >>>>> ACCEPT ERROR <<<<<" << std::endl;
myfile.close();*/
};
TCPServer::TCPServer(int port) :
_service(),
_acceptor(_service, tcp::endpoint(tcp::v4(), port)),
_port(port){
/*myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app);
myfile << " CONNECTED... " << std::to_string(_port) << std::endl;
myfile.close();*/
}
myfile.open("TCP" + std::to_string(_port) + ".txt");
myfile << " INIT TCP_SERVER " << std::to_string(_port) << std::endl;
myfile.close();
bool TCPServer::Connected() {
return _connected;
}
}
void TCPServer::writeString(const std::string &message, error_code &error) {
TCPServer::~TCPServer() {}
/*std::ofstream myfile;
myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app);
myfile << "--> WRITE <--" << std::endl;
myfile << " Message: " << message << " // length: " << message.length() << " // byte: " << GetBytes(message.length()) << std::endl;
myfile.close();*/
void TCPServer::writeString(const std::string &message, error_code &error) {
std::string outMessage(GetBytes(message.length()) + message);
myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app);
myfile << "--> WRITE <--" << std::endl;
myfile << " Create socket " << std::endl;
myfile.close();
boost::asio::write(_socket, boost::asio::buffer(outMessage), error);
//Sleep(500);
/*myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app);
myfile << "------- DONE ------" << std::endl;
myfile.close();*/
}
tcp::socket socket(_service);
_acceptor.accept(socket);
void TCPServer::readString(std::string &message, error_code &error) {
myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app);
myfile << " Connected " << std::endl;
myfile << " Message: " << message << " // length: " << message.length() << " // byte: "<< GetBytes(message.length()) <<std::endl;
myfile.close();
/*tcp::socket socket(_service);
_acceptor.accept(socket);*/
//Sleep(500);
std::string outMessage (GetBytes(message.length()) + message);
/*std::ofstream myfile;
myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app);
myfile << "--> READ <--" << std::endl;
myfile << " Create socket " << std::endl;
myfile.close();
boost::asio::write(socket, boost::asio::buffer(outMessage), error);
myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app);
myfile << "--> READ <--" << std::endl;
myfile.close();*/
myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app);
myfile << "------- DONE ------" << std::endl;
myfile.close();
bool end = false, readedBytes = false;
int readedSize = 0, sizeToRead = -1;
do {
//Sleep(500);
//std::cout << _port << ": DONE " << std::endl;
}
std::array<char, 128> buf;
void TCPServer::readString(std::string &message, error_code &error) {
myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app);
myfile << "--> READ <--" << std::endl;
myfile << " Create socket " << std::endl;
myfile.close();
size_t len = _socket.read_some(boost::asio::buffer(buf), error);
tcp::socket socket(_service);
_acceptor.accept(socket);
// @todo find a better way.
for (size_t i = 0; i < len && !end; ++i) {
if (!readedBytes) {
sizeToRead = GetInt(buf[0], buf[1], buf[2], buf[3]);
i = 3;
readedBytes = true;
}
else {
message += buf[i];
++readedSize;
}
myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app);
myfile << " Connected " << std::endl;
myfile.close();
}
for (;; ) {
std::array<char, 128> buf;
} while (!readedBytes || sizeToRead > readedSize);
size_t len = socket.read_some(boost::asio::buffer(buf), error);
/*myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app);
myfile << "Receive Message: " << message << std::endl;
myfile << "------ DONE ------" << message << std::endl;
myfile.close();*/
if (error == boost::asio::error::eof) {
break; // Connection closed cleanly by peer.
} else if (error) {
return;
}
}
// @todo find a better way.
for (size_t i = 0u; i < len; ++i) {
message += buf[i];
}
}
myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app);
myfile << "Receive Message: " << message << std::endl;
myfile << "------ DONE ------" << message << std::endl;
myfile.close();
}
} // namespace server
} // namespace server
} // namespace carla

View File

@ -3,7 +3,7 @@
#pragma once
#include <string>
#include <mutex>
#include <boost\asio.hpp>
namespace carla {
@ -21,19 +21,25 @@ namespace server {
~TCPServer();
void startAccept();
void writeString(const std::string &message, error_code &error);
void readString(std::string &message, error_code &error);
void AcceptSocket();
bool Connected();
const int _port;
private:
boost::asio::io_service _service;
boost::asio::ip::tcp::acceptor _acceptor;
const int _port;
boost::asio::ip::tcp::socket _socket;
bool _connected;
};
} // namespace server

View File

@ -19,11 +19,13 @@ namespace thread {
using WritingJob = std::function<W()>;
using ReadingJob = std::function<void(R)>;
using ConnectJob = std::function<void()>;
explicit AsyncReadWriteJobQueue(WritingJob &&writingJob, ReadingJob &&readingJob) :
explicit AsyncReadWriteJobQueue(WritingJob &&writingJob, ReadingJob &&readingJob, ConnectJob &&connectJob) :
_done(false),
_writeJob(std::move(writingJob)),
_readJob(std::move(readingJob)),
_connectJob(std::move(connectJob)),
_writeQueue(),
_thread(new std::thread(&AsyncReadWriteJobQueue::workerThread, this)) {}
@ -42,12 +44,12 @@ namespace thread {
private:
void workerThread() {
_connectJob();
while (!_done) {
R value;
_readQueue.wait_and_pop(value);
_readJob(value);
_writeQueue.push(_writeJob());
Sleep(10);
}
}
@ -55,6 +57,7 @@ namespace thread {
WritingJob _writeJob;
ReadingJob _readJob;
ConnectJob _connectJob;
ThreadSafeQueue<W> _writeQueue;
ThreadSafeQueue<R> _readQueue;

View File

@ -20,10 +20,12 @@ namespace thread {
public:
using Job = std::function<void(T)>;
using ConnectJob = std::function<void()>;
explicit AsyncReaderJobQueue(Job &&job) :
_done(false),
_job(std::move(job)),
explicit AsyncReaderJobQueue(Job &&job, ConnectJob &&connectionJob) :
_done(false),
_job(std::move(job)),
_connectionJob(std::move(connectionJob)),
_queue(),
_thread(new std::thread(&AsyncReaderJobQueue::workerThread, this)) {}
@ -38,6 +40,7 @@ namespace thread {
private:
void workerThread() {
_connectionJob();
while (!_done) {
T value;
_queue.wait_and_pop(value);
@ -49,6 +52,8 @@ namespace thread {
std::atomic_bool _done;
Job _job;
ConnectJob _connectionJob;
ThreadSafeQueue<T> _queue;

View File

@ -18,10 +18,12 @@ namespace thread {
public:
using Job = std::function<T()>;
using ConnectJob = std::function<void()>;
explicit AsyncWriterJobQueue(Job &&job) :
_done(false),
_job(std::move(job)),
explicit AsyncWriterJobQueue(Job &&job, ConnectJob &&connectJob) :
_done(false),
_job(std::move(job)),
_connectJob(std::move(connectJob)),
_queue(),
_thread(new std::thread(&AsyncWriterJobQueue::workerThread, this))
{}
@ -37,6 +39,7 @@ namespace thread {
private:
void workerThread() {
_connectJob();
while (!_done) {
_queue.push(_job());
Sleep(10);
@ -46,6 +49,7 @@ namespace thread {
std::atomic_bool _done;
Job _job;
ConnectJob _connectJob;
ThreadSafeQueue<T> _queue;

View File

@ -34,7 +34,7 @@ namespace thread {
std::unique_lock<std::mutex> lock(_mutex);
_condition.wait(lock, [this] {return !_queue.empty(); });
value = _queue.front();
_queue.pop();
_queue.pop();
}
// std::shared_ptr<T> wait_and_pop() {

View File

@ -100,12 +100,14 @@ int main(int argc, char* argv[]) {
testData.depth_1 = depth_1;
testData.depth_2 = depth_2;
std::cout << "Server send World" << std::endl;
server.sendWorld();
int mode, scene;
bool end = false;
std::cout << "Server wait scene init" << std::endl;
do {
end = server.tryReadSceneInit(mode, scene);
}while (!end);
@ -126,33 +128,49 @@ int main(int argc, char* argv[]) {
pMatrix,
};
std::cout << "Server send positions" << std::endl;
server.sendSceneValues(sceneVal);
end = false;
float startPoint, endPoint;
std::cout << "Server wait new episode" << std::endl;
do {
end = server.tryReadEpisodeStart(startPoint, endPoint);
}
while (!end);
std::cout << "Server send end reset" << std::endl;
server.sendEndReset();
float steer, gas;
bool wait_control = false;
for (;;) {
Sleep(50);
//std::cout << "Sending..." << std::endl;
auto time = daytimeString();
// server.reward = testData;
server.sendReward(testData);
using namespace std::chrono_literals;
if (server.tryReadEpisodeStart(startPoint, endPoint)) {
std::cout << "------> RESET <------" << std::endl;
std::cout << " --> Start: " << startPoint << " End: " << endPoint << std::endl;
server.sendEndReset();
}
else {
if (wait_control && server.tryReadControl(steer, gas)) {
std::cout << "Steer: " << steer << "Gas: " << gas << std::endl;
wait_control = false;
}
else if (!wait_control) {
server.sendReward(testData);
wait_control = true;
}
}
Sleep(100);
Sleep(50);
//std::cout << "Listening..." << std::endl;
float steer, gas;
if (server.tryReadControl(steer, gas)) {
/*if ((message == "q") || (message == "quit"))
break;*/
}
}
} catch (const std::exception &e) {