Prepare to upgrade to Boost 1.70 (not upgrading due to bug in Boost.Variant)

This commit is contained in:
nsubiron 2019-05-03 11:54:25 +02:00
parent 32d9715ef8
commit 0466081920
18 changed files with 95 additions and 99 deletions

View File

@ -9,7 +9,7 @@ install(FILES ${libcarla_carla_rpclib} DESTINATION lib)
# Install boost headers (install in system folder to avoid extra warnings).
# @todo Remove boost from public interface of LibCarla.client.
install(DIRECTORY "${BOOST_INCLUDE_PATH}/boost" DESTINATION include/system)
file(GLOB boost_libraries "${BOOST_LIB_PATH}/*")
file(GLOB boost_libraries "${BOOST_LIB_PATH}/*.*")
install(FILES ${boost_libraries} DESTINATION lib)
# Windows need libpng alongside with zlib to be installed

View File

@ -11,7 +11,7 @@
#include "carla/ThreadGroup.h"
#include "carla/Time.h"
#include <boost/asio/io_service.hpp>
#include <boost/asio/io_context.hpp>
#include <future>
#include <thread>
@ -19,20 +19,20 @@
namespace carla {
/// A thread pool based on Boost.Asio's io service.
/// A thread pool based on Boost.Asio's io context.
class ThreadPool : private NonCopyable {
public:
ThreadPool() : _work_to_do(_io_service) {}
ThreadPool() : _work_to_do(_io_context) {}
/// Stops the ThreadPool and joins all its threads.
~ThreadPool() {
Stop();
}
/// Return the underlying io service.
auto &service() {
return _io_service;
/// Return the underlying io_context.
auto &io_context() {
return _io_context;
}
/// Post a task to the pool.
@ -40,7 +40,7 @@ namespace carla {
std::future<ResultT> Post(FunctorT &&functor) {
auto task = std::packaged_task<ResultT()>(std::forward<FunctorT>(functor));
auto future = task.get_future();
_io_service.post(carla::MoveHandler(task));
_io_context.post(carla::MoveHandler(task));
return future;
}
@ -60,7 +60,7 @@ namespace carla {
///
/// @warning This function blocks until the ThreadPool has been stopped.
void Run() {
_io_service.run();
_io_context.run();
}
/// Run tasks in this thread for an specific @a duration.
@ -68,20 +68,20 @@ namespace carla {
/// @warning This function blocks until the ThreadPool has been stopped, or
/// until the specified time duration has elapsed.
void RunFor(time_duration duration) {
_io_service.run_for(duration.to_chrono());
_io_context.run_for(duration.to_chrono());
}
/// Stop the ThreadPool and join all its threads.
void Stop() {
_io_service.stop();
_io_context.stop();
_workers.JoinAll();
}
private:
boost::asio::io_service _io_service;
boost::asio::io_context _io_context;
boost::asio::io_service::work _work_to_do;
boost::asio::io_context::work _work_to_do;
ThreadGroup _workers;
};

View File

@ -11,7 +11,7 @@
#include "carla/rpc/Metadata.h"
#include "carla/rpc/Response.h"
#include <boost/asio/io_service.hpp>
#include <boost/asio/io_context.hpp>
#include <rpc/server.h>
@ -50,8 +50,8 @@ namespace rpc {
}
void SyncRunFor(time_duration duration) {
_sync_io_service.reset();
_sync_io_service.run_for(duration.to_chrono());
_sync_io_context.reset();
_sync_io_context.run_for(duration.to_chrono());
}
/// @warning does not stop the game thread.
@ -61,7 +61,7 @@ namespace rpc {
private:
boost::asio::io_service _sync_io_service;
boost::asio::io_context _sync_io_context;
::rpc::server _server;
};
@ -92,15 +92,15 @@ namespace detail {
/// Wraps @a functor into a function type with equivalent signature. The
/// wrap function returned. When called, posts @a functor into the
/// io_service; if the client called this method synchronously, waits for
/// io_context; if the client called this method synchronously, waits for
/// the posted task to finish, otherwise returns immediately.
///
/// This way, no matter from which thread the wrap function is called, the
/// @a functor provided is always called from the context of the io_service.
/// I.e., we can use the io_service to run tasks on a specific thread (e.g.
/// @a functor provided is always called from the context of the io_context.
/// I.e., we can use the io_context to run tasks on a specific thread (e.g.
/// game thread).
template <typename FuncT>
static auto WrapSyncCall(boost::asio::io_service &io, FuncT &&functor) {
static auto WrapSyncCall(boost::asio::io_context &io, FuncT &&functor) {
return [&io, functor=std::forward<FuncT>(functor)](Metadata metadata, Args... args) -> R {
auto task = std::packaged_task<R()>([functor=std::move(functor), args...]() {
return functor(args...);
@ -147,7 +147,7 @@ namespace detail {
using Wrapper = detail::FunctionWrapper<FunctorT>;
_server.bind(
name,
Wrapper::WrapSyncCall(_sync_io_service, std::forward<FunctorT>(functor)));
Wrapper::WrapSyncCall(_sync_io_context, std::forward<FunctorT>(functor)));
}
template <typename FunctorT>

View File

@ -12,7 +12,7 @@
#include "carla/streaming/detail/tcp/Client.h"
#include "carla/streaming/low_level/Client.h"
#include <boost/asio/io_service.hpp>
#include <boost/asio/io_context.hpp>
namespace carla {
namespace streaming {
@ -37,7 +37,7 @@ namespace streaming {
/// MultiStream).
template <typename Functor>
void Subscribe(const Token &token, Functor &&callback) {
_client.Subscribe(_service.service(), token, std::forward<Functor>(callback));
_client.Subscribe(_service.io_context(), token, std::forward<Functor>(callback));
}
void UnSubscribe(const Token &token) {

View File

@ -6,7 +6,7 @@
#pragma once
#include <boost/asio/io_service.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/address.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ip/udp.hpp>
@ -73,8 +73,8 @@ namespace detail {
}
static inline auto make_address(const std::string &address) {
boost::asio::io_service io_service;
boost::asio::ip::tcp::resolver resolver(io_service);
boost::asio::io_context io_context;
boost::asio::ip::tcp::resolver resolver(io_context);
boost::asio::ip::tcp::resolver::query query(boost::asio::ip::tcp::v4(), address, "", boost::asio::ip::tcp::resolver::query::canonical_name);
boost::asio::ip::tcp::resolver::iterator iter = resolver.resolve(query);
boost::asio::ip::tcp::resolver::iterator end;

View File

@ -10,7 +10,7 @@
#include "carla/streaming/detail/tcp/Server.h"
#include "carla/streaming/low_level/Server.h"
#include <boost/asio/io_service.hpp>
#include <boost/asio/io_context.hpp>
namespace carla {
namespace streaming {
@ -23,21 +23,21 @@ namespace streaming {
public:
explicit Server(uint16_t port)
: _server(_service.service(), make_endpoint<protocol_type>(port)) {}
: _server(_pool.io_context(), make_endpoint<protocol_type>(port)) {}
explicit Server(const std::string &address, uint16_t port)
: _server(_service.service(), make_endpoint<protocol_type>(address, port)) {}
: _server(_pool.io_context(), make_endpoint<protocol_type>(address, port)) {}
explicit Server(
const std::string &address, uint16_t port,
const std::string &external_address, uint16_t external_port)
: _server(
_service.service(),
_pool.io_context(),
make_endpoint<protocol_type>(address, port),
make_endpoint<protocol_type>(external_address, external_port)) {}
~Server() {
_service.Stop();
_pool.Stop();
}
auto GetLocalEndpoint() const {
@ -57,18 +57,18 @@ namespace streaming {
}
void Run() {
_service.Run();
_pool.Run();
}
void AsyncRun(size_t worker_threads) {
_service.AsyncRun(worker_threads);
_pool.AsyncRun(worker_threads);
}
private:
// The order of these two arguments is very important.
ThreadPool _service;
ThreadPool _pool;
underlying_server _server;
};

View File

@ -29,7 +29,7 @@ namespace detail {
Dispatcher::~Dispatcher() {
// Disconnect all the sessions from their streams, this should kill any
// session remaining since at this point the io_service should be already
// session remaining since at this point the io_context should be already
// stopped.
for (auto &pair : _stream_map) {
#ifndef LIBCARLA_NO_EXCEPTIONS

View File

@ -64,16 +64,16 @@ namespace tcp {
// ===========================================================================
Client::Client(
boost::asio::io_service &io_service,
boost::asio::io_context &io_context,
const token_type &token,
callback_function_type callback)
: LIBCARLA_INITIALIZE_LIFETIME_PROFILER(
std::string("tcp client ") + std::to_string(token.get_stream_id())),
_token(token),
_callback(std::move(callback)),
_socket(io_service),
_strand(io_service),
_connection_timer(io_service),
_socket(io_context),
_strand(io_context),
_connection_timer(io_context),
_buffer_pool(std::make_shared<BufferPool>()) {
if (!_token.protocol_is_tcp()) {
throw_exception(std::invalid_argument("invalid token, only TCP tokens supported"));
@ -173,7 +173,7 @@ namespace tcp {
// Move the buffer to the callback function and start reading the next
// piece of data.
log_debug("streaming client: success reading data, calling the callback");
_socket.get_io_service().post([self, message]() { self->_callback(message->pop()); });
_strand.context().post([self, message]() { self->_callback(message->pop()); });
ReadData();
} else {
// As usual, if anything fails start over from the very top.

View File

@ -13,7 +13,7 @@
#include "carla/streaming/detail/Types.h"
#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/strand.hpp>
@ -44,7 +44,7 @@ namespace tcp {
using callback_function_type = std::function<void (Buffer)>;
Client(
boost::asio::io_service &io_service,
boost::asio::io_context &io_context,
const token_type &token,
callback_function_type callback);
@ -70,7 +70,7 @@ namespace tcp {
boost::asio::ip::tcp::socket _socket;
boost::asio::io_service::strand _strand;
boost::asio::io_context::strand _strand;
boost::asio::deadline_timer _connection_timer;

View File

@ -15,8 +15,9 @@ namespace streaming {
namespace detail {
namespace tcp {
Server::Server(boost::asio::io_service &io_service, endpoint ep)
: _acceptor(io_service, std::move(ep)),
Server::Server(boost::asio::io_context &io_context, endpoint ep)
: _io_context(io_context),
_acceptor(_io_context, std::move(ep)),
_timeout(time_duration::seconds(10u)) {}
void Server::OpenSession(
@ -25,7 +26,7 @@ namespace tcp {
ServerSession::callback_function_type on_closed) {
using boost::system::error_code;
auto session = std::make_shared<ServerSession>(_acceptor.get_io_service(), timeout);
auto session = std::make_shared<ServerSession>(_io_context, timeout);
auto handle_query = [on_opened, on_closed, session](const error_code &ec) {
if (!ec) {
@ -37,7 +38,7 @@ namespace tcp {
_acceptor.async_accept(session->_socket, [=](error_code ec) {
// Handle query and open a new session immediately.
_acceptor.get_io_service().post([=]() { handle_query(ec); });
_io_context.post([=]() { handle_query(ec); });
OpenSession(timeout, on_opened, on_closed);
});
}

View File

@ -10,7 +10,7 @@
#include "carla/Time.h"
#include "carla/streaming/detail/tcp/ServerSession.h"
#include <boost/asio/io_service.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <atomic>
@ -20,7 +20,7 @@ namespace streaming {
namespace detail {
namespace tcp {
/// @warning This server cannot be destructed before its @a io_service is
/// @warning This server cannot be destructed before its @a io_context is
/// stopped.
class Server : private NonCopyable {
public:
@ -28,7 +28,7 @@ namespace tcp {
using endpoint = boost::asio::ip::tcp::endpoint;
using protocol_type = endpoint::protocol_type;
explicit Server(boost::asio::io_service &io_service, endpoint ep);
explicit Server(boost::asio::io_context &io_context, endpoint ep);
endpoint GetLocalEndpoint() const {
return _acceptor.local_endpoint();
@ -45,7 +45,7 @@ namespace tcp {
/// is closed.
template <typename FunctorT1, typename FunctorT2>
void Listen(FunctorT1 on_session_opened, FunctorT2 on_session_closed) {
_acceptor.get_io_service().post([=]() {
_io_context.post([=]() {
OpenSession(
_timeout,
std::move(on_session_opened),
@ -60,6 +60,8 @@ namespace tcp {
ServerSession::callback_function_type on_session_opened,
ServerSession::callback_function_type on_session_closed);
boost::asio::io_context &_io_context;
boost::asio::ip::tcp::acceptor _acceptor;
std::atomic<time_duration> _timeout;

View File

@ -22,15 +22,15 @@ namespace tcp {
static std::atomic_size_t SESSION_COUNTER{0u};
ServerSession::ServerSession(
boost::asio::io_service &io_service,
boost::asio::io_context &io_context,
const time_duration timeout)
: LIBCARLA_INITIALIZE_LIFETIME_PROFILER(
std::string("tcp server session ") + std::to_string(SESSION_COUNTER)),
_session_id(SESSION_COUNTER++),
_socket(io_service),
_socket(io_context),
_timeout(timeout),
_deadline(io_service),
_strand(io_service) {}
_deadline(io_context),
_strand(io_context) {}
void ServerSession::Open(
callback_function_type on_opened,
@ -47,7 +47,7 @@ namespace tcp {
if (!ec) {
DEBUG_ASSERT_EQ(bytes_received, sizeof(_stream_id));
log_debug("session", _session_id, "for stream", _stream_id, " started");
_socket.get_io_service().post([=]() { callback(self); });
_strand.context().post([=]() { callback(self); });
} else {
log_error("session", _session_id, ": error retrieving stream id :", ec.message());
CloseNow();
@ -123,7 +123,7 @@ namespace tcp {
if (_socket.is_open()) {
_socket.close();
}
_socket.get_io_service().post([self=shared_from_this()]() {
_strand.context().post([self=shared_from_this()]() {
DEBUG_ASSERT(self->_on_closed);
self->_on_closed(self);
});

View File

@ -14,7 +14,7 @@
#include "carla/streaming/detail/tcp/Message.h"
#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/strand.hpp>
@ -38,7 +38,9 @@ namespace tcp {
using socket_type = boost::asio::ip::tcp::socket;
using callback_function_type = std::function<void(std::shared_ptr<ServerSession>)>;
explicit ServerSession(boost::asio::io_service &io_service, time_duration timeout);
explicit ServerSession(
boost::asio::io_context &io_context,
time_duration timeout);
/// Starts the session and calls @a on_opened after successfully reading the
/// stream id, and @a on_closed once the session is closed.
@ -90,7 +92,7 @@ namespace tcp {
boost::asio::deadline_timer _deadline;
boost::asio::io_service::strand _strand;
boost::asio::io_context::strand _strand;
callback_function_type _on_closed;

View File

@ -9,7 +9,7 @@
#include "carla/streaming/detail/Token.h"
#include "carla/streaming/detail/tcp/Client.h"
#include <boost/asio/io_service.hpp>
#include <boost/asio/io_context.hpp>
#include <memory>
#include <unordered_map>
@ -19,9 +19,9 @@ namespace streaming {
namespace low_level {
/// A client able to subscribe to multiple streams. Accepts an external
/// io_service.
/// io_context.
///
/// @warning The client should not be destroyed before the @a io_service is
/// @warning The client should not be destroyed before the @a io_context is
/// stopped.
template <typename T>
class Client {
@ -50,7 +50,7 @@ namespace low_level {
/// MultiStream).
template <typename Functor>
void Subscribe(
boost::asio::io_service &io_service,
boost::asio::io_context &io_context,
token_type token,
Functor &&callback) {
DEBUG_ASSERT_EQ(_clients.find(token.get_stream_id()), _clients.end());
@ -58,7 +58,7 @@ namespace low_level {
token.set_address(_fallback_address);
}
auto client = std::make_shared<underlying_client>(
io_service,
io_context,
token,
std::forward<Functor>(callback));
client->Connect();

View File

@ -9,7 +9,7 @@
#include "carla/streaming/detail/Dispatcher.h"
#include "carla/streaming/Stream.h"
#include <boost/asio/io_service.hpp>
#include <boost/asio/io_context.hpp>
namespace carla {
namespace streaming {
@ -17,9 +17,9 @@ namespace low_level {
/// A low-level streaming server. Each new stream has a token associated, this
/// token can be used by a client to subscribe to the stream. This server
/// requires an external io_service running.
/// requires an external io_context running.
///
/// @warning This server cannot be destructed before its @a io_service is
/// @warning This server cannot be destructed before its @a io_context is
/// stopped.
template <typename T>
class Server {
@ -31,26 +31,26 @@ namespace low_level {
template <typename InternalEPType, typename ExternalEPType>
explicit Server(
boost::asio::io_service &io_service,
boost::asio::io_context &io_context,
detail::EndPoint<protocol_type, InternalEPType> internal_ep,
detail::EndPoint<protocol_type, ExternalEPType> external_ep)
: _server(io_service, std::move(internal_ep)),
: _server(io_context, std::move(internal_ep)),
_dispatcher(std::move(external_ep)) {
StartServer();
}
template <typename InternalEPType>
explicit Server(
boost::asio::io_service &io_service,
boost::asio::io_context &io_context,
detail::EndPoint<protocol_type, InternalEPType> internal_ep)
: _server(io_service, std::move(internal_ep)),
: _server(io_context, std::move(internal_ep)),
_dispatcher(make_endpoint<protocol_type>(_server.GetLocalEndpoint().port())) {
StartServer();
}
template <typename... EPArgs>
explicit Server(boost::asio::io_service &io_service, EPArgs &&... args)
: Server(io_service, make_endpoint<protocol_type>(std::forward<EPArgs>(args)...)) {}
explicit Server(boost::asio::io_context &io_context, EPArgs &&... args)
: Server(io_context, make_endpoint<protocol_type>(std::forward<EPArgs>(args)...)) {}
typename underlying_server::endpoint GetLocalEndpoint() const {
return _server.GetLocalEndpoint();

View File

@ -6,16 +6,7 @@
#include "Buffer.h"
/// @todo This header uses deprecated functionality, please re-enable
/// pragma-messages after upgrading Boost 1.69 if possible.
#if defined(__clang__)
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-W#pragma-messages"
#endif
# include <boost/random/independent_bits.hpp>
#if defined(__clang__)
# pragma clang diagnostic pop
#endif
#include <boost/random/independent_bits.hpp>
#include <climits>
#include <random>

View File

@ -21,23 +21,23 @@ using namespace std::chrono_literals;
// This is required for low level to properly stop the threads in case of
// exception/assert.
class io_service_running {
class io_context_running {
public:
boost::asio::io_service service;
boost::asio::io_context service;
explicit io_service_running(size_t threads = 2u)
explicit io_context_running(size_t threads = 2u)
: _work_to_do(service) {
_threads.CreateThreads(threads, [this]() { service.run(); });
}
~io_service_running() {
~io_context_running() {
service.stop();
}
private:
boost::asio::io_service::work _work_to_do;
boost::asio::io_context::work _work_to_do;
carla::ThreadGroup _threads;
};
@ -53,7 +53,7 @@ TEST(streaming, low_level_sending_strings) {
std::atomic_size_t message_count{0u};
io_service_running io;
io_context_running io;
Server<tcp::Server> srv(io.service, TESTING_PORT);
srv.SetTimeout(1s);
@ -86,7 +86,7 @@ TEST(streaming, low_level_unsubscribing) {
constexpr auto number_of_messages = 50u;
const std::string message_text = "Hello client!";
io_service_running io;
io_context_running io;
Server<tcp::Server> srv(io.service, TESTING_PORT);
srv.SetTimeout(1s);
@ -124,10 +124,10 @@ TEST(streaming, low_level_tcp_small_message) {
using namespace carla::streaming;
using namespace carla::streaming::detail;
boost::asio::io_service io_service;
boost::asio::io_context io_context;
tcp::Server::endpoint ep(boost::asio::ip::tcp::v4(), TESTING_PORT);
tcp::Server srv(io_service, ep);
tcp::Server srv(io_context, ep);
srv.SetTimeout(1s);
std::atomic_bool done{false};
std::atomic_size_t message_count{0u};
@ -145,7 +145,7 @@ TEST(streaming, low_level_tcp_small_message) {
Dispatcher dispatcher{make_endpoint<tcp::Client::protocol_type>(srv.GetLocalEndpoint())};
auto stream = dispatcher.MakeStream();
auto c = std::make_shared<tcp::Client>(io_service, stream.token(), [&](carla::Buffer message) {
auto c = std::make_shared<tcp::Client>(io_context, stream.token(), [&](carla::Buffer message) {
++message_count;
ASSERT_FALSE(message.empty());
ASSERT_EQ(message.size(), 5u);
@ -158,10 +158,10 @@ TEST(streaming, low_level_tcp_small_message) {
carla::ThreadGroup threads;
threads.CreateThreads(
std::max(2u, std::thread::hardware_concurrency()),
[&]() { io_service.run(); });
[&]() { io_context.run(); });
std::this_thread::sleep_for(2s);
io_service.stop();
io_context.stop();
done = true;
std::cout << "client received " << message_count << " messages\n";
ASSERT_GT(message_count, 10u);

View File

@ -111,9 +111,9 @@ private:
const carla::Buffer _message;
boost::asio::io_service _client_callback;
boost::asio::io_context _client_callback;
boost::asio::io_service::work _work_to_do;
boost::asio::io_context::work _work_to_do;
const double _success_ratio;