Make possible to unsubscribe from a sensor stream

This commit is contained in:
nsubiron 2018-10-08 13:17:26 +02:00
parent 91aa984350
commit 2c53287dcf
12 changed files with 64 additions and 64 deletions

View File

@ -80,7 +80,9 @@
## `carla.Sensor(carla.Actor)`
- `is_listening`
- `listen(callback_function)`
- `stop()`
## `carla.Image`

View File

@ -40,7 +40,7 @@ namespace client {
return _is_alive;
}
void Destroy();
virtual void Destroy();
protected:

View File

@ -19,6 +19,9 @@ namespace client {
"in the simulation:",
GetDisplayId());
}
if (_is_listening) {
Stop();
}
}
void Sensor::Listen(CallbackFunctionType callback) {
@ -35,5 +38,24 @@ namespace client {
_is_listening = true;
}
void Sensor::Stop() {
if (!_is_listening) {
log_warning(
"attempting to unsubscribe from stream but sensor wasn't listening:",
GetDisplayId());
return;
}
GetClientImplementation()->UnSubscribeFromStream(
GetActorDescription().GetStreamToken());
_is_listening = false;
}
void Sensor::Destroy() {
if (_is_listening) {
Stop();
}
Actor::Destroy();
}
} // namespace client
} // namespace carla

View File

@ -25,10 +25,14 @@ namespace client {
void Listen(CallbackFunctionType callback);
void Stop();
bool IsListening() const {
return _is_listening;
}
void Destroy() override;
private:
bool _is_listening = false;

View File

@ -130,6 +130,10 @@ namespace detail {
});
}
void Client::UnSubscribeFromStream(const streaming::Token &token) {
_pimpl->streaming_client.UnSubscribe(token);
}
geom::Location Client::GetActorLocation(const Actor &actor) {
return _pimpl->CallAndWait<geom::Location>("get_actor_location", actor.Serialize());
}

View File

@ -76,6 +76,8 @@ namespace detail {
const streaming::Token &token,
std::function<void(SharedPtr<sensor::SensorData>)> callback);
void UnSubscribeFromStream(const streaming::Token &token);
geom::Location GetActorLocation(const Actor &actor);
geom::Transform GetActorTransform(const Actor &actor);

View File

@ -41,6 +41,10 @@ namespace streaming {
_client.Subscribe(_io_service, token, std::forward<Functor>(callback));
}
void UnSubscribe(const Token &token) {
_client.UnSubscribe(token);
}
void Run() {
_io_service.run();
}
@ -49,13 +53,13 @@ namespace streaming {
_workers.CreateThreads(worker_threads, [this]() { Run(); });
}
private:
void Stop() {
_io_service.stop();
_workers.JoinAll();
}
private:
using underlying_client = low_level::Client<detail::tcp::Client>;
boost::asio::io_service _io_service;

View File

@ -1,48 +0,0 @@
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.
#pragma once
#include "carla/streaming/detail/Types.h"
#include <functional>
namespace carla {
namespace streaming {
namespace detail {
/// A wrapper to put clients into std::unordered_set.
template <typename T>
class HashableClient : public T {
public:
template <typename ... Args>
HashableClient(Args && ... args)
: T(std::forward<Args>(args) ...) {}
bool operator==(const HashableClient &rhs) const {
return T::GetStreamId() == rhs.GetStreamId();
}
};
} // namespace detail
} // namespace streaming
} // namespace carla
namespace std {
// Injecting a hash function for our clients into std namespace so we can
// directly insert them into std::unordered_set.
template <typename T>
struct hash<carla::streaming::detail::HashableClient<T>> {
using argument_type = carla::streaming::detail::HashableClient<T>;
using result_type = std::size_t;
result_type operator()(const argument_type &client) const noexcept {
return std::hash<carla::streaming::detail::stream_id_type>()(client.GetStreamId());
}
};
} // namespace std

View File

@ -79,13 +79,15 @@ namespace tcp {
}
Client::~Client() {
Stop();
_done = true;
/// @todo Destroying this client is not safe, another thread might be still
/// @using it.
}
void Client::Stop() {
_done = true;
_connection_timer.cancel();
_strand.post([this]() {
_done = true;
if (_socket.is_open()) {
_socket.close();
}
@ -179,6 +181,9 @@ namespace tcp {
DEBUG_ONLY(log_debug("streaming client: Client::ReadData.handle_read_header", bytes, "bytes"));
if (!ec && (message->size() > 0u)) {
DEBUG_ASSERT_EQ(bytes, sizeof(message_size_type));
if (_done) {
return;
}
// Now that we know the size of the coming buffer, we can allocate our
// buffer and start putting data into it.
boost::asio::async_read(

View File

@ -16,6 +16,7 @@
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/strand.hpp>
#include <atomic>
#include <functional>
#include <memory>
@ -49,10 +50,10 @@ namespace tcp {
return _token.get_stream_id();
}
void Stop();
private:
void Stop();
void Connect();
void Reconnect();
@ -71,7 +72,7 @@ namespace tcp {
std::shared_ptr<BufferPool> _buffer_pool;
bool _done = false;
std::atomic_bool _done{false};
};
} // namespace tcp

View File

@ -6,13 +6,12 @@
#pragma once
#include "carla/streaming/detail/HashableClient.h"
#include "carla/streaming/detail/Token.h"
#include "carla/streaming/detail/tcp/Client.h"
#include <boost/asio/io_service.hpp>
#include <unordered_set>
#include <unordered_map>
namespace carla {
namespace streaming {
@ -27,7 +26,7 @@ namespace low_level {
class Client {
public:
using underlying_client = detail::HashableClient<T>;
using underlying_client = T;
using protocol_type = typename underlying_client::protocol_type;
using token_type = carla::streaming::detail::token_type;
@ -48,17 +47,20 @@ namespace low_level {
if (!token.has_address()) {
token.set_address(_fallback_address);
}
_clients.emplace(
io_service,
token,
std::forward<Functor>(callback));
_clients.emplace(std::piecewise_construct,
std::forward_as_tuple(token.get_stream_id()),
std::forward_as_tuple(io_service, token, std::forward<Functor>(callback)));
}
void UnSubscribe(token_type token) {
_clients.erase(token.get_stream_id());
}
private:
boost::asio::ip::address _fallback_address;
std::unordered_set<underlying_client> _clients;
std::unordered_map<detail::stream_id_type, underlying_client> _clients;
};
} // namespace low_level

View File

@ -31,7 +31,9 @@ void export_sensor() {
namespace cc = carla::client;
class_<cc::Sensor, bases<cc::Actor>, boost::noncopyable, boost::shared_ptr<cc::Sensor>>("Sensor", no_init)
.add_property("is_listening", &cc::Sensor::IsListening)
.def("listen", &SubscribeToStream, (arg("callback")))
.def("stop", &cc::Sensor::Stop)
.def(self_ns::str(self_ns::self))
;
}