Add basic socket library

This commit is contained in:
nsubiron 2017-03-03 14:54:19 +00:00
parent 942f838774
commit bd7190308f
19 changed files with 757 additions and 0 deletions

11
Source/CarlaServer/.gitignore vendored Normal file
View File

@ -0,0 +1,11 @@
*.sln
*.vcxproj
*.vcxproj.filters
*.VC.opendb
*.VC.db
build
bin
lib
CMakeCache.txt
CMakeFiles

View File

@ -0,0 +1,16 @@
cmake_minimum_required (VERSION 2.6)
project (CarlaServer)
# Boost configuration
SET(Boost_USE_STATIC_LIBS ON)
find_package(Boost REQUIRED system date_time regex)
include_directories(${Boost_INCLUDE_DIRS})
set(LIBRARY_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/lib)
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin)
include_directories("${PROJECT_SOURCE_DIR}/source")
include_directories("${PROJECT_SOURCE_DIR}/../Common")
add_subdirectory(source/carla/server)
add_subdirectory(source/test)

View File

@ -0,0 +1,10 @@
BUILD_FOLDER=build
vsproject:
cmake -H. -B$(BUILD_FOLDER) -G "Visual Studio 14 2015 Win64"
clean:
rm -Rf build CMakeFiles
clean-all: clean
rm -Rf bin lib

View File

@ -0,0 +1,15 @@
CarlaServer
===========
Library for socket communications.
Requires boost libraries installed.
Building
--------
To generate the Visual Studio solution
make vsproject
The solution gets generated at `./build/CarlaServer.sln`.

View File

@ -0,0 +1,5 @@
#pragma once
#define CARLA_API
#include "../../Common/NonCopyable.h"

View File

@ -0,0 +1,12 @@
# add_library(carla_server
# CarlaServer.h
# CarlaServer.cpp
# TCPServer.h
# TCPServer.cpp
# )
file(GLOB carla_server_SRC
"*.h"
"*.cpp"
)
add_library(carla_server ${carla_server_SRC})

View File

@ -0,0 +1,89 @@
// CARLA, Copyright (C) 2017 Computer Vision Center (CVC)
#include "Carla.h"
#include "CarlaServer.h"
#include <carla/server/TCPServer.h>
#include <carla/thread/AsyncReaderJobQueue.h>
#include <carla/thread/AsyncWriterJobQueue.h>
#include <iostream>
#include <memory>
namespace carla {
namespace server {
template <typename ERROR_CODE>
static void logTCPError(const std::string &text, const ERROR_CODE &errorCode) {
std::cerr << "CarlaServer - TCP Server: " << text << ": " << errorCode.message() << std::endl;
}
// -- Static methods ---------------------------------------------------------
// This is the thread that sends a string over the TCP socket.
static void serverWorkerThread(TCPServer &server, const std::string &message) {
TCPServer::error_code error;
server.writeString(message, error);
if (error)
logTCPError("Failed to send", error);
}
// This is the thread that listens for string over the TCP socket.
static std::string clientWorkerThread(TCPServer &server) {
std::string message;
TCPServer::error_code error;
server.readString(message, error);
if (error && (error != boost::asio::error::eof)) { // eof is expected.
logTCPError("Failed to read", error);
return std::string();
}
return message;
}
// -- CarlaServer::Pimpl -----------------------------------------------------
class CarlaServer::Pimpl : private NonCopyable {
public:
explicit Pimpl(int writePort, int readPort) :
_server(writePort),
_client(readPort),
_serverThread([this](const std::string &str){ serverWorkerThread(this->_server, str); }),
_clientThread([this](){ return clientWorkerThread(this->_client); }) {}
void writeString(const std::string &message) {
_serverThread.push(message);
}
bool tryReadString(std::string &message) {
return _clientThread.tryPop(message);
}
private:
TCPServer _server;
TCPServer _client;
thread::AsyncReaderJobQueue<std::string> _serverThread;
thread::AsyncWriterJobQueue<std::string> _clientThread;
};
// -- CarlaServer ------------------------------------------------------------
CarlaServer::CarlaServer(int writePort, int readPort) :
_pimpl(std::make_unique<Pimpl>(writePort, readPort)) {}
CarlaServer::~CarlaServer() {}
void CarlaServer::writeString(const std::string &message) {
_pimpl->writeString(message);
}
bool CarlaServer::tryReadString(std::string &message) {
return _pimpl->tryReadString(message);
}
} // namespace server
} // namespace carla

View File

