Allow removing on tick callbacks

This commit is contained in:
nsubiron 2019-07-08 17:05:33 +02:00 committed by Néstor Subirón
parent 408e01db3e
commit 39f5c4da49
14 changed files with 105 additions and 42 deletions

View File

@ -31,6 +31,7 @@
* API extension: added `WorldSnapshot` that contains a list of `ActorSnapshot`, allows capturings a "still image" of the world at a single frame
* API extension: `world.tick()` now synchronizes with the simulator and returns the id of the newly started frame
* API extension: `world.apply_settings(settings)` now synchronizes with the simulator and returns the id of the frame when the settings took effect
* API extension: added `world.remove_on_tick(id)` to allow removing on tick callbacks
* API change: Rename `frame_count` and `frame_number` as `frame`, old members are kept as deprecated
* API change: `world.wait_for_tick()` now returns a `carla.WorldSnapshot`
* API change: the callback of `world.on_tick(callback)` now receives a `carla.WorldSnapshot`

View File

@ -41,7 +41,8 @@
- `spawn_actor(blueprint, transform, attach_to=None)`
- `try_spawn_actor(blueprint, transform, attach_to=None, attachment_type=carla.AttachmentType.Rigid)`
- `wait_for_tick(seconds=1.0) -> carla.WorldSnapshot`
- `on_tick(callback)`
- `on_tick(callback) -> id of the callback`
- `remove_on_tick(id)`
- `tick() -> int (id of the newly started frame)`
## `carla.WorldSettings`

View File

