739 lines
33 KiB
Python
739 lines
33 KiB
Python
#!/usr/bin/env python
|
|
#
|
|
# Copyright (c) 2020 Intel Corporation
|
|
#
|
|
|
|
import glob
|
|
import os
|
|
import sys
|
|
|
|
try:
|
|
sys.path.append(glob.glob(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + '/carla/dist/carla-*%d.%d-%s.egg' % (
|
|
sys.version_info.major,
|
|
sys.version_info.minor,
|
|
'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0])
|
|
except IndexError:
|
|
pass
|
|
|
|
from enum import Enum
|
|
import math
|
|
import numpy as np
|
|
import pygame
|
|
import weakref
|
|
import carla
|
|
from carla import ad
|
|
|
|
|
|
class RssStateVisualizer(object):
|
|
|
|
def __init__(self, display_dimensions, font, world):
|
|
self._surface = None
|
|
self._display_dimensions = display_dimensions
|
|
self._font = font
|
|
self._world = world
|
|
|
|
def tick(self, individual_rss_states):
|
|
state_surface = pygame.Surface((220, self._display_dimensions[1]))
|
|
state_surface.set_colorkey(pygame.Color('black'))
|
|
v_offset = 0
|
|
|
|
if individual_rss_states:
|
|
surface = self._font.render('RSS States:', True, (255, 255, 255))
|
|
state_surface.blit(surface, (8, v_offset))
|
|
v_offset += 26
|
|
for state in individual_rss_states:
|
|
object_name = "Obj"
|
|
if state.rss_state.objectId == 18446744073709551614:
|
|
object_name = "Border Left"
|
|
elif state.rss_state.objectId == 18446744073709551615:
|
|
object_name = "Border Right"
|
|
else:
|
|
other_actor = state.get_actor(self._world)
|
|
if other_actor:
|
|
li = list(other_actor.type_id.split("."))
|
|
if li:
|
|
li.pop(0)
|
|
li = [element.capitalize() for element in li]
|
|
|
|
object_name = " ".join(li).strip()[:15]
|
|
|
|
mode = "?"
|
|
if state.actor_calculation_mode == ad.rss.map.RssMode.Structured:
|
|
mode = "S"
|
|
elif state.actor_calculation_mode == ad.rss.map.RssMode.Unstructured:
|
|
mode = "U"
|
|
elif state.actor_calculation_mode == ad.rss.map.RssMode.NotRelevant:
|
|
mode = "-"
|
|
item = '%4s % 2dm %8s' % (mode, state.distance, object_name)
|
|
|
|
surface = self._font.render(item, True, (255, 255, 255))
|
|
state_surface.blit(surface, (5, v_offset))
|
|
color = (128, 128, 128)
|
|
if state.actor_calculation_mode != ad.rss.map.RssMode.NotRelevant:
|
|
if state.is_dangerous:
|
|
color = (255, 0, 0)
|
|
else:
|
|
color = (0, 255, 0)
|
|
pygame.draw.circle(state_surface, color, (12, v_offset + 7), 5)
|
|
xpos = 184
|
|
if state.actor_calculation_mode == ad.rss.map.RssMode.Structured:
|
|
if not state.rss_state.longitudinalState.isSafe and ((state.rss_state.longitudinalState.rssStateInformation.evaluator == "LongitudinalDistanceSameDirectionOtherInFront") or (state.rss_state.longitudinalState.rssStateInformation.evaluator == "LongitudinalDistanceSameDirectionEgoFront")):
|
|
pygame.draw.polygon(
|
|
state_surface, (
|
|
255, 255, 255), ((xpos + 1, v_offset + 1 + 4), (xpos + 6, v_offset + 1 + 0), (xpos + 11, v_offset + 1 + 4),
|
|
(xpos + 7, v_offset + 1 + 4), (xpos + 7, v_offset + 1 + 12), (xpos + 5, v_offset + 1 + 12), (xpos + 5, v_offset + 1 + 4)))
|
|
xpos += 14
|
|
|
|
if not state.rss_state.longitudinalState.isSafe and ((state.rss_state.longitudinalState.rssStateInformation.evaluator == "LongitudinalDistanceOppositeDirectionEgoCorrectLane") or (state.rss_state.longitudinalState.rssStateInformation.evaluator == "LongitudinalDistanceOppositeDirection")):
|
|
pygame.draw.polygon(
|
|
state_surface, (
|
|
255, 255, 255), ((xpos + 2, v_offset + 1 + 8), (xpos + 6, v_offset + 1 + 12), (xpos + 10, v_offset + 1 + 8),
|
|
(xpos + 7, v_offset + 1 + 8), (xpos + 7, v_offset + 1 + 0), (xpos + 5, v_offset + 1 + 0), (xpos + 5, v_offset + 1 + 8)))
|
|
xpos += 14
|
|
|
|
if not state.rss_state.lateralStateRight.isSafe and not (state.rss_state.lateralStateRight.rssStateInformation.evaluator == "None"):
|
|
pygame.draw.polygon(
|
|
state_surface, (
|
|
255, 255, 255), ((xpos + 0, v_offset + 1 + 4), (xpos + 8, v_offset + 1 + 4), (xpos + 8, v_offset + 1 + 1),
|
|
(xpos + 12, v_offset + 1 + 6), (xpos + 8, v_offset + 1 + 10), (xpos + 8, v_offset + 1 + 8), (xpos + 0, v_offset + 1 + 8)))
|
|
xpos += 14
|
|
if not state.rss_state.lateralStateLeft.isSafe and not (state.rss_state.lateralStateLeft.rssStateInformation.evaluator == "None"):
|
|
pygame.draw.polygon(
|
|
state_surface, (
|
|
255, 255, 255), ((xpos + 0, v_offset + 1 + 6), (xpos + 4, v_offset + 1 + 1), (xpos + 4, v_offset + 1 + 4),
|
|
(xpos + 12, v_offset + 1 + 4), (xpos + 12, v_offset + 1 + 8), (xpos + 4, v_offset + 1 + 8), (xpos + 4, v_offset + 1 + 10)))
|
|
xpos += 14
|
|
elif state.actor_calculation_mode == ad.rss.map.RssMode.Unstructured:
|
|
text = ""
|
|
if state.rss_state.unstructuredSceneState.response == ad.rss.state.UnstructuredSceneResponse.DriveAway:
|
|
text = " D"
|
|
elif state.rss_state.unstructuredSceneState.response == ad.rss.state.UnstructuredSceneResponse.ContinueForward:
|
|
text = " C"
|
|
elif state.rss_state.unstructuredSceneState.response == ad.rss.state.UnstructuredSceneResponse.Brake:
|
|
text = " B"
|
|
surface = self._font.render(text, True, (255, 255, 255))
|
|
state_surface.blit(surface, (xpos, v_offset))
|
|
|
|
v_offset += 14
|
|
self._surface = state_surface
|
|
|
|
def render(self, display, v_offset):
|
|
if self._surface:
|
|
display.blit(self._surface, (0, v_offset))
|
|
|
|
|
|
def get_matrix(transform):
|
|
"""
|
|
Creates matrix from carla transform.
|
|
"""
|
|
|
|
rotation = transform.rotation
|
|
location = transform.location
|
|
c_y = np.cos(np.radians(rotation.yaw))
|
|
s_y = np.sin(np.radians(rotation.yaw))
|
|
c_r = np.cos(np.radians(rotation.roll))
|
|
s_r = np.sin(np.radians(rotation.roll))
|
|
c_p = np.cos(np.radians(rotation.pitch))
|
|
s_p = np.sin(np.radians(rotation.pitch))
|
|
matrix = np.matrix(np.identity(4))
|
|
matrix[0, 3] = location.x
|
|
matrix[1, 3] = location.y
|
|
matrix[2, 3] = location.z
|
|
matrix[0, 0] = c_p * c_y
|
|
matrix[0, 1] = c_y * s_p * s_r - s_y * c_r
|
|
matrix[0, 2] = -c_y * s_p * c_r - s_y * s_r
|
|
matrix[1, 0] = s_y * c_p
|
|
matrix[1, 1] = s_y * s_p * s_r + c_y * c_r
|
|
matrix[1, 2] = -s_y * s_p * c_r + c_y * s_r
|
|
matrix[2, 0] = s_p
|
|
matrix[2, 1] = -c_p * s_r
|
|
matrix[2, 2] = c_p * c_r
|
|
return matrix
|
|
|
|
# ==============================================================================
|
|
# -- RssUnstructuredSceneVisualizer ------------------------------------------------
|
|
# ==============================================================================
|
|
|
|
|
|
class RssUnstructuredSceneVisualizerMode(Enum):
|
|
disabled = 1
|
|
window = 2
|
|
fullscreen = 3
|
|
|
|
|
|
class RssUnstructuredSceneVisualizer(object):
|
|
|
|
def __init__(self, parent_actor, world, display_dimensions):
|
|
self._last_rendered_frame = -1
|
|
self._surface = None
|
|
self._current_rss_surface = None
|
|
self.current_camera_surface = (0, None)
|
|
self._world = world
|
|
self._parent_actor = parent_actor
|
|
self._display_dimensions = display_dimensions
|
|
self._camera = None
|
|
self._mode = RssUnstructuredSceneVisualizerMode.disabled
|
|
|
|
self.restart(RssUnstructuredSceneVisualizerMode.window)
|
|
|
|
def destroy(self):
|
|
if self._camera:
|
|
self._camera.stop()
|
|
self._camera.destroy()
|
|
self._camera = None
|
|
|
|
def restart(self, mode):
|
|
# setup up top down camera
|
|
self.destroy()
|
|
self._mode = mode
|
|
|
|
spawn_sensor = False
|
|
if mode == RssUnstructuredSceneVisualizerMode.window:
|
|
self._dim = (self._display_dimensions[0] / 3, self._display_dimensions[1] / 2)
|
|
spawn_sensor = True
|
|
elif mode == RssUnstructuredSceneVisualizerMode.fullscreen:
|
|
self._dim = (self._display_dimensions[0], self._display_dimensions[1])
|
|
spawn_sensor = True
|
|
else:
|
|
self._surface = None
|
|
|
|
if spawn_sensor:
|
|
self._calibration = np.identity(3)
|
|
self._calibration[0, 2] = self._dim[0] / 2.0
|
|
self._calibration[1, 2] = self._dim[1] / 2.0
|
|
self._calibration[0, 0] = self._calibration[1, 1] = self._dim[0] / \
|
|
(2.0 * np.tan(90.0 * np.pi / 360.0)) # fov default: 90.0
|
|
|
|
bp_library = self._world.get_blueprint_library()
|
|
bp = bp_library.find('sensor.camera.rgb')
|
|
bp.set_attribute('image_size_x', str(self._dim[0]))
|
|
bp.set_attribute('image_size_y', str(self._dim[1]))
|
|
|
|
self._camera = self._world.spawn_actor(
|
|
bp,
|
|
carla.Transform(carla.Location(x=7.5, z=10), carla.Rotation(pitch=-90)),
|
|
attach_to=self._parent_actor)
|
|
# We need to pass the lambda a weak reference to self to avoid
|
|
# circular reference.
|
|
weak_self = weakref.ref(self)
|
|
self._camera.listen(lambda image: self._parse_image(weak_self, image))
|
|
|
|
def update_surface(self, cam_frame, rss_frame):
|
|
if self._mode == RssUnstructuredSceneVisualizerMode.disabled:
|
|
return
|
|
render = False
|
|
|
|
if cam_frame and self._current_rss_surface and self._current_rss_surface[0] == cam_frame:
|
|
render = True
|
|
|
|
if rss_frame and self.current_camera_surface and self.current_camera_surface[0] == rss_frame:
|
|
render = True
|
|
|
|
if render:
|
|
surface = self.current_camera_surface[1]
|
|
surface.blit(self._current_rss_surface[1], (0, 0))
|
|
rect = pygame.Rect((0, 0), (2, surface.get_height()))
|
|
pygame.draw.rect(surface, (0, 0, 0), rect, 0)
|
|
rect = pygame.Rect((0, 0), (surface.get_width(), 2))
|
|
pygame.draw.rect(surface, (0, 0, 0), rect, 0)
|
|
rect = pygame.Rect((0, surface.get_height() - 2), (surface.get_width(), surface.get_height()))
|
|
pygame.draw.rect(surface, (0, 0, 0), rect, 0)
|
|
rect = pygame.Rect((surface.get_width() - 2, 0), (surface.get_width(), surface.get_width()))
|
|
pygame.draw.rect(surface, (0, 0, 0), rect, 0)
|
|
self._surface = surface
|
|
|
|
def toggle_camera(self):
|
|
print("Toggle RssUnstructuredSceneVisualizer")
|
|
if self._mode == RssUnstructuredSceneVisualizerMode.window:
|
|
self.restart(RssUnstructuredSceneVisualizerMode.fullscreen)
|
|
elif self._mode == RssUnstructuredSceneVisualizerMode.fullscreen:
|
|
self.restart(RssUnstructuredSceneVisualizerMode.disabled)
|
|
elif self._mode == RssUnstructuredSceneVisualizerMode.disabled:
|
|
self.restart(RssUnstructuredSceneVisualizerMode.window)
|
|
|
|
@staticmethod
|
|
def _parse_image(weak_self, image):
|
|
self = weak_self()
|
|
if not self:
|
|
return
|
|
image.convert(carla.ColorConverter.Raw)
|
|
array = np.frombuffer(image.raw_data, dtype=np.dtype("uint8"))
|
|
array = np.reshape(array, (image.height, image.width, 4))
|
|
array = array[:, :, :3]
|
|
array = array[:, :, ::-1]
|
|
surface = pygame.surfarray.make_surface(array.swapaxes(0, 1))
|
|
self.current_camera_surface = (image.frame, surface)
|
|
self.update_surface(image.frame, None)
|
|
|
|
@staticmethod
|
|
def rotate_around_point(xy, radians, origin):
|
|
"""Rotate a point around a given point.
|
|
"""
|
|
x, y = xy
|
|
offset_x, offset_y = origin
|
|
adjusted_x = (x - offset_x)
|
|
adjusted_y = (y - offset_y)
|
|
cos_rad = math.cos(radians)
|
|
sin_rad = math.sin(radians)
|
|
qx = offset_x + cos_rad * adjusted_x - sin_rad * adjusted_y
|
|
qy = offset_y + sin_rad * adjusted_x + cos_rad * adjusted_y
|
|
|
|
return qx, qy
|
|
|
|
def tick(self, frame, rss_response, allowed_heading_ranges):
|
|
if not self._camera:
|
|
return
|
|
surface = pygame.Surface(self._dim)
|
|
surface.set_colorkey(pygame.Color('black'))
|
|
surface.set_alpha(180)
|
|
try:
|
|
lines = RssUnstructuredSceneVisualizer.get_trajectory_sets(
|
|
rss_response.rss_state_snapshot, self._camera.get_transform(), self._calibration)
|
|
|
|
polygons = []
|
|
for heading_range in allowed_heading_ranges:
|
|
polygons.append((RssUnstructuredSceneVisualizer.transform_points(
|
|
RssUnstructuredSceneVisualizer._get_points_from_pairs(
|
|
RssUnstructuredSceneVisualizer.draw_heading_range(
|
|
heading_range, rss_response.ego_dynamics_on_route)),
|
|
self._camera.get_transform(), self._calibration), (0, 0, 255)))
|
|
|
|
RssUnstructuredSceneVisualizer.draw_lines(surface, lines)
|
|
RssUnstructuredSceneVisualizer.draw_polygons(surface, polygons)
|
|
|
|
except RuntimeError as e:
|
|
print("ERROR {}".format(e))
|
|
self._current_rss_surface = (frame, surface)
|
|
self.update_surface(None, frame)
|
|
|
|
def render(self, display):
|
|
if self._surface:
|
|
display.blit(self._surface, (display.get_width() - self._dim[0], 0))
|
|
|
|
@staticmethod
|
|
def draw_heading_range(heading_range, ego_dynamics_on_route):
|
|
line = [(float(ego_dynamics_on_route.ego_center.x), float(ego_dynamics_on_route.ego_center.y))]
|
|
length = 3.0
|
|
current_angle = float(heading_range.begin)
|
|
max_angle = float(heading_range.end)
|
|
if heading_range.end < heading_range.begin:
|
|
max_angle += 2.0 * np.pi
|
|
|
|
while current_angle < max_angle:
|
|
line.append((float(ego_dynamics_on_route.ego_center.x) + length * np.cos(current_angle),
|
|
float(ego_dynamics_on_route.ego_center.y) + length * np.sin(current_angle)))
|
|
current_angle += 0.2
|
|
|
|
if current_angle != max_angle:
|
|
line.append((float(ego_dynamics_on_route.ego_center.x) + length * np.cos(max_angle),
|
|
float(ego_dynamics_on_route.ego_center.y) + length * np.sin(max_angle)))
|
|
|
|
line.append((float(ego_dynamics_on_route.ego_center.x), float(ego_dynamics_on_route.ego_center.y)))
|
|
return line
|
|
|
|
@staticmethod
|
|
def get_trajectory_sets(rss_state_snapshot, camera_transform, calibration):
|
|
"""
|
|
Creates 3D bounding boxes based on carla vehicle list and camera.
|
|
"""
|
|
trajectory_sets = []
|
|
|
|
# ego
|
|
trajectory_sets.append((RssUnstructuredSceneVisualizer.transform_points(RssUnstructuredSceneVisualizer._get_trajectory_set_points(
|
|
rss_state_snapshot.unstructuredSceneEgoInformation.brakeTrajectorySet), camera_transform, calibration), (255, 0, 0)))
|
|
trajectory_sets.append((RssUnstructuredSceneVisualizer.transform_points(RssUnstructuredSceneVisualizer._get_trajectory_set_points(
|
|
rss_state_snapshot.unstructuredSceneEgoInformation.continueForwardTrajectorySet), camera_transform, calibration), (0, 255, 0)))
|
|
|
|
# others
|
|
for state in rss_state_snapshot.individualResponses:
|
|
if state.unstructuredSceneState.rssStateInformation.brakeTrajectorySet:
|
|
trajectory_sets.append((RssUnstructuredSceneVisualizer.transform_points(RssUnstructuredSceneVisualizer._get_trajectory_set_points(
|
|
state.unstructuredSceneState.rssStateInformation.brakeTrajectorySet), camera_transform, calibration), (255, 0, 0)))
|
|
if state.unstructuredSceneState.rssStateInformation.continueForwardTrajectorySet:
|
|
trajectory_sets.append((RssUnstructuredSceneVisualizer.transform_points(RssUnstructuredSceneVisualizer._get_trajectory_set_points(
|
|
state.unstructuredSceneState.rssStateInformation.continueForwardTrajectorySet), camera_transform, calibration), (0, 255, 0)))
|
|
|
|
return trajectory_sets
|
|
|
|
@staticmethod
|
|
def draw_lines(surface, lines):
|
|
"""
|
|
Draws lines on pygame display.
|
|
"""
|
|
for line, color in lines:
|
|
if len(line) > 1:
|
|
pygame.draw.lines(surface, color, True, line, 2)
|
|
|
|
@staticmethod
|
|
def draw_polygons(surface, polygons):
|
|
"""
|
|
Draws polygons on pygame display.
|
|
"""
|
|
for polygon, color in polygons:
|
|
if len(polygon) > 1:
|
|
pygame.draw.polygon(surface, color, polygon)
|
|
|
|
@staticmethod
|
|
def transform_points(world_cords, camera_transform, calibration):
|
|
"""
|
|
Returns trajectory set projected to camera view
|
|
"""
|
|
world_cords = np.transpose(world_cords)
|
|
cords_x_y_z = RssUnstructuredSceneVisualizer._world_to_sensor(world_cords, camera_transform)[:3, :]
|
|
cords_y_minus_z_x = np.concatenate([cords_x_y_z[1, :], -cords_x_y_z[2, :], cords_x_y_z[0, :]])
|
|
ts = np.transpose(np.dot(calibration, cords_y_minus_z_x))
|
|
camera_ts = np.concatenate([ts[:, 0] / ts[:, 2], ts[:, 1] / ts[:, 2], ts[:, 2]], axis=1)
|
|
line_to_draw = []
|
|
for point in camera_ts:
|
|
line_to_draw.append((int(point[0, 0]), int(point[0, 1])))
|
|
return line_to_draw
|
|
|
|
@staticmethod
|
|
def _get_trajectory_set_points(trajectory_set):
|
|
"""
|
|
"""
|
|
cords = np.zeros((len(trajectory_set), 4))
|
|
i = 0
|
|
for pt in trajectory_set:
|
|
cords[i, :] = np.array([pt.x, -pt.y, 0, 1])
|
|
i += 1
|
|
return cords
|
|
|
|
@staticmethod
|
|
def _get_points_from_pairs(trajectory_set):
|
|
"""
|
|
"""
|
|
cords = np.zeros((len(trajectory_set), 4))
|
|
i = 0
|
|
for pt in trajectory_set:
|
|
cords[i, :] = np.array([pt[0], -pt[1], 0, 1])
|
|
i += 1
|
|
return cords
|
|
|
|
@staticmethod
|
|
def _world_to_sensor(cords, camera_transform):
|
|
"""
|
|
Transforms world coordinates to sensor.
|
|
"""
|
|
sensor_world_matrix = get_matrix(camera_transform)
|
|
world_sensor_matrix = np.linalg.inv(sensor_world_matrix)
|
|
sensor_cords = np.dot(world_sensor_matrix, cords)
|
|
return sensor_cords
|
|
|
|
# ==============================================================================
|
|
# -- RssBoundingBoxVisualizer ------------------------------------------------------
|
|
# ==============================================================================
|
|
|
|
|
|
class RssBoundingBoxVisualizer(object):
|
|
|
|
def __init__(self, display_dimensions, world, camera):
|
|
self._last_camera_frame = 0
|
|
self._surface_for_frame = []
|
|
self._world = world
|
|
self._dim = display_dimensions
|
|
self._calibration = np.identity(3)
|
|
self._calibration[0, 2] = self._dim[0] / 2.0
|
|
self._calibration[1, 2] = self._dim[1] / 2.0
|
|
self._calibration[0, 0] = self._calibration[1, 1] = self._dim[0] / \
|
|
(2.0 * np.tan(90.0 * np.pi / 360.0)) # fov default: 90.0
|
|
self._camera = camera
|
|
|
|
def tick(self, frame, individual_rss_states):
|
|
if len(self._surface_for_frame) > 0:
|
|
try:
|
|
while self._surface_for_frame[0][0] < self._last_camera_frame:
|
|
self._surface_for_frame.pop(0)
|
|
except IndexError:
|
|
return
|
|
|
|
# only render on new frame
|
|
if len(self._surface_for_frame) > 0:
|
|
if self._surface_for_frame[0][0] == frame:
|
|
return
|
|
|
|
surface = pygame.Surface(self._dim)
|
|
surface.set_colorkey(pygame.Color('black'))
|
|
surface.set_alpha(80)
|
|
try:
|
|
bounding_boxes = RssBoundingBoxVisualizer.get_bounding_boxes(
|
|
individual_rss_states, self._camera.get_transform(), self._calibration, self._world)
|
|
RssBoundingBoxVisualizer.draw_bounding_boxes(surface, bounding_boxes)
|
|
self._surface_for_frame.append((frame, surface, len(bounding_boxes)))
|
|
except RuntimeError:
|
|
pass
|
|
|
|
def render(self, display, current_camera_frame):
|
|
rendered = False
|
|
boxes_to_render = 0
|
|
for frame, surface, box_count in self._surface_for_frame:
|
|
if frame == current_camera_frame:
|
|
display.blit(surface, (0, 0))
|
|
boxes_to_render = box_count
|
|
rendered = True
|
|
break
|
|
if not rendered and boxes_to_render > 0:
|
|
print("Warning: {} bounding boxes were not drawn.".format(boxes_to_render))
|
|
self._last_camera_frame = current_camera_frame
|
|
|
|
@staticmethod
|
|
def get_bounding_boxes(individual_rss_states, camera_transform, calibration, world):
|
|
"""
|
|
Creates 3D bounding boxes based on carla vehicle list and camera.
|
|
"""
|
|
bounding_boxes = []
|
|
for state in individual_rss_states:
|
|
if state.actor_calculation_mode != ad.rss.map.RssMode.NotRelevant and state.is_dangerous:
|
|
other_actor = state.get_actor(world)
|
|
if other_actor:
|
|
bounding_boxes.append(RssBoundingBoxVisualizer.get_bounding_box(
|
|
other_actor, camera_transform, calibration))
|
|
# filter objects behind camera
|
|
bounding_boxes = [bb for bb in bounding_boxes if all(bb[:, 2] > 0)]
|
|
return bounding_boxes
|
|
|
|
@staticmethod
|
|
def draw_bounding_boxes(surface, bounding_boxes, color=pygame.Color('red')):
|
|
"""
|
|
Draws bounding boxes on pygame display.
|
|
"""
|
|
for bbox in bounding_boxes:
|
|
points = [(int(bbox[i, 0]), int(bbox[i, 1])) for i in range(8)]
|
|
# draw lines
|
|
# base
|
|
polygon = [points[0], points[1], points[2], points[3]]
|
|
pygame.draw.polygon(surface, color, polygon)
|
|
# top
|
|
polygon = [points[4], points[5], points[6], points[7]]
|
|
pygame.draw.polygon(surface, color, polygon)
|
|
# base-top
|
|
polygon = [points[0], points[1], points[5], points[4]]
|
|
pygame.draw.polygon(surface, color, polygon)
|
|
polygon = [points[1], points[2], points[6], points[5]]
|
|
pygame.draw.polygon(surface, color, polygon)
|
|
polygon = [points[2], points[6], points[7], points[3]]
|
|
pygame.draw.polygon(surface, color, polygon)
|
|
polygon = [points[0], points[4], points[7], points[3]]
|
|
pygame.draw.polygon(surface, color, polygon)
|
|
|
|
@staticmethod
|
|
def get_bounding_box(vehicle, camera_transform, calibration):
|
|
"""
|
|
Returns 3D bounding box for a vehicle based on camera view.
|
|
"""
|
|
|
|
bb_cords = RssBoundingBoxVisualizer._create_bb_points(vehicle)
|
|
cords_x_y_z = RssBoundingBoxVisualizer._vehicle_to_sensor(bb_cords, vehicle, camera_transform)[:3, :]
|
|
cords_y_minus_z_x = np.concatenate([cords_x_y_z[1, :], -cords_x_y_z[2, :], cords_x_y_z[0, :]])
|
|
bbox = np.transpose(np.dot(calibration, cords_y_minus_z_x))
|
|
camera_bbox = np.concatenate([bbox[:, 0] / bbox[:, 2], bbox[:, 1] / bbox[:, 2], bbox[:, 2]], axis=1)
|
|
return camera_bbox
|
|
|
|
@staticmethod
|
|
def _create_bb_points(vehicle):
|
|
"""
|
|
Returns 3D bounding box for a vehicle.
|
|
"""
|
|
|
|
cords = np.zeros((8, 4))
|
|
extent = vehicle.bounding_box.extent
|
|
cords[0, :] = np.array([extent.x, extent.y, -extent.z, 1])
|
|
cords[1, :] = np.array([-extent.x, extent.y, -extent.z, 1])
|
|
cords[2, :] = np.array([-extent.x, -extent.y, -extent.z, 1])
|
|
cords[3, :] = np.array([extent.x, -extent.y, -extent.z, 1])
|
|
cords[4, :] = np.array([extent.x, extent.y, extent.z, 1])
|
|
cords[5, :] = np.array([-extent.x, extent.y, extent.z, 1])
|
|
cords[6, :] = np.array([-extent.x, -extent.y, extent.z, 1])
|
|
cords[7, :] = np.array([extent.x, -extent.y, extent.z, 1])
|
|
return cords
|
|
|
|
@staticmethod
|
|
def _vehicle_to_sensor(cords, vehicle, camera_transform):
|
|
"""
|
|
Transforms coordinates of a vehicle bounding box to sensor.
|
|
"""
|
|
|
|
world_cord = RssBoundingBoxVisualizer._vehicle_to_world(cords, vehicle)
|
|
sensor_cord = RssBoundingBoxVisualizer._world_to_sensor(world_cord, camera_transform)
|
|
return sensor_cord
|
|
|
|
@staticmethod
|
|
def _vehicle_to_world(cords, vehicle):
|
|
"""
|
|
Transforms coordinates of a vehicle bounding box to world.
|
|
"""
|
|
|
|
bb_transform = carla.Transform(vehicle.bounding_box.location)
|
|
bb_vehicle_matrix = get_matrix(bb_transform)
|
|
vehicle_world_matrix = get_matrix(vehicle.get_transform())
|
|
bb_world_matrix = np.dot(vehicle_world_matrix, bb_vehicle_matrix)
|
|
world_cords = np.dot(bb_world_matrix, np.transpose(cords))
|
|
return world_cords
|
|
|
|
@staticmethod
|
|
def _world_to_sensor(cords, camera_transform):
|
|
"""
|
|
Transforms world coordinates to sensor.
|
|
"""
|
|
|
|
sensor_world_matrix = get_matrix(camera_transform)
|
|
world_sensor_matrix = np.linalg.inv(sensor_world_matrix)
|
|
sensor_cords = np.dot(world_sensor_matrix, cords)
|
|
return sensor_cords
|
|
|
|
# ==============================================================================
|
|
# -- RssDebugVisualizer ------------------------------------------------------------
|
|
# ==============================================================================
|
|
|
|
|
|
class RssDebugVisualizationMode(Enum):
|
|
Off = 1
|
|
RouteOnly = 2
|
|
VehicleStateOnly = 3
|
|
VehicleStateAndRoute = 4
|
|
All = 5
|
|
|
|
|
|
class RssDebugVisualizer(object):
|
|
|
|
def __init__(self, player, world):
|
|
self._world = world
|
|
self._player = player
|
|
self._visualization_mode = RssDebugVisualizationMode.Off
|
|
|
|
def toggleMode(self):
|
|
if self._visualization_mode == RssDebugVisualizationMode.All:
|
|
self._visualization_mode = RssDebugVisualizationMode.Off
|
|
elif self._visualization_mode == RssDebugVisualizationMode.Off:
|
|
self._visualization_mode = RssDebugVisualizationMode.RouteOnly
|
|
elif self._visualization_mode == RssDebugVisualizationMode.RouteOnly:
|
|
self._visualization_mode = RssDebugVisualizationMode.VehicleStateOnly
|
|
elif self._visualization_mode == RssDebugVisualizationMode.VehicleStateOnly:
|
|
self._visualization_mode = RssDebugVisualizationMode.VehicleStateAndRoute
|
|
elif self._visualization_mode == RssDebugVisualizationMode.VehicleStateAndRoute:
|
|
self._visualization_mode = RssDebugVisualizationMode.All
|
|
print("New Debug Visualizer Mode {}".format(self._visualization_mode))
|
|
|
|
def tick(self, route, dangerous, individual_rss_states, ego_dynamics_on_route):
|
|
if self._visualization_mode == RssDebugVisualizationMode.RouteOnly or \
|
|
self._visualization_mode == RssDebugVisualizationMode.VehicleStateAndRoute or \
|
|
self._visualization_mode == RssDebugVisualizationMode.All:
|
|
self.visualize_route(dangerous, route)
|
|
|
|
if self._visualization_mode == RssDebugVisualizationMode.VehicleStateOnly or \
|
|
self._visualization_mode == RssDebugVisualizationMode.VehicleStateAndRoute or \
|
|
self._visualization_mode == RssDebugVisualizationMode.All:
|
|
self.visualize_rss_results(individual_rss_states)
|
|
|
|
if self._visualization_mode == RssDebugVisualizationMode.All:
|
|
self.visualize_ego_dynamics(ego_dynamics_on_route)
|
|
|
|
def visualize_route(self, dangerous, route):
|
|
if not route:
|
|
return
|
|
right_lane_edges = dict()
|
|
left_lane_edges = dict()
|
|
|
|
for road_segment in route.roadSegments:
|
|
right_most_lane = road_segment.drivableLaneSegments[0]
|
|
if right_most_lane.laneInterval.laneId not in right_lane_edges:
|
|
edge = ad.map.route.getRightProjectedENUEdge(right_most_lane.laneInterval)
|
|
right_lane_edges[right_most_lane.laneInterval.laneId] = edge
|
|
intersection_lane = ad.map.intersection.Intersection.isLanePartOfAnIntersection(right_most_lane.laneInterval.laneId)
|
|
|
|
color = carla.Color(r=(128 if dangerous else 255))
|
|
if intersection_lane:
|
|
color.b = 128 if dangerous else 255
|
|
color = carla.Color(r=255, g=0, b=255)
|
|
self.visualize_enu_edge(edge, color, self._player.get_location().z)
|
|
|
|
left_most_lane = road_segment.drivableLaneSegments[-1]
|
|
if left_most_lane.laneInterval.laneId not in left_lane_edges:
|
|
edge = ad.map.route.getLeftProjectedENUEdge(left_most_lane.laneInterval)
|
|
left_lane_edges[left_most_lane.laneInterval.laneId] = edge
|
|
intersection_lane = ad.map.intersection.Intersection.isLanePartOfAnIntersection(left_most_lane.laneInterval.laneId)
|
|
color = carla.Color(g=(128 if dangerous else 255))
|
|
if intersection_lane:
|
|
color.b = 128 if dangerous else 255
|
|
|
|
self.visualize_enu_edge(edge, color, self._player.get_location().z)
|
|
|
|
def visualize_enu_edge(self, edge, color, z_offset):
|
|
for point in edge:
|
|
carla_point = carla.Location(x=float(point.x), y=-1. * float(point.y), z=float(point.z) + z_offset)
|
|
self._world.debug.draw_point(carla_point, 0.1, color, 0.1, False)
|
|
|
|
def visualize_rss_results(self, state_snapshot):
|
|
for state in state_snapshot:
|
|
other_actor = state.get_actor(self._world)
|
|
if not other_actor:
|
|
# print("Actor not found. Skip visualizing state {}".format(state))
|
|
continue
|
|
ego_point = self._player.get_location()
|
|
ego_point.z += 0.05
|
|
yaw = self._player.get_transform().rotation.yaw
|
|
cosine = math.cos(math.radians(yaw))
|
|
sine = math.sin(math.radians(yaw))
|
|
line_offset = carla.Location(-sine * 0.1, cosine * 0.1, 0.0)
|
|
|
|
point = other_actor.get_location()
|
|
point.z += 0.05
|
|
indicator_color = carla.Color(0, 255, 0)
|
|
dangerous = ad.rss.state.isDangerous(state.rss_state)
|
|
if dangerous:
|
|
indicator_color = carla.Color(255, 0, 0)
|
|
elif state.rss_state.situationType == ad.rss.situation.SituationType.NotRelevant:
|
|
indicator_color = carla.Color(150, 150, 150)
|
|
|
|
if self._visualization_mode == RssDebugVisualizationMode.All:
|
|
# the connection lines are only visualized if All is requested
|
|
lon_color = indicator_color
|
|
lat_l_color = indicator_color
|
|
lat_r_color = indicator_color
|
|
if not state.rss_state.longitudinalState.isSafe:
|
|
lon_color.r = 255
|
|
lon_color.g = 0 if dangerous else 255
|
|
if not state.rss_state.lateralStateLeft.isSafe:
|
|
lat_l_color.r = 255
|
|
lat_l_color.g = 0 if dangerous else 255
|
|
if not state.rss_state.lateralStateRight.isSafe:
|
|
lat_r_color.r = 255
|
|
lat_r_color.g = 0 if dangerous else 255
|
|
self._world.debug.draw_line(ego_point, point, 0.1, lon_color, 0.02, False)
|
|
self._world.debug.draw_line(ego_point - line_offset, point -
|
|
line_offset, 0.1, lat_l_color, 0.02, False)
|
|
self._world.debug.draw_line(ego_point + line_offset, point +
|
|
line_offset, 0.1, lat_r_color, 0.02, False)
|
|
point.z += 3.
|
|
self._world.debug.draw_point(point, 0.2, indicator_color, 0.02, False)
|
|
|
|
def visualize_ego_dynamics(self, ego_dynamics_on_route):
|
|
color = carla.Color(0, 0, 255)
|
|
|
|
sin_heading = math.sin(float(ego_dynamics_on_route.route_heading))
|
|
cos_heading = math.cos(float(ego_dynamics_on_route.route_heading))
|
|
|
|
heading_location_start = self._player.get_location()
|
|
heading_location_start.x -= cos_heading * 10.
|
|
heading_location_start.y += sin_heading * 10.
|
|
heading_location_start.z += 0.5
|
|
heading_location_end = self._player.get_location()
|
|
heading_location_end.x += cos_heading * 10.
|
|
heading_location_end.y -= sin_heading * 10.
|
|
heading_location_end.z += 0.5
|
|
|
|
self._world.debug.draw_arrow(heading_location_start, heading_location_end, 0.1, 0.1, color, 0.02, False)
|
|
|
|
sin_center = math.sin(float(ego_dynamics_on_route.route_heading) + math.pi / 2.)
|
|
cos_center = math.cos(float(ego_dynamics_on_route.route_heading) + math.pi / 2.)
|
|
center_location_start = self._player.get_location()
|
|
center_location_start.x -= cos_center * 2.
|
|
center_location_start.y += sin_center * 2.
|
|
center_location_start.z += 0.5
|
|
center_location_end = self._player.get_location()
|
|
center_location_end.x += cos_center * 2.
|
|
center_location_end.y -= sin_center * 2.
|
|
center_location_end.z += 0.5
|
|
|
|
self._world.debug.draw_line(center_location_start, center_location_end, 0.1, color, 0.02, False)
|