@ -0,0 +1,41 @@
// CARLA, Copyright (C) 2017 Computer Vision Center (CVC)
#pragma once
#include <memory>
#include <string>
namespace carla {
namespace server {
/// Asynchronous TCP server. Uses two ports, one for sending messages (write)
/// and one for receiving messages (read).
///
/// Writing and reading are executed in two different threads. Each thread has
/// its own queue of messages.
///
/// Note that a new socket is created for every connection (every write and
/// read).
class CARLA_API CarlaServer : private NonCopyable {
public:
/// Starts two threads for writing and reading.
explicit CarlaServer(int writePort, int readPort);
~CarlaServer();
/// Push a string to the sending queue.
void writeString(const std::string &message);
/// Try to read a string from the receiving queue. Return false if the queue
/// is empty.
bool tryReadString(std::string &message);
private:
class Pimpl;
const std::unique_ptr<Pimpl> _pimpl;
};
} // namespace server
} // namespace carla

View File

@ -0,0 +1,47 @@
// CARLA, Copyright (C) 2017 Computer Vision Center (CVC)
#include "Carla.h"
#include "TCPServer.h"
namespace carla {
namespace server {
using boost::asio::ip::tcp;
TCPServer::TCPServer(int port) :
_service(),
_acceptor(_service, tcp::endpoint(tcp::v4(), port)) {}
TCPServer::~TCPServer() {}
void TCPServer::writeString(const std::string &message, error_code &error) {
tcp::socket socket(_service);
_acceptor.accept(socket);
boost::asio::write(socket, boost::asio::buffer(message), error);
}
void TCPServer::readString(std::string &message, error_code &error) {
tcp::socket socket(_service);
_acceptor.accept(socket);
for (;; ) {
std::array<char, 128> buf;
size_t len = socket.read_some(boost::asio::buffer(buf), error);
if (error == boost::asio::error::eof) {
break; // Connection closed cleanly by peer.
} else if (error) {
return;
}
// @todo find a better way.
for (size_t i = 0u; i < len; ++i) {
message += buf[i];
}
}
}
} // namespace server
} // namespace carla

View File

@ -0,0 +1,36 @@
// CARLA, Copyright (C) 2017 Computer Vision Center (CVC)
#pragma once
#include <string>
#include <boost/asio.hpp>
namespace carla {
namespace server {
/// Synchronous TCP server.
///
/// A new socket is created for every connection (every write and read).
class CARLA_API TCPServer : private NonCopyable {
public:
using error_code = boost::system::error_code;
explicit TCPServer(int port);
~TCPServer();
void writeString(const std::string &message, error_code &error);
void readString(std::string &message, error_code &error);
private:
boost::asio::io_service _service;
boost::asio::ip::tcp::acceptor _acceptor;
};
} // namespace server
} // namespace carla

View File

@ -0,0 +1,58 @@
// CARLA, Copyright (C) 2017 Computer Vision Center (CVC)
#pragma once
#include <carla/thread/ThreadSafeQueue.h>
#include <carla/thread/ThreadUniquePointer.h>
#include <atomic>
#include <functional>
namespace carla {
namespace thread {
/// Executes the given job asynchronously for each item added to the queue.
///
/// The job gets called each time an item is added to the queue, the item is
/// passed as argument.
template <typename T>
class CARLA_API AsyncReaderJobQueue {
public:
using Job = std::function<void(T)>;
explicit AsyncReaderJobQueue(Job &&job) :
_done(false),
_job(std::move(job)),
_queue(),
_thread(new std::thread(&AsyncReaderJobQueue::workerThread, this)) {}
~AsyncReaderJobQueue() {
_done = true;
}
void push(T item) {
_queue.push(item);
}
private:
void workerThread() {
while (!_done) {
T value;
_queue.wait_and_pop(value);
_job(value);
}
}
std::atomic_bool _done;
Job _job;
ThreadSafeQueue<T> _queue;
const ThreadUniquePointer _thread;
};
} // namespace thread
} // namespace carla

View File

@ -0,0 +1,54 @@
// CARLA, Copyright (C) 2017 Computer Vision Center (CVC)
#pragma once
#include <carla/thread/ThreadSafeQueue.h>
#include <carla/thread/ThreadUniquePointer.h>
#include <atomic>
#include <functional>
namespace carla {
namespace thread {
/// Executes the given job asynchronously. Every item that the job returns is
/// added to the queue.
template <typename T>
class CARLA_API AsyncWriterJobQueue {
public:
using Job = std::function<T()>;
explicit AsyncWriterJobQueue(Job &&job) :
_done(false),
_job(std::move(job)),
_queue(),
_thread(new std::thread(&AsyncWriterJobQueue::workerThread, this)) {}
~AsyncWriterJobQueue() {
_done = true;
}
bool tryPop(T &value) {
return _queue.try_pop(value);
}
private:
void workerThread() {
while (!_done) {
_queue.push(_job());
}
}
std::atomic_bool _done;
Job _job;
ThreadSafeQueue<T> _queue;
const ThreadUniquePointer _thread;
};
} // namespace thread
} // namespace carla

