#!/usr/bin/env python3 # Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma de # Barcelona (UAB), and the INTEL Visual Computing Lab. # # This work is licensed under the terms of the MIT license. # For a copy, see . # Keyboard controlling for carla. Please refer to client_example for a simpler # and more documented example. """ Welcome to CARLA manual control. Use ARROWS or WASD keys for control. W : throttle S : brake AD : steer Q : toggle reverse Space : hand-brake R : restart level STARTING in a moment... """ from __future__ import print_function import argparse import logging import random import sys import time try: import pygame from pygame.locals import * except ImportError: raise RuntimeError('cannot import pygame, make sure pygame package is installed') try: import numpy as np except ImportError: raise RuntimeError('cannot import numpy, make sure numpy package is installed') from carla import image_converter from carla import sensor from carla.client import make_carla_client, VehicleControl from carla.planner.map import CarlaMap from carla.settings import CarlaSettings from carla.tcp import TCPConnectionError from carla.util import print_over_same_line WINDOW_WIDTH = 800 WINDOW_HEIGHT = 600 MINI_WINDOW_WIDTH = 320 MINI_WINDOW_HEIGHT = 180 def make_carla_settings(): """Make a CarlaSettings object with the settings we need.""" settings = CarlaSettings() settings.set( SynchronousMode=False, SendNonPlayerAgentsInfo=True, NumberOfVehicles=15, NumberOfPedestrians=30, WeatherId=random.choice([1, 3, 7, 8, 14])) settings.randomize_seeds() camera0 = sensor.Camera('CameraRGB') camera0.set_image_size(WINDOW_WIDTH, WINDOW_HEIGHT) camera0.set_position(200, 0, 140) camera0.set_rotation(0.0, 0.0, 0.0) settings.add_sensor(camera0) camera1 = sensor.Camera('CameraDepth', PostProcessing='Depth') camera1.set_image_size(MINI_WINDOW_WIDTH, MINI_WINDOW_HEIGHT) camera1.set_position(200, 0, 140) camera1.set_rotation(0.0, 0.0, 0.0) settings.add_sensor(camera1) camera2 = sensor.Camera('CameraSemSeg', PostProcessing='SemanticSegmentation') camera2.set_image_size(MINI_WINDOW_WIDTH, MINI_WINDOW_HEIGHT) camera2.set_position(200, 0, 140) camera2.set_rotation(0.0, 0.0, 0.0) settings.add_sensor(camera2) return settings class Timer(object): def __init__(self): self.step = 0 self._lap_step = 0 self._lap_time = time.time() def tick(self): self.step += 1 def lap(self): self._lap_step = self.step self._lap_time = time.time() def ticks_per_second(self): return float(self.step - self._lap_step) / self.elapsed_seconds_since_lap() def elapsed_seconds_since_lap(self): return time.time() - self._lap_time class CarlaGame(object): def __init__(self, carla_client, city_name=None): self.client = carla_client self._timer = None self._display = None self._main_image = None self._mini_view_image1 = None self._mini_view_image2 = None self._map_view = None self._is_on_reverse = False self._city_name = city_name self._map = CarlaMap(city_name) if city_name is not None else None self._map_shape = self._map.map_image.shape self._map_view = self._map.get_map([WINDOW_WIDTH, WINDOW_HEIGHT]) if city_name is not None else None def execute(self): """Launch the PyGame.""" pygame.init() self._initialize_game() try: while True: for event in pygame.event.get(): if event.type == pygame.QUIT: return self._on_loop() self._on_render() finally: pygame.quit() def _initialize_game(self): if self._city_name is not None: self._display = pygame.display.set_mode( (WINDOW_WIDTH * 2, WINDOW_HEIGHT), pygame.HWSURFACE | pygame.DOUBLEBUF) else: self._display = pygame.display.set_mode( (WINDOW_WIDTH, WINDOW_HEIGHT), pygame.HWSURFACE | pygame.DOUBLEBUF) logging.debug('pygame started') self._on_new_episode() def _on_new_episode(self): scene = self.client.load_settings(make_carla_settings()) number_of_player_starts = len(scene.player_start_spots) player_start = np.random.randint(number_of_player_starts) print('Starting new episode...') self.client.start_episode(player_start) self._timer = Timer() self._is_on_reverse = False def _on_loop(self): self._timer.tick() measurements, sensor_data = self.client.read_data() self._main_image = sensor_data['CameraRGB'] self._mini_view_image1 = sensor_data['CameraDepth'] self._mini_view_image2 = sensor_data['CameraSemSeg'] # Print measurements every second. if self._timer.elapsed_seconds_since_lap() > 1.0: if self._city_name is not None: # Function to get car position on map. map_position = self._map.get_position_on_map([ measurements.player_measurements.transform.location.x, measurements.player_measurements.transform.location.y, measurements.player_measurements.transform.location.z]) # Function to get orientation of the road car is in. lane_orientation = self._map.get_lane_orientation([ measurements.player_measurements.transform.location.x, measurements.player_measurements.transform.location.y, measurements.player_measurements.transform.location.z]) self._print_player_measurements_map( measurements.player_measurements, map_position, lane_orientation) else: self._print_player_measurements(measurements.player_measurements) # Plot position on the map as well. self._timer.lap() control = self._get_keyboard_control(pygame.key.get_pressed()) # Set the player position self._position = self._map.get_position_on_map([ measurements.player_measurements.transform.location.x, measurements.player_measurements.transform.location.y, measurements.player_measurements.transform.location.z]) self._agent_positions = measurements.non_player_agents if control is None: self._on_new_episode() else: self.client.send_control(control) def _get_keyboard_control(self, keys): """ Return a VehicleControl message based on the pressed keys. Return None if a new episode was requested. """ if keys[K_r]: return None control = VehicleControl() if keys[K_LEFT] or keys[K_a]: control.steer = -1.0 if keys[K_RIGHT] or keys[K_d]: control.steer = 1.0 if keys[K_UP] or keys[K_w]: control.throttle = 1.0 if keys[K_DOWN] or keys[K_s]: control.brake = 1.0 if keys[K_SPACE]: control.hand_brake = True if keys[K_q]: self._is_on_reverse = not self._is_on_reverse control.reverse = self._is_on_reverse return control def _print_player_measurements_map( self, player_measurements, map_position, lane_orientation): message = 'Step {step} ({fps:.1f} FPS): ' message += 'Map Position ({map_x:.1f},{map_y:.1f}) Lane Orientation ({ori_x:.1f},{ori_y:.1f}) ' message += '{speed:.2f} km/h, ' message += '{other_lane:.0f}% other lane, {offroad:.0f}% off-road' message = message.format( map_x=map_position[0], map_y=map_position[1], ori_x=lane_orientation[0], ori_y=lane_orientation[1], step=self._timer.step, fps=self._timer.ticks_per_second(), speed=player_measurements.forward_speed, other_lane=100 * player_measurements.intersection_otherlane, offroad=100 * player_measurements.intersection_offroad) print_over_same_line(message) def _print_player_measurements(self, player_measurements): message = 'Step {step} ({fps:.1f} FPS): ' message += '{speed:.2f} km/h, ' message += '{other_lane:.0f}% other lane, {offroad:.0f}% off-road' message = message.format( step=self._timer.step, fps=self._timer.ticks_per_second(), speed=player_measurements.forward_speed, other_lane=100 * player_measurements.intersection_otherlane, offroad=100 * player_measurements.intersection_offroad) print_over_same_line(message) def _on_render(self): gap_x = (WINDOW_WIDTH - 2 * MINI_WINDOW_WIDTH) / 3 mini_image_y = WINDOW_HEIGHT - MINI_WINDOW_HEIGHT - gap_x if self._main_image is not None: array = image_converter.to_rgb_array(self._main_image) surface = pygame.surfarray.make_surface(array.swapaxes(0, 1)) self._display.blit(surface, (0, 0)) if self._mini_view_image1 is not None: array = image_converter.depth_to_logarithmic_grayscale(self._mini_view_image1) surface = pygame.surfarray.make_surface(array.swapaxes(0, 1)) self._display.blit(surface, (gap_x, mini_image_y)) if self._mini_view_image2 is not None: array = image_converter.labels_to_cityscapes_palette( self._mini_view_image2) surface = pygame.surfarray.make_surface(array.swapaxes(0, 1)) self._display.blit( surface, (2 * gap_x + MINI_WINDOW_WIDTH, mini_image_y)) if self._map_view is not None: array = self._map_view array = array[:, :, :3] surface = pygame.surfarray.make_surface(array) #print ('Position ', (int(self._position[0]*(float(WINDOW_WIDTH)/float(self._map_shape[0]))), int(self._position[1]*(float(WINDOW_HEIGHT)/float(self._map_shape[1]))))) plot_positon = ( int(self._position[1]*(float(WINDOW_WIDTH)/float(self._map_shape[0]))),WINDOW_HEIGHT - int(self._position[0]*(float(WINDOW_HEIGHT)/float(self._map_shape[1])))) pygame.draw.circle(surface, [255, 0, 0, 255], plot_positon, 6, 0) for agent in self._agent_positions: if agent.HasField('vehicle'): agent_position = self._map.get_position_on_map([ agent.vehicle.transform.location.x, agent.vehicle.transform.location.y, agent.vehicle.transform.location.z]) plot_positon = ( int(agent_position[1]*(float(WINDOW_WIDTH)/float(self._map_shape[0]))),WINDOW_HEIGHT - int(agent_position[0]*(float(WINDOW_HEIGHT)/float(self._map_shape[1])))) pygame.draw.circle(surface, [255, 0, 255, 255], plot_positon, 4, 0) #surface.set_at(, [255, 0, 0, 255]) self._display.blit(surface, (WINDOW_WIDTH, 0)) pygame.display.flip() def main(): argparser = argparse.ArgumentParser( description='CARLA Manual Control Client') argparser.add_argument( '-v', '--verbose', action='store_true', dest='debug', help='print debug information') argparser.add_argument( '--host', metavar='H', default='localhost', help='IP of the host server (default: localhost)') argparser.add_argument( '-p', '--port', metavar='P', default=2000, type=int, help='TCP port to listen to (default: 2000)') argparser.add_argument( '-m', '--map-name', metavar='M', default=None, help='plot the map of the current city (needs to match active map in server, options: Town01 or Town02)') args = argparser.parse_args() log_level = logging.DEBUG if args.debug else logging.INFO logging.basicConfig(format='%(levelname)s: %(message)s', level=log_level) logging.info('listening to server %s:%s', args.host, args.port) print(__doc__) while True: try: with make_carla_client(args.host, args.port) as client: game = CarlaGame(client, args.map_name) game.execute() break except TCPConnectionError as error: logging.error(error) time.sleep(1) except Exception as exception: logging.exception(exception) sys.exit(1) if __name__ == '__main__': try: main() except KeyboardInterrupt: print('\nCancelled by user. Bye!')