#!/usr/bin/env python # Copyright (c) 2020 Computer Vision Center (CVC) at the Universitat Autonoma de # Barcelona (UAB). # # This work is licensed under the terms of the MIT license. # For a copy, see . """ Script that render multiple sensors in the same pygame window By default, it renders four cameras, one LiDAR and one Semantic LiDAR. It can easily be configure for any different number of sensors. To do that, check lines 290-308. """ import glob import os import sys try: sys.path.append(glob.glob('../carla/dist/carla-*%d.%d-%s.egg' % ( sys.version_info.major, sys.version_info.minor, 'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0]) except IndexError: pass import carla import argparse import random import time import numpy as np try: import pygame from pygame.locals import K_ESCAPE from pygame.locals import K_q except ImportError: raise RuntimeError('cannot import pygame, make sure pygame package is installed') class CustomTimer: def __init__(self): try: self.timer = time.perf_counter except AttributeError: self.timer = time.time def time(self): return self.timer() class DisplayManager: def __init__(self, grid_size, window_size): pygame.init() pygame.font.init() self.display = pygame.display.set_mode(window_size, pygame.HWSURFACE | pygame.DOUBLEBUF) self.grid_size = grid_size self.window_size = window_size self.sensor_list = [] def get_window_size(self): return [int(self.window_size[0]), int(self.window_size[1])] def get_display_size(self): return [int(self.window_size[0]/self.grid_size[1]), int(self.window_size[1]/self.grid_size[0])] def get_display_offset(self, gridPos): dis_size = self.get_display_size() return [int(gridPos[1] * dis_size[0]), int(gridPos[0] * dis_size[1])] def add_sensor(self, sensor): self.sensor_list.append(sensor) def get_sensor_list(self): return self.sensor_list def render(self): if not self.render_enabled(): return for s in self.sensor_list: s.render() pygame.display.flip() def destroy(self): for s in self.sensor_list: s.destroy() def render_enabled(self): return self.display != None class SensorManager: def __init__(self, world, display_man, sensor_type, transform, attached, sensor_options, display_pos): self.surface = None self.world = world self.display_man = display_man self.display_pos = display_pos self.sensor = self.init_sensor(sensor_type, transform, attached, sensor_options) self.sensor_options = sensor_options self.timer = CustomTimer() self.time_processing = 0.0 self.tics_processing = 0 self.display_man.add_sensor(self) def init_sensor(self, sensor_type, transform, attached, sensor_options): if sensor_type == 'RGBCamera': camera_bp = self.world.get_blueprint_library().find('sensor.camera.rgb') disp_size = self.display_man.get_display_size() camera_bp.set_attribute('image_size_x', str(disp_size[0])) camera_bp.set_attribute('image_size_y', str(disp_size[1])) for key in sensor_options: camera_bp.set_attribute(key, sensor_options[key]) camera = self.world.spawn_actor(camera_bp, transform, attach_to=attached) camera.listen(self.save_rgb_image) return camera elif sensor_type == 'LiDAR': lidar_bp = self.world.get_blueprint_library().find('sensor.lidar.ray_cast') lidar_bp.set_attribute('range', '100') lidar_bp.set_attribute('dropoff_general_rate', lidar_bp.get_attribute('dropoff_general_rate').recommended_values[0]) lidar_bp.set_attribute('dropoff_intensity_limit', lidar_bp.get_attribute('dropoff_intensity_limit').recommended_values[0]) lidar_bp.set_attribute('dropoff_zero_intensity', lidar_bp.get_attribute('dropoff_zero_intensity').recommended_values[0]) for key in sensor_options: lidar_bp.set_attribute(key, sensor_options[key]) lidar = self.world.spawn_actor(lidar_bp, transform, attach_to=attached) lidar.listen(self.save_lidar_image) return lidar elif sensor_type == 'SemanticLiDAR': lidar_bp = self.world.get_blueprint_library().find('sensor.lidar.ray_cast_semantic') lidar_bp.set_attribute('range', '100') for key in sensor_options: lidar_bp.set_attribute(key, sensor_options[key]) lidar = self.world.spawn_actor(lidar_bp, transform, attach_to=attached) lidar.listen(self.save_semanticlidar_image) return lidar elif sensor_type == "Radar": radar_bp = self.world.get_blueprint_library().find('sensor.other.radar') for key in sensor_options: radar_bp.set_attribute(key, sensor_options[key]) radar = self.world.spawn_actor(radar_bp, transform, attach_to=attached) radar.listen(self.save_radar_image) return radar else: return None def get_sensor(self): return self.sensor def save_rgb_image(self, image): t_start = self.timer.time() image.convert(carla.ColorConverter.Raw) array = np.frombuffer(image.raw_data, dtype=np.dtype("uint8")) array = np.reshape(array, (image.height, image.width, 4)) array = array[:, :, :3] array = array[:, :, ::-1] if self.display_man.render_enabled(): self.surface = pygame.surfarray.make_surface(array.swapaxes(0, 1)) t_end = self.timer.time() self.time_processing += (t_end-t_start) self.tics_processing += 1 def save_lidar_image(self, image): t_start = self.timer.time() disp_size = self.display_man.get_display_size() lidar_range = 2.0*float(self.sensor_options['range']) points = np.frombuffer(image.raw_data, dtype=np.dtype('f4')) points = np.reshape(points, (int(points.shape[0] / 4), 4)) lidar_data = np.array(points[:, :2]) lidar_data *= min(disp_size) / lidar_range lidar_data += (0.5 * disp_size[0], 0.5 * disp_size[1]) lidar_data = np.fabs(lidar_data) # pylint: disable=E1111 lidar_data = lidar_data.astype(np.int32) lidar_data = np.reshape(lidar_data, (-1, 2)) lidar_img_size = (disp_size[0], disp_size[1], 3) lidar_img = np.zeros((lidar_img_size), dtype=np.uint8) lidar_img[tuple(lidar_data.T)] = (255, 255, 255) if self.display_man.render_enabled(): self.surface = pygame.surfarray.make_surface(lidar_img) t_end = self.timer.time() self.time_processing += (t_end-t_start) self.tics_processing += 1 def save_semanticlidar_image(self, image): t_start = self.timer.time() disp_size = self.display_man.get_display_size() lidar_range = 2.0*float(self.sensor_options['range']) points = np.frombuffer(image.raw_data, dtype=np.dtype('f4')) points = np.reshape(points, (int(points.shape[0] / 6), 6)) lidar_data = np.array(points[:, :2]) lidar_data *= min(disp_size) / lidar_range lidar_data += (0.5 * disp_size[0], 0.5 * disp_size[1]) lidar_data = np.fabs(lidar_data) # pylint: disable=E1111 lidar_data = lidar_data.astype(np.int32) lidar_data = np.reshape(lidar_data, (-1, 2)) lidar_img_size = (disp_size[0], disp_size[1], 3) lidar_img = np.zeros((lidar_img_size), dtype=np.uint8) lidar_img[tuple(lidar_data.T)] = (255, 255, 255) if self.display_man.render_enabled(): self.surface = pygame.surfarray.make_surface(lidar_img) t_end = self.timer.time() self.time_processing += (t_end-t_start) self.tics_processing += 1 def save_radar_image(self, radar_data): t_start = self.timer.time() points = np.frombuffer(radar_data.raw_data, dtype=np.dtype('f4')) points = np.reshape(points, (len(radar_data), 4)) t_end = self.timer.time() self.time_processing += (t_end-t_start) self.tics_processing += 1 def render(self): if self.surface is not None: offset = self.display_man.get_display_offset(self.display_pos) self.display_man.display.blit(self.surface, offset) def destroy(self): self.sensor.destroy() def run_simulation(args, client): """This function performed one test run using the args parameters and connecting to the carla client passed. """ display_manager = None vehicle = None vehicle_list = [] timer = CustomTimer() try: # Getting the world and world = client.get_world() original_settings = world.get_settings() if args.sync: traffic_manager = client.get_trafficmanager(8000) settings = world.get_settings() traffic_manager.set_synchronous_mode(True) settings.synchronous_mode = True settings.fixed_delta_seconds = 0.05 world.apply_settings(settings) # Instanciating the vehicle to which we attached the sensors bp = world.get_blueprint_library().filter('charger_2020')[0] vehicle = world.spawn_actor(bp, random.choice(world.get_map().get_spawn_points())) vehicle_list.append(vehicle) vehicle.set_autopilot(True) # Display Manager organize all the sensors an its display in a window # If can easily configure the grid and the total window size display_manager = DisplayManager(grid_size=[2, 3], window_size=[args.width, args.height]) # Then, SensorManager can be used to spawn RGBCamera, LiDARs and SemanticLiDARs as needed # and assign each of them to a grid position, SensorManager(world, display_manager, 'RGBCamera', carla.Transform(carla.Location(x=0, z=2.4), carla.Rotation(yaw=-90)), vehicle, {}, display_pos=[0, 0]) SensorManager(world, display_manager, 'RGBCamera', carla.Transform(carla.Location(x=0, z=2.4), carla.Rotation(yaw=+00)), vehicle, {}, display_pos=[0, 1]) SensorManager(world, display_manager, 'RGBCamera', carla.Transform(carla.Location(x=0, z=2.4), carla.Rotation(yaw=+90)), vehicle, {}, display_pos=[0, 2]) SensorManager(world, display_manager, 'RGBCamera', carla.Transform(carla.Location(x=0, z=2.4), carla.Rotation(yaw=180)), vehicle, {}, display_pos=[1, 1]) SensorManager(world, display_manager, 'LiDAR', carla.Transform(carla.Location(x=0, z=2.4)), vehicle, {'channels' : '64', 'range' : '100', 'points_per_second': '250000', 'rotation_frequency': '20'}, display_pos=[1, 0]) SensorManager(world, display_manager, 'SemanticLiDAR', carla.Transform(carla.Location(x=0, z=2.4)), vehicle, {'channels' : '64', 'range' : '100', 'points_per_second': '100000', 'rotation_frequency': '20'}, display_pos=[1, 2]) #Simulation loop call_exit = False time_init_sim = timer.time() while True: # Carla Tick if args.sync: world.tick() else: world.wait_for_tick() # Render received data display_manager.render() for event in pygame.event.get(): if event.type == pygame.QUIT: call_exit = True elif event.type == pygame.KEYDOWN: if event.key == K_ESCAPE or event.key == K_q: call_exit = True break if call_exit: break finally: if display_manager: display_manager.destroy() client.apply_batch([carla.command.DestroyActor(x) for x in vehicle_list]) world.apply_settings(original_settings) def main(): argparser = argparse.ArgumentParser( description='CARLA Sensor tutorial') argparser.add_argument( '--host', metavar='H', default='127.0.0.1', help='IP of the host server (default: 127.0.0.1)') argparser.add_argument( '-p', '--port', metavar='P', default=2000, type=int, help='TCP port to listen to (default: 2000)') argparser.add_argument( '--sync', action='store_true', help='Synchronous mode execution') argparser.add_argument( '--async', dest='sync', action='store_false', help='Asynchronous mode execution') argparser.set_defaults(sync=True) argparser.add_argument( '--res', metavar='WIDTHxHEIGHT', default='1280x720', help='window resolution (default: 1280x720)') args = argparser.parse_args() args.width, args.height = [int(x) for x in args.res.split('x')] try: client = carla.Client(args.host, args.port) client.set_timeout(5.0) run_simulation(args, client) except KeyboardInterrupt: print('\nCancelled by user. Bye!') if __name__ == '__main__': main()