Fix synchronization issues when changing episode
This commit is contained in:
parent
579d93e61f
commit
1378d7c3cf
|
@ -6,9 +6,13 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include "carla/AtomicSharedPtr.h"
|
||||
#include "carla/NonCopyable.h"
|
||||
#include "carla/RecurrentSharedFuture.h"
|
||||
#include "carla/client/Timestamp.h"
|
||||
#include "carla/client/detail/CachedActorList.h"
|
||||
#include "carla/client/detail/Client.h"
|
||||
#include "carla/client/detail/CallbackList.h"
|
||||
#include "carla/client/detail/EpisodeState.h"
|
||||
#include "carla/rpc/EpisodeInfo.h"
|
||||
|
||||
namespace carla {
|
||||
|
@ -27,7 +31,11 @@ namespace detail {
|
|||
private NonCopyable {
|
||||
public:
|
||||
|
||||
explicit Episode(uint64_t id) : _id(id) {}
|
||||
explicit Episode(Client &client);
|
||||
|
||||
~Episode();
|
||||
|
||||
void Listen();
|
||||
|
||||
auto GetId() const {
|
||||
return GetState()->GetEpisodeId();
|
||||
|
@ -41,13 +49,14 @@ namespace detail {
|
|||
_actors.Insert(std::move(actor));
|
||||
}
|
||||
|
||||
template <typename RangeT>
|
||||
std::vector<rpc::Actor> GetActors(Client &client, const RangeT &actor_ids) {
|
||||
auto missing_ids = _actors.GetMissingIds(actor_ids);
|
||||
if (!missing_ids.empty()) {
|
||||
_actors.InsertRange(client.GetActorsById(missing_ids));
|
||||
}
|
||||
return _actors.GetActorsById(actor_ids);
|
||||
std::vector<rpc::Actor> GetActors();
|
||||
|
||||
boost::optional<Timestamp> WaitForState(time_duration timeout) {
|
||||
return _timestamp.WaitFor(timeout);
|
||||
}
|
||||
|
||||
void RegisterOnTickEvent(std::function<void(Timestamp)> callback) {
|
||||
_on_tick_callbacks.RegisterCallback(std::move(callback));
|
||||
}
|
||||
|
||||
private:
|
||||
|
|
|
@ -1,83 +0,0 @@
|
|||
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
|
||||
// de Barcelona (UAB).
|
||||
//
|
||||
// This work is licensed under the terms of the MIT license.
|
||||
// For a copy, see <https://opensource.org/licenses/MIT>.
|
||||
|
||||
#include "carla/client/detail/EpisodeHolder.h"
|
||||
|
||||
#include "carla/Logging.h"
|
||||
#include "carla/client/detail/Client.h"
|
||||
#include "carla/sensor/Deserializer.h"
|
||||
|
||||
#include <exception>
|
||||
|
||||
namespace carla {
|
||||
namespace client {
|
||||
namespace detail {
|
||||
|
||||
static auto &CastData(const sensor::SensorData &data) {
|
||||
using target_t = const sensor::data::RawEpisodeState;
|
||||
DEBUG_ASSERT(dynamic_cast<target_t *>(&data) != nullptr);
|
||||
return static_cast<target_t &>(data);
|
||||
}
|
||||
|
||||
EpisodeHolder::EpisodeHolder(Client &client)
|
||||
: EpisodeHolder(client, client.GetEpisodeInfo()) {}
|
||||
|
||||
EpisodeHolder::EpisodeHolder(Client &client, const rpc::EpisodeInfo &info)
|
||||
: _client(client),
|
||||
_episode(std::make_shared<Episode>(info.id)),
|
||||
_state(std::make_shared<EpisodeState>(info.id)),
|
||||
_token(info.token) {}
|
||||
|
||||
EpisodeHolder::~EpisodeHolder() {
|
||||
try {
|
||||
_client.UnSubscribeFromStream(_token);
|
||||
} catch (const std::exception &e) {
|
||||
log_error("exception trying to disconnect from episode:", e.what());
|
||||
}
|
||||
}
|
||||
|
||||
void EpisodeHolder::Listen() {
|
||||
std::weak_ptr<EpisodeHolder> weak = shared_from_this();
|
||||
_client.SubscribeToStream(_token, [weak](auto buffer) {
|
||||
auto self = weak.lock();
|
||||
if (self != nullptr) {
|
||||
auto data = sensor::Deserializer::Deserialize(std::move(buffer));
|
||||
const auto &raw_data = CastData(*data);
|
||||
const auto episode_id = raw_data.GetEpisodeId();
|
||||
|
||||
std::shared_ptr<const EpisodeState> prev;
|
||||
|
||||
if (episode_id != self->GetEpisode()->GetId()) {
|
||||
/// @todo this operation is not atomic, we could potentially end up
|
||||
/// resetting the episode several times.
|
||||
self->_episode.reset(std::make_shared<Episode>(episode_id));
|
||||
prev = std::make_shared<EpisodeState>(episode_id);
|
||||
} else {
|
||||
prev = self->_state.load();
|
||||
}
|
||||
|
||||
auto next = prev->DeriveNextStep(raw_data);
|
||||
self->_state = next;
|
||||
self->_timestamp.SetValue(next->GetTimestamp());
|
||||
self->_on_tick_callbacks.Call(next->GetTimestamp());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
std::vector<rpc::Actor> EpisodeHolder::GetActors() {
|
||||
auto episode = GetEpisode();
|
||||
auto state = GetState();
|
||||
if (episode->GetId() != state->GetEpisodeId()) {
|
||||
// This should never happen...
|
||||
log_error("Error retrieving actors: invalid episode.");
|
||||
return {};
|
||||
}
|
||||
return episode->GetActors(_client, state->GetActorIds());
|
||||
}
|
||||
|
||||
} // namespace detail
|
||||
} // namespace client
|
||||
} // namespace carla
|
|
@ -1,80 +0,0 @@
|
|||
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
|
||||
// de Barcelona (UAB).
|
||||
//
|
||||
// This work is licensed under the terms of the MIT license.
|
||||
// For a copy, see <https://opensource.org/licenses/MIT>.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "carla/AtomicSharedPtr.h"
|
||||
#include "carla/NonCopyable.h"
|
||||
#include "carla/RecurrentSharedFuture.h"
|
||||
#include "carla/client/Timestamp.h"
|
||||
#include "carla/client/detail/CallbackList.h"
|
||||
#include "carla/client/detail/Episode.h"
|
||||
#include "carla/client/detail/EpisodeState.h"
|
||||
|
||||
namespace carla {
|
||||
namespace client {
|
||||
namespace detail {
|
||||
|
||||
class Client;
|
||||
|
||||
/// Holds the current episode, and the current episode state.
|
||||
///
|
||||
/// The episode state changes in the background each time a world tick is
|
||||
/// received. The episode may change with any background update if the
|
||||
/// simulator has loaded a new episode.
|
||||
class EpisodeHolder
|
||||
: public std::enable_shared_from_this<EpisodeHolder>,
|
||||
private NonCopyable {
|
||||
public:
|
||||
|
||||
explicit EpisodeHolder(Client &client);
|
||||
|
||||
~EpisodeHolder();
|
||||
|
||||
void Listen();
|
||||
|
||||
std::shared_ptr<Episode> GetEpisode() {
|
||||
return _episode.load();
|
||||
}
|
||||
|
||||
std::shared_ptr<const EpisodeState> GetState() const {
|
||||
return _state.load();
|
||||
}
|
||||
|
||||
void RegisterActor(rpc::Actor actor) {
|
||||
GetEpisode()->RegisterActor(std::move(actor));
|
||||
}
|
||||
|
||||
std::vector<rpc::Actor> GetActors();
|
||||
|
||||
Timestamp WaitForState(time_duration timeout) {
|
||||
return _timestamp.WaitFor(timeout);
|
||||
}
|
||||
|
||||
void RegisterOnTickEvent(std::function<void(Timestamp)> callback) {
|
||||
_on_tick_callbacks.RegisterCallback(std::move(callback));
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
EpisodeHolder(Client &client, const rpc::EpisodeInfo &info);
|
||||
|
||||
Client &_client;
|
||||
|
||||
AtomicSharedPtr<Episode> _episode;
|
||||
|
||||
AtomicSharedPtr<const EpisodeState> _state;
|
||||
|
||||
RecurrentSharedFuture<Timestamp> _timestamp;
|
||||
|
||||
CallbackList<Timestamp> _on_tick_callbacks;
|
||||
|
||||
const streaming::Token _token;
|
||||
};
|
||||
|
||||
} // namespace detail
|
||||
} // namespace client
|
||||
} // namespace carla
|
|
@ -79,7 +79,7 @@ namespace detail {
|
|||
EpisodeProxy Simulator::GetCurrentEpisode() {
|
||||
if (_episode == nullptr) {
|
||||
ValidateVersions(_client);
|
||||
_episode = std::make_shared<EpisodeHolder>(_client);
|
||||
_episode = std::make_shared<Episode>(_client);
|
||||
_episode->Listen();
|
||||
if (!GetEpisodeSettings().synchronous_mode) {
|
||||
WaitForTick(_client.GetTimeout());
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
#include "carla/client/Vehicle.h"
|
||||
#include "carla/client/Walker.h"
|
||||
#include "carla/client/detail/Client.h"
|
||||
#include "carla/client/detail/EpisodeHolder.h"
|
||||
#include "carla/client/detail/Episode.h"
|
||||
#include "carla/client/detail/EpisodeProxy.h"
|
||||
#include "carla/profiler/LifetimeProfiled.h"
|
||||
#include "carla/rpc/TrafficLightState.h"
|
||||
|
@ -346,7 +346,7 @@ namespace detail {
|
|||
|
||||
Client _client;
|
||||
|
||||
std::shared_ptr<EpisodeHolder> _episode;
|
||||
std::shared_ptr<Episode> _episode;
|
||||
|
||||
GarbageCollectionPolicy _gc_policy;
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue