#!/usr/bin/env python3 # Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma de # Barcelona (UAB). # # This work is licensed under the terms of the MIT license. # For a copy, see . # Keyboard controlling for CARLA. Please refer to client_example.py for a simpler # and more documented example. """ Welcome to CARLA manual control. Use ARROWS or WASD keys for control. W : throttle S : brake AD : steer Q : toggle reverse Space : hand-brake R : restart level STARTING in a moment... """ from __future__ import print_function import argparse import logging import random import time try: import pygame from pygame.locals import K_DOWN from pygame.locals import K_LEFT from pygame.locals import K_RIGHT from pygame.locals import K_SPACE from pygame.locals import K_UP from pygame.locals import K_a from pygame.locals import K_d from pygame.locals import K_q from pygame.locals import K_r from pygame.locals import K_s from pygame.locals import K_w except ImportError: raise RuntimeError('cannot import pygame, make sure pygame package is installed') try: import numpy as np except ImportError: raise RuntimeError('cannot import numpy, make sure numpy package is installed') from carla import image_converter from carla import sensor from carla.client import make_carla_client, VehicleControl from carla.planner.map import CarlaMap from carla.settings import CarlaSettings from carla.tcp import TCPConnectionError from carla.util import print_over_same_line WINDOW_WIDTH = 800 WINDOW_HEIGHT = 600 MINI_WINDOW_WIDTH = 320 MINI_WINDOW_HEIGHT = 180 def make_carla_settings(enable_lidar): """Make a CarlaSettings object with the settings we need.""" settings = CarlaSettings() settings.set( SynchronousMode=False, SendNonPlayerAgentsInfo=True, NumberOfVehicles=15, NumberOfPedestrians=30, WeatherId=random.choice([1, 3, 7, 8, 14])) settings.randomize_seeds() camera0 = sensor.Camera('CameraRGB') camera0.set_image_size(WINDOW_WIDTH, WINDOW_HEIGHT) camera0.set_position(200, 0, 140) camera0.set_rotation(0.0, 0.0, 0.0) settings.add_sensor(camera0) camera1 = sensor.Camera('CameraDepth', PostProcessing='Depth') camera1.set_image_size(MINI_WINDOW_WIDTH, MINI_WINDOW_HEIGHT) camera1.set_position(200, 0, 140) camera1.set_rotation(0.0, 0.0, 0.0) settings.add_sensor(camera1) camera2 = sensor.Camera('CameraSemSeg', PostProcessing='SemanticSegmentation') camera2.set_image_size(MINI_WINDOW_WIDTH, MINI_WINDOW_HEIGHT) camera2.set_position(200, 0, 140) camera2.set_rotation(0.0, 0.0, 0.0) settings.add_sensor(camera2) if enable_lidar: lidar = sensor.Lidar('Lidar32') lidar.set_position(0, 0, 250) lidar.set_rotation(0, 0, 0) lidar.set( Channels=32, Range=5000, PointsPerSecond=100000, RotationFrequency=10, UpperFovLimit=10, LowerFovLimit=-30) settings.add_sensor(lidar) return settings class Timer(object): def __init__(self): self.step = 0 self._lap_step = 0 self._lap_time = time.time() def tick(self): self.step += 1 def lap(self): self._lap_step = self.step self._lap_time = time.time() def ticks_per_second(self): return float(self.step - self._lap_step) / self.elapsed_seconds_since_lap() def elapsed_seconds_since_lap(self): return time.time() - self._lap_time class CarlaGame(object): def __init__(self, carla_client, enable_lidar=False, city_name=None): self.client = carla_client self._timer = None self._display = None self._main_image = None self._mini_view_image1 = None self._mini_view_image2 = None self._enable_lidar = enable_lidar self._lidar_measurement = None self._map_view = None self._is_on_reverse = False self._city_name = city_name self._map = CarlaMap(city_name, 16.43, 50.0) if city_name is not None else None self._map_shape = self._map.map_image.shape if city_name is not None else None self._map_view = self._map.get_map(WINDOW_HEIGHT) if city_name is not None else None self._position = None self._agent_positions = None def execute(self): """Launch the PyGame.""" pygame.init() self._initialize_game() try: while True: for event in pygame.event.get(): if event.type == pygame.QUIT: return self._on_loop() self._on_render() finally: pygame.quit() def _initialize_game(self): if self._city_name is not None: self._display = pygame.display.set_mode( (WINDOW_WIDTH + int((WINDOW_HEIGHT/float(self._map.map_image.shape[0]))*self._map.map_image.shape[1]), WINDOW_HEIGHT), pygame.HWSURFACE | pygame.DOUBLEBUF) else: self._display = pygame.display.set_mode( (WINDOW_WIDTH, WINDOW_HEIGHT), pygame.HWSURFACE | pygame.DOUBLEBUF) logging.debug('pygame started') self._on_new_episode() def _on_new_episode(self): scene = self.client.load_settings(make_carla_settings(self._enable_lidar)) number_of_player_starts = len(scene.player_start_spots) player_start = np.random.randint(number_of_player_starts) print('Starting new episode...') self.client.start_episode(player_start) self._timer = Timer() self._is_on_reverse = False def _on_loop(self): self._timer.tick() measurements, sensor_data = self.client.read_data() self._main_image = sensor_data.get('CameraRGB',None) self._mini_view_image1 = sensor_data.get('CameraDepth' ,None) self._mini_view_image2 = sensor_data.get('CameraSemSeg',None) if self._enable_lidar: self._lidar_measurement = sensor_data.get('Lidar32',None) # Print measurements every second. if self._timer.elapsed_seconds_since_lap() > 1.0: if self._city_name is not None: # Function to get car position on map. map_position = self._map.convert_to_pixel([ measurements.player_measurements.transform.location.x, measurements.player_measurements.transform.location.y, measurements.player_measurements.transform.location.z]) # Function to get orientation of the road car is in. lane_orientation = self._map.get_lane_orientation([ measurements.player_measurements.transform.location.x, measurements.player_measurements.transform.location.y, measurements.player_measurements.transform.location.z]) self._print_player_measurements_map( measurements.player_measurements, map_position, lane_orientation) else: self._print_player_measurements(measurements.player_measurements) # Plot position on the map as well. self._timer.lap() control = self._get_keyboard_control(pygame.key.get_pressed()) # Set the player position if self._city_name is not None: self._position = self._map.convert_to_pixel([ measurements.player_measurements.transform.location.x, measurements.player_measurements.transform.location.y, measurements.player_measurements.transform.location.z]) self._agent_positions = measurements.non_player_agents if control is None: self._on_new_episode() else: self.client.send_control(control) def _get_keyboard_control(self, keys): """ Return a VehicleControl message based on the pressed keys. Return None if a new episode was requested. """ if keys[K_r]: return None control = VehicleControl() if keys[K_LEFT] or keys[K_a]: control.steer = -1.0 if keys[K_RIGHT] or keys[K_d]: control.steer = 1.0 if keys[K_UP] or keys[K_w]: control.throttle = 1.0 if keys[K_DOWN] or keys[K_s]: control.brake = 1.0 if keys[K_SPACE]: control.hand_brake = True if keys[K_q]: self._is_on_reverse = not self._is_on_reverse control.reverse = self._is_on_reverse return control def _print_player_measurements_map( self, player_measurements, map_position, lane_orientation): message = 'Step {step} ({fps:.1f} FPS): ' message += 'Map Position ({map_x:.1f},{map_y:.1f}) ' message += 'Lane Orientation ({ori_x:.1f},{ori_y:.1f}) ' message += '{speed:.2f} km/h, ' message += '{other_lane:.0f}% other lane, {offroad:.0f}% off-road' message = message.format( map_x=map_position[0], map_y=map_position[1], ori_x=lane_orientation[0], ori_y=lane_orientation[1], step=self._timer.step, fps=self._timer.ticks_per_second(), speed=player_measurements.forward_speed, other_lane=100 * player_measurements.intersection_otherlane, offroad=100 * player_measurements.intersection_offroad) print_over_same_line(message) def _print_player_measurements(self, player_measurements): message = 'Step {step} ({fps:.1f} FPS): ' message += '{speed:.2f} km/h, ' message += '{other_lane:.0f}% other lane, {offroad:.0f}% off-road' message = message.format( step=self._timer.step, fps=self._timer.ticks_per_second(), speed=player_measurements.forward_speed, other_lane=100 * player_measurements.intersection_otherlane, offroad=100 * player_measurements.intersection_offroad) print_over_same_line(message) def _on_render(self): gap_x = (WINDOW_WIDTH - 2 * MINI_WINDOW_WIDTH) / 3 mini_image_y = WINDOW_HEIGHT - MINI_WINDOW_HEIGHT - gap_x if self._main_image is not None: array = image_converter.to_rgb_array(self._main_image) surface = pygame.surfarray.make_surface(array.swapaxes(0, 1)) self._display.blit(surface, (0, 0)) if self._mini_view_image1 is not None: array = image_converter.depth_to_logarithmic_grayscale(self._mini_view_image1) surface = pygame.surfarray.make_surface(array.swapaxes(0, 1)) self._display.blit(surface, (gap_x, mini_image_y)) if self._mini_view_image2 is not None: array = image_converter.labels_to_cityscapes_palette( self._mini_view_image2) surface = pygame.surfarray.make_surface(array.swapaxes(0, 1)) self._display.blit( surface, (2 * gap_x + MINI_WINDOW_WIDTH, mini_image_y)) if self._lidar_measurement is not None: lidar_data = np.array(self._lidar_measurement.data[:, :2]) lidar_data /= 50.0 lidar_data += 100.0 lidar_data = np.fabs(lidar_data) lidar_data = lidar_data.astype(np.int32) lidar_data = np.reshape(lidar_data, (-1, 2)) #draw lidar lidar_img_size = (200, 200, 3) lidar_img = np.zeros(lidar_img_size) lidar_img[tuple(lidar_data.T)] = (255, 255, 255) surface = pygame.surfarray.make_surface(lidar_img) self._display.blit(surface, (10, 10)) if self._map_view is not None: array = self._map_view array = array[:, :, :3] new_window_width = \ (float(WINDOW_HEIGHT) / float(self._map_shape[0])) * \ float(self._map_shape[1]) surface = pygame.surfarray.make_surface(array.swapaxes(0, 1)) w_pos = int(self._position[0]*(float(WINDOW_HEIGHT)/float(self._map_shape[0]))) h_pos = int(self._position[1] *(new_window_width/float(self._map_shape[1]))) pygame.draw.circle(surface, [255, 0, 0, 255], (w_pos, h_pos), 6, 0) for agent in self._agent_positions: if agent.HasField('vehicle'): agent_position = self._map.convert_to_pixel([ agent.vehicle.transform.location.x, agent.vehicle.transform.location.y, agent.vehicle.transform.location.z]) w_pos = int(agent_position[0]*(float(WINDOW_HEIGHT)/float(self._map_shape[0]))) h_pos = int(agent_position[1] *(new_window_width/float(self._map_shape[1]))) pygame.draw.circle(surface, [255, 0, 255, 255], (w_pos, h_pos), 4, 0) self._display.blit(surface, (WINDOW_WIDTH, 0)) pygame.display.flip() def main(): argparser = argparse.ArgumentParser( description='CARLA Manual Control Client') argparser.add_argument( '-v', '--verbose', action='store_true', dest='debug', help='print debug information') argparser.add_argument( '--host', metavar='H', default='localhost', help='IP of the host server (default: localhost)') argparser.add_argument( '-p', '--port', metavar='P', default=2000, type=int, help='TCP port to listen to (default: 2000)') argparser.add_argument( '-l', '--lidar', action='store_true', help='enable Lidar') argparser.add_argument( '-m', '--map-name', metavar='M', default=None, help='plot the map of the current city (needs to match active map in ' 'server, options: Town01 or Town02)') args = argparser.parse_args() log_level = logging.DEBUG if args.debug else logging.INFO logging.basicConfig(format='%(levelname)s: %(message)s', level=log_level) logging.info('listening to server %s:%s', args.host, args.port) print(__doc__) while True: try: with make_carla_client(args.host, args.port) as client: game = CarlaGame(client, args.lidar, args.map_name) game.execute() break except TCPConnectionError as error: logging.error(error) time.sleep(1) if __name__ == '__main__': try: main() except KeyboardInterrupt: print('\nCancelled by user. Bye!')