View File

@ -0,0 +1,83 @@
// CARLA, Copyright (C) 2017 Computer Vision Center (CVC)
#pragma once
#include <condition_variable>
// #include <memory>
#include <mutex>
#include <queue>
namespace carla {
namespace thread {
/// A thread safe queue.
///
/// From "C++ Concurrency In Action", Anthony Williams, listing 4.5.
template<typename T>
class CARLA_API ThreadSafeQueue {
public:
ThreadSafeQueue() = default;
ThreadSafeQueue(const ThreadSafeQueue &other) {
std::lock_guard<std::mutex> lock(other._mutex);
_queue = other._queue;
}
void push(T new_value) {
std::lock_guard<std::mutex> lock(_mutex);
_queue.push(new_value);
_condition.notify_one();
}
void wait_and_pop(T &value) {
std::unique_lock<std::mutex> lock(_mutex);
_condition.wait(lock, [this] {return !_queue.empty(); });
value = _queue.front();
_queue.pop();
}
// std::shared_ptr<T> wait_and_pop() {
// std::unique_lock<std::mutex> lock(_mutex);
// _condition.wait(lock, [this] {return !_queue.empty(); });
// std::shared_ptr<T> res(std::make_shared<T>(_queue.front()));
// _queue.pop();
// return res;
// }
bool try_pop(T &value) {
std::lock_guard<std::mutex> lock(_mutex);
if (_queue.empty()) {
return false;
}
value = _queue.front();
_queue.pop();
return true;
}
// std::shared_ptr<T> try_pop() {
// std::lock_guard<std::mutex> lock(_mutex);
// if (_queue.empty()) {
// return std::shared_ptr<T>();
// }
// std::shared_ptr<T> res(std::make_shared<T>(_queue.front()));
// _queue.pop();
// return res;
// }
bool empty() const {
std::lock_guard<std::mutex> lock(_mutex);
return _queue.empty();
}
private:
mutable std::mutex _mutex;
std::queue<T> _queue;
std::condition_variable _condition;
};
} // namespace thread
} // namespace carla

View File

@ -0,0 +1,30 @@
// CARLA, Copyright (C) 2017 Computer Vision Center (CVC)
#pragma once
#include <memory>
#include <thread>
namespace carla {
namespace thread {
template<typename T>
class JoinAndDeletePointer {
public:
void operator()(T *ptr) {
if (ptr) {
if (ptr->joinable()) {
ptr->join();
}
delete ptr;
}
}
};
using ThreadUniquePointer =
std::unique_ptr<std::thread, JoinAndDeletePointer<std::thread>>;
} // namespace thread
} // namespace carla

View File

@ -0,0 +1,8 @@
add_executable(test_async_server async_server.cpp)
target_link_libraries(test_async_server carla_server ${Boost_SYSTEM_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_REGEX_LIBRARY})
add_executable(test_sync_server sync_server.cpp)
target_link_libraries(test_sync_server carla_server ${Boost_SYSTEM_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_REGEX_LIBRARY})
add_executable(test_client client.cpp)
target_link_libraries(test_client carla_server ${Boost_SYSTEM_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_REGEX_LIBRARY})

View File

@ -0,0 +1,66 @@
// CARLA, Copyright (C) 2017 Computer Vision Center (CVC)
#include "Carla.h"
#include <carla/server/CarlaServer.h>
#include <ctime>
#include <iostream>
#include <string>
#include <thread>
enum ErrorCodes {
InvalidArguments,
STLException,
UnknownException,
ErrorSending,
ErrorReading
};
static int toInt(const std::string &str) {
return std::stoi(str);
}
static std::string daytimeString() {
using namespace std;
time_t now = time(0);
std::string str = ctime(&now);
return str;
}
int main(int argc, char* argv[]) {
try {
if (argc != 3) {
std::cerr << "Usage: server <send-port> <read-port>" << std::endl;
return InvalidArguments;
}
// This already starts the two threads.
carla::server::CarlaServer server(toInt(argv[1u]), toInt(argv[2u]));
// Let's simulate the game loop.
for (;;) {
std::cout << "Sending..." << std::endl;
auto time = daytimeString();
server.writeString(time);
using namespace std::chrono_literals;
std::this_thread::sleep_for(1s);
std::cout << "Listening..." << std::endl;
std::string message;
if (server.tryReadString(message)) {
std::cout << "Received: " << message << std::endl;
if ((message == "q") || (message == "quit"))
break;
}
}
} catch (const std::exception &e) {
std::cerr << e.what() << std::endl;
return STLException;
} catch (...) {
std::cerr << "Unknown exception thrown" << std::endl;
return UnknownException;
}
}

