send existing actors to all secondary servers when a new connection is done

This commit is contained in:
bernatx 2022-12-22 14:35:40 +01:00 committed by bernat
parent 70db124d0e
commit 2c28eafe28
6 changed files with 108 additions and 26 deletions

View File

@ -70,6 +70,11 @@ void Router::SetCallbacks() {
log_info("Listening at ", _endpoint);
}
void Router::SetNewConnectionCallback(std::function<void(void)> func)
{
_callback = func;
}
void Router::AsyncRun(size_t worker_threads) {
_pool.AsyncRun(worker_threads);
}
@ -83,6 +88,9 @@ void Router::ConnectSession(std::shared_ptr<Primary> session) {
std::lock_guard<std::mutex> lock(_mutex);
_sessions.emplace_back(std::move(session));
log_info("Connected secondary servers:", _sessions.size());
// run external callback for new connections
if (_callback)
_callback();
}
void Router::DisconnectSession(std::shared_ptr<Primary> session) {

View File

@ -44,6 +44,8 @@ namespace multigpu {
void Stop();
void SetCallbacks();
void SetNewConnectionCallback(std::function<void(void)>);
void AsyncRun(size_t worker_threads);
boost::asio::ip::tcp::endpoint GetLocalEndpoint() const;
@ -70,6 +72,7 @@ namespace multigpu {
uint32_t _next;
std::unordered_map<Primary *, std::shared_ptr<std::promise<SessionInfo>>> _promises;
PrimaryCommands _commander;
std::function<void(void)> _callback;
};
} // namespace multigpu

View File

@ -170,6 +170,11 @@ void FCarlaEngine::NotifyInitGame(const UCarlaSettings &Settings)
// we are primary server, starting server
bIsPrimaryServer = true;
SecondaryServer = Server.GetSecondaryServer();
SecondaryServer->SetNewConnectionCallback([this]()
{
this->bNewConnection = true;
UE_LOG(LogCarla, Log, TEXT("New secondary connection detected"));
});
}
}
@ -257,18 +262,19 @@ void FCarlaEngine::OnPreTick(UWorld *, ELevelTick TickType, float DeltaSeconds)
// update frame counter
UpdateFrameCounter();
if (CurrentEpisode != nullptr)
if (CurrentEpisode)
{
CurrentEpisode->TickTimers(DeltaSeconds);
}
if (!bIsPrimaryServer && GetCurrentEpisode())
{
if (FramesToProcess.size())
if (!bIsPrimaryServer)
{
TRACE_CPUPROFILER_EVENT_SCOPE_STR("FramesToProcess.PlayFrameData");
std::lock_guard<std::mutex> Lock(FrameToProcessMutex);
FramesToProcess.front().PlayFrameData(GetCurrentEpisode(), MappedId);
FramesToProcess.erase(FramesToProcess.begin()); // remove first element
if (FramesToProcess.size())
{
TRACE_CPUPROFILER_EVENT_SCOPE_STR("FramesToProcess.PlayFrameData");
std::lock_guard<std::mutex> Lock(FrameToProcessMutex);
FramesToProcess.front().PlayFrameData(CurrentEpisode, MappedId);
FramesToProcess.erase(FramesToProcess.begin()); // remove first element
}
}
}
}
@ -284,10 +290,11 @@ void FCarlaEngine::OnPostTick(UWorld *World, ELevelTick TickType, float DeltaSec
if (bIsPrimaryServer)
{
if (SecondaryServer->HasClientsConnected()) {
GetCurrentEpisode()->GetFrameData().GetFrameData(GetCurrentEpisode());
GetCurrentEpisode()->GetFrameData().GetFrameData(GetCurrentEpisode(), true, bNewConnection);
bNewConnection = false;
std::ostringstream OutStream;
GetCurrentEpisode()->GetFrameData().Write(OutStream);
// send frame data to secondary
std::string Tmp(OutStream.str());
SecondaryServer->GetCommander().SendFrameData(carla::Buffer(std::move((unsigned char *) Tmp.c_str()), (size_t) Tmp.size()));

View File

@ -115,6 +115,7 @@ private:
FDelegateHandle OnEpisodeSettingsChangeHandle;
bool bIsPrimaryServer = true;
bool bNewConnection = false;
std::unordered_map<uint32_t, uint32_t> MappedId;

View File

@ -11,12 +11,17 @@
#include "Carla/Game/CarlaEpisode.h"
void FFrameData::GetFrameData(UCarlaEpisode *ThisEpisode, bool bAdditionalData)
void FFrameData::GetFrameData(UCarlaEpisode *ThisEpisode, bool bAdditionalData, bool bIncludeActorsAgain)
{
Episode = ThisEpisode;
// PlatformTime.UpdateTime();
const FActorRegistry &Registry = Episode->GetActorRegistry();
if (bIncludeActorsAgain)
{
AddExistingActors();
}
// through all actors in registry
for (auto It = Registry.begin(); It != Registry.end(); ++It)
{
@ -75,7 +80,8 @@ void FFrameData::PlayFrameData(
EventAdd.Description,
EventAdd.DatabaseId,
false,
true);
true,
MappedId);
switch (Result.first)
{
// actor not created
@ -87,12 +93,14 @@ void FFrameData::PlayFrameData(
case 1:
// mapping id (recorded Id is a new Id in replayer)
MappedId[OldId] = Result.second;
UE_LOG(LogCarla, Log, TEXT("actor created"));
break;
// actor reused from existing
case 2:
// mapping id (say desired Id is mapped to what)
MappedId[OldId] = Result.second;
UE_LOG(LogCarla, Log, TEXT("actor reused"));
break;
}
}
@ -257,7 +265,8 @@ void FFrameData::CreateRecorderEventAdd(
uint32_t DatabaseId,
uint8_t Type,
const FTransform &Transform,
FActorDescription ActorDescription)
FActorDescription ActorDescription,
bool bAddOtherRelatedInfo)
{
CarlaRecorderActorDescription Description;
Description.UId = ActorDescription.UId;
@ -289,8 +298,18 @@ void FFrameData::CreateRecorderEventAdd(
};
AddEvent(std::move(RecEvent));
FCarlaActor* CarlaActor = Episode->FindCarlaActor(DatabaseId);
if (!bAddOtherRelatedInfo)
{
return;
}
// Other events related to spawning actors
FCarlaActor* CarlaActor = Episode->FindCarlaActor(DatabaseId);
if (!CarlaActor)
{
return;
}
// check if it is a vehicle to get initial physics control
ACarlaWheeledVehicle* Vehicle = Cast<ACarlaWheeledVehicle>(CarlaActor->GetActor());
if (Vehicle)
@ -587,12 +606,13 @@ void FFrameData::GetFrameCounter()
}
// create or reuse an actor for replaying
std::pair<int, FCarlaActor*> FFrameData::TryToCreateReplayerActor(
std::pair<int, FCarlaActor*> FFrameData::CreateOrReuseActor(
FVector &Location,
FVector &Rotation,
FActorDescription &ActorDesc,
uint32_t DesiredId,
bool SpawnSensors)
bool SpawnSensors,
std::unordered_map<uint32_t, uint32_t>& MappedId)
{
check(Episode != nullptr);
@ -603,6 +623,7 @@ std::pair<int, FCarlaActor*> FFrameData::TryToCreateReplayerActor(
if (CarlaActor != nullptr)
{
// reuse that actor
UE_LOG(LogCarla, Log, TEXT("TrafficLight found"));
return std::pair<int, FCarlaActor*>(2, CarlaActor);
}
else
@ -629,6 +650,21 @@ std::pair<int, FCarlaActor*> FFrameData::TryToCreateReplayerActor(
return std::pair<int, FCarlaActor*>(2, CarlaActor);
}
}
else if (MappedId.find(DesiredId) != MappedId.end() && Episode->GetActorRegistry().Contains(MappedId[DesiredId]))
{
auto* CarlaActor = Episode->FindCarlaActor(MappedId[DesiredId]);
const FActorDescription *desc = &CarlaActor->GetActorInfo()->Description;
if (desc->Id == ActorDesc.Id)
{
// we don't need to create, actor of same type already exist
// relocate
FRotator Rot = FRotator::MakeFromEuler(Rotation);
FTransform Trans2(Rot, Location, FVector(1, 1, 1));
CarlaActor->SetActorGlobalTransform(Trans2);
return std::pair<int, FCarlaActor*>(2, CarlaActor);
}
}
// create new actor
// create the transform
FRotator Rot = FRotator::MakeFromEuler(Rotation);
FTransform Trans(Rot, FVector(0, 0, 100000), FVector(1, 1, 1));
@ -648,7 +684,7 @@ std::pair<int, FCarlaActor*> FFrameData::TryToCreateReplayerActor(
}
else
{
UE_LOG(LogCarla, Log, TEXT("Actor could't be created by replayer"));
UE_LOG(LogCarla, Log, TEXT("Actor could't be created"));
return std::pair<int, FCarlaActor*>(0, Result.Value);
}
}
@ -666,7 +702,8 @@ std::pair<int, uint32_t> FFrameData::ProcessReplayerEventAdd(
CarlaRecorderActorDescription Description,
uint32_t DesiredId,
bool bIgnoreHero,
bool ReplaySensors)
bool ReplaySensors,
std::unordered_map<uint32_t, uint32_t>& MappedId)
{
check(Episode != nullptr);
FActorDescription ActorDesc;
@ -687,12 +724,13 @@ std::pair<int, uint32_t> FFrameData::ProcessReplayerEventAdd(
IsHero = true;
}
auto result = TryToCreateReplayerActor(
auto result = CreateOrReuseActor(
Location,
Rotation,
ActorDesc,
DesiredId,
ReplaySensors);
ReplaySensors,
MappedId);
if (result.first != 0)
{
@ -1025,3 +1063,23 @@ FCarlaActor *FFrameData::FindTrafficLightAt(FVector Location)
// actor not found
return nullptr;
}
void FFrameData::AddExistingActors(void)
{
// registring all existing actors in first frame
FActorRegistry Registry = Episode->GetActorRegistry();
for (auto& It : Registry)
{
const FCarlaActor* CarlaActor = It.Value.Get();
if (CarlaActor != nullptr)
{
// create event
CreateRecorderEventAdd(
CarlaActor->GetActorId(),
static_cast<uint8_t>(CarlaActor->GetActorType()),
CarlaActor->GetActorGlobalTransform(),
CarlaActor->GetActorInfo()->Description,
false);
}
}
}

View File

@ -64,7 +64,7 @@ public:
void SetEpisode(UCarlaEpisode* ThisEpisode) {Episode = ThisEpisode;}
void GetFrameData(UCarlaEpisode *ThisEpisode, bool bAdditionalData = false);
void GetFrameData(UCarlaEpisode *ThisEpisode, bool bAdditionalData = false, bool bIncludeActorsAgain = false);
void PlayFrameData(UCarlaEpisode *ThisEpisode, std::unordered_map<uint32_t, uint32_t>& MappedId);
@ -78,7 +78,8 @@ public:
uint32_t DatabaseId,
uint8_t Type,
const FTransform &Transform,
FActorDescription ActorDescription);
FActorDescription ActorDescription,
bool bAddOtherRelatedInfo = true);
void AddEvent(const CarlaRecorderEventAdd &Event);
void AddEvent(const CarlaRecorderEventDel &Event);
void AddEvent(const CarlaRecorderEventParent &Event);
@ -107,12 +108,13 @@ private:
void GetFrameCounter();
std::pair<int, FCarlaActor*> TryToCreateReplayerActor(
std::pair<int, FCarlaActor*> CreateOrReuseActor(
FVector &Location,
FVector &Rotation,
FActorDescription &ActorDesc,
uint32_t DesiredId,
bool SpawnSensors);
bool SpawnSensors,
std::unordered_map<uint32_t, uint32_t>& MappedId);
// replay event for creating actor
std::pair<int, uint32_t> ProcessReplayerEventAdd(
@ -121,7 +123,8 @@ private:
CarlaRecorderActorDescription Description,
uint32_t DesiredId,
bool bIgnoreHero,
bool ReplaySensors);
bool ReplaySensors,
std::unordered_map<uint32_t, uint32_t>& MappedId);
// replay event for removing actor
bool ProcessReplayerEventDel(uint32_t DatabaseId);
@ -156,5 +159,7 @@ private:
FCarlaActor* FindTrafficLightAt(FVector Location);
void AddExistingActors(void);
UCarlaEpisode *Episode;
};