442 lines
15 KiB
Python
442 lines
15 KiB
Python
|
#!/usr/bin/env python3
|
||
|
|
||
|
# Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma de
|
||
|
# Barcelona (UAB).
|
||
|
#
|
||
|
# This work is licensed under the terms of the MIT license.
|
||
|
# For a copy, see <https://opensource.org/licenses/MIT>.
|
||
|
|
||
|
# Keyboard controlling for CARLA. Please refer to client_example.py for a simpler
|
||
|
# and more documented example.
|
||
|
|
||
|
"""
|
||
|
Welcome to CARLA manual control.
|
||
|
|
||
|
Use ARROWS or WASD keys for control.
|
||
|
|
||
|
W : throttle
|
||
|
S : brake
|
||
|
AD : steer
|
||
|
Q : toggle reverse
|
||
|
Space : hand-brake
|
||
|
P : toggle autopilot
|
||
|
|
||
|
R : restart level
|
||
|
|
||
|
STARTING in a moment...
|
||
|
"""
|
||
|
|
||
|
from __future__ import print_function
|
||
|
|
||
|
import argparse
|
||
|
import logging
|
||
|
import random
|
||
|
import time
|
||
|
|
||
|
try:
|
||
|
import pygame
|
||
|
from pygame.locals import K_DOWN
|
||
|
from pygame.locals import K_LEFT
|
||
|
from pygame.locals import K_RIGHT
|
||
|
from pygame.locals import K_SPACE
|
||
|
from pygame.locals import K_UP
|
||
|
from pygame.locals import K_a
|
||
|
from pygame.locals import K_d
|
||
|
from pygame.locals import K_p
|
||
|
from pygame.locals import K_q
|
||
|
from pygame.locals import K_r
|
||
|
from pygame.locals import K_s
|
||
|
from pygame.locals import K_w
|
||
|
except ImportError:
|
||
|
raise RuntimeError('cannot import pygame, make sure pygame package is installed')
|
||
|
|
||
|
try:
|
||
|
import numpy as np
|
||
|
except ImportError:
|
||
|
raise RuntimeError('cannot import numpy, make sure numpy package is installed')
|
||
|
|
||
|
from carla import image_converter
|
||
|
from carla import sensor
|
||
|
from carla.client import make_carla_client, VehicleControl
|
||
|
from carla.planner.map import CarlaMap
|
||
|
from carla.settings import CarlaSettings
|
||
|
from carla.tcp import TCPConnectionError
|
||
|
from carla.util import print_over_same_line
|
||
|
|
||
|
|
||
|
WINDOW_WIDTH = 800
|
||
|
WINDOW_HEIGHT = 600
|
||
|
MINI_WINDOW_WIDTH = 320
|
||
|
MINI_WINDOW_HEIGHT = 180
|
||
|
|
||
|
|
||
|
def make_carla_settings(args):
|
||
|
"""Make a CarlaSettings object with the settings we need."""
|
||
|
settings = CarlaSettings()
|
||
|
settings.set(
|
||
|
SynchronousMode=False,
|
||
|
SendNonPlayerAgentsInfo=True,
|
||
|
NumberOfVehicles=15,
|
||
|
NumberOfPedestrians=30,
|
||
|
WeatherId=random.choice([1, 3, 7, 8, 14]),
|
||
|
QualityLevel=args.quality_level)
|
||
|
settings.randomize_seeds()
|
||
|
camera0 = sensor.Camera('CameraRGB')
|
||
|
camera0.set_image_size(WINDOW_WIDTH, WINDOW_HEIGHT)
|
||
|
camera0.set_position(2.0, 0.0, 1.4)
|
||
|
camera0.set_rotation(0.0, 0.0, 0.0)
|
||
|
settings.add_sensor(camera0)
|
||
|
camera1 = sensor.Camera('CameraDepth', PostProcessing='Depth')
|
||
|
camera1.set_image_size(MINI_WINDOW_WIDTH, MINI_WINDOW_HEIGHT)
|
||
|
camera1.set_position(2.0, 0.0, 1.4)
|
||
|
camera1.set_rotation(0.0, 0.0, 0.0)
|
||
|
settings.add_sensor(camera1)
|
||
|
camera2 = sensor.Camera('CameraSemSeg', PostProcessing='SemanticSegmentation')
|
||
|
camera2.set_image_size(MINI_WINDOW_WIDTH, MINI_WINDOW_HEIGHT)
|
||
|
camera2.set_position(2.0, 0.0, 1.4)
|
||
|
camera2.set_rotation(0.0, 0.0, 0.0)
|
||
|
settings.add_sensor(camera2)
|
||
|
if args.lidar:
|
||
|
lidar = sensor.Lidar('Lidar32')
|
||
|
lidar.set_position(0, 0, 2.5)
|
||
|
lidar.set_rotation(0, 0, 0)
|
||
|
lidar.set(
|
||
|
Channels=32,
|
||
|
Range=50,
|
||
|
PointsPerSecond=100000,
|
||
|
RotationFrequency=10,
|
||
|
UpperFovLimit=10,
|
||
|
LowerFovLimit=-30)
|
||
|
settings.add_sensor(lidar)
|
||
|
return settings
|
||
|
|
||
|
|
||
|
class Timer(object):
|
||
|
def __init__(self):
|
||
|
self.step = 0
|
||
|
self._lap_step = 0
|
||
|
self._lap_time = time.time()
|
||
|
|
||
|
def tick(self):
|
||
|
self.step += 1
|
||
|
|
||
|
def lap(self):
|
||
|
self._lap_step = self.step
|
||
|
self._lap_time = time.time()
|
||
|
|
||
|
def ticks_per_second(self):
|
||
|
return float(self.step - self._lap_step) / self.elapsed_seconds_since_lap()
|
||
|
|
||
|
def elapsed_seconds_since_lap(self):
|
||
|
return time.time() - self._lap_time
|
||
|
|
||
|
|
||
|
class CarlaGame(object):
|
||
|
def __init__(self, carla_client, args):
|
||
|
self.client = carla_client
|
||
|
self._carla_settings = make_carla_settings(args)
|
||
|
self._timer = None
|
||
|
self._display = None
|
||
|
self._main_image = None
|
||
|
self._mini_view_image1 = None
|
||
|
self._mini_view_image2 = None
|
||
|
self._enable_autopilot = args.autopilot
|
||
|
self._lidar_measurement = None
|
||
|
self._map_view = None
|
||
|
self._is_on_reverse = False
|
||
|
self._display_map = args.map
|
||
|
self._city_name = None
|
||
|
self._map = None
|
||
|
self._map_shape = None
|
||
|
self._map_view = None
|
||
|
self._position = None
|
||
|
self._agent_positions = None
|
||
|
|
||
|
def execute(self):
|
||
|
"""Launch the PyGame."""
|
||
|
pygame.init()
|
||
|
self._initialize_game()
|
||
|
try:
|
||
|
while True:
|
||
|
for event in pygame.event.get():
|
||
|
if event.type == pygame.QUIT:
|
||
|
return
|
||
|
self._on_loop()
|
||
|
self._on_render()
|
||
|
finally:
|
||
|
pygame.quit()
|
||
|
|
||
|
def _initialize_game(self):
|
||
|
self._on_new_episode()
|
||
|
|
||
|
if self._city_name is not None:
|
||
|
self._map = CarlaMap(self._city_name)
|
||
|
self._map_shape = self._map.map_image.shape
|
||
|
self._map_view = self._map.get_map(WINDOW_HEIGHT)
|
||
|
|
||
|
extra_width = int((WINDOW_HEIGHT/float(self._map_shape[0]))*self._map_shape[1])
|
||
|
self._display = pygame.display.set_mode(
|
||
|
(WINDOW_WIDTH + extra_width, WINDOW_HEIGHT),
|
||
|
pygame.HWSURFACE | pygame.DOUBLEBUF)
|
||
|
else:
|
||
|
self._display = pygame.display.set_mode(
|
||
|
(WINDOW_WIDTH, WINDOW_HEIGHT),
|
||
|
pygame.HWSURFACE | pygame.DOUBLEBUF)
|
||
|
|
||
|
logging.debug('pygame started')
|
||
|
|
||
|
def _on_new_episode(self):
|
||
|
self._carla_settings.randomize_seeds()
|
||
|
self._carla_settings.randomize_weather()
|
||
|
scene = self.client.load_settings(self._carla_settings)
|
||
|
if self._display_map:
|
||
|
self._city_name = scene.map_name
|
||
|
number_of_player_starts = len(scene.player_start_spots)
|
||
|
player_start = np.random.randint(number_of_player_starts)
|
||
|
print('Starting new episode...')
|
||
|
self.client.start_episode(player_start)
|
||
|
self._timer = Timer()
|
||
|
self._is_on_reverse = False
|
||
|
|
||
|
def _on_loop(self):
|
||
|
self._timer.tick()
|
||
|
|
||
|
measurements, sensor_data = self.client.read_data()
|
||
|
|
||
|
self._main_image = sensor_data.get('CameraRGB', None)
|
||
|
self._mini_view_image1 = sensor_data.get('CameraDepth', None)
|
||
|
self._mini_view_image2 = sensor_data.get('CameraSemSeg', None)
|
||
|
self._lidar_measurement = sensor_data.get('Lidar32', None)
|
||
|
|
||
|
# Print measurements every second.
|
||
|
if self._timer.elapsed_seconds_since_lap() > 1.0:
|
||
|
if self._city_name is not None:
|
||
|
# Function to get car position on map.
|
||
|
map_position = self._map.convert_to_pixel([
|
||
|
measurements.player_measurements.transform.location.x,
|
||
|
measurements.player_measurements.transform.location.y,
|
||
|
measurements.player_measurements.transform.location.z])
|
||
|
# Function to get orientation of the road car is in.
|
||
|
lane_orientation = self._map.get_lane_orientation([
|
||
|
measurements.player_measurements.transform.location.x,
|
||
|
measurements.player_measurements.transform.location.y,
|
||
|
measurements.player_measurements.transform.location.z])
|
||
|
|
||
|
self._print_player_measurements_map(
|
||
|
measurements.player_measurements,
|
||
|
map_position,
|
||
|
lane_orientation)
|
||
|
else:
|
||
|
self._print_player_measurements(measurements.player_measurements)
|
||
|
|
||
|
# Plot position on the map as well.
|
||
|
|
||
|
self._timer.lap()
|
||
|
|
||
|
control = self._get_keyboard_control(pygame.key.get_pressed())
|
||
|
# Set the player position
|
||
|
if self._city_name is not None:
|
||
|
self._position = self._map.convert_to_pixel([
|
||
|
measurements.player_measurements.transform.location.x,
|
||
|
measurements.player_measurements.transform.location.y,
|
||
|
measurements.player_measurements.transform.location.z])
|
||
|
self._agent_positions = measurements.non_player_agents
|
||
|
|
||
|
if control is None:
|
||
|
self._on_new_episode()
|
||
|
elif self._enable_autopilot:
|
||
|
self.client.send_control(measurements.player_measurements.autopilot_control)
|
||
|
else:
|
||
|
self.client.send_control(control)
|
||
|
|
||
|
def _get_keyboard_control(self, keys):
|
||
|
"""
|
||
|
Return a VehicleControl message based on the pressed keys. Return None
|
||
|
if a new episode was requested.
|
||
|
"""
|
||
|
if keys[K_r]:
|
||
|
return None
|
||
|
control = VehicleControl()
|
||
|
if keys[K_LEFT] or keys[K_a]:
|
||
|
control.steer = -1.0
|
||
|
if keys[K_RIGHT] or keys[K_d]:
|
||
|
control.steer = 1.0
|
||
|
if keys[K_UP] or keys[K_w]:
|
||
|
control.throttle = 1.0
|
||
|
if keys[K_DOWN] or keys[K_s]:
|
||
|
control.brake = 1.0
|
||
|
if keys[K_SPACE]:
|
||
|
control.hand_brake = True
|
||
|
if keys[K_q]:
|
||
|
self._is_on_reverse = not self._is_on_reverse
|
||
|
if keys[K_p]:
|
||
|
self._enable_autopilot = not self._enable_autopilot
|
||
|
control.reverse = self._is_on_reverse
|
||
|
return control
|
||
|
|
||
|
def _print_player_measurements_map(
|
||
|
self,
|
||
|
player_measurements,
|
||
|
map_position,
|
||
|
lane_orientation):
|
||
|
message = 'Step {step} ({fps:.1f} FPS): '
|
||
|
message += 'Map Position ({map_x:.1f},{map_y:.1f}) '
|
||
|
message += 'Lane Orientation ({ori_x:.1f},{ori_y:.1f}) '
|
||
|
message += '{speed:.2f} km/h, '
|
||
|
message += '{other_lane:.0f}% other lane, {offroad:.0f}% off-road'
|
||
|
message = message.format(
|
||
|
map_x=map_position[0],
|
||
|
map_y=map_position[1],
|
||
|
ori_x=lane_orientation[0],
|
||
|
ori_y=lane_orientation[1],
|
||
|
step=self._timer.step,
|
||
|
fps=self._timer.ticks_per_second(),
|
||
|
speed=player_measurements.forward_speed * 3.6,
|
||
|
other_lane=100 * player_measurements.intersection_otherlane,
|
||
|
offroad=100 * player_measurements.intersection_offroad)
|
||
|
print_over_same_line(message)
|
||
|
|
||
|
def _print_player_measurements(self, player_measurements):
|
||
|
message = 'Step {step} ({fps:.1f} FPS): '
|
||
|
message += '{speed:.2f} km/h, '
|
||
|
message += '{other_lane:.0f}% other lane, {offroad:.0f}% off-road'
|
||
|
message = message.format(
|
||
|
step=self._timer.step,
|
||
|
fps=self._timer.ticks_per_second(),
|
||
|
speed=player_measurements.forward_speed * 3.6,
|
||
|
other_lane=100 * player_measurements.intersection_otherlane,
|
||
|
offroad=100 * player_measurements.intersection_offroad)
|
||
|
print_over_same_line(message)
|
||
|
|
||
|
def _on_render(self):
|
||
|
gap_x = (WINDOW_WIDTH - 2 * MINI_WINDOW_WIDTH) / 3
|
||
|
mini_image_y = WINDOW_HEIGHT - MINI_WINDOW_HEIGHT - gap_x
|
||
|
|
||
|
if self._main_image is not None:
|
||
|
array = image_converter.to_rgb_array(self._main_image)
|
||
|
surface = pygame.surfarray.make_surface(array.swapaxes(0, 1))
|
||
|
self._display.blit(surface, (0, 0))
|
||
|
|
||
|
if self._mini_view_image1 is not None:
|
||
|
array = image_converter.depth_to_logarithmic_grayscale(self._mini_view_image1)
|
||
|
surface = pygame.surfarray.make_surface(array.swapaxes(0, 1))
|
||
|
self._display.blit(surface, (gap_x, mini_image_y))
|
||
|
|
||
|
if self._mini_view_image2 is not None:
|
||
|
array = image_converter.labels_to_cityscapes_palette(
|
||
|
self._mini_view_image2)
|
||
|
surface = pygame.surfarray.make_surface(array.swapaxes(0, 1))
|
||
|
|
||
|
self._display.blit(
|
||
|
surface, (2 * gap_x + MINI_WINDOW_WIDTH, mini_image_y))
|
||
|
|
||
|
if self._lidar_measurement is not None:
|
||
|
lidar_data = np.array(self._lidar_measurement.data[:, :2])
|
||
|
lidar_data *= 2.0
|
||
|
lidar_data += 100.0
|
||
|
lidar_data = np.fabs(lidar_data)
|
||
|
lidar_data = lidar_data.astype(np.int32)
|
||
|
lidar_data = np.reshape(lidar_data, (-1, 2))
|
||
|
#draw lidar
|
||
|
lidar_img_size = (200, 200, 3)
|
||
|
lidar_img = np.zeros(lidar_img_size)
|
||
|
lidar_img[tuple(lidar_data.T)] = (255, 255, 255)
|
||
|
surface = pygame.surfarray.make_surface(lidar_img)
|
||
|
self._display.blit(surface, (10, 10))
|
||
|
|
||
|
if self._map_view is not None:
|
||
|
array = self._map_view
|
||
|
array = array[:, :, :3]
|
||
|
|
||
|
new_window_width = \
|
||
|
(float(WINDOW_HEIGHT) / float(self._map_shape[0])) * \
|
||
|
float(self._map_shape[1])
|
||
|
surface = pygame.surfarray.make_surface(array.swapaxes(0, 1))
|
||
|
|
||
|
w_pos = int(self._position[0]*(float(WINDOW_HEIGHT)/float(self._map_shape[0])))
|
||
|
h_pos = int(self._position[1] *(new_window_width/float(self._map_shape[1])))
|
||
|
|
||
|
pygame.draw.circle(surface, [255, 0, 0, 255], (w_pos, h_pos), 6, 0)
|
||
|
for agent in self._agent_positions:
|
||
|
if agent.HasField('vehicle'):
|
||
|
agent_position = self._map.convert_to_pixel([
|
||
|
agent.vehicle.transform.location.x,
|
||
|
agent.vehicle.transform.location.y,
|
||
|
agent.vehicle.transform.location.z])
|
||
|
|
||
|
w_pos = int(agent_position[0]*(float(WINDOW_HEIGHT)/float(self._map_shape[0])))
|
||
|
h_pos = int(agent_position[1] *(new_window_width/float(self._map_shape[1])))
|
||
|
|
||
|
pygame.draw.circle(surface, [255, 0, 255, 255], (w_pos, h_pos), 4, 0)
|
||
|
|
||
|
self._display.blit(surface, (WINDOW_WIDTH, 0))
|
||
|
|
||
|
pygame.display.flip()
|
||
|
|
||
|
|
||
|
def main():
|
||
|
argparser = argparse.ArgumentParser(
|
||
|
description='CARLA Manual Control Client')
|
||
|
argparser.add_argument(
|
||
|
'-v', '--verbose',
|
||
|
action='store_true',
|
||
|
dest='debug',
|
||
|
help='print debug information')
|
||
|
argparser.add_argument(
|
||
|
'--host',
|
||
|
metavar='H',
|
||
|
default='localhost',
|
||
|
help='IP of the host server (default: localhost)')
|
||
|
argparser.add_argument(
|
||
|
'-p', '--port',
|
||
|
metavar='P',
|
||
|
default=2000,
|
||
|
type=int,
|
||
|
help='TCP port to listen to (default: 2000)')
|
||
|
argparser.add_argument(
|
||
|
'-a', '--autopilot',
|
||
|
action='store_true',
|
||
|
help='enable autopilot')
|
||
|
argparser.add_argument(
|
||
|
'-l', '--lidar',
|
||
|
action='store_true',
|
||
|
help='enable Lidar')
|
||
|
argparser.add_argument(
|
||
|
'-q', '--quality-level',
|
||
|
choices=['Low', 'Epic'],
|
||
|
type=lambda s: s.title(),
|
||
|
default='Epic',
|
||
|
help='graphics quality level, a lower level makes the simulation run considerably faster')
|
||
|
argparser.add_argument(
|
||
|
'-m', '--map',
|
||
|
action='store_true',
|
||
|
help='plot the map of the current city')
|
||
|
args = argparser.parse_args()
|
||
|
|
||
|
log_level = logging.DEBUG if args.debug else logging.INFO
|
||
|
logging.basicConfig(format='%(levelname)s: %(message)s', level=log_level)
|
||
|
|
||
|
logging.info('listening to server %s:%s', args.host, args.port)
|
||
|
|
||
|
print(__doc__)
|
||
|
|
||
|
while True:
|
||
|
try:
|
||
|
|
||
|
with make_carla_client(args.host, args.port) as client:
|
||
|
game = CarlaGame(client, args)
|
||
|
game.execute()
|
||
|
break
|
||
|
|
||
|
except TCPConnectionError as error:
|
||
|
logging.error(error)
|
||
|
time.sleep(1)
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
|
||
|
try:
|
||
|
main()
|
||
|
except KeyboardInterrupt:
|
||
|
print('\nCancelled by user. Bye!')
|