View File

@ -0,0 +1,80 @@
// CARLA, Copyright (C) 2017 Computer Vision Center (CVC)
#include "Carla.h"
#include <array>
#include <iostream>
#include <boost/asio.hpp>
enum ErrorCodes {
InvalidArguments,
STLException,
UnknownException
};
int main(int argc, char* argv[]) {
try {
if (argc != 4) {
std::cerr << "Usage: client <host-ip> <read-port> <send-port>" << std::endl;
return InvalidArguments;
}
using boost::asio::ip::tcp;
boost::asio::io_service io_service;
tcp::resolver resolver(io_service);
tcp::resolver::query read_query(argv[1], argv[2]);
tcp::resolver::iterator read_endpoint_iterator = resolver.resolve(read_query);
tcp::resolver::query send_query(argv[1], argv[3]);
tcp::resolver::iterator send_endpoint_iterator = resolver.resolve(send_query);
for (;;) {
// Read message.
{
tcp::socket socket(io_service);
boost::asio::connect(socket, read_endpoint_iterator);
for (;;)
{
std::array<char, 128> buf;
boost::system::error_code error;
size_t len = socket.read_some(boost::asio::buffer(buf), error);
if (error == boost::asio::error::eof)
break; // Connection closed cleanly by peer.
else if (error)
throw boost::system::system_error(error); // Some other error.
std::cout.write(buf.data(), len);
}
}
// Send message.
{
std::cout << std::endl;
std::string reply;
std::cin >> reply;
tcp::socket socket(io_service);
boost::asio::connect(socket, send_endpoint_iterator);
boost::system::error_code error;
boost::asio::write(socket, boost::asio::buffer(reply), error);
if (error)
throw boost::system::system_error(error);
if ((reply == "q") || (reply == "quit"))
break;
}
}
} catch (const std::exception &e) {
std::cerr << e.what() << std::endl;
return STLException;
} catch (...) {
std::cerr << "Unknown exception thrown" << std::endl;
return UnknownException;
}
}

View File

@ -0,0 +1,85 @@
// CARLA, Copyright (C) 2017 Computer Vision Center (CVC)
#include "Carla.h"
#include <carla/server/TCPServer.h>
#include <iostream>
#include <string>
enum ErrorCodes {
InvalidArguments,
STLException,
UnknownException,
ErrorSending,
ErrorReading
};
static int toInt(const std::string &str) {
return std::stoi(str);
}
int main(int argc, char* argv[]) {
try {
if (argc != 2) {
std::cerr << "Usage: server <port>" << std::endl;
return InvalidArguments;
}
using namespace carla::server;
TCPServer server(toInt(argv[1u]));
// Send message.
{
const std::string message = "What's up?";
std::cout << "Sending " << message << "..." << std::endl;
TCPServer::error_code error;
server.writeString(message, error);
if (error) {
std::cerr << "Error sending: " << error.message() << std::endl;
return ErrorSending;
}
}
for (;;) {
// Read message.
{
std::cout << "Reading..." << std::endl;
std::string message;
TCPServer::error_code error;
server.readString(message, error);
if (error && (error != boost::asio::error::eof)) {
std::cerr << "Error reading: " << error.message() << std::endl;
return ErrorReading;
}
std::cout << "They said " << message << std::endl;
}
// Send reply.
{
std::cout << "What do I say now?" << std::endl;
std::string message;
std::cin >> message;
if ((message == "q") || (message == "quit"))
break;
TCPServer::error_code error;
server.writeString(message, error);
if (error) {
std::cerr << "Error sending: " << error.message() << std::endl;
return ErrorSending;
}
}
}
} catch (const std::exception &e) {
std::cerr << e.what() << std::endl;
return STLException;
} catch (...) {
std::cerr << "Unknown exception thrown" << std::endl;
return UnknownException;
}
}

View File

@ -0,0 +1,11 @@
#pragma once
class CARLA_API NonCopyable {
public:
NonCopyable() = default;
NonCopyable(const NonCopyable &) = delete;
void operator=(const NonCopyable &x) = delete;
};