New behavior agent + small changes to manual and automatic control. (#2508)

* pr agent behaviors

* pr agent behaviors

* Add files via upload

* Add files via upload

* Add files via upload

* Add files via upload

* limits for steering

* Add files via upload

* pylint formatting

* pylint formatting

* seed flag for repeatability

* seed flag for repeatability

* Add files via upload

* Add files via upload

* first pr review corrections

* Add files via upload

* Add files via upload

* Delete parameters.py

* Add files via upload

* Add files via upload

* Add files via upload

* Add files via upload

* start of pedestrian avoidance + minor fixes

* start of pedestrian avoidance + minor fixes

* get_map check

* get_map check

* get_map check.

* Delete basic_agent.py

* Delete roaming_agent.py

* pylint final changes

* Pylint final check

* more pylint

* minor fix

* minor changes

* Add files via upload

* changes in PID + old models

* trying to stabilize waypoint distance

* fixed step in sampling radius

* folder structure change and more stuff

* pylint fix

* re-introducing pedestrian avoidance

* minor things

* deleted unused folder

* restore basic folder

* resurrected behavior agent

* set debug to false

* remove prints + open PR

* changelog

* setting behavior agent as default in auto_control
changing manual_control to have slower increasing braking too

* Requested modifications PR

The vehicle can now accelerate and brake at the same time again.
Removed unnecessary  variable.

* Fix frame rate display for the client

* removing unnecessary duplication
reordering folders
code cleanup for codacity

* unused import

* codacity and travis changes

Co-authored-by: Joel Moriana <joel.moriana@gmail.com>
This commit is contained in:
Jacopo Bartiromo 2020-03-17 11:31:05 +01:00 committed by GitHub
parent 80708b9daa
commit 4bc53a7f6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1274 additions and 281 deletions

View File

@ -4,6 +4,8 @@
## CARLA 0.9.8
* Improved manual_control by adding realistic throttle and brake
* Added new Behavior agent
* Added beta version sumo-carla co-simulation.
* Traffic Manager:
- Added benchmark

View File

@ -1,7 +1,4 @@
#!/usr/bin/env python
# Copyright (c) 2018 Intel Labs.
# authors: German Ros (german.ros@intel.com)
# Copyright (c) # Copyright (c) 2018-2020 CVC.
#
# This work is licensed under the terms of the MIT license.
# For a copy, see <https://opensource.org/licenses/MIT>.
@ -10,12 +7,13 @@
waypoints and avoiding other vehicles.
The agent also responds to traffic lights. """
import sys
import math
from enum import Enum
import math
import carla
from agents.tools.misc import is_within_distance_ahead
from agents.tools.misc import is_within_distance_ahead, is_within_distance, compute_distance
class AgentState(Enum):
"""
@ -27,12 +25,11 @@ class AgentState(Enum):
class Agent(object):
"""
Base class to define agents in CARLA
"""
"""Base class to define agents in CARLA"""
def __init__(self, vehicle):
"""
Constructor method.
:param vehicle: actor to apply to local planner logic onto
"""
@ -41,12 +38,25 @@ class Agent(object):
self._proximity_vehicle_threshold = 10.0 # meters
self._local_planner = None
self._world = self._vehicle.get_world()
self._map = self._vehicle.get_world().get_map()
try:
self._map = self._world.get_map()
except RuntimeError as error:
print('RuntimeError: {}'.format(error))
print(' The server could not send the OpenDRIVE (.xodr) file:')
print(' Make sure it exists, has the same name of your town, and is correct.')
sys.exit(1)
self._last_traffic_light = None
def run_step(self, debug=False):
def get_local_planner(self):
"""Get method for protected member local planner"""
return self._local_planner
@staticmethod
def run_step(debug=False):
"""
Execute one step of navigation.
:param debug: boolean flag for debugging
:return: control
"""
control = carla.VehicleControl()
@ -120,16 +130,64 @@ class Agent(object):
return carla.Location(point_location.x, point_location.y, point_location.z)
def _is_vehicle_hazard(self, vehicle_list):
def _bh_is_vehicle_hazard(self, ego_wpt, ego_loc, vehicle_list,
proximity_th, up_angle_th, low_angle_th=0, lane_offset=0):
"""
Check if a given vehicle is an obstacle in our way. To this end we take
into account the road and lane the target vehicle is on and run a
geometry test to check if the target vehicle is under a certain distance
in front of our ego vehicle.
in front of our ego vehicle. We also check the next waypoint, just to be
sure there's not a sudden road id change.
WARNING: This method is an approximation that could fail for very large
vehicles, which center is actually on a different lane but their
extension falls within the ego vehicle lane.
extension falls within the ego vehicle lane. Also, make sure to remove
the ego vehicle from the list. Lane offset is set to +1 for right lanes
and -1 for left lanes, but this has to be inverted if lane values are
negative.
:param ego_wpt: waypoint of ego-vehicle
:param ego_log: location of ego-vehicle
:param vehicle_list: list of potential obstacle to check
:param proximity_th: threshold for the agent to be alerted of
a possible collision
:param up_angle_th: upper threshold for angle
:param low_angle_th: lower threshold for angle
:param lane_offset: for right and left lane changes
:return: a tuple given by (bool_flag, vehicle, distance), where:
- bool_flag is True if there is a vehicle ahead blocking us
and False otherwise
- vehicle is the blocker object itself
- distance is the meters separating the two vehicles
"""
# Get the right offset
if ego_wpt.lane_id < 0 and lane_offset != 0:
lane_offset *= -1
for target_vehicle in vehicle_list:
target_vehicle_loc = target_vehicle.get_location()
# If the object is not in our next or current lane it's not an obstacle
target_wpt = self._map.get_waypoint(target_vehicle_loc)
if target_wpt.road_id != ego_wpt.road_id or \
target_wpt.lane_id != ego_wpt.lane_id + lane_offset:
next_wpt = self._local_planner.get_incoming_waypoint_and_direction(steps=5)[0]
if target_wpt.road_id != next_wpt.road_id or \
target_wpt.lane_id != next_wpt.lane_id + lane_offset:
continue
if is_within_distance(target_vehicle_loc, ego_loc,
self._vehicle.get_transform().rotation.yaw,
proximity_th, up_angle_th, low_angle_th):
return (True, target_vehicle, compute_distance(target_vehicle_loc, ego_loc))
return (False, None, -1)
def _is_vehicle_hazard(self, vehicle_list):
"""
:param vehicle_list: list of potential obstacle to check
:return: a tuple given by (bool_flag, vehicle), where
@ -159,10 +217,13 @@ class Agent(object):
return (False, None)
def emergency_stop(self):
@staticmethod
def emergency_stop():
"""
Send an emergency stop command to the vehicle
:return:
:return: control for braking
"""
control = carla.VehicleControl()
control.steer = 0.0

View File

@ -1,7 +1,4 @@
#!/usr/bin/env python
# Copyright (c) 2018 Intel Labs.
# authors: German Ros (german.ros@intel.com)
# Copyright (c) # Copyright (c) 2018-2020 CVC.
#
# This work is licensed under the terms of the MIT license.
# For a copy, see <https://opensource.org/licenses/MIT>.
@ -58,7 +55,6 @@ class BasicAgent(Agent):
carla.Location(location[0], location[1], location[2]))
route_trace = self._trace_route(start_waypoint, end_waypoint)
assert route_trace
self._local_planner.set_global_plan(route_trace)

View File

@ -0,0 +1,428 @@
# Copyright (c) # Copyright (c) 2018-2020 CVC.
#
# This work is licensed under the terms of the MIT license.
# For a copy, see <https://opensource.org/licenses/MIT>.
""" This module implements an agent that roams around a track following random
waypoints and avoiding other vehicles. The agent also responds to traffic lights,
traffic signs, and has different possible configurations. """
import random
import numpy as np
import carla
from agents.navigation.agent import Agent
from agents.navigation.local_planner_behavior import LocalPlanner, RoadOption
from agents.navigation.global_route_planner import GlobalRoutePlanner
from agents.navigation.global_route_planner_dao import GlobalRoutePlannerDAO
from agents.navigation.types_behavior import Cautious, Aggressive, Normal
from agents.tools.misc import get_speed, positive
class BehaviorAgent(Agent):
"""
BehaviorAgent implements an agent that navigates scenes to reach a given
target destination, by computing the shortest possible path to it.
This agent can correctly follow traffic signs, speed limitations,
traffic lights, while also taking into account nearby vehicles. Lane changing
decisions can be taken by analyzing the surrounding environment,
such as overtaking or tailgating avoidance. Adding to these are possible
behaviors, the agent can also keep safety distance from a car in front of it
by tracking the instantaneous time to collision and keeping it in a certain range.
Finally, different sets of behaviors are encoded in the agent, from cautious
to a more aggressive ones.
"""
def __init__(self, vehicle, ignore_traffic_light=False, behavior='normal'):
"""
Constructor method.
:param vehicle: actor to apply to local planner logic onto
:param ignore_traffic_light: boolean to ignore any traffic light
:param behavior: type of agent to apply
"""
super(BehaviorAgent, self).__init__(vehicle)
self.vehicle = vehicle
self.ignore_traffic_light = ignore_traffic_light
self._local_planner = LocalPlanner(self)
self._grp = None
self.look_ahead_steps = 0
# Vehicle information
self.speed = 0
self.speed_limit = 0
self.direction = None
self.incoming_direction = None
self.incoming_waypoint = None
self.start_waypoint = None
self.end_waypoint = None
self.is_at_traffic_light = 0
self.light_state = "Green"
self.light_id_to_ignore = -1
self.min_speed = 5
self.behavior = None
self._sampling_resolution = 4.5
# Parameters for agent behavior
if behavior == 'cautious':
self.behavior = Cautious()
elif behavior == 'normal':
self.behavior = Normal()
elif behavior == 'aggressive':
self.behavior = Aggressive()
def update_information(self, world):
"""
This method updates the information regarding the ego
vehicle based on the surrounding world.
:param world: carla.world object
"""
self.speed = get_speed(self.vehicle)
self.speed_limit = world.player.get_speed_limit()
self._local_planner.set_speed(self.speed_limit)
self.direction = self._local_planner.target_road_option
if self.direction is None:
self.direction = RoadOption.LANEFOLLOW
self.look_ahead_steps = int((self.speed_limit) / 10)
self.incoming_waypoint, self.incoming_direction = self._local_planner.get_incoming_waypoint_and_direction(
steps=self.look_ahead_steps)
if self.incoming_direction is None:
self.incoming_direction = RoadOption.LANEFOLLOW
self.is_at_traffic_light = world.player.is_at_traffic_light()
if self.ignore_traffic_light:
self.light_state = "Green"
else:
# This method also includes stop signs and intersections.
self.light_state = str(self.vehicle.get_traffic_light_state())
def set_destination(self, start_location, end_location, clean=False):
"""
This method creates a list of waypoints from agent's position to destination location
based on the route returned by the global router.
:param start_location: initial position
:param end_location: final position
:param clean: boolean to clean the waypoint queue
"""
if clean:
self._local_planner.waypoints_queue.clear()
self.start_waypoint = self._map.get_waypoint(start_location)
self.end_waypoint = self._map.get_waypoint(end_location)
route_trace = self._trace_route(self.start_waypoint, self.end_waypoint)
self._local_planner.set_global_plan(route_trace)
def reroute(self, spawn_points):
"""
This method implements re-routing for vehicles approaching its destination.
It finds a new target and computes another path to reach it.
:param spawn_points: list of possible destinations for the agent
"""
print("Target almost reached, setting new destination...")
random.shuffle(spawn_points)
new_start = self._local_planner.waypoints_queue[-1][0].transform.location
destination = spawn_points[0].location if spawn_points[0].location != new_start else spawn_points[1].location
print("New destination: " + str(destination))
self.set_destination(new_start, destination)
def _trace_route(self, start_waypoint, end_waypoint):
"""
This method sets up a global router and returns the
optimal route from start_waypoint to end_waypoint.
:param start_waypoint: initial position
:param end_waypoint: final position
"""
# Setting up global router
if self._grp is None:
wld = self.vehicle.get_world()
dao = GlobalRoutePlannerDAO(
wld.get_map(), sampling_resolution=self._sampling_resolution)
grp = GlobalRoutePlanner(dao)
grp.setup()
self._grp = grp
# Obtain route plan
route = self._grp.trace_route(
start_waypoint.transform.location,
end_waypoint.transform.location)
return route
def traffic_light_manager(self, waypoint):
"""
This method is in charge of behaviors for red lights and stops.
WARNING: What follows is a proxy to avoid having a car brake after running a yellow light.
This happens because the car is still under the influence of the semaphore,
even after passing it. So, the semaphore id is temporarely saved to
ignore it and go around this issue, until the car is near a new one.
:param waypoint: current waypoint of the agent
"""
light_id = self.vehicle.get_traffic_light().id if self.vehicle.get_traffic_light() is not None else -1
if self.light_state == "Red":
if not waypoint.is_junction and (self.light_id_to_ignore != light_id or light_id == -1):
return 1
elif waypoint.is_junction and light_id != -1:
self.light_id_to_ignore = light_id
if self.light_id_to_ignore != light_id:
self.light_id_to_ignore = -1
return 0
def _overtake(self, location, waypoint, vehicle_list):
"""
This method is in charge of overtaking behaviors.
:param location: current location of the agent
:param waypoint: current waypoint of the agent
:param vehicle_list: list of all the nearby vehicles
"""
left_turn = waypoint.left_lane_marking.lane_change
right_turn = waypoint.right_lane_marking.lane_change
left_wpt = waypoint.get_left_lane()
right_wpt = waypoint.get_right_lane()
if (left_turn == carla.LaneChange.Left or left_turn ==
carla.LaneChange.Both) and waypoint.lane_id * left_wpt.lane_id > 0 and left_wpt.lane_type == carla.LaneType.Driving:
new_vehicle_state, _, _ = self._bh_is_vehicle_hazard(waypoint, location, vehicle_list, max(
self.behavior.min_proximity_threshold, self.speed_limit / 3), up_angle_th=180, lane_offset=-1)
if not new_vehicle_state:
print("Overtaking to the left!")
self.behavior.overtake_counter = 200
self.set_destination(left_wpt.transform.location,
self.end_waypoint.transform.location, clean=True)
elif right_turn == carla.LaneChange.Right and waypoint.lane_id * right_wpt.lane_id > 0 and right_wpt.lane_type == carla.LaneType.Driving:
new_vehicle_state, _, _ = self._bh_is_vehicle_hazard(waypoint, location, vehicle_list, max(
self.behavior.min_proximity_threshold, self.speed_limit / 3), up_angle_th=180, lane_offset=1)
if not new_vehicle_state:
print("Overtaking to the right!")
self.behavior.overtake_counter = 200
self.set_destination(right_wpt.transform.location,
self.end_waypoint.transform.location, clean=True)
def _tailgating(self, location, waypoint, vehicle_list):
"""
This method is in charge of tailgating behaviors.
:param location: current location of the agent
:param waypoint: current waypoint of the agent
:param vehicle_list: list of all the nearby vehicles
"""
left_turn = waypoint.left_lane_marking.lane_change
right_turn = waypoint.right_lane_marking.lane_change
left_wpt = waypoint.get_left_lane()
right_wpt = waypoint.get_right_lane()
behind_vehicle_state, behind_vehicle, _ = self._bh_is_vehicle_hazard(waypoint, location, vehicle_list, max(
self.behavior.min_proximity_threshold, self.speed_limit / 2), up_angle_th=180, low_angle_th=160)
if behind_vehicle_state and self.speed < get_speed(behind_vehicle):
if (right_turn == carla.LaneChange.Right or right_turn ==
carla.LaneChange.Both) and waypoint.lane_id * right_wpt.lane_id > 0 and right_wpt.lane_type == carla.LaneType.Driving:
new_vehicle_state, _, _ = self._bh_is_vehicle_hazard(waypoint, location, vehicle_list, max(
self.behavior.min_proximity_threshold, self.speed_limit / 2), up_angle_th=180, lane_offset=1)
if not new_vehicle_state:
print("Tailgating, moving to the right!")
self.behavior.tailgate_counter = 200
self.set_destination(right_wpt.transform.location,
self.end_waypoint.transform.location, clean=True)
elif left_turn == carla.LaneChange.Left and waypoint.lane_id * left_wpt.lane_id > 0 and left_wpt.lane_type == carla.LaneType.Driving:
new_vehicle_state, _, _ = self._bh_is_vehicle_hazard(waypoint, location, vehicle_list, max(
self.behavior.min_proximity_threshold, self.speed_limit / 2), up_angle_th=180, lane_offset=-1)
if not new_vehicle_state:
print("Tailgating, moving to the left!")
self.behavior.tailgate_counter = 200
self.set_destination(left_wpt.transform.location,
self.end_waypoint.transform.location, clean=True)
def collision_and_car_avoid_manager(self, location, waypoint):
"""
This module is in charge of warning in case of a collision
and managing possible overtaking or tailgating chances.
:param location: current location of the agent
:param waypoint: current waypoint of the agent
:return vehicle_state: True if there is a vehicle nearby, False if not
:return vehicle: nearby vehicle
:return distance: distance to nearby vehicle
"""
vehicle_list = self._world.get_actors().filter("*vehicle*")
def dist(v): return v.get_location().distance(waypoint.transform.location)
vehicle_list = [v for v in vehicle_list if dist(v) < 45 and v.id != self.vehicle.id]
if self.direction == RoadOption.CHANGELANELEFT:
vehicle_state, vehicle, distance = self._bh_is_vehicle_hazard(
waypoint, location, vehicle_list, max(
self.behavior.min_proximity_threshold, self.speed_limit / 2), up_angle_th=180, lane_offset=-1)
elif self.direction == RoadOption.CHANGELANERIGHT:
vehicle_state, vehicle, distance = self._bh_is_vehicle_hazard(
waypoint, location, vehicle_list, max(
self.behavior.min_proximity_threshold, self.speed_limit / 2), up_angle_th=180, lane_offset=1)
else:
vehicle_state, vehicle, distance = self._bh_is_vehicle_hazard(
waypoint, location, vehicle_list, max(
self.behavior.min_proximity_threshold, self.speed_limit / 3), up_angle_th=30)
# Check for overtaking
if vehicle_state and self.direction == RoadOption.LANEFOLLOW and \
not waypoint.is_junction and self.speed > 10 \
and self.behavior.overtake_counter == 0 and self.speed > get_speed(vehicle):
self._overtake(location, waypoint, vehicle_list)
# Check for tailgating
elif not vehicle_state and self.direction == RoadOption.LANEFOLLOW \
and not waypoint.is_junction and self.speed > 10 \
and self.behavior.tailgate_counter == 0:
self._tailgating(location, waypoint, vehicle_list)
return vehicle_state, vehicle, distance
def pedestrian_avoid_manager(self, location, waypoint):
"""
This module is in charge of warning in case of a collision
with any pedestrian.
:param location: current location of the agent
:param waypoint: current waypoint of the agent
:return vehicle_state: True if there is a walker nearby, False if not
:return vehicle: nearby walker
:return distance: distance to nearby walker
"""
walker_list = self._world.get_actors().filter("*walker.pedestrian*")
def dist(w): return w.get_location().distance(waypoint.transform.location)
walker_list = [w for w in walker_list if dist(w) < 10]
if self.direction == RoadOption.CHANGELANELEFT:
walker_state, walker, distance = self._bh_is_vehicle_hazard(waypoint, location, walker_list, max(
self.behavior.min_proximity_threshold, self.speed_limit / 2), up_angle_th=90, lane_offset=-1)
elif self.direction == RoadOption.CHANGELANERIGHT:
walker_state, walker, distance = self._bh_is_vehicle_hazard(waypoint, location, walker_list, max(
self.behavior.min_proximity_threshold, self.speed_limit / 2), up_angle_th=90, lane_offset=1)
else:
walker_state, walker, distance = self._bh_is_vehicle_hazard(waypoint, location, walker_list, max(
self.behavior.min_proximity_threshold, self.speed_limit / 3), up_angle_th=60)
return walker_state, walker, distance
def car_following_manager(self, vehicle, distance, debug=False):
"""
Module in charge of car-following behaviors when there's
someone in front of us.
:param vehicle: car to follow
:param distance: distance from vehicle
:param debug: boolean for debugging
:return control: carla.VehicleControl
"""
vehicle_speed = get_speed(vehicle)
delta_v = max(1, (self.speed - vehicle_speed) / 3.6)
ttc = distance / delta_v if delta_v != 0 else distance / np.nextafter(0., 1.)
# Under safety time distance, slow down.
if self.behavior.safety_time > ttc > 0.0:
control = self._local_planner.run_step(
target_speed=min(positive(vehicle_speed - self.behavior.speed_decrease),
min(self.behavior.max_speed, self.speed_limit - self.behavior.speed_lim_dist)), debug=debug)
# Actual safety distance area, try to follow the speed of the vehicle in front.
elif 2 * self.behavior.safety_time > ttc >= self.behavior.safety_time:
control = self._local_planner.run_step(
target_speed=min(max(self.min_speed, vehicle_speed),
min(self.behavior.max_speed, self.speed_limit - self.behavior.speed_lim_dist)), debug=debug)
# Normal behavior.
else:
control = self._local_planner.run_step(
target_speed= min(self.behavior.max_speed, self.speed_limit - self.behavior.speed_lim_dist), debug=debug)
return control
def run_step(self, debug=False):
"""
Execute one step of navigation.
:param debug: boolean for debugging
:return control: carla.VehicleControl
"""
control = None
if self.behavior.tailgate_counter > 0:
self.behavior.tailgate_counter -= 1
if self.behavior.overtake_counter > 0:
self.behavior.overtake_counter -= 1
ego_vehicle_loc = self.vehicle.get_location()
ego_vehicle_wp = self._map.get_waypoint(ego_vehicle_loc)
# 1: Red lights and stops behavior
if self.traffic_light_manager(ego_vehicle_wp) != 0:
return self.emergency_stop()
# 2.1: Pedestrian avoidancd behaviors
walker_state, walker, w_distance = self.pedestrian_avoid_manager(
ego_vehicle_loc, ego_vehicle_wp)
if walker_state:
# Distance is computed from the center of the two cars,
# we use bounding boxes to calculate the actual distance
distance = w_distance - max(
walker.bounding_box.extent.y, walker.bounding_box.extent.x) - max(
self.vehicle.bounding_box.extent.y, self.vehicle.bounding_box.extent.x)
# Emergency brake if the car is very close.
if distance < self.behavior.braking_distance:
return self.emergency_stop()
# 2.2: Car following behaviors
vehicle_state, vehicle, distance = self.collision_and_car_avoid_manager(
ego_vehicle_loc, ego_vehicle_wp)
if vehicle_state:
# Distance is computed from the center of the two cars,
# we use bounding boxes to calculate the actual distance
distance = distance - max(
vehicle.bounding_box.extent.y, vehicle.bounding_box.extent.x) - max(
self.vehicle.bounding_box.extent.y, self.vehicle.bounding_box.extent.x)
# Emergency brake if the car is very close.
if distance < self.behavior.braking_distance:
return self.emergency_stop()
else:
control = self.car_following_manager(vehicle, distance)
# 4: Intersection behavior
# Checking if there's a junction nearby to slow down
elif self.incoming_waypoint.is_junction and (self.incoming_direction == RoadOption.LEFT or self.incoming_direction == RoadOption.RIGHT):
control = self._local_planner.run_step(
target_speed=min(self.behavior.max_speed, self.speed_limit - 5), debug=debug)
# 5: Normal behavior
# Calculate controller based on no turn, traffic light or vehicle in front
else:
control = self._local_planner.run_step(
target_speed= min(self.behavior.max_speed, self.speed_limit - self.behavior.speed_lim_dist), debug=debug)
return control

View File

@ -1,7 +1,4 @@
#!/usr/bin/env python
# Copyright (c) 2018 Intel Labs.
# authors: German Ros (german.ros@intel.com)
# Copyright (c) # Copyright (c) 2018-2020 CVC.
#
# This work is licensed under the terms of the MIT license.
# For a copy, see <https://opensource.org/licenses/MIT>.
@ -10,60 +7,82 @@
from collections import deque
import math
import numpy as np
import carla
from agents.tools.misc import get_speed
class VehiclePIDController():
"""
VehiclePIDController is the combination of two PID controllers (lateral and longitudinal) to perform the
VehiclePIDController is the combination of two PID controllers
(lateral and longitudinal) to perform the
low level control a vehicle from client side
"""
def __init__(self, vehicle, args_lateral=None, args_longitudinal=None):
MAX_BRAKE = 0.3
MAX_ACCEL = 0.75
MAX_STEER = 0.8
def __init__(self, vehicle, args_lateral, args_longitudinal):
"""
Constructor method.
:param vehicle: actor to apply to local planner logic onto
:param args_lateral: dictionary of arguments to set the lateral PID controller using the following semantics:
:param args_lateral: dictionary of arguments to set the lateral PID controller
using the following semantics:
K_P -- Proportional term
K_D -- Differential term
K_I -- Integral term
:param args_longitudinal: dictionary of arguments to set the longitudinal PID controller using the following
semantics:
:param args_longitudinal: dictionary of arguments to set the longitudinal
PID controller using the following semantics:
K_P -- Proportional term
K_D -- Differential term
K_I -- Integral term
"""
if not args_lateral:
args_lateral = {'K_P': 1.0, 'K_D': 0.0, 'K_I': 0.0}
if not args_longitudinal:
args_longitudinal = {'K_P': 1.0, 'K_D': 0.0, 'K_I': 0.0}
self._vehicle = vehicle
self._world = self._vehicle.get_world()
self.past_steering = self._vehicle.get_control().steer
self._lon_controller = PIDLongitudinalController(self._vehicle, **args_longitudinal)
self._lat_controller = PIDLateralController(self._vehicle, **args_lateral)
def run_step(self, target_speed, waypoint):
"""
Execute one step of control invoking both lateral and longitudinal PID controllers to reach a target waypoint
Execute one step of control invoking both lateral and longitudinal
PID controllers to reach a target waypoint
at a given target_speed.
:param target_speed: desired vehicle speed
:param waypoint: target location encoded as a waypoint
:return: distance (in meters) to the waypoint
"""
throttle = self._lon_controller.run_step(target_speed)
steering = self._lat_controller.run_step(waypoint)
acceleration = self._lon_controller.run_step(target_speed)
current_steering = self._lat_controller.run_step(waypoint)
control = carla.VehicleControl()
control.steer = steering
control.throttle = throttle
if acceleration >= 0.0:
control.throttle = min(acceleration, self.MAX_ACCEL)
control.brake = 0.0
else:
control.throttle = 0.0
control.brake = min(abs(acceleration), self.MAX_ACCEL)
# Steering regulation: changes cannot happen abruptly, can't steer too much.
if current_steering > self.past_steering + 0.1:
current_steering = self.past_steering + 0.1
elif current_steering < self.past_steering - 0.1:
current_steering = self.past_steering - 0.1
if current_steering >= 0:
steering = min(self.MAX_STEER, current_steering)
else:
steering = max(-self.MAX_STEER, current_steering)
control.steer = steering
control.hand_brake = False
control.manual_gear_shift = False
self.past_steering = steering
return control
@ -73,8 +92,11 @@ class PIDLongitudinalController():
PIDLongitudinalController implements longitudinal control using a PID.
"""
def __init__(self, vehicle, K_P=1.0, K_D=0.0, K_I=0.0, dt=0.03):
"""
Constructor method.
:param vehicle: actor to apply to local planner logic onto
:param K_P: Proportional term
:param K_D: Differential term
@ -82,18 +104,19 @@ class PIDLongitudinalController():
:param dt: time differential in seconds
"""
self._vehicle = vehicle
self._K_P = K_P
self._K_D = K_D
self._K_I = K_I
self._k_p = K_P
self._k_d = K_D
self._k_i = K_I
self._dt = dt
self._e_buffer = deque(maxlen=30)
self._error_buffer = deque(maxlen=10)
def run_step(self, target_speed, debug=False):
"""
Execute one step of longitudinal control to reach a given target speed.
:param target_speed: target speed in Km/h
:return: throttle control in the range [0, 1]
:param debug: boolean for debugging
:return: throttle control
"""
current_speed = get_speed(self._vehicle)
@ -104,24 +127,24 @@ class PIDLongitudinalController():
def _pid_control(self, target_speed, current_speed):
"""
Estimate the throttle of the vehicle based on the PID equations
Estimate the throttle/brake of the vehicle based on the PID equations
:param target_speed: target speed in Km/h
:param current_speed: current speed of the vehicle in Km/h
:return: throttle control in the range [0, 1]
:return: throttle/brake control
"""
_e = (target_speed - current_speed)
self._e_buffer.append(_e)
if len(self._e_buffer) >= 2:
_de = (self._e_buffer[-1] - self._e_buffer[-2]) / self._dt
_ie = sum(self._e_buffer) * self._dt
error = target_speed - current_speed
self._error_buffer.append(error)
if len(self._error_buffer) >= 2:
_de = (self._error_buffer[-1] - self._error_buffer[-2]) / self._dt
_ie = sum(self._error_buffer) * self._dt
else:
_de = 0.0
_ie = 0.0
return np.clip((self._K_P * _e) + (self._K_D * _de) + (self._K_I * _ie), 0.0, 1.0)
return np.clip((self._k_p * error) + (self._k_d * _de) + (self._k_i * _ie), -1.0, 1.0)
class PIDLateralController():
"""
@ -130,6 +153,8 @@ class PIDLateralController():
def __init__(self, vehicle, K_P=1.0, K_D=0.0, K_I=0.0, dt=0.03):
"""
Constructor method.
:param vehicle: actor to apply to local planner logic onto
:param K_P: Proportional term
:param K_D: Differential term
@ -137,19 +162,20 @@ class PIDLateralController():
:param dt: time differential in seconds
"""
self._vehicle = vehicle
self._K_P = K_P
self._K_D = K_D
self._K_I = K_I
self._k_p = K_P
self._k_d = K_D
self._k_i = K_I
self._dt = dt
self._e_buffer = deque(maxlen=10)
def run_step(self, waypoint):
"""
Execute one step of lateral control to steer the vehicle towards a certain waypoin.
Execute one step of lateral control to steer
the vehicle towards a certain waypoin.
:param waypoint: target waypoint
:return: steering control in the range [-1, 1] where:
-1 represent maximum steering to left
-1 maximum steering to left
+1 maximum steering to right
"""
return self._pid_control(waypoint, self._vehicle.get_transform())
@ -174,6 +200,7 @@ class PIDLateralController():
(np.linalg.norm(w_vec) * np.linalg.norm(v_vec)), -1.0, 1.0))
_cross = np.cross(v_vec, w_vec)
if _cross[2] < 0:
_dot *= -1.0
@ -185,4 +212,4 @@ class PIDLateralController():
_de = 0.0
_ie = 0.0
return np.clip((self._K_P * _dot) + (self._K_D * _de) + (self._K_I * _ie), -1.0, 1.0)
return np.clip((self._k_p * _dot) + (self._k_d * _de) + (self._k_i * _ie), -1.0, 1.0)

View File

@ -1,12 +1,14 @@
# Copyright (c) # Copyright (c) 2018-2020 CVC.
#
# This work is licensed under the terms of the MIT license.
# For a copy, see <https://opensource.org/licenses/MIT>.
"""
This module provides GlobalRoutePlanner implementation.
"""
import math
import numpy as np
import networkx as nx
@ -88,6 +90,7 @@ class GlobalRoutePlanner(object):
entry_carla_vector = entry_wp.transform.rotation.get_forward_vector()
exit_carla_vector = exit_wp.transform.rotation.get_forward_vector()
# Adding edge with attributes
graph.add_edge(
n1, n2,
@ -104,7 +107,7 @@ class GlobalRoutePlanner(object):
def _find_loose_ends(self):
"""
This method finds road segments that have an unconnected end and
This method finds road segments that have an unconnected end, and
adds them to the internal graph representation
"""
count_loose_ends = 0
@ -113,9 +116,7 @@ class GlobalRoutePlanner(object):
end_wp = segment['exit']
exit_xyz = segment['exitxyz']
road_id, section_id, lane_id = end_wp.road_id, end_wp.section_id, end_wp.lane_id
if road_id in self._road_id_to_edge and \
section_id in self._road_id_to_edge[road_id] and \
lane_id in self._road_id_to_edge[road_id][section_id]:
if road_id in self._road_id_to_edge and section_id in self._road_id_to_edge[road_id] and lane_id in self._road_id_to_edge[road_id][section_id]:
pass
else:
count_loose_ends += 1
@ -128,10 +129,7 @@ class GlobalRoutePlanner(object):
self._road_id_to_edge[road_id][section_id][lane_id] = (n1, n2)
next_wp = end_wp.next(hop_resolution)
path = []
while next_wp is not None and next_wp and \
next_wp[0].road_id == road_id and \
next_wp[0].section_id == section_id and \
next_wp[0].lane_id == lane_id:
while next_wp is not None and next_wp and next_wp[0].road_id == road_id and next_wp[0].section_id == section_id and next_wp[0].lane_id == lane_id:
path.append(next_wp[0])
next_wp = next_wp[0].next(hop_resolution)
if path:
@ -144,7 +142,7 @@ class GlobalRoutePlanner(object):
length=len(path) + 1, path=path,
entry_waypoint=end_wp, exit_waypoint=path[-1],
entry_vector=None, exit_vector=None, net_vector=None,
intersection=end_wp.is_intersection, type=RoadOption.LANEFOLLOW)
intersection=end_wp.is_junction, type=RoadOption.LANEFOLLOW)
def _localize(self, location):
"""
@ -179,33 +177,28 @@ class GlobalRoutePlanner(object):
if not segment['entry'].is_junction:
next_waypoint, next_road_option, next_segment = None, None, None
if bool(waypoint.lane_change & carla.LaneChange.Right) and not right_found:
if waypoint.right_lane_marking.lane_change & carla.LaneChange.Right and not right_found:
next_waypoint = waypoint.get_right_lane()
if next_waypoint is not None and \
next_waypoint.lane_type == carla.LaneType.Driving and \
waypoint.road_id == next_waypoint.road_id:
if next_waypoint is not None and next_waypoint.lane_type == carla.LaneType.Driving and waypoint.road_id == next_waypoint.road_id:
next_road_option = RoadOption.CHANGELANERIGHT
next_segment = self._localize(next_waypoint.transform.location)
if next_segment is not None:
self._graph.add_edge(
self._id_map[segment['entryxyz']], next_segment[0], entry_waypoint=segment['entry'],
exit_waypoint=self._graph.edges[next_segment[0], next_segment[1]]['entry_waypoint'],
path=[], length=0, type=next_road_option, change_waypoint = waypoint)
self._id_map[segment['entryxyz']], next_segment[0], entry_waypoint=waypoint,
exit_waypoint=next_waypoint,
path=[], length=0, type=next_road_option, change_waypoint=next_waypoint)
right_found = True
if bool(waypoint.lane_change & carla.LaneChange.Left) and not left_found:
if waypoint.left_lane_marking.lane_change & carla.LaneChange.Left and not left_found:
next_waypoint = waypoint.get_left_lane()
if next_waypoint is not None and next_waypoint.lane_type == carla.LaneType.Driving and \
waypoint.road_id == next_waypoint.road_id:
if next_waypoint is not None and next_waypoint.lane_type == carla.LaneType.Driving and waypoint.road_id == next_waypoint.road_id:
next_road_option = RoadOption.CHANGELANELEFT
next_segment = self._localize(next_waypoint.transform.location)
if next_segment is not None:
self._graph.add_edge(
self._id_map[segment['entryxyz']], next_segment[0], entry_waypoint=segment['entry'],
exit_waypoint=self._graph.edges[next_segment[0], next_segment[1]]['entry_waypoint'],
path=[], length=0, type=next_road_option, change_waypoint = waypoint)
self._id_map[segment['entryxyz']], next_segment[0], entry_waypoint=waypoint,
exit_waypoint=next_waypoint,
path=[], length=0, type=next_road_option, change_waypoint=next_waypoint)
left_found = True
if left_found and right_found:
break
@ -240,7 +233,6 @@ class GlobalRoutePlanner(object):
"""
This method returns the last successive intersection edge
from a starting index on the route.
This helps moving past tiny intersection edges to calculate
proper turn decisions.
"""
@ -251,8 +243,7 @@ class GlobalRoutePlanner(object):
candidate_edge = self._graph.edges[node1, node2]
if node1 == route[index]:
last_intersection_edge = candidate_edge
if candidate_edge['type'] == RoadOption.LANEFOLLOW and \
candidate_edge['intersection']:
if candidate_edge['type'] == RoadOption.LANEFOLLOW and candidate_edge['intersection']:
last_intersection_edge = candidate_edge
last_node = node2
else:
@ -272,19 +263,13 @@ class GlobalRoutePlanner(object):
next_node = route[index+1]
next_edge = self._graph.edges[current_node, next_node]
if index > 0:
if self._previous_decision != RoadOption.VOID and \
self._intersection_end_node > 0 and \
self._intersection_end_node != previous_node and \
next_edge['type'] == RoadOption.LANEFOLLOW and \
next_edge['intersection']:
if self._previous_decision != RoadOption.VOID and self._intersection_end_node > 0 and self._intersection_end_node != previous_node and next_edge['type'] == RoadOption.LANEFOLLOW and next_edge['intersection']:
decision = self._previous_decision
else:
self._intersection_end_node = -1
current_edge = self._graph.edges[previous_node, current_node]
calculate_turn = current_edge['type'].value == RoadOption.LANEFOLLOW.value and \
not current_edge['intersection'] and \
next_edge['type'].value == RoadOption.LANEFOLLOW.value and \
next_edge['intersection']
calculate_turn = current_edge['type'].value == RoadOption.LANEFOLLOW.value and not current_edge[
'intersection'] and next_edge['type'].value == RoadOption.LANEFOLLOW.value and next_edge['intersection']
if calculate_turn:
last_node, tail_edge = self._successive_last_intersection_edge(index, route)
self._intersection_end_node = last_node
@ -315,10 +300,8 @@ class GlobalRoutePlanner(object):
decision = RoadOption.RIGHT
else:
decision = next_edge['type']
else:
decision = next_edge['type']
self._previous_decision = decision
self._previous_decision = decision
return decision
def abstract_route_plan(self, origin, destination):
@ -356,7 +339,7 @@ class GlobalRoutePlanner(object):
def trace_route(self, origin, destination):
"""
This method returns list of (carla.Waypoint, RoadOption)
from origin (carla.Location) to destination (carla.Location)
from origin to destination
"""
route_trace = []
@ -370,8 +353,7 @@ class GlobalRoutePlanner(object):
edge = self._graph.edges[route[i], route[i+1]]
path = []
if edge['type'].value != RoadOption.LANEFOLLOW.value and \
edge['type'].value != RoadOption.VOID.value:
if edge['type'].value != RoadOption.LANEFOLLOW.value and edge['type'].value != RoadOption.VOID.value:
route_trace.append((current_waypoint, road_option))
exit_wp = edge['exit_waypoint']
n1, n2 = self._road_id_to_edge[exit_wp.road_id][exit_wp.section_id][exit_wp.lane_id]
@ -390,13 +372,9 @@ class GlobalRoutePlanner(object):
for waypoint in path[closest_index:]:
current_waypoint = waypoint
route_trace.append((current_waypoint, road_option))
if len(route)-i <= 2 and \
waypoint.transform.location.distance(destination) < 2*resolution:
if len(route)-i <= 2 and waypoint.transform.location.distance(destination) < 2*resolution:
break
elif len(route)-i <= 2 and \
current_waypoint.road_id == destination_waypoint.road_id and \
current_waypoint.section_id == destination_waypoint.section_id and \
current_waypoint.lane_id == destination_waypoint.lane_id:
elif len(route)-i <= 2 and current_waypoint.road_id == destination_waypoint.road_id and current_waypoint.section_id == destination_waypoint.section_id and current_waypoint.lane_id == destination_waypoint.lane_id:
destination_index = self._find_closest_in_list(destination_waypoint, path)
if closest_index > destination_index:
break

