carla/PythonAPI/automatic_control.py

413 lines
14 KiB
Python

#!/usr/bin/env python
# Copyright (c) 2018 Intel Labs.
# authors: German Ros (german.ros@intel.com)
#
# This work is licensed under the terms of the MIT license.
# For a copy, see <https://opensource.org/licenses/MIT>.
"""
Example of automatic vehicle control from client side.
"""
from __future__ import print_function
import argparse
import logging
import re
import time
import weakref
try:
import pygame
except ImportError:
raise RuntimeError(
'cannot import pygame, make sure pygame package is installed')
try:
import numpy as np
except ImportError:
raise RuntimeError(
'cannot import numpy, make sure numpy package is installed')
import carla
from carla import ColorConverter as cc
from agents.navigation.roaming_agent import *
# ==============================================================================
# -- World ---------------------------------------------------------------
# ==============================================================================
def find_weather_presets():
rgx = re.compile('.+?(?:(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])|$)')
def name(x): return ' '.join(m.group(0) for m in rgx.finditer(x))
presets = [
x for x in dir(
carla.WeatherParameters) if re.match(
'[A-Z].+', x)]
return [(getattr(carla.WeatherParameters, x), name(x)) for x in presets]
class World(object):
def __init__(self, carla_world, hud):
self.world = carla_world
self.hud = hud
blueprints = self.world.get_blueprint_library().filter('vehicle')
blueprint = [e for i, e in enumerate(blueprints) if e.id == 'vehicle.lincoln.mkz2017'][0]
spawn_points = self.world.get_map().get_spawn_points()
# random.choice(spawn_points) if spawn_points else carla.Transform()
spawn_point = spawn_points[1]
self.vehicle = self.world.spawn_actor(blueprint, spawn_point)
self.collision_sensor = CollisionSensor(self.vehicle, self.hud)
self.camera_manager = CameraManager(self.vehicle, self.hud)
self.camera_manager.set_sensor(0, notify=False)
self.controller = None
self._weather_presets = find_weather_presets()
self._weather_index = 0
def restart(self):
cam_index = self.camera_manager._index
cam_pos_index = self.camera_manager._transform_index
start_pose = self.vehicle.get_transform()
start_pose.location.z += 2.0
start_pose.rotation.roll = 0.0
start_pose.rotation.pitch = 0.0
blueprint = self._get_random_blueprint()
self.destroy()
self.vehicle = self.world.spawn_actor(blueprint, start_pose)
self.collision_sensor = CollisionSensor(self.vehicle, self.hud)
self.camera_manager = CameraManager(self.vehicle, self.hud)
self.camera_manager._transform_index = cam_pos_index
self.camera_manager.set_sensor(cam_index, notify=False)
actor_type = ' '.join(
self.vehicle.type_id.replace(
'_', '.').title().split('.')[
1:])
self.hud.notification(actor_type)
def next_weather(self, reverse=False):
self._weather_index += -1 if reverse else 1
self._weather_index %= len(self._weather_presets)
preset = self._weather_presets[self._weather_index]
self.hud.notification('Weather: %s' % preset[1])
self.vehicle.get_world().set_weather(preset[0])
def tick(self, clock):
self.hud.tick(self, clock)
def render(self, display):
self.camera_manager.render(display)
self.hud.render(display)
def destroy(self):
for actor in [self.camera_manager.sensor,
self.collision_sensor.sensor, self.vehicle]:
if actor is not None:
actor.destroy()
def _get_random_blueprint(self):
bp = random.choice(self.world.get_blueprint_library().filter('tesla'))
if bp.has_attribute('color'):
color = random.choice(bp.get_attribute('color').recommended_values)
bp.set_attribute('color', color)
return bp
# ==============================================================================
# -- HUD -----------------------------------------------------------------
# ==============================================================================
class HUD(object):
def __init__(self, width, height):
self.dim = (width, height)
font = pygame.font.Font(pygame.font.get_default_font(), 20)
# hope for the best...
mono = next(x for x in pygame.font.get_fonts() if 'mono' in x)
mono = pygame.font.match_font(mono, bold=True)
self._font_mono = pygame.font.Font(mono, 14)
self._notifications = FadingText(font, (width, 40), (0, height - 40))
self.help = HelpText(pygame.font.Font(mono, 24), width, height)
def tick(self, world, clock):
self._notifications.tick(world, clock)
def notification(self, text, seconds=2.0):
self._notifications.set_text(text, seconds=seconds)
def error(self, text):
self._notifications.set_text('Error: %s' % text, (255, 0, 0))
def render(self, display):
self._notifications.render(display)
self.help.render(display)
# ==============================================================================
# -- FadingText ----------------------------------------------------------
# ==============================================================================
class FadingText(object):
def __init__(self, font, dim, pos):
self.font = font
self.dim = dim
self.pos = pos
self.seconds_left = 0
self.surface = pygame.Surface(self.dim)
def set_text(self, text, color=(255, 255, 255), seconds=2.0):
text_texture = self.font.render(text, True, color)
self.surface = pygame.Surface(self.dim)
self.seconds_left = seconds
self.surface.fill((0, 0, 0, 0))
self.surface.blit(text_texture, (10, 11))
def tick(self, _, clock):
delta_seconds = 1e-3 * clock.get_time()
self.seconds_left = max(0.0, self.seconds_left - delta_seconds)
self.surface.set_alpha(500.0 * self.seconds_left)
def render(self, display):
display.blit(self.surface, self.pos)
# ==============================================================================
# -- HelpText ------------------------------------------------------------
# ==============================================================================
class HelpText(object):
def __init__(self, font, width, height):
lines = __doc__.split('\n')
self.font = font
self.dim = (680, len(lines) * 22 + 12)
self.pos = (
0.5 *
width -
0.5 *
self.dim[0],
0.5 *
height -
0.5 *
self.dim[1])
self.seconds_left = 0
self.surface = pygame.Surface(self.dim)
self.surface.fill((0, 0, 0, 0))
for n, line in enumerate(lines):
text_texture = self.font.render(line, True, (255, 255, 255))
self.surface.blit(text_texture, (22, n * 22))
self._render = False
self.surface.set_alpha(220)
def toggle(self):
self._render = not self._render
def render(self, display):
if self._render:
display.blit(self.surface, self.pos)
# ==============================================================================
# -- CollisionSensor -----------------------------------------------------
# ==============================================================================
class CollisionSensor(object):
def __init__(self, parent_actor, hud):
self.sensor = None
self._parent = parent_actor
self._hud = hud
world = self._parent.get_world()
bp = world.get_blueprint_library().find('sensor.other.collision')
self.sensor = world.spawn_actor(
bp, carla.Transform(), attach_to=self._parent)
# We need to pass the lambda a weak reference to self to avoid circular
# reference.
weak_self = weakref.ref(self)
self.sensor.listen(
lambda event: CollisionSensor._on_collision(
weak_self, event))
@staticmethod
def _on_collision(weak_self, event):
self = weak_self()
if not self:
return
actor_type = ' '.join(
event.other_actor.type_id.replace(
'_', '.').title().split('.')[
1:])
self._hud.notification('Collision with %r' % actor_type)
# ==============================================================================
# -- CameraManager -------------------------------------------------------
# ==============================================================================
class CameraManager(object):
def __init__(self, parent_actor, hud):
self.sensor = None
self._surface = None
self._parent = parent_actor
self._hud = hud
self._recording = False
self._camera_transforms = [
carla.Transform(carla.Location(x=1.6, z=1.7)),
carla.Transform(
carla.Location(
x=24, z=28.0), carla.Rotation(
roll=-90, pitch=-90)),
carla.Transform(carla.Location(x=-5.5, z=2.8), carla.Rotation(pitch=-15))]
self._transform_index = 2
self._sensors = [
['sensor.camera.rgb', cc.Raw, 'Camera RGB']]
world = self._parent.get_world()
bp_library = world.get_blueprint_library()
for item in self._sensors:
bp = bp_library.find(item[0])
if item[0].startswith('sensor.camera'):
bp.set_attribute('image_size_x', str(hud.dim[0]))
bp.set_attribute('image_size_y', str(hud.dim[1]))
item.append(bp)
self._index = None
self._server_clock = pygame.time.Clock()
def set_sensor(self, index, notify=True):
index = index % len(self._sensors)
needs_respawn = True if self._index is None \
else self._sensors[index][0] != self._sensors[self._index][0]
if needs_respawn:
if self.sensor is not None:
self.sensor.destroy()
self._surface = None
self.sensor = self._parent.get_world().spawn_actor(
self._sensors[index][-1],
self._camera_transforms[self._transform_index],
attach_to=self._parent)
# We need to pass the lambda a weak reference to self to avoid
# circular reference.
weak_self = weakref.ref(self)
self.sensor.listen(
lambda image: CameraManager._parse_image(
weak_self, image))
if notify:
self._hud.notification(self._sensors[index][2])
self._index = index
def render(self, display):
if self._surface is not None:
display.blit(self._surface, (0, 0))
@staticmethod
def _parse_image(weak_self, image):
self = weak_self()
if not self:
return
self._server_clock.tick()
self._hud.server_fps = self._server_clock.get_fps()
image.convert(self._sensors[self._index][1])
array = np.frombuffer(image.raw_data, dtype=np.dtype("uint8"))
array = np.reshape(array, (image.height, image.width, 4))
array = array[:, :, :3]
array = array[:, :, ::-1]
self._surface = pygame.surfarray.make_surface(array.swapaxes(0, 1))
# ==============================================================================
# -- game_loop() ---------------------------------------------------------
# ==============================================================================
def game_loop(args):
pygame.init()
pygame.font.init()
world = None
try:
client = carla.Client(args.host, args.port)
client.set_timeout(4.0)
display = pygame.display.set_mode(
(args.width, args.height),
pygame.HWSURFACE | pygame.DOUBLEBUF)
hud = HUD(args.width, args.height)
world = World(client.get_world(), hud)
agent = RoamingAgent(world.vehicle)
time.sleep(1)
while True:
# as soon as the server is ready continue!
if not world.world.wait_for_tick(10.0):
continue
world.render(display)
pygame.display.flip()
control = agent.run_step()
world.vehicle.apply_control(control)
finally:
if world is not None:
world.destroy()
pygame.quit()
# ==============================================================================
# -- main() --------------------------------------------------------------
# ==============================================================================
def main():
argparser = argparse.ArgumentParser(
description='CARLA Manual Control Client')
argparser.add_argument(
'-v', '--verbose',
action='store_true',
dest='debug',
help='print debug information')
argparser.add_argument(
'--host',
metavar='H',
default='127.0.0.1',
help='IP of the host server (default: 127.0.0.1)')
argparser.add_argument(
'-p', '--port',
metavar='P',
default=2000,
type=int,
help='TCP port to listen to (default: 2000)')
argparser.add_argument(
'--res',
metavar='WIDTHxHEIGHT',
default='1280x720',
help='window resolution (default: 1280x720)')
args = argparser.parse_args()
args.width, args.height = [int(x) for x in args.res.split('x')]
log_level = logging.DEBUG if args.debug else logging.INFO
logging.basicConfig(format='%(levelname)s: %(message)s', level=log_level)
logging.info('listening to server %s:%s', args.host, args.port)
print(__doc__)
try:
game_loop(args)
except KeyboardInterrupt:
print('\nCancelled by user. Bye!')
except Exception as error:
logging.exception(error)
if __name__ == '__main__':
main()