#!/usr/bin/env python # # Copyright (c) 2020 Intel Corporation # import glob import os import sys try: sys.path.append(glob.glob(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + '/carla/dist/carla-*%d.%d-%s.egg' % ( sys.version_info.major, sys.version_info.minor, 'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0]) except IndexError: pass from enum import Enum import math import numpy as np import pygame import weakref import carla from carla import ad class RssStateVisualizer(object): def __init__(self, display_dimensions, font, world): self._surface = None self._display_dimensions = display_dimensions self._font = font self._world = world def tick(self, individual_rss_states): state_surface = pygame.Surface((220, self._display_dimensions[1])) state_surface.set_colorkey(pygame.Color('black')) v_offset = 0 if individual_rss_states: surface = self._font.render('RSS States:', True, (255, 255, 255)) state_surface.blit(surface, (8, v_offset)) v_offset += 26 for state in individual_rss_states: object_name = "Obj" if state.rss_state.objectId == 18446744073709551614: object_name = "Border Left" elif state.rss_state.objectId == 18446744073709551615: object_name = "Border Right" else: other_actor = state.get_actor(self._world) if other_actor: li = list(other_actor.type_id.split(".")) if li: li.pop(0) li = [element.capitalize() for element in li] object_name = " ".join(li).strip()[:15] mode = "?" if state.actor_calculation_mode == ad.rss.map.RssMode.Structured: mode = "S" elif state.actor_calculation_mode == ad.rss.map.RssMode.Unstructured: mode = "U" elif state.actor_calculation_mode == ad.rss.map.RssMode.NotRelevant: mode = "-" item = '%4s % 2dm %8s' % (mode, state.distance, object_name) surface = self._font.render(item, True, (255, 255, 255)) state_surface.blit(surface, (5, v_offset)) color = (128, 128, 128) if state.actor_calculation_mode != ad.rss.map.RssMode.NotRelevant: if state.is_dangerous: color = (255, 0, 0) else: color = (0, 255, 0) pygame.draw.circle(state_surface, color, (12, v_offset + 7), 5) xpos = 184 if state.actor_calculation_mode == ad.rss.map.RssMode.Structured: if not state.rss_state.longitudinalState.isSafe and ((state.rss_state.longitudinalState.rssStateInformation.evaluator == "LongitudinalDistanceSameDirectionOtherInFront") or (state.rss_state.longitudinalState.rssStateInformation.evaluator == "LongitudinalDistanceSameDirectionEgoFront")): pygame.draw.polygon( state_surface, ( 255, 255, 255), ((xpos + 1, v_offset + 1 + 4), (xpos + 6, v_offset + 1 + 0), (xpos + 11, v_offset + 1 + 4), (xpos + 7, v_offset + 1 + 4), (xpos + 7, v_offset + 1 + 12), (xpos + 5, v_offset + 1 + 12), (xpos + 5, v_offset + 1 + 4))) xpos += 14 if not state.rss_state.longitudinalState.isSafe and ((state.rss_state.longitudinalState.rssStateInformation.evaluator == "LongitudinalDistanceOppositeDirectionEgoCorrectLane") or (state.rss_state.longitudinalState.rssStateInformation.evaluator == "LongitudinalDistanceOppositeDirection")): pygame.draw.polygon( state_surface, ( 255, 255, 255), ((xpos + 2, v_offset + 1 + 8), (xpos + 6, v_offset + 1 + 12), (xpos + 10, v_offset + 1 + 8), (xpos + 7, v_offset + 1 + 8), (xpos + 7, v_offset + 1 + 0), (xpos + 5, v_offset + 1 + 0), (xpos + 5, v_offset + 1 + 8))) xpos += 14 if not state.rss_state.lateralStateRight.isSafe and not (state.rss_state.lateralStateRight.rssStateInformation.evaluator == "None"): pygame.draw.polygon( state_surface, ( 255, 255, 255), ((xpos + 0, v_offset + 1 + 4), (xpos + 8, v_offset + 1 + 4), (xpos + 8, v_offset + 1 + 1), (xpos + 12, v_offset + 1 + 6), (xpos + 8, v_offset + 1 + 10), (xpos + 8, v_offset + 1 + 8), (xpos + 0, v_offset + 1 + 8))) xpos += 14 if not state.rss_state.lateralStateLeft.isSafe and not (state.rss_state.lateralStateLeft.rssStateInformation.evaluator == "None"): pygame.draw.polygon( state_surface, ( 255, 255, 255), ((xpos + 0, v_offset + 1 + 6), (xpos + 4, v_offset + 1 + 1), (xpos + 4, v_offset + 1 + 4), (xpos + 12, v_offset + 1 + 4), (xpos + 12, v_offset + 1 + 8), (xpos + 4, v_offset + 1 + 8), (xpos + 4, v_offset + 1 + 10))) xpos += 14 elif state.actor_calculation_mode == ad.rss.map.RssMode.Unstructured: text = "" if state.rss_state.unstructuredSceneState.response == ad.rss.state.UnstructuredSceneResponse.DriveAway: text = " D" elif state.rss_state.unstructuredSceneState.response == ad.rss.state.UnstructuredSceneResponse.ContinueForward: text = " C" elif state.rss_state.unstructuredSceneState.response == ad.rss.state.UnstructuredSceneResponse.Brake: text = " B" surface = self._font.render(text, True, (255, 255, 255)) state_surface.blit(surface, (xpos, v_offset)) v_offset += 14 self._surface = state_surface def render(self, display, v_offset): if self._surface: display.blit(self._surface, (0, v_offset)) def get_matrix(transform): """ Creates matrix from carla transform. """ rotation = transform.rotation location = transform.location c_y = np.cos(np.radians(rotation.yaw)) s_y = np.sin(np.radians(rotation.yaw)) c_r = np.cos(np.radians(rotation.roll)) s_r = np.sin(np.radians(rotation.roll)) c_p = np.cos(np.radians(rotation.pitch)) s_p = np.sin(np.radians(rotation.pitch)) matrix = np.matrix(np.identity(4)) matrix[0, 3] = location.x matrix[1, 3] = location.y matrix[2, 3] = location.z matrix[0, 0] = c_p * c_y matrix[0, 1] = c_y * s_p * s_r - s_y * c_r matrix[0, 2] = -c_y * s_p * c_r - s_y * s_r matrix[1, 0] = s_y * c_p matrix[1, 1] = s_y * s_p * s_r + c_y * c_r matrix[1, 2] = -s_y * s_p * c_r + c_y * s_r matrix[2, 0] = s_p matrix[2, 1] = -c_p * s_r matrix[2, 2] = c_p * c_r return matrix # ============================================================================== # -- RssUnstructuredSceneVisualizer ------------------------------------------------ # ============================================================================== class RssUnstructuredSceneVisualizerMode(Enum): disabled = 1 window = 2 fullscreen = 3 class RssUnstructuredSceneVisualizer(object): def __init__(self, parent_actor, world, display_dimensions): self._last_rendered_frame = -1 self._surface = None self._current_rss_surface = None self.current_camera_surface = (0, None) self._world = world self._parent_actor = parent_actor self._display_dimensions = display_dimensions self._camera = None self._mode = RssUnstructuredSceneVisualizerMode.disabled self.restart(RssUnstructuredSceneVisualizerMode.window) def destroy(self): if self._camera: self._camera.stop() self._camera.destroy() self._camera = None def restart(self, mode): # setup up top down camera self.destroy() self._mode = mode spawn_sensor = False if mode == RssUnstructuredSceneVisualizerMode.window: self._dim = (self._display_dimensions[0] / 3, self._display_dimensions[1] / 2) spawn_sensor = True elif mode == RssUnstructuredSceneVisualizerMode.fullscreen: self._dim = (self._display_dimensions[0], self._display_dimensions[1]) spawn_sensor = True else: self._surface = None if spawn_sensor: self._calibration = np.identity(3) self._calibration[0, 2] = self._dim[0] / 2.0 self._calibration[1, 2] = self._dim[1] / 2.0 self._calibration[0, 0] = self._calibration[1, 1] = self._dim[0] / \ (2.0 * np.tan(90.0 * np.pi / 360.0)) # fov default: 90.0 bp_library = self._world.get_blueprint_library() bp = bp_library.find('sensor.camera.rgb') bp.set_attribute('image_size_x', str(self._dim[0])) bp.set_attribute('image_size_y', str(self._dim[1])) self._camera = self._world.spawn_actor( bp, carla.Transform(carla.Location(x=7.5, z=10), carla.Rotation(pitch=-90)), attach_to=self._parent_actor) # We need to pass the lambda a weak reference to self to avoid # circular reference. weak_self = weakref.ref(self) self._camera.listen(lambda image: self._parse_image(weak_self, image)) def update_surface(self, cam_frame, rss_frame): if self._mode == RssUnstructuredSceneVisualizerMode.disabled: return render = False if cam_frame and self._current_rss_surface and self._current_rss_surface[0] == cam_frame: render = True if rss_frame and self.current_camera_surface and self.current_camera_surface[0] == rss_frame: render = True if render: surface = self.current_camera_surface[1] surface.blit(self._current_rss_surface[1], (0, 0)) rect = pygame.Rect((0, 0), (2, surface.get_height())) pygame.draw.rect(surface, (0, 0, 0), rect, 0) rect = pygame.Rect((0, 0), (surface.get_width(), 2)) pygame.draw.rect(surface, (0, 0, 0), rect, 0) rect = pygame.Rect((0, surface.get_height() - 2), (surface.get_width(), surface.get_height())) pygame.draw.rect(surface, (0, 0, 0), rect, 0) rect = pygame.Rect((surface.get_width() - 2, 0), (surface.get_width(), surface.get_width())) pygame.draw.rect(surface, (0, 0, 0), rect, 0) self._surface = surface def toggle_camera(self): print("Toggle RssUnstructuredSceneVisualizer") if self._mode == RssUnstructuredSceneVisualizerMode.window: self.restart(RssUnstructuredSceneVisualizerMode.fullscreen) elif self._mode == RssUnstructuredSceneVisualizerMode.fullscreen: self.restart(RssUnstructuredSceneVisualizerMode.disabled) elif self._mode == RssUnstructuredSceneVisualizerMode.disabled: self.restart(RssUnstructuredSceneVisualizerMode.window) @staticmethod def _parse_image(weak_self, image): self = weak_self() if not self: return image.convert(carla.ColorConverter.Raw) array = np.frombuffer(image.raw_data, dtype=np.dtype("uint8")) array = np.reshape(array, (image.height, image.width, 4)) array = array[:, :, :3] array = array[:, :, ::-1] surface = pygame.surfarray.make_surface(array.swapaxes(0, 1)) self.current_camera_surface = (image.frame, surface) self.update_surface(image.frame, None) @staticmethod def rotate_around_point(xy, radians, origin): """Rotate a point around a given point. """ x, y = xy offset_x, offset_y = origin adjusted_x = (x - offset_x) adjusted_y = (y - offset_y) cos_rad = math.cos(radians) sin_rad = math.sin(radians) qx = offset_x + cos_rad * adjusted_x - sin_rad * adjusted_y qy = offset_y + sin_rad * adjusted_x + cos_rad * adjusted_y return qx, qy def tick(self, frame, rss_response, allowed_heading_ranges): if not self._camera: return surface = pygame.Surface(self._dim) surface.set_colorkey(pygame.Color('black')) surface.set_alpha(180) try: lines = RssUnstructuredSceneVisualizer.get_trajectory_sets( rss_response.rss_state_snapshot, self._camera.get_transform(), self._calibration) polygons = [] for heading_range in allowed_heading_ranges: polygons.append((RssUnstructuredSceneVisualizer.transform_points( RssUnstructuredSceneVisualizer._get_points_from_pairs( RssUnstructuredSceneVisualizer.draw_heading_range( heading_range, rss_response.ego_dynamics_on_route)), self._camera.get_transform(), self._calibration), (0, 0, 255))) RssUnstructuredSceneVisualizer.draw_lines(surface, lines) RssUnstructuredSceneVisualizer.draw_polygons(surface, polygons) except RuntimeError as e: print("ERROR {}".format(e)) self._current_rss_surface = (frame, surface) self.update_surface(None, frame) def render(self, display): if self._surface: display.blit(self._surface, (display.get_width() - self._dim[0], 0)) @staticmethod def draw_heading_range(heading_range, ego_dynamics_on_route): line = [(float(ego_dynamics_on_route.ego_center.x), float(ego_dynamics_on_route.ego_center.y))] length = 3.0 current_angle = float(heading_range.begin) max_angle = float(heading_range.end) if heading_range.end < heading_range.begin: max_angle += 2.0 * np.pi while current_angle < max_angle: line.append((float(ego_dynamics_on_route.ego_center.x) + length * np.cos(current_angle), float(ego_dynamics_on_route.ego_center.y) + length * np.sin(current_angle))) current_angle += 0.2 if current_angle != max_angle: line.append((float(ego_dynamics_on_route.ego_center.x) + length * np.cos(max_angle), float(ego_dynamics_on_route.ego_center.y) + length * np.sin(max_angle))) line.append((float(ego_dynamics_on_route.ego_center.x), float(ego_dynamics_on_route.ego_center.y))) return line @staticmethod def get_trajectory_sets(rss_state_snapshot, camera_transform, calibration): """ Creates 3D bounding boxes based on carla vehicle list and camera. """ trajectory_sets = [] # ego trajectory_sets.append((RssUnstructuredSceneVisualizer.transform_points(RssUnstructuredSceneVisualizer._get_trajectory_set_points( rss_state_snapshot.unstructuredSceneEgoInformation.brakeTrajectorySet), camera_transform, calibration), (255, 0, 0))) trajectory_sets.append((RssUnstructuredSceneVisualizer.transform_points(RssUnstructuredSceneVisualizer._get_trajectory_set_points( rss_state_snapshot.unstructuredSceneEgoInformation.continueForwardTrajectorySet), camera_transform, calibration), (0, 255, 0))) # others for state in rss_state_snapshot.individualResponses: if state.unstructuredSceneState.rssStateInformation.brakeTrajectorySet: trajectory_sets.append((RssUnstructuredSceneVisualizer.transform_points(RssUnstructuredSceneVisualizer._get_trajectory_set_points( state.unstructuredSceneState.rssStateInformation.brakeTrajectorySet), camera_transform, calibration), (255, 0, 0))) if state.unstructuredSceneState.rssStateInformation.continueForwardTrajectorySet: trajectory_sets.append((RssUnstructuredSceneVisualizer.transform_points(RssUnstructuredSceneVisualizer._get_trajectory_set_points( state.unstructuredSceneState.rssStateInformation.continueForwardTrajectorySet), camera_transform, calibration), (0, 255, 0))) return trajectory_sets @staticmethod def draw_lines(surface, lines): """ Draws lines on pygame display. """ for line, color in lines: if len(line) > 1: pygame.draw.lines(surface, color, True, line, 2) @staticmethod def draw_polygons(surface, polygons): """ Draws polygons on pygame display. """ for polygon, color in polygons: if len(polygon) > 1: pygame.draw.polygon(surface, color, polygon) @staticmethod def transform_points(world_cords, camera_transform, calibration): """ Returns trajectory set projected to camera view """ world_cords = np.transpose(world_cords) cords_x_y_z = RssUnstructuredSceneVisualizer._world_to_sensor(world_cords, camera_transform)[:3, :] cords_y_minus_z_x = np.concatenate([cords_x_y_z[1, :], -cords_x_y_z[2, :], cords_x_y_z[0, :]]) ts = np.transpose(np.dot(calibration, cords_y_minus_z_x)) camera_ts = np.concatenate([ts[:, 0] / ts[:, 2], ts[:, 1] / ts[:, 2], ts[:, 2]], axis=1) line_to_draw = [] for point in camera_ts: line_to_draw.append((int(point[0, 0]), int(point[0, 1]))) return line_to_draw @staticmethod def _get_trajectory_set_points(trajectory_set): """ """ cords = np.zeros((len(trajectory_set), 4)) i = 0 for pt in trajectory_set: cords[i, :] = np.array([pt.x, -pt.y, 0, 1]) i += 1 return cords @staticmethod def _get_points_from_pairs(trajectory_set): """ """ cords = np.zeros((len(trajectory_set), 4)) i = 0 for pt in trajectory_set: cords[i, :] = np.array([pt[0], -pt[1], 0, 1]) i += 1 return cords @staticmethod def _world_to_sensor(cords, camera_transform): """ Transforms world coordinates to sensor. """ sensor_world_matrix = get_matrix(camera_transform) world_sensor_matrix = np.linalg.inv(sensor_world_matrix) sensor_cords = np.dot(world_sensor_matrix, cords) return sensor_cords # ============================================================================== # -- RssBoundingBoxVisualizer ------------------------------------------------------ # ============================================================================== class RssBoundingBoxVisualizer(object): def __init__(self, display_dimensions, world, camera): self._last_camera_frame = 0 self._surface_for_frame = [] self._world = world self._dim = display_dimensions self._calibration = np.identity(3) self._calibration[0, 2] = self._dim[0] / 2.0 self._calibration[1, 2] = self._dim[1] / 2.0 self._calibration[0, 0] = self._calibration[1, 1] = self._dim[0] / \ (2.0 * np.tan(90.0 * np.pi / 360.0)) # fov default: 90.0 self._camera = camera def tick(self, frame, individual_rss_states): if len(self._surface_for_frame) > 0: try: while self._surface_for_frame[0][0] < self._last_camera_frame: self._surface_for_frame.pop(0) except IndexError: return # only render on new frame if len(self._surface_for_frame) > 0: if self._surface_for_frame[0][0] == frame: return surface = pygame.Surface(self._dim) surface.set_colorkey(pygame.Color('black')) surface.set_alpha(80) try: bounding_boxes = RssBoundingBoxVisualizer.get_bounding_boxes( individual_rss_states, self._camera.get_transform(), self._calibration, self._world) RssBoundingBoxVisualizer.draw_bounding_boxes(surface, bounding_boxes) self._surface_for_frame.append((frame, surface, len(bounding_boxes))) except RuntimeError: pass def render(self, display, current_camera_frame): rendered = False boxes_to_render = 0 for frame, surface, box_count in self._surface_for_frame: if frame == current_camera_frame: display.blit(surface, (0, 0)) boxes_to_render = box_count rendered = True break if not rendered and boxes_to_render > 0: print("Warning: {} bounding boxes were not drawn.".format(boxes_to_render)) self._last_camera_frame = current_camera_frame @staticmethod def get_bounding_boxes(individual_rss_states, camera_transform, calibration, world): """ Creates 3D bounding boxes based on carla vehicle list and camera. """ bounding_boxes = [] for state in individual_rss_states: if state.actor_calculation_mode != ad.rss.map.RssMode.NotRelevant and state.is_dangerous: other_actor = state.get_actor(world) if other_actor: bounding_boxes.append(RssBoundingBoxVisualizer.get_bounding_box( other_actor, camera_transform, calibration)) # filter objects behind camera bounding_boxes = [bb for bb in bounding_boxes if all(bb[:, 2] > 0)] return bounding_boxes @staticmethod def draw_bounding_boxes(surface, bounding_boxes, color=pygame.Color('red')): """ Draws bounding boxes on pygame display. """ for bbox in bounding_boxes: points = [(int(bbox[i, 0]), int(bbox[i, 1])) for i in range(8)] # draw lines # base polygon = [points[0], points[1], points[2], points[3]] pygame.draw.polygon(surface, color, polygon) # top polygon = [points[4], points[5], points[6], points[7]] pygame.draw.polygon(surface, color, polygon) # base-top polygon = [points[0], points[1], points[5], points[4]] pygame.draw.polygon(surface, color, polygon) polygon = [points[1], points[2], points[6], points[5]] pygame.draw.polygon(surface, color, polygon) polygon = [points[2], points[6], points[7], points[3]] pygame.draw.polygon(surface, color, polygon) polygon = [points[0], points[4], points[7], points[3]] pygame.draw.polygon(surface, color, polygon) @staticmethod def get_bounding_box(vehicle, camera_transform, calibration): """ Returns 3D bounding box for a vehicle based on camera view. """ bb_cords = RssBoundingBoxVisualizer._create_bb_points(vehicle) cords_x_y_z = RssBoundingBoxVisualizer._vehicle_to_sensor(bb_cords, vehicle, camera_transform)[:3, :] cords_y_minus_z_x = np.concatenate([cords_x_y_z[1, :], -cords_x_y_z[2, :], cords_x_y_z[0, :]]) bbox = np.transpose(np.dot(calibration, cords_y_minus_z_x)) camera_bbox = np.concatenate([bbox[:, 0] / bbox[:, 2], bbox[:, 1] / bbox[:, 2], bbox[:, 2]], axis=1) return camera_bbox @staticmethod def _create_bb_points(vehicle): """ Returns 3D bounding box for a vehicle. """ cords = np.zeros((8, 4)) extent = vehicle.bounding_box.extent cords[0, :] = np.array([extent.x, extent.y, -extent.z, 1]) cords[1, :] = np.array([-extent.x, extent.y, -extent.z, 1]) cords[2, :] = np.array([-extent.x, -extent.y, -extent.z, 1]) cords[3, :] = np.array([extent.x, -extent.y, -extent.z, 1]) cords[4, :] = np.array([extent.x, extent.y, extent.z, 1]) cords[5, :] = np.array([-extent.x, extent.y, extent.z, 1]) cords[6, :] = np.array([-extent.x, -extent.y, extent.z, 1]) cords[7, :] = np.array([extent.x, -extent.y, extent.z, 1]) return cords @staticmethod def _vehicle_to_sensor(cords, vehicle, camera_transform): """ Transforms coordinates of a vehicle bounding box to sensor. """ world_cord = RssBoundingBoxVisualizer._vehicle_to_world(cords, vehicle) sensor_cord = RssBoundingBoxVisualizer._world_to_sensor(world_cord, camera_transform) return sensor_cord @staticmethod def _vehicle_to_world(cords, vehicle): """ Transforms coordinates of a vehicle bounding box to world. """ bb_transform = carla.Transform(vehicle.bounding_box.location) bb_vehicle_matrix = get_matrix(bb_transform) vehicle_world_matrix = get_matrix(vehicle.get_transform()) bb_world_matrix = np.dot(vehicle_world_matrix, bb_vehicle_matrix) world_cords = np.dot(bb_world_matrix, np.transpose(cords)) return world_cords @staticmethod def _world_to_sensor(cords, camera_transform): """ Transforms world coordinates to sensor. """ sensor_world_matrix = get_matrix(camera_transform) world_sensor_matrix = np.linalg.inv(sensor_world_matrix) sensor_cords = np.dot(world_sensor_matrix, cords) return sensor_cords # ============================================================================== # -- RssDebugVisualizer ------------------------------------------------------------ # ============================================================================== class RssDebugVisualizationMode(Enum): Off = 1 RouteOnly = 2 VehicleStateOnly = 3 VehicleStateAndRoute = 4 All = 5 class RssDebugVisualizer(object): def __init__(self, player, world): self._world = world self._player = player self._visualization_mode = RssDebugVisualizationMode.Off def toggleMode(self): if self._visualization_mode == RssDebugVisualizationMode.All: self._visualization_mode = RssDebugVisualizationMode.Off elif self._visualization_mode == RssDebugVisualizationMode.Off: self._visualization_mode = RssDebugVisualizationMode.RouteOnly elif self._visualization_mode == RssDebugVisualizationMode.RouteOnly: self._visualization_mode = RssDebugVisualizationMode.VehicleStateOnly elif self._visualization_mode == RssDebugVisualizationMode.VehicleStateOnly: self._visualization_mode = RssDebugVisualizationMode.VehicleStateAndRoute elif self._visualization_mode == RssDebugVisualizationMode.VehicleStateAndRoute: self._visualization_mode = RssDebugVisualizationMode.All print("New Debug Visualizer Mode {}".format(self._visualization_mode)) def tick(self, route, dangerous, individual_rss_states, ego_dynamics_on_route): if self._visualization_mode == RssDebugVisualizationMode.RouteOnly or \ self._visualization_mode == RssDebugVisualizationMode.VehicleStateAndRoute or \ self._visualization_mode == RssDebugVisualizationMode.All: self.visualize_route(dangerous, route) if self._visualization_mode == RssDebugVisualizationMode.VehicleStateOnly or \ self._visualization_mode == RssDebugVisualizationMode.VehicleStateAndRoute or \ self._visualization_mode == RssDebugVisualizationMode.All: self.visualize_rss_results(individual_rss_states) if self._visualization_mode == RssDebugVisualizationMode.All: self.visualize_ego_dynamics(ego_dynamics_on_route) def visualize_route(self, dangerous, route): if not route: return right_lane_edges = dict() left_lane_edges = dict() for road_segment in route.roadSegments: right_most_lane = road_segment.drivableLaneSegments[0] if right_most_lane.laneInterval.laneId not in right_lane_edges: edge = ad.map.route.getRightProjectedENUEdge(right_most_lane.laneInterval) right_lane_edges[right_most_lane.laneInterval.laneId] = edge intersection_lane = ad.map.intersection.Intersection.isLanePartOfAnIntersection(right_most_lane.laneInterval.laneId) color = carla.Color(r=(128 if dangerous else 255)) if intersection_lane: color.b = 128 if dangerous else 255 color = carla.Color(r=255, g=0, b=255) self.visualize_enu_edge(edge, color, self._player.get_location().z) left_most_lane = road_segment.drivableLaneSegments[-1] if left_most_lane.laneInterval.laneId not in left_lane_edges: edge = ad.map.route.getLeftProjectedENUEdge(left_most_lane.laneInterval) left_lane_edges[left_most_lane.laneInterval.laneId] = edge intersection_lane = ad.map.intersection.Intersection.isLanePartOfAnIntersection(left_most_lane.laneInterval.laneId) color = carla.Color(g=(128 if dangerous else 255)) if intersection_lane: color.b = 128 if dangerous else 255 self.visualize_enu_edge(edge, color, self._player.get_location().z) def visualize_enu_edge(self, edge, color, z_offset): for point in edge: carla_point = carla.Location(x=float(point.x), y=-1. * float(point.y), z=float(point.z) + z_offset) self._world.debug.draw_point(carla_point, 0.1, color, 0.1, False) def visualize_rss_results(self, state_snapshot): for state in state_snapshot: other_actor = state.get_actor(self._world) if not other_actor: # print("Actor not found. Skip visualizing state {}".format(state)) continue ego_point = self._player.get_location() ego_point.z += 0.05 yaw = self._player.get_transform().rotation.yaw cosine = math.cos(math.radians(yaw)) sine = math.sin(math.radians(yaw)) line_offset = carla.Location(-sine * 0.1, cosine * 0.1, 0.0) point = other_actor.get_location() point.z += 0.05 indicator_color = carla.Color(0, 255, 0) dangerous = ad.rss.state.isDangerous(state.rss_state) if dangerous: indicator_color = carla.Color(255, 0, 0) elif state.rss_state.situationType == ad.rss.situation.SituationType.NotRelevant: indicator_color = carla.Color(150, 150, 150) if self._visualization_mode == RssDebugVisualizationMode.All: # the connection lines are only visualized if All is requested lon_color = indicator_color lat_l_color = indicator_color lat_r_color = indicator_color if not state.rss_state.longitudinalState.isSafe: lon_color.r = 255 lon_color.g = 0 if dangerous else 255 if not state.rss_state.lateralStateLeft.isSafe: lat_l_color.r = 255 lat_l_color.g = 0 if dangerous else 255 if not state.rss_state.lateralStateRight.isSafe: lat_r_color.r = 255 lat_r_color.g = 0 if dangerous else 255 self._world.debug.draw_line(ego_point, point, 0.1, lon_color, 0.02, False) self._world.debug.draw_line(ego_point - line_offset, point - line_offset, 0.1, lat_l_color, 0.02, False) self._world.debug.draw_line(ego_point + line_offset, point + line_offset, 0.1, lat_r_color, 0.02, False) point.z += 3. self._world.debug.draw_point(point, 0.2, indicator_color, 0.02, False) def visualize_ego_dynamics(self, ego_dynamics_on_route): color = carla.Color(0, 0, 255) sin_heading = math.sin(float(ego_dynamics_on_route.route_heading)) cos_heading = math.cos(float(ego_dynamics_on_route.route_heading)) heading_location_start = self._player.get_location() heading_location_start.x -= cos_heading * 10. heading_location_start.y += sin_heading * 10. heading_location_start.z += 0.5 heading_location_end = self._player.get_location() heading_location_end.x += cos_heading * 10. heading_location_end.y -= sin_heading * 10. heading_location_end.z += 0.5 self._world.debug.draw_arrow(heading_location_start, heading_location_end, 0.1, 0.1, color, 0.02, False) sin_center = math.sin(float(ego_dynamics_on_route.route_heading) + math.pi / 2.) cos_center = math.cos(float(ego_dynamics_on_route.route_heading) + math.pi / 2.) center_location_start = self._player.get_location() center_location_start.x -= cos_center * 2. center_location_start.y += sin_center * 2. center_location_start.z += 0.5 center_location_end = self._player.get_location() center_location_end.x += cos_center * 2. center_location_end.y -= sin_center * 2. center_location_end.z += 0.5 self._world.debug.draw_line(center_location_start, center_location_end, 0.1, color, 0.02, False)