View File

@ -1,3 +1,5 @@
# Copyright (c) # Copyright (c) 2018-2020 CVC.
#
# This work is licensed under the terms of the MIT license.
# For a copy, see <https://opensource.org/licenses/MIT>.
@ -14,11 +16,12 @@ class GlobalRoutePlannerDAO(object):
from the carla server instance for GlobalRoutePlanner
"""
def __init__(self, wmap, sampling_resolution=1):
"""get_topology
Constructor
def __init__(self, wmap, sampling_resolution):
"""
Constructor method.
wmap : carl world map object
:param wmap: carla.world object
:param sampling_resolution: sampling distance between waypoints
"""
self._sampling_resolution = sampling_resolution
self._wmap = wmap
@ -30,7 +33,7 @@ class GlobalRoutePlannerDAO(object):
road segments as pairs of waypoint objects, and processes the
topology into a list of dictionary objects.
return: list of dictionary objects with the following attributes
:return topology: list of dictionary objects with the following attributes
entry - waypoint of entry point of road segment
entryxyz- (x,y,z) of entry point of road segment
exit - waypoint of exit point of road segment
@ -57,13 +60,16 @@ class GlobalRoutePlannerDAO(object):
seg_dict['path'].append(w)
w = w.next(self._sampling_resolution)[0]
else:
seg_dict['path'].append(wp1.next(self._sampling_resolution/2.0)[0])
seg_dict['path'].append(wp1.next(self._sampling_resolution)[0])
topology.append(seg_dict)
return topology
def get_waypoint(self, location):
"""
The method returns waypoint at given location
:param location: vehicle location
:return waypoint: generated waypoint close to location
"""
waypoint = self._wmap.get_waypoint(location)
return waypoint

