From 2c53287dcfc35060abf1c98934f4800ad2688861 Mon Sep 17 00:00:00 2001 From: nsubiron Date: Mon, 8 Oct 2018 13:17:26 +0200 Subject: [PATCH] Make possible to unsubscribe from a sensor stream --- Docs/python_api.md | 2 + LibCarla/source/carla/client/Actor.h | 2 +- LibCarla/source/carla/client/Sensor.cpp | 22 +++++++++ LibCarla/source/carla/client/Sensor.h | 4 ++ .../source/carla/client/detail/Client.cpp | 4 ++ LibCarla/source/carla/client/detail/Client.h | 2 + LibCarla/source/carla/streaming/Client.h | 8 +++- .../carla/streaming/detail/HashableClient.h | 48 ------------------- .../carla/streaming/detail/tcp/Client.cpp | 9 +++- .../carla/streaming/detail/tcp/Client.h | 7 +-- .../source/carla/streaming/low_level/Client.h | 18 +++---- PythonAPI/source/libcarla/Sensor.cpp | 2 + 12 files changed, 64 insertions(+), 64 deletions(-) delete mode 100644 LibCarla/source/carla/streaming/detail/HashableClient.h diff --git a/Docs/python_api.md b/Docs/python_api.md index d601b0ef7..24765cd6c 100644 --- a/Docs/python_api.md +++ b/Docs/python_api.md @@ -80,7 +80,9 @@ ## `carla.Sensor(carla.Actor)` +- `is_listening` - `listen(callback_function)` +- `stop()` ## `carla.Image` diff --git a/LibCarla/source/carla/client/Actor.h b/LibCarla/source/carla/client/Actor.h index 09c2e2859..3f7a207ae 100644 --- a/LibCarla/source/carla/client/Actor.h +++ b/LibCarla/source/carla/client/Actor.h @@ -40,7 +40,7 @@ namespace client { return _is_alive; } - void Destroy(); + virtual void Destroy(); protected: diff --git a/LibCarla/source/carla/client/Sensor.cpp b/LibCarla/source/carla/client/Sensor.cpp index 4d61e0c0b..dba7869bc 100644 --- a/LibCarla/source/carla/client/Sensor.cpp +++ b/LibCarla/source/carla/client/Sensor.cpp @@ -19,6 +19,9 @@ namespace client { "in the simulation:", GetDisplayId()); } + if (_is_listening) { + Stop(); + } } void Sensor::Listen(CallbackFunctionType callback) { @@ -35,5 +38,24 @@ namespace client { _is_listening = true; } + void Sensor::Stop() { + if (!_is_listening) { + log_warning( + "attempting to unsubscribe from stream but sensor wasn't listening:", + GetDisplayId()); + return; + } + GetClientImplementation()->UnSubscribeFromStream( + GetActorDescription().GetStreamToken()); + _is_listening = false; + } + + void Sensor::Destroy() { + if (_is_listening) { + Stop(); + } + Actor::Destroy(); + } + } // namespace client } // namespace carla diff --git a/LibCarla/source/carla/client/Sensor.h b/LibCarla/source/carla/client/Sensor.h index 74e73c21d..a6c87d914 100644 --- a/LibCarla/source/carla/client/Sensor.h +++ b/LibCarla/source/carla/client/Sensor.h @@ -25,10 +25,14 @@ namespace client { void Listen(CallbackFunctionType callback); + void Stop(); + bool IsListening() const { return _is_listening; } + void Destroy() override; + private: bool _is_listening = false; diff --git a/LibCarla/source/carla/client/detail/Client.cpp b/LibCarla/source/carla/client/detail/Client.cpp index 55fd8d374..9fd520e8f 100644 --- a/LibCarla/source/carla/client/detail/Client.cpp +++ b/LibCarla/source/carla/client/detail/Client.cpp @@ -130,6 +130,10 @@ namespace detail { }); } + void Client::UnSubscribeFromStream(const streaming::Token &token) { + _pimpl->streaming_client.UnSubscribe(token); + } + geom::Location Client::GetActorLocation(const Actor &actor) { return _pimpl->CallAndWait("get_actor_location", actor.Serialize()); } diff --git a/LibCarla/source/carla/client/detail/Client.h b/LibCarla/source/carla/client/detail/Client.h index 1199d8d07..3a396dd76 100644 --- a/LibCarla/source/carla/client/detail/Client.h +++ b/LibCarla/source/carla/client/detail/Client.h @@ -76,6 +76,8 @@ namespace detail { const streaming::Token &token, std::function)> callback); + void UnSubscribeFromStream(const streaming::Token &token); + geom::Location GetActorLocation(const Actor &actor); geom::Transform GetActorTransform(const Actor &actor); diff --git a/LibCarla/source/carla/streaming/Client.h b/LibCarla/source/carla/streaming/Client.h index 4d3a99980..2219ce0f0 100644 --- a/LibCarla/source/carla/streaming/Client.h +++ b/LibCarla/source/carla/streaming/Client.h @@ -41,6 +41,10 @@ namespace streaming { _client.Subscribe(_io_service, token, std::forward(callback)); } + void UnSubscribe(const Token &token) { + _client.UnSubscribe(token); + } + void Run() { _io_service.run(); } @@ -49,13 +53,13 @@ namespace streaming { _workers.CreateThreads(worker_threads, [this]() { Run(); }); } + private: + void Stop() { _io_service.stop(); _workers.JoinAll(); } - private: - using underlying_client = low_level::Client; boost::asio::io_service _io_service; diff --git a/LibCarla/source/carla/streaming/detail/HashableClient.h b/LibCarla/source/carla/streaming/detail/HashableClient.h deleted file mode 100644 index beebbbf85..000000000 --- a/LibCarla/source/carla/streaming/detail/HashableClient.h +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma -// de Barcelona (UAB). -// -// This work is licensed under the terms of the MIT license. -// For a copy, see . - -#pragma once - -#include "carla/streaming/detail/Types.h" - -#include - -namespace carla { -namespace streaming { -namespace detail { - - /// A wrapper to put clients into std::unordered_set. - template - class HashableClient : public T { - public: - - template - HashableClient(Args && ... args) - : T(std::forward(args) ...) {} - - bool operator==(const HashableClient &rhs) const { - return T::GetStreamId() == rhs.GetStreamId(); - } - }; - -} // namespace detail -} // namespace streaming -} // namespace carla - -namespace std { - - // Injecting a hash function for our clients into std namespace so we can - // directly insert them into std::unordered_set. - template - struct hash> { - using argument_type = carla::streaming::detail::HashableClient; - using result_type = std::size_t; - result_type operator()(const argument_type &client) const noexcept { - return std::hash()(client.GetStreamId()); - } - }; - -} // namespace std diff --git a/LibCarla/source/carla/streaming/detail/tcp/Client.cpp b/LibCarla/source/carla/streaming/detail/tcp/Client.cpp index c45ec2ef9..88ebbb91b 100644 --- a/LibCarla/source/carla/streaming/detail/tcp/Client.cpp +++ b/LibCarla/source/carla/streaming/detail/tcp/Client.cpp @@ -79,13 +79,15 @@ namespace tcp { } Client::~Client() { - Stop(); + _done = true; + /// @todo Destroying this client is not safe, another thread might be still + /// @using it. } void Client::Stop() { + _done = true; _connection_timer.cancel(); _strand.post([this]() { - _done = true; if (_socket.is_open()) { _socket.close(); } @@ -179,6 +181,9 @@ namespace tcp { DEBUG_ONLY(log_debug("streaming client: Client::ReadData.handle_read_header", bytes, "bytes")); if (!ec && (message->size() > 0u)) { DEBUG_ASSERT_EQ(bytes, sizeof(message_size_type)); + if (_done) { + return; + } // Now that we know the size of the coming buffer, we can allocate our // buffer and start putting data into it. boost::asio::async_read( diff --git a/LibCarla/source/carla/streaming/detail/tcp/Client.h b/LibCarla/source/carla/streaming/detail/tcp/Client.h index 77374c6f8..e11b48053 100644 --- a/LibCarla/source/carla/streaming/detail/tcp/Client.h +++ b/LibCarla/source/carla/streaming/detail/tcp/Client.h @@ -16,6 +16,7 @@ #include #include +#include #include #include @@ -49,10 +50,10 @@ namespace tcp { return _token.get_stream_id(); } - void Stop(); - private: + void Stop(); + void Connect(); void Reconnect(); @@ -71,7 +72,7 @@ namespace tcp { std::shared_ptr _buffer_pool; - bool _done = false; + std::atomic_bool _done{false}; }; } // namespace tcp diff --git a/LibCarla/source/carla/streaming/low_level/Client.h b/LibCarla/source/carla/streaming/low_level/Client.h index 0a29d1a87..e25055b23 100644 --- a/LibCarla/source/carla/streaming/low_level/Client.h +++ b/LibCarla/source/carla/streaming/low_level/Client.h @@ -6,13 +6,12 @@ #pragma once -#include "carla/streaming/detail/HashableClient.h" #include "carla/streaming/detail/Token.h" #include "carla/streaming/detail/tcp/Client.h" #include -#include +#include namespace carla { namespace streaming { @@ -27,7 +26,7 @@ namespace low_level { class Client { public: - using underlying_client = detail::HashableClient; + using underlying_client = T; using protocol_type = typename underlying_client::protocol_type; using token_type = carla::streaming::detail::token_type; @@ -48,17 +47,20 @@ namespace low_level { if (!token.has_address()) { token.set_address(_fallback_address); } - _clients.emplace( - io_service, - token, - std::forward(callback)); + _clients.emplace(std::piecewise_construct, + std::forward_as_tuple(token.get_stream_id()), + std::forward_as_tuple(io_service, token, std::forward(callback))); + } + + void UnSubscribe(token_type token) { + _clients.erase(token.get_stream_id()); } private: boost::asio::ip::address _fallback_address; - std::unordered_set _clients; + std::unordered_map _clients; }; } // namespace low_level diff --git a/PythonAPI/source/libcarla/Sensor.cpp b/PythonAPI/source/libcarla/Sensor.cpp index ec7735766..38f95e9e4 100644 --- a/PythonAPI/source/libcarla/Sensor.cpp +++ b/PythonAPI/source/libcarla/Sensor.cpp @@ -31,7 +31,9 @@ void export_sensor() { namespace cc = carla::client; class_, boost::noncopyable, boost::shared_ptr>("Sensor", no_init) + .add_property("is_listening", &cc::Sensor::IsListening) .def("listen", &SubscribeToStream, (arg("callback"))) + .def("stop", &cc::Sensor::Stop) .def(self_ns::str(self_ns::self)) ; }