From d7ed3e1f8cdb3bbb72cb6cf31093ad964295efe1 Mon Sep 17 00:00:00 2001 From: nsubiron Date: Sun, 21 Oct 2018 15:22:00 +0200 Subject: [PATCH] Add RecurrentSharedFuture class --- LibCarla/source/carla/RecurrentSharedFuture.h | 126 ++++++++++++++++++ .../test/test_recurrent_shared_future.cpp | 64 +++++++++ 2 files changed, 190 insertions(+) create mode 100644 LibCarla/source/carla/RecurrentSharedFuture.h create mode 100644 LibCarla/source/test/test_recurrent_shared_future.cpp diff --git a/LibCarla/source/carla/RecurrentSharedFuture.h b/LibCarla/source/carla/RecurrentSharedFuture.h new file mode 100644 index 000000000..58bf75dce --- /dev/null +++ b/LibCarla/source/carla/RecurrentSharedFuture.h @@ -0,0 +1,126 @@ +// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma +// de Barcelona (UAB). +// +// This work is licensed under the terms of the MIT license. +// For a copy, see . + +#pragma once + +#include "carla/Time.h" + +#include + +#include +#include +#include +#include + +namespace carla { + +namespace detail { + class SharedException; +} // namespace detail + + // =========================================================================== + // -- RecurrentSharedFuture -------------------------------------------------- + // =========================================================================== + + /// This class is meant to be used similar to a shared future, but the value + /// can be set any number of times. + template + class RecurrentSharedFuture { + public: + + using SharedException = detail::SharedException; + + /// Wait until the next value is set. Any number of threads can be waiting + /// simultaneously. + T WaitFor(time_duration timeout); + + /// Set the value and notify all waiting threads. + template + void SetValue(const T2 &value); + + /// Set a exception, this exception will be thrown on all the threads + /// waiting. + /// + /// @note The @a exception will be stored on a SharedException and thrown + /// as such. + template + void SetException(ExceptionT &&exception); + + private: + + std::mutex _mutex; + + std::condition_variable _cv; + + struct mapped_type { + bool should_wait; + boost::variant value; + }; + + std::map _map; + }; + + // =========================================================================== + // -- RecurrentSharedFuture implementation ----------------------------------- + // =========================================================================== + +namespace detail { + + static thread_local const char thread_tag{}; + + class SharedException : public std::exception { + public: + + SharedException(const SharedException &) = default; + + SharedException(std::shared_ptr e) + : _exception(std::move(e)) {} + + const char *what() const noexcept override { + return _exception->what(); + } + + std::shared_ptr GetException() const { + return _exception; + } + + private: + + std::shared_ptr _exception; + }; + +} // namespace detail + + template + T RecurrentSharedFuture::WaitFor(time_duration timeout) { + std::unique_lock lock(_mutex); + auto &r = _map[&detail::thread_tag]; + r.should_wait = true; + if (!_cv.wait_for(lock, timeout.to_chrono(), [&]() { return !r.should_wait; })) + throw std::runtime_error("RecurrentSharedFuture.WaitFor: time-out"); + if (r.value.which() == 1) + throw boost::get(r.value); + return boost::get(std::move(r.value)); + } + + template + template + void RecurrentSharedFuture::SetValue(const T2 &value) { + std::lock_guard lock(_mutex); + for (auto &pair : _map) { + pair.second.should_wait = false; + pair.second.value = value; + } + _cv.notify_all(); + } + + template + template + void RecurrentSharedFuture::SetException(ExceptionT &&e) { + SetValue(SharedException(std::make_shared(std::forward(e)))); + } + +} // namespace carla diff --git a/LibCarla/source/test/test_recurrent_shared_future.cpp b/LibCarla/source/test/test_recurrent_shared_future.cpp new file mode 100644 index 000000000..41aa6b2ac --- /dev/null +++ b/LibCarla/source/test/test_recurrent_shared_future.cpp @@ -0,0 +1,64 @@ +// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma +// de Barcelona (UAB). +// +// This work is licensed under the terms of the MIT license. +// For a copy, see . + +#include "test.h" + +#include +#include + +TEST(recurrent_shared_future, use_case) { + using namespace carla; + ThreadGroup threads; + RecurrentSharedFuture future; + + constexpr size_t number_of_threads = 12u; + constexpr size_t number_of_openings = 40u; + + std::atomic_size_t count{0u}; + std::atomic_bool done{false}; + + threads.CreateThreads(number_of_threads, [&]() { + while (!done) { + int result = future.WaitFor(1s); + ASSERT_EQ(result, 42); + ++count; + } + }); + + std::this_thread::sleep_for(12ms); + for (auto i = 0u; i < number_of_openings - 1u; ++i) { + future.SetValue(42); + std::this_thread::sleep_for(4ms); + } + done = true; + future.SetValue(42); + threads.JoinAll(); + ASSERT_EQ(count, number_of_openings * number_of_threads); +} + +TEST(recurrent_shared_future, timeout) { + using namespace carla; + RecurrentSharedFuture future; + ASSERT_THROW(future.WaitFor(1ns), std::runtime_error); +} + +TEST(recurrent_shared_future, exception) { + using namespace carla; + ThreadGroup threads; + RecurrentSharedFuture future; + const std::string message = "Uh oh an exception!"; + + threads.CreateThread([&]() { + std::this_thread::sleep_for(10ms); + future.SetException(std::runtime_error(message)); + }); + + try { + future.WaitFor(1s); + } catch (const std::exception &e) { + ASSERT_STREQ(e.what(), message.c_str()); + } +}