#!/usr/bin/env python # Copyright (c) 2019 Computer Vision Center (CVC) at the Universitat Autonoma de # Barcelona (UAB). # # This work is licensed under the terms of the MIT license. # For a copy, see . """Spawn NPCs into the simulation""" import glob import os import sys import time try: sys.path.append(glob.glob('../carla/dist/carla-*%d.%d-%s.egg' % ( sys.version_info.major, sys.version_info.minor, 'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0]) except IndexError: pass import carla import argparse import logging import random def main(): argparser = argparse.ArgumentParser( description=__doc__) argparser.add_argument( '--host', metavar='H', default='127.0.0.1', help='IP of the host server (default: 127.0.0.1)') argparser.add_argument( '-p', '--port', metavar='P', default=2000, type=int, help='TCP port to listen to (default: 2000)') argparser.add_argument( '-n', '--number-of-vehicles', metavar='N', default=10, type=int, help='number of vehicles (default: 10)') argparser.add_argument( '-w', '--number-of-walkers', metavar='W', default=50, type=int, help='number of walkers (default: 50)') argparser.add_argument( '--safe', action='store_true', help='avoid spawning vehicles prone to accidents') argparser.add_argument( '--filterv', metavar='PATTERN', default='vehicle.*', help='vehicles filter (default: "vehicle.*")') argparser.add_argument( '--filterw', metavar='PATTERN', default='walker.pedestrian.*', help='pedestrians filter (default: "walker.pedestrian.*")') args = argparser.parse_args() logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.INFO) vehicles_list = [] walkers_list = [] all_id = [] client = carla.Client(args.host, args.port) client.set_timeout(2.0) try: world = client.get_world() blueprints = world.get_blueprint_library().filter(args.filterv) blueprintsWalkers = world.get_blueprint_library().filter(args.filterw) if args.safe: blueprints = [x for x in blueprints if int(x.get_attribute('number_of_wheels')) == 4] blueprints = [x for x in blueprints if not x.id.endswith('isetta')] blueprints = [x for x in blueprints if not x.id.endswith('carlacola')] spawn_points = world.get_map().get_spawn_points() number_of_spawn_points = len(spawn_points) if args.number_of_vehicles < number_of_spawn_points: random.shuffle(spawn_points) elif args.number_of_vehicles > number_of_spawn_points: msg = 'requested %d vehicles, but could only find %d spawn points' logging.warning(msg, args.number_of_vehicles, number_of_spawn_points) args.number_of_vehicles = number_of_spawn_points # @todo cannot import these directly. SpawnActor = carla.command.SpawnActor SetAutopilot = carla.command.SetAutopilot FutureActor = carla.command.FutureActor # -------------- # Spawn vehicles # -------------- batch = [] for n, transform in enumerate(spawn_points): if n >= args.number_of_vehicles: break blueprint = random.choice(blueprints) if blueprint.has_attribute('color'): color = random.choice(blueprint.get_attribute('color').recommended_values) blueprint.set_attribute('color', color) if blueprint.has_attribute('driver_id'): driver_id = random.choice(blueprint.get_attribute('driver_id').recommended_values) blueprint.set_attribute('driver_id', driver_id) blueprint.set_attribute('role_name', 'autopilot') batch.append(SpawnActor(blueprint, transform).then(SetAutopilot(FutureActor, True))) for response in client.apply_batch_sync(batch): if response.error: logging.error(response.error) else: vehicles_list.append(response.actor_id) # ------------- # Spawn Walkers # ------------- # some settings percentagePedestriansRunning = 0.0 # how many pedestrians will run percentagePedestriansCrossing = 0.0 # how many pedestrians will walk through the road # 1. take all the random locations to spawn spawn_points = [] for i in range(args.number_of_walkers): spawn_point = carla.Transform() loc = world.get_random_location_from_navigation() if (loc != None): spawn_point.location = loc spawn_points.append(spawn_point) # 2. we spawn the walker object batch = [] walker_speed = [] for spawn_point in spawn_points: walker_bp = random.choice(blueprintsWalkers) # set as not invencible if walker_bp.has_attribute('is_invincible'): walker_bp.set_attribute('is_invincible', 'false') # set the max speed if walker_bp.has_attribute('speed'): if (random.random() > percentagePedestriansRunning): # walking walker_speed.append(walker_bp.get_attribute('speed').recommended_values[1]) else: # running walker_speed.append(walker_bp.get_attribute('speed').recommended_values[2]) else: print("Walker has no speed") walker_speed.append(0.0) batch.append(SpawnActor(walker_bp, spawn_point)) results = client.apply_batch_sync(batch, True) walker_speed2 = [] for i in range(len(results)): if results[i].error: logging.error(results[i].error) else: walkers_list.append({"id": results[i].actor_id}) walker_speed2.append(walker_speed[i]) walker_speed = walker_speed2 # 3. we spawn the walker controller batch = [] walker_controller_bp = world.get_blueprint_library().find('controller.ai.walker') for i in range(len(walkers_list)): batch.append(SpawnActor(walker_controller_bp, carla.Transform(), walkers_list[i]["id"])) results = client.apply_batch_sync(batch, True) for i in range(len(results)): if results[i].error: logging.error(results[i].error) else: walkers_list[i]["con"] = results[i].actor_id # 4. we put altogether the walkers and controllers id to get the objects from their id for i in range(len(walkers_list)): all_id.append(walkers_list[i]["con"]) all_id.append(walkers_list[i]["id"]) all_actors = world.get_actors(all_id) # wait for a tick to ensure client receives the last transform of the walkers we have just created world.wait_for_tick() # 5. initialize each controller and set target to walk to (list is [controler, actor, controller, actor ...]) # set how many pedestrians can cross the road world.set_pedestrians_cross_factor(percentagePedestriansCrossing) for i in range(0, len(all_id), 2): # start walker all_actors[i].start() # set walk to random point all_actors[i].go_to_location(world.get_random_location_from_navigation()) # max speed all_actors[i].set_max_speed(float(walker_speed[int(i/2)])) print('spawned %d vehicles and %d walkers, press Ctrl+C to exit.' % (len(vehicles_list), len(walkers_list))) while True: world.wait_for_tick() finally: print('\ndestroying %d vehicles' % len(vehicles_list)) client.apply_batch([carla.command.DestroyActor(x) for x in vehicles_list]) # stop walker controllers (list is [controller, actor, controller, actor ...]) for i in range(0, len(all_id), 2): all_actors[i].stop() print('\ndestroying %d walkers' % len(walkers_list)) client.apply_batch([carla.command.DestroyActor(x) for x in all_id]) time.sleep(0.5) if __name__ == '__main__': try: main() except KeyboardInterrupt: pass finally: print('\ndone.')