Merge branch 'xisco' into nestor

This commit is contained in:
nsubiron 2017-03-21 18:20:09 +01:00
commit 8a769bf60a
16 changed files with 704 additions and 403 deletions

View File

@ -31,16 +31,38 @@ namespace carla {
return _pimpl->tryReadControl(steer, throttle);
}
void CarlaServer::sendReward(const Reward_Values &values) {
bool CarlaServer::sendReward(const Reward_Values &values) {
if (needRestart()) return false;
_pimpl->sendReward(values);
return true;
}
void CarlaServer::sendSceneValues(const Scene_Values &values) {
bool CarlaServer::sendSceneValues(const Scene_Values &values) {
if (needRestart()) return false;
_pimpl->sendSceneValues(values);
return true;
}
void CarlaServer::sendEndReset() {
bool CarlaServer::sendEndReset() {
if (needRestart()) return false;
_pimpl->sendEndReset();
return true;
}
bool CarlaServer::worldConnected(){
return _pimpl->worldConnected();
}
bool CarlaServer::clientConnected(){
return _pimpl->clientConnected();
}
bool CarlaServer::serverConnected(){
return _pimpl->serverConnected();
}
bool CarlaServer::needRestart() {
return _pimpl->needRestart();
}
} // namespace carla

View File

@ -109,13 +109,25 @@ namespace carla {
bool tryReadControl(float &steer, float &throttle);
/// Send values of the current player status.
void sendReward(const Reward_Values &values);
bool sendReward(const Reward_Values &values);
/// Send the values of the generated scene.
void sendSceneValues(const Scene_Values &values);
bool sendSceneValues(const Scene_Values &values);
/// Send a signal to the client to notify that the car is ready.
void sendEndReset();
bool sendEndReset();
/// return true if client thread is connected
bool clientConnected();
/// return true if server thread is connected
bool serverConnected();
/// return true if world thread is connected
bool worldConnected();
// Returns true if the server needs to restart the communication protocol
bool needRestart();
private:

View File

@ -2,6 +2,7 @@
#include "lodepng.h"
#include "carla_protocol.pb.h"
#include "../CarlaServer.h"
#include <iostream>
@ -10,140 +11,347 @@ namespace server {
// -- Static methods ---------------------------------------------------------
std::mutex _generalMutex;
static Mode getMode(int modeInt) {
switch (modeInt) {
case 0: return Mode::MONO;
case 1: return Mode::STEREO;
default: return Mode::INVALID;
}
}
template<typename ERROR_CODE>
static void logTCPError(const std::string &text, const ERROR_CODE &errorCode) {
std::cerr << "CarlaConnection - TCP Server: " << text << ": " << errorCode.message() << std::endl;
}
// This is the thread that sends a string over the TCP socket.
static void serverWorkerThread(TCPServer &server, const Reward &rwd) {
static void serverWorkerThread(
TCPServer &server,
thread::AsyncReaderJobQueue<Reward_Values> &thr,
const std::unique_ptr<Protocol> &proto ,
const Reward_Values &rwd
) {
if (!thr.getRestart()){
std::string message;
bool correctSerialize = rwd.SerializeToString(&message);
Reward reward;
proto->LoadReward(reward, rwd);
bool correctSerialize = reward.SerializeToString(&message);
TCPServer::error_code error;
if (correctSerialize) {
server.writeString(message, error);
if (error) { logTCPError("Failed to send", error); }
if (error) {
logTCPError("Failed to send", error);
}
if (!server.Connected() && !thr.getRestart()) {
thr.reconnect();
}
} else {
logTCPError("Falied to serialize", error);
}
}
}
//TODO:
// Sortida amb google protocol
// This is the thread that listens for string over the TCP socket.
static std::string clientWorkerThread(TCPServer &server) {
static std::string clientWorkerThread(TCPServer &server, thread::AsyncWriterJobQueue<std::string> &thr) {
//if (!server.Connected()) server.AcceptSocket();
std::string message;
bool success = false;
do{
if (!thr.getRestart()){
TCPServer::error_code error;
server.readString(message, error);
success = server.readString(message, error);
if (error && (error != boost::asio::error::eof)) { // eof is expected.
logTCPError("Failed to read", error);
return std::string();
}
if (!server.Connected() && !thr.getRestart()) {
thr.reconnect();
break;
}
}
} while (!success);
return message;
}
// This is the thread that listens & sends a string over the TCP world socket.
static std::string worldReceiveThread(TCPServer &server) {
static std::string worldReceiveThread(TCPServer &server, thread::AsyncReadWriteJobQueue<std::string, std::string> &thr) {
//std::lock_guard<std::mutex> lock(server.getMutex());
std::string message;
bool success = false;
do{
if (!thr.getRestart()){
TCPServer::error_code error;
//if (!server.Connected()) server.AcceptSocket();
server.readString(message, error);
success = server.readString(message, error);
if (error && (error != boost::asio::error::eof)) { // eof is expected.
logTCPError("Failed to read world", error);
return std::string();
}
if (!server.Connected() && !thr.getRestart()) {
thr.reconnect();
break;
}
}
}while (!success);
return message;
}
static void worldSendThread(TCPServer &server, const std::string &message) {
TCPServer::error_code error;
//message = message.size + message;
static void worldSendThread(TCPServer &server, thread::AsyncReadWriteJobQueue<std::string, std::string> &thr, const std::string &message) {
//if (!server.Connected()) server.AcceptSocket();
if (!thr.getRestart()){
TCPServer::error_code error;
server.writeString(message, error);
if (error) {
logTCPError("Failed to send world", error);
}
if (!server.Connected() && !thr.getRestart()) {
thr.reconnect();
}
}
}
static void Connect(TCPServer &server) {
if (!server.Connected()) { server.AcceptSocket(); }
std::cout << "Waiting... port: " << server.port << std::endl;
server.AcceptSocket();
}
static void ReconnectAll(CarlaCommunication &communication){
std::lock_guard<std::mutex> lock(_generalMutex);
if (!communication.NeedRestart()){
std::cout << " ---- RECONNECT ALL ...." << std::endl;
if (!communication.getWorldThread().getRestart()){
std::cout << " ---- RESTART WORLD ...." << std::endl;
communication.restartWorld();
communication.getWorldThread().restart();
}
if (!communication.getServerThread().getRestart()){
std::cout << " ---- RESTART SERVER ...." << std::endl;
communication.restartServer();
communication.getServerThread().restart();
}
if (!communication.getClientThread().getRestart()){
std::cout << " ---- RESTART CLIENT ...." << std::endl;
communication.restartClient();
communication.getClientThread().restart();
}
communication.Restart();
}
}
CarlaCommunication::CarlaCommunication(int worldPort, int writePort, int readPort) :
_serverPort(writePort),
_clientPort(readPort),
_worldPort(worldPort),
_world(worldPort),
_server(writePort),
_client(readPort),
_needRestart(false),
_proto(std::make_unique<Protocol>(this)),
_worldThread {
[this]() { return worldReceiveThread(this->_world); },
[this](const std::string & msg) { worldSendThread(this->_world, msg); },
[this]() { Connect(this->_world); }
[this]() { return worldReceiveThread(this->_world, this->_worldThread); },
[this](const std::string & msg) { worldSendThread(this->_world, this->_worldThread, msg); },
[this]() { Connect(this->_world); },
[this]() { ReconnectAll(*this);}
},
_serverThread {
[this](const Reward &rwd) { serverWorkerThread(this->_server, rwd); },
[this]() { Connect(this->_server); }
[this](const Reward_Values &rwd) { serverWorkerThread(this->_server, this->_serverThread, this->_proto, rwd); },
[this]() { Connect(this->_server); },
[this]() { ReconnectAll(*this);}
},
_clientThread {
[this]() { return clientWorkerThread(this->_client); },
[this]() { Connect(this->_client); }
[this]() { return clientWorkerThread(this->_client, this->_clientThread); },
[this]() { Connect(this->_client); },
[this]() { ReconnectAll(*this);}
}
{
_mode = Mode::MONO;
/*std::cout << "WorldPort: " << worldPort << std::endl;
std::cout << "writePort: " << writePort << std::endl;
std::cout << "readPort: " << readPort << std::endl;*/
}
void CarlaCommunication::sendReward(const Reward &reward) {
_serverThread.push(reward);
std::cout << "Send Reward" << std::endl;
void CarlaCommunication::sendReward(const Reward_Values &values) {
_serverThread.push(values);
}
bool CarlaCommunication::tryReadControl(std::string &control) {
return _clientThread.tryPop(control);
bool CarlaCommunication::tryReadControl(float &steer, float &gas) {
steer = 0.0f;
gas = 0.0f;
std::string controlMessage;
if (!_clientThread.tryPop(controlMessage)) return false;
Control control;
if (!control.ParseFromString(controlMessage)) return false;
steer = control.steer();
gas = control.gas();
return true;
}
void CarlaCommunication::sendWorld(const World &world) {
void CarlaCommunication::sendWorld(const uint32_t modes,const uint32_t scenes) {
_needRestart = false;
World world;
_proto->LoadWorld(world, modes, scenes);
std::string message;
bool error = !world.SerializeToString(&message);
_worldThread.push(message);
}
void CarlaCommunication::sendScene(const Scene &scene) {
void CarlaCommunication::sendScene(const Scene_Values &values) {
Scene scene;
_proto -> LoadScene(scene, values);
std::string message;
bool error = !scene.SerializeToString(&message);
_worldThread.push(message);
}
void CarlaCommunication::sendReset(const EpisodeReady &ready) {
void CarlaCommunication::sendReset() {
EpisodeReady eReady;
eReady.set_ready(true);
std::string message;
bool error = !ready.SerializeToString(&message);
if (!error) {
//std::cout << "Send End Reset" << std::endl;
if (eReady.SerializeToString(&message)) {
_worldThread.push(message);
} else {
std::cout << " >> SEND RESET ERROR <<" << std::endl;
}
}
bool CarlaCommunication::tryReadWorldInfo(std::string &info) {
return _worldThread.tryPop(info);
bool CarlaCommunication::tryReadSceneInit(Mode &mode, uint32_t &scene) {
mode = Mode::INVALID;
scene = 0u;
std::string info;
if (!_worldThread.tryPop(info)) return false;
SceneInit sceneInit;
if (!sceneInit.ParseFromString(info)) return false;
mode = getMode(sceneInit.mode());
scene = sceneInit.scene();
_mode = mode;
return true;
}
bool CarlaCommunication::tryReadEpisodeStart(uint32_t &start_index, uint32_t &end_index){
start_index = 0;
end_index = 0;
std::string startData;
if (!_worldThread.tryPop(startData)) return false;
EpisodeStart episodeStart;
if(!episodeStart.ParseFromString(startData)) return false;
start_index = episodeStart.start_index();
end_index = episodeStart.end_index();
return true;
}
void CarlaCommunication::restartServer(){
_server.close();
//_server = TCPServer(_serverPort);
}
void CarlaCommunication::restartWorld(){
_world.close();
//_world = TCPServer(_worldPort);
}
void CarlaCommunication::restartClient(){
_client.close();
//_client = TCPServer(_clientPort);
}
thread::AsyncReaderJobQueue<Reward_Values>& CarlaCommunication::getServerThread(){
return _serverThread;
}
thread::AsyncWriterJobQueue<std::string>& CarlaCommunication::getClientThread(){
return _clientThread;
}
thread::AsyncReadWriteJobQueue<std::string, std::string>& CarlaCommunication::getWorldThread(){
return _worldThread;
}
bool CarlaCommunication::worldConnected(){
return _world.Connected();
}
bool CarlaCommunication::clientConnected(){
return _client.Connected();
}
bool CarlaCommunication::serverConnected(){
return _server.Connected();
}
Mode CarlaCommunication::GetMode(){
return _mode;
}
bool CarlaCommunication::NeedRestart(){
return _needRestart;
}
void CarlaCommunication::Restart(){
_needRestart = true;
}
}
}

View File

@ -14,6 +14,11 @@
class EpisodeReady;
namespace carla {
struct Scene_Values;
struct Reward_Values;
enum class Mode : int8_t;
namespace server {
class CarlaCommunication : private NonCopyable {
@ -21,31 +26,64 @@ namespace server {
explicit CarlaCommunication(int worldPort, int writePort, int readPort);
void sendReward(const Reward &values);
void sendReward(const Reward_Values &values);
bool tryReadControl(std::string &control);
bool tryReadControl(float &steer, float &gas);
void sendScene(const Scene &scene);
void sendScene(const Scene_Values &scene);
void sendReset(const EpisodeReady &ready);
void sendReset();
void sendWorld(const World &world);
//void sendWorld(const World &world);
void sendWorld(const uint32_t modes, const uint32_t scenes);
bool tryReadWorldInfo(std::string &info);
bool tryReadSceneInit(Mode &mode, uint32_t &scene);
bool tryReadEpisodeStart(uint32_t &start_index, uint32_t &end_index);
void restartServer();
void restartWorld();
void restartClient();
thread::AsyncReaderJobQueue<Reward_Values>& getServerThread();
thread::AsyncWriterJobQueue<std::string>& getClientThread();
thread::AsyncReadWriteJobQueue<std::string, std::string>& getWorldThread();
bool worldConnected();
bool clientConnected();
bool serverConnected();
std::mutex getGeneralMutex();
Mode GetMode();
bool NeedRestart();
void Restart();
private:
TCPServer _server;
TCPServer _client;
TCPServer _world;
thread::AsyncReaderJobQueue<Reward> _serverThread;
int _worldPort, _clientPort, _serverPort;
thread::AsyncReaderJobQueue<Reward_Values> _serverThread;
thread::AsyncWriterJobQueue<std::string> _clientThread;
thread::AsyncReadWriteJobQueue<std::string, std::string> _worldThread;
std::atomic_bool _needRestart;
std::atomic<Mode> _mode;
const std::unique_ptr<Protocol> _proto;
};
}

View File

@ -1,6 +1,7 @@
#include "Protocol.h"
#include "Server.h"
#include "CarlaCommunication.h"
#include "carla/CarlaServer.h"
#include "lodepng.h"
#include "carla_protocol.pb.h"
@ -28,7 +29,7 @@ namespace server {
color_img.emplace_back(color.R);
color_img.emplace_back(color.G);
color_img.emplace_back(color.B);
color_img.emplace_back(color.A);
color_img.emplace_back(255u);
}
// Compress to png.
lodepng::State state;
@ -41,8 +42,8 @@ namespace server {
return true;
}
Protocol::Protocol(carla::server::Server *server) {
_server = server;
Protocol::Protocol(carla::server::CarlaCommunication *communication) {
_communication = communication;
}
Protocol::~Protocol() {}
@ -71,13 +72,13 @@ namespace server {
}
}
// auto depths = {values.image_depth_0, values.image_depth_1};
// for (const std::vector<Color> &image : depths) {
// std::vector<unsigned char> png_image;
// if (getPNGImage(image, values.image_width, values.image_height, png_image)) {
// reward.add_depth(std::string(png_image.begin(), png_image.end()));
// }
// }
auto depths = {values.image_depth_0, values.image_depth_1};
for (const std::vector<Color> &image : depths) {
std::vector<unsigned char> png_image;
if (getPNGImage(image, values.image_width, values.image_height, png_image)) {
reward.set_depth(std::string(png_image.begin(), png_image.end()));
}
}
}
void Protocol::LoadScene(Scene &scene, const Scene_Values &values) {
@ -89,7 +90,7 @@ namespace server {
point->set_pos_y(positions[i].y);
}
if (_server->GetMode() == Mode::STEREO) {
if (_communication->GetMode() == Mode::STEREO) {
Scene::Projection_Matrix* matrix;
std::vector<std::array<float,16>> projection_matrix = values.projection_matrices;
for (int i = 0; i < projection_matrix.size(); ++i) {

View File

@ -11,12 +11,12 @@ namespace carla {
namespace server {
class Server;
class CarlaCommunication;
class Protocol {
public:
explicit Protocol(Server *server);
explicit Protocol(CarlaCommunication *communication);
~Protocol();
@ -28,7 +28,7 @@ namespace server {
private:
carla::server::Server *_server;
carla::server::CarlaCommunication *_communication;
};
}

View File

@ -11,133 +11,76 @@
namespace carla {
namespace server {
static Mode getMode(int modeInt) {
switch (modeInt) {
case 1: return Mode::MONO;
case 2: return Mode::STEREO;
default: return Mode::INVALID;
}
}
// -- CarlaServer ------------------------------------------------------------
Server::Server(uint32_t worldPort, uint32_t writePort, uint32_t readPort) :
_communication(std::make_unique<CarlaCommunication>(worldPort, writePort, readPort)),
_proto(std::make_unique<Protocol>(this)){}
_communication(std::make_unique<CarlaCommunication>(worldPort, writePort, readPort)){}
Server::~Server() {
}
void Server::sendReward(const Reward_Values &values) {
Reward reward;
_proto->LoadReward(reward, values);
_communication->sendReward(reward);
//Reward reward;
//_proto->LoadReward(reward, values);
//_communication->sendReward(reward);
_communication->sendReward(values);
}
void Server::sendSceneValues(const Scene_Values &values) {
Scene *scene = new Scene;
_proto->LoadScene(*scene, values);
_communication->sendScene(*scene);
//Scene scene;
//_proto->LoadScene(scene, values);
//_communication->sendScene(scene);
_communication->sendScene(values);
}
void Server::sendEndReset() {
EpisodeReady eReady;
eReady.set_ready(true);
_communication->sendReset(eReady);
_communication->sendReset();
}
void Server::sendWorld(const uint32_t modes, const uint32_t scenes) {
World world;
_proto->LoadWorld(world, modes, scenes);
_communication->sendWorld(world);
//World world;
//_proto->LoadWorld(world, modes, scenes);
//_communication->sendWorld(world);
_communication->sendWorld(modes, scenes);
}
bool Server::tryReadControl(float &steer, float &gas) {
std::string controlMessage;
bool success = _communication->tryReadControl(controlMessage);
Control control;
if (success) {
success &= control.ParseFromString(controlMessage);
}
steer = control.steer();
gas = control.gas();
if (!success) {
steer = 0.0f;
gas = 0.0f;
}
else {
steer = control.steer();
gas = control.gas();
//std::cout << "Steer: " << steer << " Gas: " << gas << std::endl;
}
return success;
return _communication->tryReadControl(steer, gas);
}
bool Server::tryReadSceneInit(Mode &mode, uint32_t &scene) {
std::string initMessage;
bool success = _communication->tryReadWorldInfo(initMessage);
SceneInit sceneInit;
if (success) {
success &= sceneInit.ParseFromString(initMessage);
}
if (!success) {
mode = Mode::INVALID;
scene = 0u;
}
else {
mode = getMode(sceneInit.mode());
scene = sceneInit.scene();
//std::cout << "Mode: " << mode << " Scene: " << scene << std::endl;
}
return success;
return _communication->tryReadSceneInit(mode, scene);
}
bool Server::tryReadEpisodeStart(uint32_t &start_index, uint32_t &end_index) {
std::string startData;
bool success = _communication->tryReadWorldInfo(startData);
EpisodeStart episodeStart;
success &= episodeStart.ParseFromString(startData);
if (!success) {
start_index = 0.0;
end_index = 0.0;
}
else {
start_index = episodeStart.start_index();
end_index = episodeStart.end_index();
//std::cout << "Start: " << start_index << " End: " << end_index << std::endl;
}
return success;
}
void Server::setMode(Mode mode) {
_mode = mode;
return _communication->tryReadEpisodeStart(start_index, end_index);
}
Mode Server::GetMode() const {
return _mode;
return _communication->GetMode();
}
void Server::SetScene(int scene) {
_scene = scene;
}
int Server::GetScene() const {
return _scene;
bool Server::worldConnected() const{
return _communication->worldConnected();
}
void Server::SetReset(bool reset) {
_reset = reset;
bool Server::clientConnected() const{
return _communication->clientConnected();
}
bool Server::Reset() const {
return _reset;
bool Server::serverConnected() const{
return _communication->serverConnected();
}
bool Server::needRestart() const {
return _communication->NeedRestart();
}
} // namespace server

View File

@ -15,7 +15,6 @@ namespace carla {
namespace server {
class CarlaCommunication;
class Protocol;
/// Asynchronous TCP server. Uses two ports, one for sending messages (write)
/// and one for receiving messages (read).
@ -62,21 +61,24 @@ namespace server {
int GetScene() const;
void SetReset(bool reset);
bool worldConnected() const;
bool Reset() const;
bool clientConnected() const;
bool serverConnected() const;
bool needRestart() const;
private:
//std::mutex _mutex;
std::atomic<Mode> _mode { Mode::MONO };
std::atomic_int _scene;
std::atomic_bool _reset;
const std::unique_ptr<CarlaCommunication> _communication;
const std::unique_ptr<Protocol> _proto;
//const std::unique_ptr<Protocol> _proto;
};
} // namespace server

View File

@ -34,47 +34,23 @@ namespace carla {
TCPServer::TCPServer(int port) :
_service(),
_acceptor(_service, tcp::endpoint(tcp::v4(), port)),
_port(port),
port(port),
_socket(_service),
_connected(false){
/*std::ofstream myfile;
myfile.open("TCP" + std::to_string(_port) + ".txt");
myfile << " INIT TCP_SERVER " << std::to_string(_port) << std::endl;
myfile.close();*/
}
_connected(false){}
TCPServer::~TCPServer() {
/*std::ofstream myfile;
myfile.open("TCP" + std::to_string(_port) + ".txt");
myfile << " Y VOLO " << std::to_string(_port) << std::endl;
myfile.close();*/
}
TCPServer::~TCPServer() {}
void TCPServer::AcceptSocket() {
/*std::ofstream myfile;
myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app);
myfile << " Create socket " << std::to_string(_port) <<std::endl;
myfile.close();*/
try {
_acceptor.accept(_socket);
boost::asio::ip::tcp::acceptor acceptor(_service, tcp::endpoint(tcp::v4(), port));
acceptor.accept(_socket);
_connected = true;
_service.run();
std::cout << "Connected port " << port << std::endl;
}
catch (boost::system::system_error) {
/*myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app);
myfile << " >>>>> ACCEPT ERROR <<<<<" << std::endl;
myfile.close();*/
};
/*myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app);
myfile << " CONNECTED... " << std::to_string(_port) << std::endl;
myfile.close();*/
catch (boost::system::system_error) {std::cerr<<"Socket System error"<<std::endl;};
}
bool TCPServer::Connected() {
@ -83,46 +59,34 @@ namespace carla {
void TCPServer::writeString(const std::string &message, error_code &error) {
/*std::ofstream myfile;
myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app);
myfile << "--> WRITE <--" << std::endl;
myfile << " Message: " << message << " // length: " << message.length() << " // byte: " << GetBytes(message.length()) << std::endl;
myfile.close();*/
std::string outMessage(GetBytes(message.length()) + message);
std::cout << message.length() << std::endl;
boost::asio::write(_socket, boost::asio::buffer(outMessage), error);
/*myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app);
myfile << "------- DONE ------" << std::endl;
myfile.close();*/
if (error)
{
_connected = false;
}
}
void TCPServer::readString(std::string &message, error_code &error) {
/*tcp::socket socket(_service);
_acceptor.accept(socket);*/
/*std::ofstream myfile;
myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app);
myfile << "--> READ <--" << std::endl;
myfile << " Create socket " << std::endl;
myfile.close();
myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app);
myfile << "--> READ <--" << std::endl;
myfile.close();*/
bool TCPServer::readString(std::string &message, error_code &error) {
bool end = false, readedBytes = false;
int readedSize = 0, sizeToRead = -1;
if (_socket.available() > 0){
do {
std::array<char, 128> buf;
size_t len = _socket.read_some(boost::asio::buffer(buf), error);
if (boost::asio::error::connection_reset == error)
{
_connected = false;
}
else if (!error){
// @todo find a better way.
for (size_t i = 0; i < len && !end; ++i) {
if (!readedBytes) {
@ -136,15 +100,35 @@ namespace carla {
}
}
}
} while (!readedBytes || sizeToRead > readedSize);
} while ((!readedBytes || sizeToRead > readedSize) && _connected);
return true;
}
else return false;
/*myfile.open("TCP" + std::to_string(_port) + ".txt", std::ios::app);
myfile << "Receive Message: " << message << std::endl;
myfile << "------ DONE ------" << message << std::endl;
myfile.close();*/
}
void TCPServer::close(){
_connected = false;
// flush the socket buffer
std::string message;
TCPServer::error_code error;
readString(message, error);
_service.stop();
_socket.close();
/*_socket.cancel();
_socket.close();
_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
_acceptor.cancel();*/
}
} // namespace server
} // namespace carla

View File

@ -14,7 +14,7 @@ namespace server {
/// { TCP server.
///
/// A new socket is created for every connection (every write and read).
class TCPServer : private NonCopyable {
class TCPServer {
public:
using error_code = boost::system::error_code;
@ -25,23 +25,24 @@ namespace server {
void writeString(const std::string &message, error_code &error);
void readString(std::string &message, error_code &error);
bool readString(std::string &message, error_code &error);
void AcceptSocket();
bool Connected();
const int _port;
void close();
const int port;
private:
boost::asio::io_service _service;
boost::asio::ip::tcp::acceptor _acceptor;
boost::asio::ip::tcp::socket _socket;
bool _connected;
std::atomic_bool _connected;
};
} // namespace server

View File

@ -7,6 +7,7 @@
#include <atomic>
#include <functional>
#include <iostream>
namespace carla {
namespace thread {
@ -20,15 +21,19 @@ namespace thread {
using WritingJob = std::function<W()>;
using ReadingJob = std::function<void(R)>;
using ConnectJob = std::function<void()>;
using ReconnectJob = std::function<void()>;
explicit AsyncReadWriteJobQueue(
WritingJob && writingJob,
ReadingJob && readingJob,
ConnectJob && connectJob) :
ConnectJob && connectJob,
ReconnectJob && reconnectJob) :
_done(false),
_restart(true),
_writeJob(std::move(writingJob)),
_readJob(std::move(readingJob)),
_connectJob(std::move(connectJob)),
_reconnectJob(std::move(reconnectJob)),
_writeQueue(),
_thread(new std::thread(&AsyncReadWriteJobQueue::workerThread, this)) {}
@ -44,28 +49,54 @@ namespace thread {
_readQueue.push(item);
}
void reconnect(){
_reconnectJob();
}
void restart(){
_restart = true;
_readQueue.canWait(false);
_writeQueue.canWait(false);
//_thread->detach();
//_thread = ThreadUniquePointer(new std::thread(&AsyncReadWriteJobQueue::workerThread, this));
}
bool getRestart(){
return _restart;
}
private:
void workerThread() {
while(!_done){
_connectJob();
while (!_done) {
_restart = false;
_readQueue.canWait(true);
while (!_restart && !_done) {
R value;
_readQueue.wait_and_pop(value);
if (_readQueue.wait_and_pop(value)) {
_readJob(value);
}
if (!_restart){
_writeQueue.push(_writeJob());
}
}
}
}
std::atomic_bool _done;
std::atomic_bool _restart;
WritingJob _writeJob;
ReadingJob _readJob;
ConnectJob _connectJob;
ReconnectJob _reconnectJob;
ThreadSafeQueue<W> _writeQueue;
ThreadSafeQueue<R> _readQueue;
const ThreadUniquePointer _thread;
ThreadUniquePointer _thread;
};
} // namespace thread

View File

@ -21,11 +21,14 @@ namespace thread {
using Job = std::function<void(T)>;
using ConnectJob = std::function<void()>;
using ReconnectJob = std::function<void()>;
explicit AsyncReaderJobQueue(Job &&job, ConnectJob &&connectionJob) :
explicit AsyncReaderJobQueue(Job &&job, ConnectJob &&connectionJob, ReconnectJob &&reconnectJob) :
_done(false),
_restart(true),
_job(std::move(job)),
_connectionJob(std::move(connectionJob)),
_reconnectJob(std::move(reconnectJob)),
_queue(),
_thread(new std::thread(&AsyncReaderJobQueue::workerThread, this)) {}
@ -40,27 +43,44 @@ namespace thread {
_queue.push(item);
}
private:
void restart(){
_restart = true;
_queue.canWait(false);
}
bool getRestart(){
return _restart;
}
void reconnect(){
_reconnectJob();
}
private:
void workerThread() {
while (!_done){
_connectionJob();
while (!_done) {
_restart = false;
_queue.canWait(true);
while (!_restart && !_done) {
T value;
_queue.wait_and_pop(value);
_job(value);
if (_queue.wait_and_pop(value)) _job(value);
//Sleep(10);
}
}
}
std::atomic_bool _done;
std::atomic_bool _restart;
Job _job;
ConnectJob _connectionJob;
ReconnectJob _reconnectJob;
ThreadSafeQueue<T> _queue;
const ThreadUniquePointer _thread;
ThreadUniquePointer _thread;
};
} // namespace thread

View File

@ -19,11 +19,14 @@ namespace thread {
using Job = std::function<T()>;
using ConnectJob = std::function<void()>;
using ReconnectJob = std::function<void()>;
explicit AsyncWriterJobQueue(Job &&job, ConnectJob &&connectJob) :
explicit AsyncWriterJobQueue(Job &&job, ConnectJob &&connectJob, ReconnectJob &&reconnectJob) :
_done(false),
_restart(true),
_job(std::move(job)),
_connectJob(std::move(connectJob)),
_reconnectJob(std::move(reconnectJob)),
_queue(),
_thread(new std::thread(&AsyncWriterJobQueue::workerThread, this))
{}
@ -36,24 +39,43 @@ namespace thread {
return _queue.try_pop(value);
}
void restart(){
_restart = true;
_queue.canWait(false);
}
bool getRestart(){
return _restart;
}
void reconnect(){
_reconnectJob();
}
private:
void workerThread() {
while (!_done){
_connectJob();
while (!_done) {
_restart = false;
_queue.canWait(true);
while (!_restart && !_done) {
_queue.push(_job());
//Sleep(10);
}
}
}
std::atomic_bool _done;
std::atomic_bool _restart;
Job _job;
ConnectJob _connectJob;
ReconnectJob _reconnectJob;
ThreadSafeQueue<T> _queue;
const ThreadUniquePointer _thread;
ThreadUniquePointer _thread;
};

View File

@ -7,6 +7,8 @@
#include <mutex>
#include <queue>
#include <iostream>
namespace carla {
namespace thread {
@ -22,6 +24,7 @@ namespace thread {
ThreadSafeQueue(const ThreadSafeQueue &other) {
std::lock_guard<std::mutex> lock(other._mutex);
_queue = other._queue;
_canWait = true;
}
void push(T new_value) {
@ -30,20 +33,28 @@ namespace thread {
_condition.notify_one();
}
void wait_and_pop(T &value) {
std::unique_lock<std::mutex> lock(_mutex);
_condition.wait(lock, [this] {return !_queue.empty(); });
value = _queue.front();
_queue.pop();
void canWait(bool wait){
//std::lock_guard<std::mutex> lock(_mutex);
_canWait = wait;
_condition.notify_one();
}
// std::shared_ptr<T> wait_and_pop() {
// std::unique_lock<std::mutex> lock(_mutex);
// _condition.wait(lock, [this] {return !_queue.empty(); });
// std::shared_ptr<T> res(std::make_shared<T>(_queue.front()));
// _queue.pop();
// return res;
// }
bool wait_and_pop(T &value) {
std::unique_lock<std::mutex> lock(_mutex);
_condition.wait(lock, [this] {
return !_queue.empty() || !_canWait;
});
//while(_queue.empty() && _canWait);
if (!_queue.empty() && _canWait) {
value = _queue.front();
_queue.pop();
return true;
}
else return false;
}
bool try_pop(T &value) {
std::lock_guard<std::mutex> lock(_mutex);
@ -57,28 +68,26 @@ namespace thread {
}
}
// std::shared_ptr<T> try_pop() {
// std::lock_guard<std::mutex> lock(_mutex);
// if (_queue.empty()) {
// return std::shared_ptr<T>();
// }
// std::shared_ptr<T> res(std::make_shared<T>(_queue.front()));
// _queue.pop();
// return res;
// }
bool empty() const {
std::lock_guard<std::mutex> lock(_mutex);
return _queue.empty();
}
private:
mutable std::mutex _mutex;
std::atomic_bool _canWait;
std::queue<T> _queue;
std::condition_variable _condition;
};
} // namespace thread

View File

@ -38,6 +38,7 @@ static std::vector<carla::Color> makeImage(uint32_t width, uint32_t height) {
img[4 * width * i + 4 * e + 3] = 255;
}
}
std::vector<carla::Color> image(width * height);
size_t i = 0u;
for (carla::Color &color : image) {
@ -55,7 +56,6 @@ int main(int argc, char *argv[]) {
std::cerr << "Usage: server <world-port> <write-port> <read-port>" << std::endl;
return InvalidArguments;
}
const uint32_t worldPort = toInt(argv[1u]);
const uint32_t writePort = toInt(argv[2u]);
const uint32_t readPort = toInt(argv[3u]);
@ -87,19 +87,25 @@ int main(int argc, char *argv[]) {
reward.image_depth_0 = makeImage(imageWidth, imageHeight);
reward.image_depth_1 = makeImage(imageWidth, imageHeight);
std::cout << "Server send World" << std::endl;
for (;;){
if (server.worldConnected()){
server.init(1u);
{
std::cout << "Server wait scene init" << std::endl;
carla::Mode mode;
uint32_t scene;
while (!server.tryReadSceneInit(mode, scene)) {}
while(!server.needRestart() && !server.tryReadSceneInit(mode, scene));
std::cout << "Received: mode = "
<< (mode == carla::Mode::MONO ? "MONO" : "STEREO")
<< ", scene = "
<< scene << std::endl;
}
{
carla::Scene_Values sceneValues;
sceneValues.possible_positions.push_back({0.0f, 0.0f});
@ -108,36 +114,38 @@ int main(int argc, char *argv[]) {
const std::array<float, 16u> pMatrix = {{ 10.0 }};
sceneValues.projection_matrices.push_back(pMatrix);
std::cout << "Server send positions" << std::endl;
server.sendSceneValues(sceneValues);
std::cout << "Server wait new episode" << std::endl;
std::cout << "New episode" << std::endl;
uint32_t start, end;
while (!server.tryReadEpisodeStart(start, end)) {}
while (!server.needRestart() && !server.tryReadEpisodeStart(start, end));
std::cout << "Received: startIndex = " << start
<< ", endIndex = " << end << std::endl;
}
std::cout << "Server send end reset" << std::endl;
server.sendEndReset();
for (;;) {
while (!server.needRestart()) {
if (server.clientConnected() && server.serverConnected()){
float steer, gas;
uint32_t startPoint, endPoint;
if (server.tryReadEpisodeStart(startPoint, endPoint)) {
std::cout << "------> RESET <------" << std::endl;
std::cout << " --> Start: " << startPoint << " End: " << endPoint << std::endl;
std::cout << "-------- RESET --------" << std::endl;
std::cout << "--> Start: " << startPoint << " End: " << endPoint << " <--" << std::endl;
server.sendEndReset();
} else {
if (server.tryReadControl(steer, gas)) {
std::cout << "Steer: " << steer << "Gas: " << gas << std::endl;
std::cout << "Steer: " << steer << " Gas: " << gas << std::endl;
}
static decltype(carla::Reward_Values::timestamp) timestamp = 0u;
reward.timestamp = timestamp++;
server.sendReward(reward);
}
}
}
std::cout << " ----- RESTARTING -----" << std::endl;
}
}
} catch (const std::exception &e) {
std::cerr << e.what() << std::endl;
return STLException;