@ -20,8 +20,9 @@ namespace detail {
///
/// @warning Only Load method is atomic, modifications to the list are locked
/// with a mutex.
template <typename T, typename ListT = std::vector<T>>
template <typename T>
class AtomicList : private NonCopyable {
using ListT = std::vector<T>;
public:
AtomicList() : _list(std::make_shared<ListT>()) {}
@ -30,17 +31,27 @@ namespace detail {
void Push(ValueT &&value) {
std::lock_guard<std::mutex> lock(_mutex);
auto new_list = std::make_shared<ListT>(*Load());
new_list->push_back(std::forward<ValueT>(value));
new_list->emplace_back(std::forward<ValueT>(value));
_list = new_list;
}
void Delete(unsigned int index) {
void DeleteByIndex(size_t index) {
std::lock_guard<std::mutex> lock(_mutex);
auto new_list = std::make_shared<ListT>(*Load());
new_list->erase(new_list->begin() + index);
auto begin = new_list->begin();
std::advance(begin, index);
new_list->erase(begin);
_list = new_list;
}
template <typename ValueT>
void DeleteByValue(const ValueT &value) {
std::lock_guard<std::mutex> lock(_mutex);
auto list = std::make_shared<ListT>(*Load());
list->erase(std::remove(list->begin(), list->end(), value), list->end());
_list = list;
}
void Clear() {
std::lock_guard<std::mutex> lock(_mutex);
_list = std::make_shared<ListT>();

View File

@ -18,7 +18,7 @@ namespace client {
GnssSensor::~GnssSensor() = default;
void GnssSensor::Listen(CallbackFunctionType callback) {
if (_is_listening) {
if (IsListening()) {
log_error(GetDisplayId(), ": already listening");
return;
}
@ -35,7 +35,7 @@ namespace client {
auto self = boost::static_pointer_cast<GnssSensor>(shared_from_this());
log_debug(GetDisplayId(), ": subscribing to tick event");
GetEpisode().Lock()->RegisterOnTickEvent([
_callback_id = GetEpisode().Lock()->RegisterOnTickEvent([
cb=std::move(callback),
weak_self=WeakPtr<GnssSensor>(self)](const auto &snapshot) {
auto self = weak_self.lock();
@ -46,8 +46,6 @@ namespace client {
}
}
});
_is_listening = true;
}
SharedPtr<sensor::SensorData> GnssSensor::TickGnssSensor(
@ -59,15 +57,17 @@ namespace client {
GetTransform(),
_geo_reference.Transform(GetLocation()));
} catch (const std::exception &e) {
/// @todo We need to unsubscribe the sensor.
// log_error("GnssSensor:", e.what());
log_error("GnssSensor:", e.what());
Stop();
return nullptr;
}
}
void GnssSensor::Stop() {
/// @todo We need unsubscribe from the world on tick.
_is_listening = false;
if (_callback_id.has_value()) {
GetEpisode().Lock()->RemoveOnTickEvent(*_callback_id);
_callback_id = boost::none;
}
}
} // namespace client

View File

@ -5,16 +5,18 @@
#pragma once
#include "carla/client/Sensor.h"
#include "carla/client/ClientSideSensor.h"
#include "carla/geom/GeoLocation.h"
#include <boost/optional.hpp>
namespace carla {
namespace client {
class GnssSensor final : public Sensor {
class GnssSensor final : public ClientSideSensor {
public:
using Sensor::Sensor;
using ClientSideSensor::ClientSideSensor;
~GnssSensor();
@ -33,7 +35,7 @@ namespace client {
/// Return whether this Sensor instance is currently listening to the
/// associated sensor in the simulator.
bool IsListening() const override {
return _is_listening;
return _callback_id.has_value();
}
private:
@ -42,7 +44,7 @@ namespace client {
geom::GeoLocation _geo_reference;
bool _is_listening = false;
boost::optional<size_t> _callback_id;
};
} // namespace client

View File

@ -43,7 +43,7 @@ namespace client {
LaneInvasionSensor::~LaneInvasionSensor() = default;
void LaneInvasionSensor::Listen(CallbackFunctionType callback) {
if (_is_listening) {
if (IsListening()) {
log_error(GetDisplayId(), ": already listening");
return;
}
@ -60,7 +60,7 @@ namespace client {
auto self = boost::static_pointer_cast<LaneInvasionSensor>(shared_from_this());
log_debug(GetDisplayId(), ": subscribing to tick event");
GetEpisode().Lock()->RegisterOnTickEvent([
_callback_id = GetEpisode().Lock()->RegisterOnTickEvent([
cb=std::move(callback),
weak_self=WeakPtr<LaneInvasionSensor>(self)](const auto &snapshot) {
auto self = weak_self.lock();
@ -71,12 +71,13 @@ namespace client {
}
}
});
_is_listening = true;
}
void LaneInvasionSensor::Stop() {
/// @todo We need unsubscribe from the world on tick.
_is_listening = false;
if (_callback_id.has_value()) {
GetEpisode().Lock()->RemoveOnTickEvent(*_callback_id);
_callback_id = boost::none;
}
}
SharedPtr<sensor::SensorData> LaneInvasionSensor::TickLaneInvasionSensor(
@ -98,8 +99,8 @@ namespace client {
_vehicle,
crossed_lanes);
} catch (const std::exception &e) {
/// @todo We need to unsubscribe the sensor.
// log_error("LaneInvasionSensor:", e.what());
log_error("LaneInvasionSensor:", e.what());
Stop();
return nullptr;
}
}

View File

@ -9,6 +9,8 @@
#include "carla/client/ClientSideSensor.h"
#include "carla/geom/Location.h"
#include <boost/optional.hpp>
#include <array>
namespace carla {
@ -38,20 +40,20 @@ namespace client {
/// Return whether this Sensor instance is currently listening to the
/// associated sensor in the simulator.
bool IsListening() const override {
return _is_listening;
return _callback_id.has_value();
}
private:
SharedPtr<sensor::SensorData> TickLaneInvasionSensor(const Timestamp &timestamp);
bool _is_listening = false;
SharedPtr<Map> _map;
SharedPtr<Vehicle> _vehicle;
std::array<geom::Location, 4u> _bounds;
boost::optional<size_t> _callback_id;
};
} // namespace client

View File

@ -97,10 +97,14 @@ namespace client {
return _episode.Lock()->WaitForTick(timeout);
}
void World::OnTick(std::function<void(WorldSnapshot)> callback) {
size_t World::OnTick(std::function<void(WorldSnapshot)> callback) {
return _episode.Lock()->RegisterOnTickEvent(std::move(callback));
}
void World::RemoveOnTick(size_t callback_id) {
_episode.Lock()->RemoveOnTickEvent(callback_id);
}
uint64_t World::Tick() {
return _episode.Lock()->Tick();
}

View File

@ -104,7 +104,12 @@ namespace client {
WorldSnapshot WaitForTick(time_duration timeout) const;
/// Register a @a callback to be called every time a world tick is received.
void OnTick(std::function<void(WorldSnapshot)> callback);
///
/// @return ID of the callback, use it to remove the callback.
size_t OnTick(std::function<void(WorldSnapshot)> callback);
/// Remove a callback registered with OnTick.
void RemoveOnTick(size_t callback_id);
/// Signal the simulator to continue to next tick (only has effect on
/// synchronous mode).

View File

@ -9,6 +9,7 @@
#include "carla/AtomicList.h"
#include "carla/NonCopyable.h"
#include <atomic>
#include <functional>
namespace carla {
@ -23,13 +24,19 @@ namespace detail {
void Call(InputsT... args) const {
auto list = _list.Load();
for (auto &callback : *list) {
callback(args...);
for (auto &item : *list) {
item.callback(args...);
}
}
void RegisterCallback(CallbackType &&callback) {
_list.Push(std::move(callback));
size_t Push(CallbackType &&callback) {
auto id = ++_counter;
_list.Push(Item{id, std::move(callback)});
return id;
}
void Remove(size_t id) {
_list.DeleteByValue(id);
}
void Clear() {
@ -38,7 +45,26 @@ namespace detail {
private:
AtomicList<CallbackType> _list;
struct Item {
size_t id;
CallbackType callback;
friend bool operator==(const Item &lhs, const Item &rhs) {
return lhs.id == rhs.id;
}
friend bool operator==(const Item &lhs, size_t rhs) {
return lhs.id == rhs;
}
friend bool operator==(size_t lhs, const Item &rhs) {
return lhs == rhs.id;
}
};
std::atomic_size_t _counter{0u};
AtomicList<Item> _list;
};
} // namespace detail

View File

@ -71,8 +71,12 @@ namespace detail {
return _snapshot.WaitFor(timeout);
}
void RegisterOnTickEvent(std::function<void(WorldSnapshot)> callback) {
_on_tick_callbacks.RegisterCallback(std::move(callback));
size_t RegisterOnTickEvent(std::function<void(WorldSnapshot)> callback) {
return _on_tick_callbacks.Push(std::move(callback));
}
void RemoveOnTickEvent(size_t id) {
_on_tick_callbacks.Remove(id);
}
private:

View File

@ -140,9 +140,14 @@ namespace detail {
WorldSnapshot WaitForTick(time_duration timeout);
void RegisterOnTickEvent(std::function<void(WorldSnapshot)> callback) {
size_t RegisterOnTickEvent(std::function<void(WorldSnapshot)> callback) {
DEBUG_ASSERT(_episode != nullptr);
_episode->RegisterOnTickEvent(std::move(callback));
return _episode->RegisterOnTickEvent(std::move(callback));
}
void RemoveOnTickEvent(size_t id) {
DEBUG_ASSERT(_episode != nullptr);
_episode->RemoveOnTickEvent(id);
}
uint64_t Tick();

View File

@ -41,7 +41,7 @@ namespace detail {
while (i < list->size()) {
if ((*list)[i].walker == walker_id &&
(*list)[i].controller == controller_id) {
_walkers.Delete(i);
_walkers.DeleteByIndex(i);
break;
}
++i;

View File

@ -52,8 +52,8 @@ static auto WaitForTick(const carla::client::World &world, double seconds) {
return world.WaitForTick(TimeDurationFromSeconds(seconds));
}
static void OnTick(carla::client::World &self, boost::python::object callback) {
self.OnTick(MakeCallback(std::move(callback)));
static size_t OnTick(carla::client::World &self, boost::python::object callback) {
return self.OnTick(MakeCallback(std::move(callback)));
}
static auto GetActorsById(carla::client::World &self, const boost::python::list &actor_ids) {
@ -145,6 +145,7 @@ void export_world() {
.def("try_spawn_actor", SPAWN_ACTOR_WITHOUT_GIL(TrySpawnActor))
.def("wait_for_tick", &WaitForTick, (arg("seconds")=10.0))
.def("on_tick", &OnTick, (arg("callback")))
.def("remove_on_tick", &cc::World::RemoveOnTick, (arg("callback_id")))
.def("tick", CALL_WITHOUT_GIL(cc::World, Tick))
.def(self_ns::str(self_ns::self))
;