diff --git a/Source/CarlaServer/.gitignore b/Source/CarlaServer/.gitignore new file mode 100644 index 000000000..640ec7f31 --- /dev/null +++ b/Source/CarlaServer/.gitignore @@ -0,0 +1,11 @@ +*.sln +*.vcxproj +*.vcxproj.filters +*.VC.opendb +*.VC.db + +build +bin +lib +CMakeCache.txt +CMakeFiles diff --git a/Source/CarlaServer/CMakeLists.txt b/Source/CarlaServer/CMakeLists.txt new file mode 100644 index 000000000..e03bbfc01 --- /dev/null +++ b/Source/CarlaServer/CMakeLists.txt @@ -0,0 +1,16 @@ +cmake_minimum_required (VERSION 2.6) +project (CarlaServer) + +# Boost configuration +SET(Boost_USE_STATIC_LIBS ON) +find_package(Boost REQUIRED system date_time regex) +include_directories(${Boost_INCLUDE_DIRS}) + +set(LIBRARY_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/lib) +set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin) + +include_directories("${PROJECT_SOURCE_DIR}/source") +include_directories("${PROJECT_SOURCE_DIR}/../Common") + +add_subdirectory(source/carla/server) +add_subdirectory(source/test) diff --git a/Source/CarlaServer/Makefile b/Source/CarlaServer/Makefile new file mode 100644 index 000000000..652eb8769 --- /dev/null +++ b/Source/CarlaServer/Makefile @@ -0,0 +1,10 @@ +BUILD_FOLDER=build + +vsproject: + cmake -H. -B$(BUILD_FOLDER) -G "Visual Studio 14 2015 Win64" + +clean: + rm -Rf build CMakeFiles + +clean-all: clean + rm -Rf bin lib diff --git a/Source/CarlaServer/README.md b/Source/CarlaServer/README.md new file mode 100644 index 000000000..64d7c689d --- /dev/null +++ b/Source/CarlaServer/README.md @@ -0,0 +1,15 @@ +CarlaServer +=========== + +Library for socket communications. + +Requires boost libraries installed. + +Building +-------- + +To generate the Visual Studio solution + + make vsproject + +The solution gets generated at `./build/CarlaServer.sln`. diff --git a/Source/CarlaServer/source/Carla.h b/Source/CarlaServer/source/Carla.h new file mode 100644 index 000000000..a9448ee4f --- /dev/null +++ b/Source/CarlaServer/source/Carla.h @@ -0,0 +1,5 @@ +#pragma once + +#define CARLA_API + +#include "../../Common/NonCopyable.h" diff --git a/Source/CarlaServer/source/carla/server/CMakeLists.txt b/Source/CarlaServer/source/carla/server/CMakeLists.txt new file mode 100644 index 000000000..658a5e485 --- /dev/null +++ b/Source/CarlaServer/source/carla/server/CMakeLists.txt @@ -0,0 +1,12 @@ +# add_library(carla_server +# CarlaServer.h +# CarlaServer.cpp +# TCPServer.h +# TCPServer.cpp +# ) +file(GLOB carla_server_SRC + "*.h" + "*.cpp" +) + +add_library(carla_server ${carla_server_SRC}) diff --git a/Source/CarlaServer/source/carla/server/CarlaServer.cpp b/Source/CarlaServer/source/carla/server/CarlaServer.cpp new file mode 100644 index 000000000..aac2aea76 --- /dev/null +++ b/Source/CarlaServer/source/carla/server/CarlaServer.cpp @@ -0,0 +1,89 @@ +// CARLA, Copyright (C) 2017 Computer Vision Center (CVC) + +#include "Carla.h" +#include "CarlaServer.h" + +#include +#include +#include + +#include +#include + +namespace carla { +namespace server { + + template + static void logTCPError(const std::string &text, const ERROR_CODE &errorCode) { + std::cerr << "CarlaServer - TCP Server: " << text << ": " << errorCode.message() << std::endl; + } + + // -- Static methods --------------------------------------------------------- + + // This is the thread that sends a string over the TCP socket. + static void serverWorkerThread(TCPServer &server, const std::string &message) { + TCPServer::error_code error; + server.writeString(message, error); + if (error) + logTCPError("Failed to send", error); + } + + // This is the thread that listens for string over the TCP socket. + static std::string clientWorkerThread(TCPServer &server) { + std::string message; + TCPServer::error_code error; + server.readString(message, error); + if (error && (error != boost::asio::error::eof)) { // eof is expected. + logTCPError("Failed to read", error); + return std::string(); + } + return message; + } + + // -- CarlaServer::Pimpl ----------------------------------------------------- + + class CarlaServer::Pimpl : private NonCopyable { + public: + + explicit Pimpl(int writePort, int readPort) : + _server(writePort), + _client(readPort), + _serverThread([this](const std::string &str){ serverWorkerThread(this->_server, str); }), + _clientThread([this](){ return clientWorkerThread(this->_client); }) {} + + void writeString(const std::string &message) { + _serverThread.push(message); + } + + bool tryReadString(std::string &message) { + return _clientThread.tryPop(message); + } + + private: + + TCPServer _server; + + TCPServer _client; + + thread::AsyncReaderJobQueue _serverThread; + + thread::AsyncWriterJobQueue _clientThread; + }; + + // -- CarlaServer ------------------------------------------------------------ + + CarlaServer::CarlaServer(int writePort, int readPort) : + _pimpl(std::make_unique(writePort, readPort)) {} + + CarlaServer::~CarlaServer() {} + + void CarlaServer::writeString(const std::string &message) { + _pimpl->writeString(message); + } + + bool CarlaServer::tryReadString(std::string &message) { + return _pimpl->tryReadString(message); + } + +} // namespace server +} // namespace carla diff --git a/Source/CarlaServer/source/carla/server/CarlaServer.h b/Source/CarlaServer/source/carla/server/CarlaServer.h new file mode 100644 index 000000000..2a274aa37 --- /dev/null +++ b/Source/CarlaServer/source/carla/server/CarlaServer.h @@ -0,0 +1,41 @@ +// CARLA, Copyright (C) 2017 Computer Vision Center (CVC) + +#pragma once + +#include +#include + +namespace carla { +namespace server { + + /// Asynchronous TCP server. Uses two ports, one for sending messages (write) + /// and one for receiving messages (read). + /// + /// Writing and reading are executed in two different threads. Each thread has + /// its own queue of messages. + /// + /// Note that a new socket is created for every connection (every write and + /// read). + class CARLA_API CarlaServer : private NonCopyable { + public: + + /// Starts two threads for writing and reading. + explicit CarlaServer(int writePort, int readPort); + + ~CarlaServer(); + + /// Push a string to the sending queue. + void writeString(const std::string &message); + + /// Try to read a string from the receiving queue. Return false if the queue + /// is empty. + bool tryReadString(std::string &message); + + private: + + class Pimpl; + const std::unique_ptr _pimpl; + }; + +} // namespace server +} // namespace carla diff --git a/Source/CarlaServer/source/carla/server/TCPServer.cpp b/Source/CarlaServer/source/carla/server/TCPServer.cpp new file mode 100644 index 000000000..0fe387dcb --- /dev/null +++ b/Source/CarlaServer/source/carla/server/TCPServer.cpp @@ -0,0 +1,47 @@ +// CARLA, Copyright (C) 2017 Computer Vision Center (CVC) + +#include "Carla.h" +#include "TCPServer.h" + +namespace carla { +namespace server { + + using boost::asio::ip::tcp; + + TCPServer::TCPServer(int port) : + _service(), + _acceptor(_service, tcp::endpoint(tcp::v4(), port)) {} + + TCPServer::~TCPServer() {} + + void TCPServer::writeString(const std::string &message, error_code &error) { + tcp::socket socket(_service); + _acceptor.accept(socket); + + boost::asio::write(socket, boost::asio::buffer(message), error); + } + + void TCPServer::readString(std::string &message, error_code &error) { + tcp::socket socket(_service); + _acceptor.accept(socket); + + for (;; ) { + std::array buf; + + size_t len = socket.read_some(boost::asio::buffer(buf), error); + + if (error == boost::asio::error::eof) { + break; // Connection closed cleanly by peer. + } else if (error) { + return; + } + + // @todo find a better way. + for (size_t i = 0u; i < len; ++i) { + message += buf[i]; + } + } + } + +} // namespace server +} // namespace carla diff --git a/Source/CarlaServer/source/carla/server/TCPServer.h b/Source/CarlaServer/source/carla/server/TCPServer.h new file mode 100644 index 000000000..2c63cc59e --- /dev/null +++ b/Source/CarlaServer/source/carla/server/TCPServer.h @@ -0,0 +1,36 @@ +// CARLA, Copyright (C) 2017 Computer Vision Center (CVC) + +#pragma once + +#include + +#include + +namespace carla { +namespace server { + + /// Synchronous TCP server. + /// + /// A new socket is created for every connection (every write and read). + class CARLA_API TCPServer : private NonCopyable { + public: + + using error_code = boost::system::error_code; + + explicit TCPServer(int port); + + ~TCPServer(); + + void writeString(const std::string &message, error_code &error); + + void readString(std::string &message, error_code &error); + + private: + + boost::asio::io_service _service; + + boost::asio::ip::tcp::acceptor _acceptor; + }; + +} // namespace server +} // namespace carla diff --git a/Source/CarlaServer/source/carla/thread/AsyncReaderJobQueue.h b/Source/CarlaServer/source/carla/thread/AsyncReaderJobQueue.h new file mode 100644 index 000000000..4fd75dcbf --- /dev/null +++ b/Source/CarlaServer/source/carla/thread/AsyncReaderJobQueue.h @@ -0,0 +1,58 @@ +// CARLA, Copyright (C) 2017 Computer Vision Center (CVC) + +#pragma once + +#include +#include + +#include +#include + +namespace carla { +namespace thread { + + /// Executes the given job asynchronously for each item added to the queue. + /// + /// The job gets called each time an item is added to the queue, the item is + /// passed as argument. + template + class CARLA_API AsyncReaderJobQueue { + public: + + using Job = std::function; + + explicit AsyncReaderJobQueue(Job &&job) : + _done(false), + _job(std::move(job)), + _queue(), + _thread(new std::thread(&AsyncReaderJobQueue::workerThread, this)) {} + + ~AsyncReaderJobQueue() { + _done = true; + } + + void push(T item) { + _queue.push(item); + } + + private: + + void workerThread() { + while (!_done) { + T value; + _queue.wait_and_pop(value); + _job(value); + } + } + + std::atomic_bool _done; + + Job _job; + + ThreadSafeQueue _queue; + + const ThreadUniquePointer _thread; + }; + +} // namespace thread +} // namespace carla diff --git a/Source/CarlaServer/source/carla/thread/AsyncWriterJobQueue.h b/Source/CarlaServer/source/carla/thread/AsyncWriterJobQueue.h new file mode 100644 index 000000000..3b359c59b --- /dev/null +++ b/Source/CarlaServer/source/carla/thread/AsyncWriterJobQueue.h @@ -0,0 +1,54 @@ +// CARLA, Copyright (C) 2017 Computer Vision Center (CVC) + +#pragma once + +#include +#include + +#include +#include + +namespace carla { +namespace thread { + + /// Executes the given job asynchronously. Every item that the job returns is + /// added to the queue. + template + class CARLA_API AsyncWriterJobQueue { + public: + + using Job = std::function; + + explicit AsyncWriterJobQueue(Job &&job) : + _done(false), + _job(std::move(job)), + _queue(), + _thread(new std::thread(&AsyncWriterJobQueue::workerThread, this)) {} + + ~AsyncWriterJobQueue() { + _done = true; + } + + bool tryPop(T &value) { + return _queue.try_pop(value); + } + + private: + + void workerThread() { + while (!_done) { + _queue.push(_job()); + } + } + + std::atomic_bool _done; + + Job _job; + + ThreadSafeQueue _queue; + + const ThreadUniquePointer _thread; + }; + +} // namespace thread +} // namespace carla diff --git a/Source/CarlaServer/source/carla/thread/ThreadSafeQueue.h b/Source/CarlaServer/source/carla/thread/ThreadSafeQueue.h new file mode 100644 index 000000000..76cfb43c1 --- /dev/null +++ b/Source/CarlaServer/source/carla/thread/ThreadSafeQueue.h @@ -0,0 +1,83 @@ +// CARLA, Copyright (C) 2017 Computer Vision Center (CVC) + +#pragma once + +#include +// #include +#include +#include + +namespace carla { +namespace thread { + + /// A thread safe queue. + /// + /// From "C++ Concurrency In Action", Anthony Williams, listing 4.5. + template + class CARLA_API ThreadSafeQueue { + public: + + ThreadSafeQueue() = default; + + ThreadSafeQueue(const ThreadSafeQueue &other) { + std::lock_guard lock(other._mutex); + _queue = other._queue; + } + + void push(T new_value) { + std::lock_guard lock(_mutex); + _queue.push(new_value); + _condition.notify_one(); + } + + void wait_and_pop(T &value) { + std::unique_lock lock(_mutex); + _condition.wait(lock, [this] {return !_queue.empty(); }); + value = _queue.front(); + _queue.pop(); + } + + // std::shared_ptr wait_and_pop() { + // std::unique_lock lock(_mutex); + // _condition.wait(lock, [this] {return !_queue.empty(); }); + // std::shared_ptr res(std::make_shared(_queue.front())); + // _queue.pop(); + // return res; + // } + + bool try_pop(T &value) { + std::lock_guard lock(_mutex); + if (_queue.empty()) { + return false; + } + value = _queue.front(); + _queue.pop(); + return true; + } + + // std::shared_ptr try_pop() { + // std::lock_guard lock(_mutex); + // if (_queue.empty()) { + // return std::shared_ptr(); + // } + // std::shared_ptr res(std::make_shared(_queue.front())); + // _queue.pop(); + // return res; + // } + + bool empty() const { + std::lock_guard lock(_mutex); + return _queue.empty(); + } + + private: + + mutable std::mutex _mutex; + + std::queue _queue; + + std::condition_variable _condition; + }; + +} // namespace thread +} // namespace carla diff --git a/Source/CarlaServer/source/carla/thread/ThreadUniquePointer.h b/Source/CarlaServer/source/carla/thread/ThreadUniquePointer.h new file mode 100644 index 000000000..104bc74a6 --- /dev/null +++ b/Source/CarlaServer/source/carla/thread/ThreadUniquePointer.h @@ -0,0 +1,30 @@ +// CARLA, Copyright (C) 2017 Computer Vision Center (CVC) + +#pragma once + +#include +#include + +namespace carla { +namespace thread { + + template + class JoinAndDeletePointer { + public: + + void operator()(T *ptr) { + if (ptr) { + if (ptr->joinable()) { + ptr->join(); + } + delete ptr; + } + } + + }; + + using ThreadUniquePointer = + std::unique_ptr>; + +} // namespace thread +} // namespace carla diff --git a/Source/CarlaServer/source/test/CMakeLists.txt b/Source/CarlaServer/source/test/CMakeLists.txt new file mode 100644 index 000000000..55554eea4 --- /dev/null +++ b/Source/CarlaServer/source/test/CMakeLists.txt @@ -0,0 +1,8 @@ +add_executable(test_async_server async_server.cpp) +target_link_libraries(test_async_server carla_server ${Boost_SYSTEM_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_REGEX_LIBRARY}) + +add_executable(test_sync_server sync_server.cpp) +target_link_libraries(test_sync_server carla_server ${Boost_SYSTEM_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_REGEX_LIBRARY}) + +add_executable(test_client client.cpp) +target_link_libraries(test_client carla_server ${Boost_SYSTEM_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_REGEX_LIBRARY}) diff --git a/Source/CarlaServer/source/test/async_server.cpp b/Source/CarlaServer/source/test/async_server.cpp new file mode 100644 index 000000000..4d271050f --- /dev/null +++ b/Source/CarlaServer/source/test/async_server.cpp @@ -0,0 +1,66 @@ +// CARLA, Copyright (C) 2017 Computer Vision Center (CVC) + +#include "Carla.h" + +#include + +#include +#include +#include +#include + +enum ErrorCodes { + InvalidArguments, + STLException, + UnknownException, + ErrorSending, + ErrorReading +}; + +static int toInt(const std::string &str) { + return std::stoi(str); +} + +static std::string daytimeString() { + using namespace std; + time_t now = time(0); + std::string str = ctime(&now); + return str; +} + +int main(int argc, char* argv[]) { + try { + if (argc != 3) { + std::cerr << "Usage: server " << std::endl; + return InvalidArguments; + } + + // This already starts the two threads. + carla::server::CarlaServer server(toInt(argv[1u]), toInt(argv[2u])); + + // Let's simulate the game loop. + for (;;) { + std::cout << "Sending..." << std::endl; + auto time = daytimeString(); + server.writeString(time); + + using namespace std::chrono_literals; + std::this_thread::sleep_for(1s); + + std::cout << "Listening..." << std::endl; + std::string message; + if (server.tryReadString(message)) { + std::cout << "Received: " << message << std::endl; + if ((message == "q") || (message == "quit")) + break; + } + } + + } catch (const std::exception &e) { + std::cerr << e.what() << std::endl; + return STLException; + } catch (...) { + std::cerr << "Unknown exception thrown" << std::endl; + return UnknownException; + } +} diff --git a/Source/CarlaServer/source/test/client.cpp b/Source/CarlaServer/source/test/client.cpp new file mode 100644 index 000000000..6e96d496b --- /dev/null +++ b/Source/CarlaServer/source/test/client.cpp @@ -0,0 +1,80 @@ +// CARLA, Copyright (C) 2017 Computer Vision Center (CVC) + +#include "Carla.h" + +#include +#include + +#include + +enum ErrorCodes { + InvalidArguments, + STLException, + UnknownException +}; + +int main(int argc, char* argv[]) { + try { + if (argc != 4) { + std::cerr << "Usage: client " << std::endl; + return InvalidArguments; + } + + using boost::asio::ip::tcp; + + boost::asio::io_service io_service; + + tcp::resolver resolver(io_service); + tcp::resolver::query read_query(argv[1], argv[2]); + tcp::resolver::iterator read_endpoint_iterator = resolver.resolve(read_query); + tcp::resolver::query send_query(argv[1], argv[3]); + tcp::resolver::iterator send_endpoint_iterator = resolver.resolve(send_query); + + for (;;) { + // Read message. + { + tcp::socket socket(io_service); + boost::asio::connect(socket, read_endpoint_iterator); + + for (;;) + { + std::array buf; + boost::system::error_code error; + + size_t len = socket.read_some(boost::asio::buffer(buf), error); + + if (error == boost::asio::error::eof) + break; // Connection closed cleanly by peer. + else if (error) + throw boost::system::system_error(error); // Some other error. + + std::cout.write(buf.data(), len); + } + } + + // Send message. + { + std::cout << std::endl; + std::string reply; + std::cin >> reply; + + tcp::socket socket(io_service); + boost::asio::connect(socket, send_endpoint_iterator); + + boost::system::error_code error; + boost::asio::write(socket, boost::asio::buffer(reply), error); + if (error) + throw boost::system::system_error(error); + + if ((reply == "q") || (reply == "quit")) + break; + } + } + } catch (const std::exception &e) { + std::cerr << e.what() << std::endl; + return STLException; + } catch (...) { + std::cerr << "Unknown exception thrown" << std::endl; + return UnknownException; + } +} diff --git a/Source/CarlaServer/source/test/sync_server.cpp b/Source/CarlaServer/source/test/sync_server.cpp new file mode 100644 index 000000000..09aed60ba --- /dev/null +++ b/Source/CarlaServer/source/test/sync_server.cpp @@ -0,0 +1,85 @@ +// CARLA, Copyright (C) 2017 Computer Vision Center (CVC) + +#include "Carla.h" + +#include + +#include +#include + +enum ErrorCodes { + InvalidArguments, + STLException, + UnknownException, + ErrorSending, + ErrorReading +}; + +static int toInt(const std::string &str) { + return std::stoi(str); +} + +int main(int argc, char* argv[]) { + try { + if (argc != 2) { + std::cerr << "Usage: server " << std::endl; + return InvalidArguments; + } + + using namespace carla::server; + + TCPServer server(toInt(argv[1u])); + + // Send message. + { + const std::string message = "What's up?"; + std::cout << "Sending " << message << "..." << std::endl; + TCPServer::error_code error; + server.writeString(message, error); + if (error) { + std::cerr << "Error sending: " << error.message() << std::endl; + return ErrorSending; + } + } + + for (;;) { + // Read message. + { + std::cout << "Reading..." << std::endl; + std::string message; + TCPServer::error_code error; + server.readString(message, error); + if (error && (error != boost::asio::error::eof)) { + std::cerr << "Error reading: " << error.message() << std::endl; + return ErrorReading; + } + + std::cout << "They said " << message << std::endl; + } + + // Send reply. + { + std::cout << "What do I say now?" << std::endl; + std::string message; + std::cin >> message; + + if ((message == "q") || (message == "quit")) + break; + + TCPServer::error_code error; + server.writeString(message, error); + if (error) { + std::cerr << "Error sending: " << error.message() << std::endl; + return ErrorSending; + } + } + } + + } catch (const std::exception &e) { + std::cerr << e.what() << std::endl; + return STLException; + } catch (...) { + std::cerr << "Unknown exception thrown" << std::endl; + return UnknownException; + } +} diff --git a/Source/Common/NonCopyable.h b/Source/Common/NonCopyable.h new file mode 100644 index 000000000..839037aea --- /dev/null +++ b/Source/Common/NonCopyable.h @@ -0,0 +1,11 @@ +#pragma once + +class CARLA_API NonCopyable { +public: + + NonCopyable() = default; + + NonCopyable(const NonCopyable &) = delete; + + void operator=(const NonCopyable &x) = delete; +};