View File

@ -1,7 +1,4 @@
#!/usr/bin/env python
# Copyright (c) 2018 Intel Labs.
# authors: German Ros (german.ros@intel.com)
# Copyright (c) # Copyright (c) 2018-2020 CVC.
#
# This work is licensed under the terms of the MIT license.
# For a copy, see <https://opensource.org/licenses/MIT>.
@ -186,7 +183,7 @@ class LocalPlanner(object):
self._target_road_option = RoadOption.LANEFOLLOW
self._global_plan = True
def run_step(self, debug=True):
def run_step(self, debug=False):
"""
Execute one step of local planning which involves running the longitudinal and lateral PID controllers to
follow the waypoints trajectory.

View File

@ -0,0 +1,234 @@
#!/usr/bin/env python
# Copyright (c) 2018 Intel Labs.
# authors: German Ros (german.ros@intel.com)
#
# This work is licensed under the terms of the MIT license.
# For a copy, see <https://opensource.org/licenses/MIT>.
""" This module contains a local planner to perform
low-level waypoint following based on PID controllers. """
from collections import deque
from enum import Enum
import carla
from agents.navigation.controller import VehiclePIDController
from agents.tools.misc import distance_vehicle, draw_waypoints
class RoadOption(Enum):
"""
RoadOption represents the possible topological configurations
when moving from a segment of lane to other.
"""
VOID = -1
LEFT = 1
RIGHT = 2
STRAIGHT = 3
LANEFOLLOW = 4
CHANGELANELEFT = 5
CHANGELANERIGHT = 6
class LocalPlanner(object):
"""
LocalPlanner implements the basic behavior of following a trajectory
of waypoints that is generated on-the-fly.
The low-level motion of the vehicle is computed by using two PID controllers,
one is used for the lateral control
and the other for the longitudinal control (cruise speed).
When multiple paths are available (intersections)
this local planner makes a random choice.
"""
# Minimum distance to target waypoint as a percentage
# (e.g. within 80% of total distance)
# FPS used for dt
FPS = 20
def __init__(self, agent):
"""
:param agent: agent that regulates the vehicle
:param vehicle: actor to apply to local planner logic onto
"""
self._vehicle = agent.vehicle
self._map = agent.vehicle.get_world().get_map()
self._target_speed = None
self.sampling_radius = None
self._min_distance = None
self._current_waypoint = None
self.target_road_option = None
self._next_waypoints = None
self.target_waypoint = None
self._vehicle_controller = None
self._global_plan = None
self._pid_controller = None
self.waypoints_queue = deque(maxlen=20000) # queue with tuples of (waypoint, RoadOption)
self._buffer_size = 5
self._waypoint_buffer = deque(maxlen=self._buffer_size)
self._init_controller() # initializing controller
def reset_vehicle(self):
"""Reset the ego-vehicle"""
self._vehicle = None
print("Resetting ego-vehicle!")
def _init_controller(self):
"""
Controller initialization.
dt -- time difference between physics control in seconds.
This is can be fixed from server side
using the arguments -benchmark -fps=F, since dt = 1/F
target_speed -- desired cruise speed in km/h
min_distance -- minimum distance to remove waypoint from queue
lateral_dict -- dictionary of arguments to setup the lateral PID controller
{'K_P':, 'K_D':, 'K_I':, 'dt'}
longitudinal_dict -- dictionary of arguments to setup the longitudinal PID controller
{'K_P':, 'K_D':, 'K_I':, 'dt'}
"""
# Default parameters
self.args_lat_hw_dict = {
'K_P': 0.75,
'K_D': 0.02,
'K_I': 0.4,
'dt': 1.0 / self.FPS}
self.args_lat_city_dict = {
'K_P': 0.58,
'K_D': 0.02,
'K_I': 0.5,
'dt': 1.0 / self.FPS}
self.args_long_hw_dict = {
'K_P': 0.37,
'K_D': 0.024,
'K_I': 0.032,
'dt': 1.0 / self.FPS}
self.args_long_city_dict = {
'K_P': 0.15,
'K_D': 0.05,
'K_I': 0.07,
'dt': 1.0 / self.FPS}
self._current_waypoint = self._map.get_waypoint(self._vehicle.get_location())
self._global_plan = False
self._target_speed = self._vehicle.get_speed_limit()
self._min_distance = 3
def set_speed(self, speed):
"""
Request new target speed.
:param speed: new target speed in km/h
"""
self._target_speed = speed
def set_global_plan(self, current_plan):
"""
Sets new global plan.
:param current_plan: list of waypoints in the actual plan
"""
for elem in current_plan:
self.waypoints_queue.append(elem)
self._global_plan = True
def get_incoming_waypoint_and_direction(self, steps=3):
"""
Returns direction and waypoint at a distance ahead defined by the user.
:param steps: number of steps to get the incoming waypoint.
"""
if len(self.waypoints_queue) > steps:
return self.waypoints_queue[steps]
else:
try:
wpt, direction = self.waypoints_queue[-1]
return wpt, direction
except IndexError as i:
print(i)
return None, RoadOption.VOID
return None, RoadOption.VOID
def run_step(self, target_speed=None, debug=False):
"""
Execute one step of local planning which involves
running the longitudinal and lateral PID controllers to
follow the waypoints trajectory.
:param target_speed: desired speed
:param debug: boolean flag to activate waypoints debugging
:return: control
"""
if target_speed is not None:
self._target_speed = target_speed
else:
self._target_speed = self._vehicle.get_speed_limit()
if len(self.waypoints_queue) == 0:
control = carla.VehicleControl()
control.steer = 0.0
control.throttle = 0.0
control.brake = 1.0
control.hand_brake = False
control.manual_gear_shift = False
return control
# Buffering the waypoints
if not self._waypoint_buffer:
for i in range(self._buffer_size):
if self.waypoints_queue:
self._waypoint_buffer.append(
self.waypoints_queue.popleft())
else:
break
# Current vehicle waypoint
self._current_waypoint = self._map.get_waypoint(self._vehicle.get_location())
# Target waypoint
self.target_waypoint, self.target_road_option = self._waypoint_buffer[0]
if target_speed > 50:
args_lat = self.args_lat_hw_dict
args_long = self.args_long_hw_dict
else:
args_lat = self.args_lat_city_dict
args_long = self.args_long_city_dict
self._pid_controller = VehiclePIDController(self._vehicle,
args_lateral=args_lat,
args_longitudinal=args_long)
control = self._pid_controller.run_step(self._target_speed, self.target_waypoint)
# Purge the queue of obsolete waypoints
vehicle_transform = self._vehicle.get_transform()
max_index = -1
for i, (waypoint, _) in enumerate(self._waypoint_buffer):
if distance_vehicle(
waypoint, vehicle_transform) < self._min_distance:
max_index = i
if max_index >= 0:
for i in range(max_index + 1):
self._waypoint_buffer.popleft()
if debug:
draw_waypoints(self._vehicle.get_world(),
[self.target_waypoint], 1.0)
return control

View File

@ -0,0 +1,40 @@
# This work is licensed under the terms of the MIT license.
# For a copy, see <https://opensource.org/licenses/MIT>.
""" This module contains the different parameters sets for each behavior. """
class Cautious(object):
"""Class for Cautious agent."""
max_speed = 40
speed_lim_dist = 6
speed_decrease = 12
safety_time = 3
min_proximity_threshold = 12
braking_distance = 6
overtake_counter = -1
tailgate_counter = 0
class Normal(object):
"""Class for Normal agent."""
max_speed = 50
speed_lim_dist = 3
speed_decrease = 10
safety_time = 3
min_proximity_threshold = 10
braking_distance = 5
overtake_counter = 0
tailgate_counter = 0
class Aggressive(object):
"""Class for Aggressive agent."""
max_speed = 70
speed_lim_dist = 1
speed_decrease = 8
safety_time = 3
min_proximity_threshold = 8
braking_distance = 4
overtake_counter = 0
tailgate_counter = -1

View File

@ -9,12 +9,9 @@
""" Module with auxiliary functions. """
import math
import numpy as np
import carla
def draw_waypoints(world, waypoints, z=0.5):
"""
Draw a list of waypoints at a certain height given in z.
@ -22,25 +19,25 @@ def draw_waypoints(world, waypoints, z=0.5):
:param world: carla.world object
:param waypoints: list or iterable container with the waypoints to draw
:param z: height in meters
:return:
"""
for w in waypoints:
t = w.transform
begin = t.location + carla.Location(z=z)
angle = math.radians(t.rotation.yaw)
for wpt in waypoints:
wpt_t = wpt.transform
begin = wpt_t.location + carla.Location(z=z)
angle = math.radians(wpt_t.rotation.yaw)
end = begin + carla.Location(x=math.cos(angle), y=math.sin(angle))
world.debug.draw_arrow(begin, end, arrow_size=0.3, life_time=1.0)
def get_speed(vehicle):
"""
Compute speed of a vehicle in Kmh
Compute speed of a vehicle in Km/h.
:param vehicle: the vehicle for which speed is calculated
:return: speed as a float in Kmh
:return: speed as a float in Km/h
"""
vel = vehicle.get_velocity()
return 3.6 * math.sqrt(vel.x ** 2 + vel.y ** 2 + vel.z ** 2)
return 3.6 * math.sqrt(vel.x ** 2 + vel.y ** 2 + vel.z ** 2)
def is_within_distance_ahead(target_transform, current_transform, max_distance):
"""
@ -68,6 +65,35 @@ def is_within_distance_ahead(target_transform, current_transform, max_distance):
return d_angle < 90.0
def is_within_distance(target_location, current_location, orientation, max_distance, d_angle_th_up, d_angle_th_low=0):
"""
Check if a target object is within a certain distance from a reference object.
A vehicle in front would be something around 0 deg, while one behind around 180 deg.
:param target_location: location of the target object
:param current_location: location of the reference object
:param orientation: orientation of the reference object
:param max_distance: maximum allowed distance
:param d_angle_th_up: upper thereshold for angle
:param d_angle_th_low: low thereshold for angle (optional, default is 0)
:return: True if target object is within max_distance ahead of the reference object
"""
target_vector = np.array([target_location.x - current_location.x, target_location.y - current_location.y])
norm_target = np.linalg.norm(target_vector)
# If the vector is too short, we can simply stop here
if norm_target < 0.001:
return True
if norm_target > max_distance:
return False
forward_vector = np.array(
[math.cos(math.radians(orientation)), math.sin(math.radians(orientation))])
d_angle = math.degrees(math.acos(np.clip(np.dot(forward_vector, target_vector) / norm_target, -1., 1.)))
return d_angle_th_low < d_angle < d_angle_th_up
def compute_magnitude_angle(target_location, current_location, orientation):
"""
@ -88,17 +114,24 @@ def compute_magnitude_angle(target_location, current_location, orientation):
def distance_vehicle(waypoint, vehicle_transform):
loc = vehicle_transform.location
dx = waypoint.transform.location.x - loc.x
dy = waypoint.transform.location.y - loc.y
"""
Returns the 2D distance from a waypoint to a vehicle
return math.sqrt(dx * dx + dy * dy)
:param waypoint: actual waypoint
:param vehicle_transform: transform of the target vehicle
"""
loc = vehicle_transform.location
x = waypoint.transform.location.x - loc.x
y = waypoint.transform.location.y - loc.y
return math.sqrt(x * x + y * y)
def vector(location_1, location_2):
"""
Returns the unit vector from location_1 to location_2
location_1, location_2: carla.Location objects
:param location_1, location_2: carla.Location objects
"""
x = location_2.x - location_1.x
y = location_2.y - location_1.y
@ -106,3 +139,25 @@ def vector(location_1, location_2):
norm = np.linalg.norm([x, y, z]) + np.finfo(float).eps
return [x / norm, y / norm, z / norm]
def compute_distance(location_1, location_2):
"""
Euclidean distance between 3D points
:param location_1, location_2: 3D points
"""
x = location_2.x - location_1.x
y = location_2.y - location_1.y
z = location_2.z - location_1.z
norm = np.linalg.norm([x, y, z]) + np.finfo(float).eps
return norm
def positive(num):
"""
Return the given number if positive, else 0
:param num: value to check
"""
return num if num > 0.0 else 0.0

View File

@ -6,9 +6,7 @@
# This work is licensed under the terms of the MIT license.
# For a copy, see <https://opensource.org/licenses/MIT>.
"""
Example of automatic vehicle control from client side.
"""
"""Example of automatic vehicle control from client side."""
from __future__ import print_function
@ -39,7 +37,7 @@ except ImportError:
'cannot import numpy, make sure numpy package is installed')
# ==============================================================================
# -- find carla module ---------------------------------------------------------
# -- Find CARLA module ---------------------------------------------------------
# ==============================================================================
try:
sys.path.append(glob.glob('../carla/dist/carla-*%d.%d-%s.egg' % (
@ -50,7 +48,7 @@ except IndexError:
pass
# ==============================================================================
# -- add PythonAPI for release mode --------------------------------------------
# -- Add PythonAPI for release mode --------------------------------------------
# ==============================================================================
try:
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + '/carla')
@ -59,22 +57,27 @@ except IndexError:
import carla
from carla import ColorConverter as cc
from agents.navigation.roaming_agent import RoamingAgent
from agents.navigation.basic_agent import BasicAgent
from agents.navigation.behavior_agent import BehaviorAgent # pylint: disable=import-error
from agents.navigation.roaming_agent import RoamingAgent # pylint: disable=import-error
from agents.navigation.basic_agent import BasicAgent # pylint: disable=import-error
# ==============================================================================
# -- Global functions ----------------------------------------------------------
# ==============================================================================
def find_weather_presets():
"""Method to find weather presets"""
rgx = re.compile('.+?(?:(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])|$)')
name = lambda x: ' '.join(m.group(0) for m in rgx.finditer(x))
def name(x): return ' '.join(m.group(0) for m in rgx.finditer(x))
presets = [x for x in dir(carla.WeatherParameters) if re.match('[A-Z].+', x)]
return [(getattr(carla.WeatherParameters, x), name(x)) for x in presets]
def get_actor_display_name(actor, truncate=250):
"""Method to get actor display name"""
name = ' '.join(actor.type_id.replace('_', '.').title().split('.')[1:])
return (name[:truncate - 1] + u'\u2026') if len(name) > truncate else name
@ -84,9 +87,18 @@ def get_actor_display_name(actor, truncate=250):
# ==============================================================================
class World(object):
def __init__(self, carla_world, hud, actor_filter):
""" Class representing the surrounding environment """
def __init__(self, carla_world, hud, args):
"""Constructor method"""
self.world = carla_world
try:
self.map = self.world.get_map()
except RuntimeError as error:
print('RuntimeError: {}'.format(error))
print(' The server could not send the OpenDRIVE (.xodr) file:')
print(' Make sure it exists, has the same name of your town, and is correct.')
sys.exit(1)
self.hud = hud
self.player = None
self.collision_sensor = None
@ -95,16 +107,22 @@ class World(object):
self.camera_manager = None
self._weather_presets = find_weather_presets()
self._weather_index = 0
self._actor_filter = actor_filter
self.restart()
self._actor_filter = args.filter
self._gamma = args.gamma
self.restart(args)
self.world.on_tick(hud.on_world_tick)
self.recording_enabled = False
self.recording_start = 0
def restart(self):
def restart(self, args):
"""Restart the world"""
# Keep same camera config if the camera manager exists.
cam_index = self.camera_manager.index if self.camera_manager is not None else 0
cam_pos_index = self.camera_manager.transform_index if self.camera_manager is not None else 0
cam_pos_id = self.camera_manager.transform_index if self.camera_manager is not None else 0
# Set the seed if requested by user
if args.seed is not None:
random.seed(args.seed)
# Get a random blueprint.
blueprint = random.choice(self.world.get_blueprint_library().filter(self._actor_filter))
blueprint.set_attribute('role_name', 'hero')
@ -112,6 +130,7 @@ class World(object):
color = random.choice(blueprint.get_attribute('color').recommended_values)
blueprint.set_attribute('color', color)
# Spawn the player.
print("Spawning the player")
if self.player is not None:
spawn_point = self.player.get_transform()
spawn_point.location.z += 2.0
@ -119,7 +138,12 @@ class World(object):
spawn_point.rotation.pitch = 0.0
self.destroy()
self.player = self.world.try_spawn_actor(blueprint, spawn_point)
while self.player is None:
if not self.map.get_spawn_points():
print('There are no spawn points available in your map/town.')
print('Please add some Vehicle Spawn Point to your UE4 scene.')
sys.exit(1)
spawn_points = self.map.get_spawn_points()
spawn_point = random.choice(spawn_points) if spawn_points else carla.Transform()
self.player = self.world.try_spawn_actor(blueprint, spawn_point)
@ -127,13 +151,14 @@ class World(object):
self.collision_sensor = CollisionSensor(self.player, self.hud)
self.lane_invasion_sensor = LaneInvasionSensor(self.player, self.hud)
self.gnss_sensor = GnssSensor(self.player)
self.camera_manager = CameraManager(self.player, self.hud)
self.camera_manager.transform_index = cam_pos_index
self.camera_manager = CameraManager(self.player, self.hud, self._gamma)
self.camera_manager.transform_index = cam_pos_id
self.camera_manager.set_sensor(cam_index, notify=False)
actor_type = get_actor_display_name(self.player)
self.hud.notification(actor_type)
def next_weather(self, reverse=False):
"""Get next weather setting"""
self._weather_index += -1 if reverse else 1
self._weather_index %= len(self._weather_presets)
preset = self._weather_presets[self._weather_index]
@ -141,18 +166,22 @@ class World(object):
self.player.get_world().set_weather(preset[0])
def tick(self, clock):
"""Method for every tick"""
self.hud.tick(self, clock)
def render(self, display):
"""Render world"""
self.camera_manager.render(display)
self.hud.render(display)
def destroy_sensors(self):
"""Destroy sensors"""
self.camera_manager.sensor.destroy()
self.camera_manager.sensor = None
self.camera_manager.index = None
def destroy(self):
"""Destroys all actors"""
actors = [
self.camera_manager.sensor,
self.collision_sensor.sensor,
@ -177,12 +206,13 @@ class KeyboardControl(object):
for event in pygame.event.get():
if event.type == pygame.QUIT:
return True
elif event.type == pygame.KEYUP:
if event.type == pygame.KEYUP:
if self._is_quit_shortcut(event.key):
return True
@staticmethod
def _is_quit_shortcut(key):
"""Shortcut for quitting"""
return (key == K_ESCAPE) or (key == K_q and pygame.key.get_mods() & KMOD_CTRL)
# ==============================================================================
@ -191,7 +221,10 @@ class KeyboardControl(object):
class HUD(object):
"""Class for HUD text"""
def __init__(self, width, height):
"""Constructor method"""
self.dim = (width, height)
font = pygame.font.Font(pygame.font.get_default_font(), 20)
font_name = 'courier' if os.name == 'nt' else 'mono'
@ -210,27 +243,30 @@ class HUD(object):
self._server_clock = pygame.time.Clock()
def on_world_tick(self, timestamp):
"""Gets informations from the world at every tick"""
self._server_clock.tick()
self.server_fps = self._server_clock.get_fps()
self.frame = timestamp.frame
self.frame = timestamp.frame_count
self.simulation_time = timestamp.elapsed_seconds
def tick(self, world, clock):
"""HUD method for every tick"""
self._notifications.tick(world, clock)
if not self._show_info:
return
t = world.player.get_transform()
v = world.player.get_velocity()
c = world.player.get_control()
heading = 'N' if abs(t.rotation.yaw) < 89.5 else ''
heading += 'S' if abs(t.rotation.yaw) > 90.5 else ''
heading += 'E' if 179.5 > t.rotation.yaw > 0.5 else ''
heading += 'W' if -0.5 > t.rotation.yaw > -179.5 else ''
transform = world.player.get_transform()
vel = world.player.get_velocity()
control = world.player.get_control()
heading = 'N' if abs(transform.rotation.yaw) < 89.5 else ''
heading += 'S' if abs(transform.rotation.yaw) > 90.5 else ''
heading += 'E' if 179.5 > transform.rotation.yaw > 0.5 else ''
heading += 'W' if -0.5 > transform.rotation.yaw > -179.5 else ''
colhist = world.collision_sensor.get_collision_history()
collision = [colhist[x + self.frame - 200] for x in range(0, 200)]
max_col = max(1.0, max(collision))
collision = [x / max_col for x in collision]
vehicles = world.world.get_actors().filter('vehicle.*')
self._info_text = [
'Server: % 16.0f FPS' % self.server_fps,
'Client: % 16.0f FPS' % clock.get_fps(),
@ -239,53 +275,60 @@ class HUD(object):
'Map: % 20s' % world.map.name,
'Simulation time: % 12s' % datetime.timedelta(seconds=int(self.simulation_time)),
'',
'Speed: % 15.0f km/h' % (3.6 * math.sqrt(v.x ** 2 + v.y ** 2 + v.z ** 2)),
u'Heading:% 16.0f\N{DEGREE SIGN} % 2s' % (t.rotation.yaw, heading),
'Location:% 20s' % ('(% 5.1f, % 5.1f)' % (t.location.x, t.location.y)),
'Speed: % 15.0f km/h' % (3.6 * math.sqrt(vel.x**2 + vel.y**2 + vel.z**2)),
u'Heading:% 16.0f\N{DEGREE SIGN} % 2s' % (transform.rotation.yaw, heading),
'Location:% 20s' % ('(% 5.1f, % 5.1f)' % (transform.location.x, transform.location.y)),
'GNSS:% 24s' % ('(% 2.6f, % 3.6f)' % (world.gnss_sensor.lat, world.gnss_sensor.lon)),
'Height: % 18.0f m' % t.location.z,
'Height: % 18.0f m' % transform.location.z,
'']
if isinstance(c, carla.VehicleControl):
if isinstance(control, carla.VehicleControl):
self._info_text += [
('Throttle:', c.throttle, 0.0, 1.0),
('Steer:', c.steer, -1.0, 1.0),
('Brake:', c.brake, 0.0, 1.0),
('Reverse:', c.reverse),
('Hand brake:', c.hand_brake),
('Manual:', c.manual_gear_shift),
'Gear: %s' % {-1: 'R', 0: 'N'}.get(c.gear, c.gear)]
elif isinstance(c, carla.WalkerControl):
('Throttle:', control.throttle, 0.0, 1.0),
('Steer:', control.steer, -1.0, 1.0),
('Brake:', control.brake, 0.0, 1.0),
('Reverse:', control.reverse),
('Hand brake:', control.hand_brake),
('Manual:', control.manual_gear_shift),
'Gear: %s' % {-1: 'R', 0: 'N'}.get(control.gear, control.gear)]
elif isinstance(control, carla.WalkerControl):
self._info_text += [
('Speed:', c.speed, 0.0, 5.556),
('Jump:', c.jump)]
('Speed:', control.speed, 0.0, 5.556),
('Jump:', control.jump)]
self._info_text += [
'',
'Collision:',
collision,
'',
'Number of vehicles: % 8d' % len(vehicles)]
if len(vehicles) > 1:
self._info_text += ['Nearby vehicles:']
def distance(l): return math.sqrt(
(l.x - t.location.x) ** 2 + (l.y - t.location.y) ** 2 + (l.z - t.location.z) ** 2)
vehicles = [(distance(x.get_location()), x) for x in vehicles if x.id != world.player.id]
for d, vehicle in sorted(vehicles):
if d > 200.0:
def dist(l):
return math.sqrt((l.x - transform.location.x)**2 + (l.y - transform.location.y)
** 2 + (l.z - transform.location.z)**2)
vehicles = [(dist(x.get_location()), x) for x in vehicles if x.id != world.player.id]
for dist, vehicle in sorted(vehicles):
if dist > 200.0:
break
vehicle_type = get_actor_display_name(vehicle, truncate=22)
self._info_text.append('% 4dm %s' % (d, vehicle_type))
self._info_text.append('% 4dm %s' % (dist, vehicle_type))
def toggle_info(self):
"""Toggle info on or off"""
self._show_info = not self._show_info
def notification(self, text, seconds=2.0):
"""Notification text"""
self._notifications.set_text(text, seconds=seconds)
def error(self, text):
"""Error text"""
self._notifications.set_text('Error: %s' % text, (255, 0, 0))
def render(self, display):
"""Render for HUD class"""
if self._show_info:
info_surface = pygame.Surface((220, self.dim[1]))
info_surface.set_alpha(100)
@ -298,7 +341,7 @@ class HUD(object):
break
if isinstance(item, list):
if len(item) > 1:
points = [(x + 8, v_offset + 8 + (1.0 - y) * 30) for x, y in enumerate(item)]
points = [(x + 8, v_offset + 8 + (1 - y) * 30) for x, y in enumerate(item)]
pygame.draw.lines(display, (255, 136, 0), False, points, 2)
item = None
v_offset += 18
@ -309,11 +352,12 @@ class HUD(object):
else:
rect_border = pygame.Rect((bar_h_offset, v_offset + 8), (bar_width, 6))
pygame.draw.rect(display, (255, 255, 255), rect_border, 1)
f = (item[1] - item[2]) / (item[3] - item[2])
fig = (item[1] - item[2]) / (item[3] - item[2])
if item[2] < 0.0:
rect = pygame.Rect((bar_h_offset + f * (bar_width - 6), v_offset + 8), (6, 6))
rect = pygame.Rect(
(bar_h_offset + fig * (bar_width - 6), v_offset + 8), (6, 6))
else:
rect = pygame.Rect((bar_h_offset, v_offset + 8), (f * bar_width, 6))
rect = pygame.Rect((bar_h_offset, v_offset + 8), (fig * bar_width, 6))
pygame.draw.rect(display, (255, 255, 255), rect)
item = item[0]
if item: # At this point has to be a str.
@ -329,7 +373,10 @@ class HUD(object):
class FadingText(object):
""" Class for fading text """
def __init__(self, font, dim, pos):
"""Constructor method"""
self.font = font
self.dim = dim
self.pos = pos
@ -337,6 +384,7 @@ class FadingText(object):
self.surface = pygame.Surface(self.dim)
def set_text(self, text, color=(255, 255, 255), seconds=2.0):
"""Set fading text"""
text_texture = self.font.render(text, True, color)
self.surface = pygame.Surface(self.dim)
self.seconds_left = seconds
@ -344,11 +392,13 @@ class FadingText(object):
self.surface.blit(text_texture, (10, 11))
def tick(self, _, clock):
"""Fading text method for every tick"""
delta_seconds = 1e-3 * clock.get_time()
self.seconds_left = max(0.0, self.seconds_left - delta_seconds)
self.surface.set_alpha(500.0 * self.seconds_left)
def render(self, display):
"""Render fading text method"""
display.blit(self.surface, self.pos)
# ==============================================================================
@ -357,7 +407,10 @@ class FadingText(object):
class HelpText(object):
""" Helper class for text render"""
def __init__(self, font, width, height):
"""Constructor method"""
lines = __doc__.split('\n')
self.font = font
self.dim = (680, len(lines) * 22 + 12)
@ -365,16 +418,18 @@ class HelpText(object):
self.seconds_left = 0
self.surface = pygame.Surface(self.dim)
self.surface.fill((0, 0, 0, 0))
for n, line in enumerate(lines):
for i, line in enumerate(lines):
text_texture = self.font.render(line, True, (255, 255, 255))
self.surface.blit(text_texture, (22, n * 22))
self.surface.blit(text_texture, (22, i * 22))
self._render = False
self.surface.set_alpha(220)
def toggle(self):
"""Toggle on or off the render help"""
self._render = not self._render
def render(self, display):
"""Render help text method"""
if self._render:
display.blit(self.surface, self.pos)
@ -384,20 +439,24 @@ class HelpText(object):
class CollisionSensor(object):
""" Class for collision sensors"""
def __init__(self, parent_actor, hud):
"""Constructor method"""
self.sensor = None
self.history = []
self._parent = parent_actor
self.hud = hud
world = self._parent.get_world()
bp = world.get_blueprint_library().find('sensor.other.collision')
self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=self._parent)
# We need to pass the lambda a weak reference to self to avoid circular
# reference.
blueprint = world.get_blueprint_library().find('sensor.other.collision')
self.sensor = world.spawn_actor(blueprint, carla.Transform(), attach_to=self._parent)
# We need to pass the lambda a weak reference to
# self to avoid circular reference.
weak_self = weakref.ref(self)
self.sensor.listen(lambda event: CollisionSensor._on_collision(weak_self, event))
def get_collision_history(self):
"""Gets the history of collisions"""
history = collections.defaultdict(int)
for frame, intensity in self.history:
history[frame] += intensity
@ -405,6 +464,7 @@ class CollisionSensor(object):
@staticmethod
def _on_collision(weak_self, event):
"""On collision method"""
self = weak_self()
if not self:
return
@ -422,7 +482,10 @@ class CollisionSensor(object):
class LaneInvasionSensor(object):
"""Class for lane invasion sensors"""
def __init__(self, parent_actor, hud):
"""Constructor method"""
self.sensor = None
self._parent = parent_actor
self.hud = hud
@ -436,6 +499,7 @@ class LaneInvasionSensor(object):
@staticmethod
def _on_invasion(weak_self, event):
"""On invasion method"""
self = weak_self()
if not self:
return
@ -449,22 +513,26 @@ class LaneInvasionSensor(object):
class GnssSensor(object):
""" Class for GNSS sensors"""
def __init__(self, parent_actor):
"""Constructor method"""
self.sensor = None
self._parent = parent_actor
self.lat = 0.0
self.lon = 0.0
world = self._parent.get_world()
bp = world.get_blueprint_library().find('sensor.other.gnss')
self.sensor = world.spawn_actor(bp, carla.Transform(carla.Location(x=1.0, z=2.8)),
blueprint = world.get_blueprint_library().find('sensor.other.gnss')
self.sensor = world.spawn_actor(blueprint, carla.Transform(carla.Location(x=1.0, z=2.8)),
attach_to=self._parent)
# We need to pass the lambda a weak reference to self to avoid circular
# reference.
# We need to pass the lambda a weak reference to
# self to avoid circular reference.
weak_self = weakref.ref(self)
self.sensor.listen(lambda event: GnssSensor._on_gnss_event(weak_self, event))
@staticmethod
def _on_gnss_event(weak_self, event):
"""GNSS method"""
self = weak_self()
if not self:
return
@ -477,15 +545,28 @@ class GnssSensor(object):
class CameraManager(object):
def __init__(self, parent_actor, hud):
""" Class for camera management"""
def __init__(self, parent_actor, hud, gamma_correction):
"""Constructor method"""
self.sensor = None
self.surface = None
self._parent = parent_actor
self.hud = hud
self.recording = False
bound_y = 0.5 + self._parent.bounding_box.extent.y
attachment = carla.AttachmentType
self._camera_transforms = [
carla.Transform(carla.Location(x=-5.5, z=2.8), carla.Rotation(pitch=-15)),
carla.Transform(carla.Location(x=1.6, z=1.7))]
(carla.Transform(
carla.Location(x=-5.5, z=2.5), carla.Rotation(pitch=8.0)), attachment.SpringArm),
(carla.Transform(
carla.Location(x=1.6, z=1.7)), attachment.Rigid),
(carla.Transform(
carla.Location(x=5.5, y=1.5, z=1.5)), attachment.SpringArm),
(carla.Transform(
carla.Location(x=-8.0, z=6.0), carla.Rotation(pitch=6.0)), attachment.SpringArm),
(carla.Transform(
carla.Location(x=-1, y=-bound_y, z=0.5)), attachment.Rigid)]
self.transform_index = 1
self.sensors = [
['sensor.camera.rgb', cc.Raw, 'Camera RGB'],
@ -499,33 +580,39 @@ class CameraManager(object):
world = self._parent.get_world()
bp_library = world.get_blueprint_library()
for item in self.sensors:
bp = bp_library.find(item[0])
blp = bp_library.find(item[0])
if item[0].startswith('sensor.camera'):
bp.set_attribute('image_size_x', str(hud.dim[0]))
bp.set_attribute('image_size_y', str(hud.dim[1]))
blp.set_attribute('image_size_x', str(hud.dim[0]))
blp.set_attribute('image_size_y', str(hud.dim[1]))
if blp.has_attribute('gamma'):
blp.set_attribute('gamma', str(gamma_correction))
elif item[0].startswith('sensor.lidar'):
bp.set_attribute('range', '50')
item.append(bp)
blp.set_attribute('range', '50')
item.append(blp)
self.index = None
def toggle_camera(self):
"""Activate a camera"""
self.transform_index = (self.transform_index + 1) % len(self._camera_transforms)
self.sensor.set_transform(self._camera_transforms[self.transform_index])
self.set_sensor(self.index, notify=False, force_respawn=True)
def set_sensor(self, index, notify=True):
def set_sensor(self, index, notify=True, force_respawn=False):
"""Set a sensor"""
index = index % len(self.sensors)
needs_respawn = True if self.index is None \
else self.sensors[index][0] != self.sensors[self.index][0]
needs_respawn = True if self.index is None else (
force_respawn or (self.sensors[index][0] != self.sensors[self.index][0]))
if needs_respawn:
if self.sensor is not None:
self.sensor.destroy()
self.surface = None
self.sensor = self._parent.get_world().spawn_actor(
self.sensors[index][-1],
self._camera_transforms[self.transform_index],
attach_to=self._parent)
# We need to pass the lambda a weak reference to self to avoid
# circular reference.
self._camera_transforms[self.transform_index][0],
attach_to=self._parent,
attachment_type=self._camera_transforms[self.transform_index][1])
# We need to pass the lambda a weak reference to
# self to avoid circular reference.
weak_self = weakref.ref(self)
self.sensor.listen(lambda image: CameraManager._parse_image(weak_self, image))
if notify:
@ -533,13 +620,16 @@ class CameraManager(object):
self.index = index
def next_sensor(self):
"""Get the next sensor"""
self.set_sensor(self.index + 1)
def toggle_recording(self):
"""Toggle recording on or off"""
self.recording = not self.recording
self.hud.notification('Recording %s' % ('On' if self.recording else 'Off'))
def render(self, display):
"""Render method"""
if self.surface is not None:
display.blit(self.surface, (0, 0))
@ -554,7 +644,7 @@ class CameraManager(object):
lidar_data = np.array(points[:, :2])
lidar_data *= min(self.hud.dim) / 100.0
lidar_data += (0.5 * self.hud.dim[0], 0.5 * self.hud.dim[1])
lidar_data = np.fabs(lidar_data) # pylint: disable=E1111
lidar_data = np.fabs(lidar_data) # pylint: disable=assignment-from-no-return
lidar_data = lidar_data.astype(np.int32)
lidar_data = np.reshape(lidar_data, (-1, 2))
lidar_img_size = (self.hud.dim[0], self.hud.dim[1], 3)
@ -571,15 +661,19 @@ class CameraManager(object):
if self.recording:
image.save_to_disk('_out/%08d' % image.frame)
# ==============================================================================
# -- Game Loop ---------------------------------------------------------
# ==============================================================================
# ==============================================================================
# -- game_loop() ---------------------------------------------------------
# ==============================================================================
def game_loop(args):
""" Main loop for agent"""
pygame.init()
pygame.font.init()
world = None
tot_target_reached = 0
num_min_waypoints = 21
try:
client = carla.Client(args.host, args.port)
@ -590,20 +684,42 @@ def game_loop(args):
pygame.HWSURFACE | pygame.DOUBLEBUF)
hud = HUD(args.width, args.height)
world = World(client.get_world(), hud, args.filter)
world = World(client.get_world(), hud, args)
controller = KeyboardControl(world)
if args.agent == "Roaming":
agent = RoamingAgent(world.player)
else:
elif args.agent == "Basic":
agent = BasicAgent(world.player)
spawn_point = world.map.get_spawn_points()[0]
agent.set_destination((spawn_point.location.x,
spawn_point.location.y,
spawn_point.location.z))
else:
agent = BehaviorAgent(world.player, behavior=args.behavior)
spawn_points = world.map.get_spawn_points()
random.shuffle(spawn_points)
if spawn_points[0].location != agent.vehicle.get_location():
destination = spawn_points[0].location
else:
destination = spawn_points[1].location
agent.set_destination(agent.vehicle.get_location(), destination, clean=True)
clock = pygame.time.Clock()
while True:
clock.tick_busy_loop(60)
if controller.parse_events():
return
# As soon as the server is ready continue!
if not world.world.wait_for_tick(10.0):
continue
if args.agent == "Roaming" or args.agent == "Basic":
if controller.parse_events():
return
@ -616,6 +732,29 @@ def game_loop(args):
control = agent.run_step()
control.manual_gear_shift = False
world.player.apply_control(control)
else:
agent.update_information(world)
world.tick(clock)
world.render(display)
pygame.display.flip()
# Set new destination when target has been reached
if len(agent.get_local_planner().waypoints_queue) < num_min_waypoints and args.loop:
agent.reroute(spawn_points)
tot_target_reached += 1
world.hud.notification("The target has been reached " +
str(tot_target_reached) + " times.", seconds=4.0)
elif len(agent.get_local_planner().waypoints_queue) == 0 and not args.loop:
print("Target reached, mission accomplished...")
break
speed_limit = world.player.get_speed_limit()
agent.get_local_planner().set_speed(speed_limit)
control = agent.run_step()
world.player.apply_control(control)
finally:
if world is not None:
@ -630,13 +769,15 @@ def game_loop(args):
def main():
"""Main method"""
argparser = argparse.ArgumentParser(
description='CARLA Manual Control Client')
description='CARLA Automatic Control Client')
argparser.add_argument(
'-v', '--verbose',
action='store_true',
dest='debug',
help='print debug information')
help='Print debug information')
argparser.add_argument(
'--host',
metavar='H',
@ -652,16 +793,37 @@ def main():
'--res',
metavar='WIDTHxHEIGHT',
default='1280x720',
help='window resolution (default: 1280x720)')
help='Window resolution (default: 1280x720)')
argparser.add_argument(
'--filter',
metavar='PATTERN',
default='vehicle.*',
help='actor filter (default: "vehicle.*")')
help='Actor filter (default: "vehicle.*")')
argparser.add_argument(
'--gamma',
default=2.2,
type=float,
help='Gamma correction of the camera (default: 2.2)')
argparser.add_argument(
'-l', '--loop',
action='store_true',
dest='loop',
help='Sets a new random destination upon reaching the previous one (default: False)')
argparser.add_argument(
'-b', '--behavior', type=str,
choices=["cautious", "normal", "aggressive"],
help='Choose one of the possible agent behaviors (default: normal) ',
default='normal')
argparser.add_argument("-a", "--agent", type=str,
choices=["Roaming", "Basic"],
choices=["Behavior", "Roaming", "Basic"],
help="select which agent to run",
default="Basic")
default="Behavior")
argparser.add_argument(
'-s', '--seed',
help='Set seed for repeating executions (default: None)',
default=None,
type=int)
args = argparser.parse_args()
args.width, args.height = [int(x) for x in args.res.split('x')]
@ -674,7 +836,6 @@ def main():
print(__doc__)
try:
game_loop(args)
except KeyboardInterrupt:

View File

@ -433,7 +433,16 @@ class KeyboardControl(object):
world.player.apply_control(self._control)
def _parse_vehicle_keys(self, keys, milliseconds):
self._control.throttle = 1.0 if keys[K_UP] or keys[K_w] else 0.0
if keys[K_UP] or keys[K_w]:
self._control.throttle = min(self._control.throttle + 0.01, 1)
else:
self._control.throttle = 0.0
if keys[K_DOWN] or keys[K_s]:
self._control.brake = min(self._control.brake + 0.2, 1)
else:
self._control.brake = 0
steer_increment = 5e-4 * milliseconds
if keys[K_LEFT] or keys[K_a]:
if self._steer_cache > 0:
@ -449,7 +458,6 @@ class KeyboardControl(object):
self._steer_cache = 0.0
self._steer_cache = min(0.7, max(-0.7, self._steer_cache))
self._control.steer = round(self._steer_cache, 1)
self._control.brake = 1.0 if keys[K_DOWN] or keys[K_s] else 0.0
self._control.hand_brake = keys[K_SPACE]
def _parse_walker_keys(self, keys, milliseconds, world):