Updated docs and cleaned code

This commit is contained in:
Guillermo 2021-07-13 10:42:00 +02:00 committed by bernat
parent 982354081d
commit e6420a0f73
3 changed files with 64 additions and 103 deletions

View File

@ -6,6 +6,7 @@
""" """
This module implements an agent that roams around a track following random This module implements an agent that roams around a track following random
waypoints and avoiding other vehicles. The agent also responds to traffic lights. waypoints and avoiding other vehicles. The agent also responds to traffic lights.
It can also make use of the global route planner to follow a specifed route
""" """
import carla import carla
@ -16,45 +17,39 @@ from agents.navigation.global_route_planner import GlobalRoutePlanner
from agents.tools.misc import get_speed, is_within_distance, get_trafficlight_trigger_location from agents.tools.misc import get_speed, is_within_distance, get_trafficlight_trigger_location
class AgentState(Enum):
"""
AGENT_STATE represents the possible states of a roaming agent
"""
NAVIGATING = 1
BLOCKED_BY_VEHICLE = 2
BLOCKED_RED_LIGHT = 3
class BasicAgent(object): class BasicAgent(object):
""" """
BasicAgent implements an agent that navigates the scene. BasicAgent implements an agent that navigates the scene.
This agent respects traffic lights and other vehicles, but ignores stop signs. This agent respects traffic lights and other vehicles, but ignores stop signs.
It has several functions available to specify the route that the agent must follow It has several functions available to specify the route that the agent must follow,
as well as to change its parameters in case a different driving mode is desired.
""" """
def __init__(self, vehicle, target_speed=20, opt_dict={}): def __init__(self, vehicle, target_speed=20, opt_dict={}):
""" """
:param vehicle: actor to apply to local planner logic onto Initialization the agent paramters, the local and the global planner.
:param target_speed: speed (in Km/h) at which the vehicle will move
:param opt_dict: dictionary in case some of its parameters want to be changed. :param vehicle: actor to apply to agent logic onto
This also applies to parameters related to the LocalPlanner. :param target_speed: speed (in Km/h) at which the vehicle will move
:param opt_dict: dictionary in case some of its parameters want to be changed.
This also applies to parameters related to the LocalPlanner.
""" """
self._vehicle = vehicle self._vehicle = vehicle
self._world = self._vehicle.get_world() self._world = self._vehicle.get_world()
self._map = self._world.get_map() self._map = self._world.get_map()
self._state = AgentState.NAVIGATING self._last_traffic_light = None
# Base values # Base parameters
self._ignore_traffic_lights = False self._ignore_traffic_lights = False
self._ignore_stop_signs = False self._ignore_stop_signs = False
self._ignore_vehicles = False self._ignore_vehicles = False
self._last_traffic_light = None
self._target_speed = target_speed self._target_speed = target_speed
self._sampling_resolution = 2.0 self._sampling_resolution = 2.0
self._base_tlight_threshold = 5.0 # meters self._base_tlight_threshold = 5.0 # meters
self._base_vehicle_threshold = 5.0 # meters self._base_vehicle_threshold = 5.0 # meters
self._max_brake = 0.5 self._max_brake = 0.5
# Change values according to the target speed # Change parameters according to the dictionary
opt_dict['target_speed'] = target_speed opt_dict['target_speed'] = target_speed
if 'ignore_traffic_lights' in opt_dict: if 'ignore_traffic_lights' in opt_dict:
self._ignore_traffic_lights = opt_dict['ignore_traffic_lights'] self._ignore_traffic_lights = opt_dict['ignore_traffic_lights']
@ -77,7 +72,10 @@ class BasicAgent(object):
def add_emergency_stop(self, control): def add_emergency_stop(self, control):
""" """
Send an emergency stop command to the vehicle Overwrites the throttle a brake values of a control to perform an emergency stop.
The steering is kept the same to avoid going out of the lane when stopping during turns
:param speed (carl.VehicleControl): control to be modified
""" """
control.throttle = 0.0 control.throttle = 0.0
control.brake = self._max_brake control.brake = self._max_brake
@ -87,35 +85,35 @@ class BasicAgent(object):
def set_target_speed(self, speed): def set_target_speed(self, speed):
""" """
Changes the target speed of the agent Changes the target speed of the agent
:param speed (float): target speed in Km/h
""" """
self._local_planner.set_speed(speed) self._local_planner.set_speed(speed)
def follow_speed_limits(self, value=True): def follow_speed_limits(self, value=True):
""" """
(De)activates a flag to make the agent dynamically change the target speed according to the speed limits If active, the agent will dynamically change the target speed according to the speed limits
:param value (bool): whether or not to activate this behavior
""" """
self._local_planner.follow_speed_limits(value) self._local_planner.follow_speed_limits(value)
def get_local_planner(self): def get_local_planner(self):
""" """Get method for protected member local planner"""
Get method for protected member local planner
"""
return self._local_planner return self._local_planner
def get_global_planner(self): def get_global_planner(self):
""" """Get method for protected member local planner"""
Get method for protected member local planner
"""
return self._global_planner return self._global_planner
def set_destination(self, end_location, start_location=None): def set_destination(self, end_location, start_location=None):
""" """
This method creates a list of waypoints between a starting and ending location, This method creates a list of waypoints between a starting and ending location,
based on the route returned by the global router, and adds it to the local planner. based on the route returned by the global router, and adds it to the local planner.
If no starting location is passed, the vehicle location is chosen. If no starting location is passed, the vehicle local planner's target location is chosen,
which corresponds (by default), to a location about 5 meters in front of the vehicle.
:param end_location: final location of the route :param end_location (carla.Location): final location of the route
:param start_location: starting location of the route :param start_location (carla.Location): starting location of the route
""" """
if not start_location: if not start_location:
start_location = self._local_planner.target_waypoint.transform.location start_location = self._local_planner.target_waypoint.transform.location
@ -135,7 +133,7 @@ class BasicAgent(object):
Adds a specific plan to the agent. Adds a specific plan to the agent.
:param plan: list of [carla.Waypoint, RoadOption] representing the route to be followed :param plan: list of [carla.Waypoint, RoadOption] representing the route to be followed
:param stop_waypoint_creation: stops the automatic creation of waypoints :param stop_waypoint_creation: stops the automatic random creation of waypoints
:param clean_queue: resets the current agent's plan :param clean_queue: resets the current agent's plan
""" """
self._local_planner.set_global_plan( self._local_planner.set_global_plan(
@ -146,20 +144,15 @@ class BasicAgent(object):
def trace_route(self, start_waypoint, end_waypoint): def trace_route(self, start_waypoint, end_waypoint):
""" """
This method sets up a global router and returns the Calculates the shortest route between a starting and ending waypoint.
optimal route from start_waypoint to end_waypoint.
:param start_waypoint: initial position :param start_waypoint (carla.Waypoint): initial waypoint
:param end_waypoint: final position :param end_waypoint (carla.Waypoint): final waypoint
""" """
route = self._global_planner.trace_route(start_waypoint, end_waypoint) return self._global_planner.trace_route(start_waypoint, end_waypoint)
return route
def run_step(self, debug=False): def run_step(self):
""" """Execute one step of navigation."""
Execute one step of navigation.
:return: carla.VehicleControl
"""
hazard_detected = False hazard_detected = False
# Retrieve all relevant actors # Retrieve all relevant actors
@ -169,73 +162,55 @@ class BasicAgent(object):
vehicle_speed = get_speed(self._vehicle) / 3.6 vehicle_speed = get_speed(self._vehicle) / 3.6
# Check for possible obstacles # Check for possible vehicle obstacles
max_vehicle_distance = self._base_vehicle_threshold + vehicle_speed max_vehicle_distance = self._base_vehicle_threshold + vehicle_speed
vehicle_state, vehicle = self._vehicle_obstacle_detected(vehicle_list, max_vehicle_distance) affected_by_vehicle, _ = self._vehicle_obstacle_detected(vehicle_list, max_vehicle_distance)
if vehicle_state: if affected_by_vehicle:
if debug:
print('!!! VEHICLE BLOCKING AHEAD [{}])'.format(vehicle.id))
self._state = AgentState.BLOCKED_BY_VEHICLE
hazard_detected = True hazard_detected = True
# Check if the vehicle is affected by a red traffic light # Check if the vehicle is affected by a red traffic light
max_tlight_distance = self._base_tlight_threshold + vehicle_speed max_tlight_distance = self._base_tlight_threshold + vehicle_speed
light_state, traffic_light = self._affected_by_traffic_light(lights_list, max_tlight_distance) affected_by_tlight, _ = self._affected_by_traffic_light(lights_list, max_tlight_distance)
if light_state: if affected_by_tlight:
if debug:
print('=== RED LIGHT AHEAD [{}])'.format(traffic_light.id))
self._state = AgentState.BLOCKED_RED_LIGHT
hazard_detected = True hazard_detected = True
control = self._local_planner.run_step(debug=debug) control = self._local_planner.run_step()
if hazard_detected: if hazard_detected:
control = self.add_emergency_stop(control) control = self.add_emergency_stop(control)
else:
self._state = AgentState.NAVIGATING
return control return control
def done(self): def done(self):
""" """Check whether the agent has reached its destination."""
Check whether the agent has reached its destination.
"""
return self._local_planner.done() return self._local_planner.done()
def ignore_traffic_lights(self, active=True): def ignore_traffic_lights(self, active=True):
""" """(De)activates the checks for traffic lights"""
(De)activates the checks for traffic lights
"""
self._ignore_traffic_lights = active self._ignore_traffic_lights = active
def ignore_stop_signs(self, active=True): def ignore_stop_signs(self, active=True):
""" """(De)activates the checks for stop signs"""
(De)activates the checks for stop signs
"""
self._ignore_stop_signs = active self._ignore_stop_signs = active
def ignore_vehicles(self, active=True): def ignore_vehicles(self, active=True):
""" """(De)activates the checks for stop signs"""
(De)activates the checks for stop signs
"""
self._ignore_vehicles = active self._ignore_vehicles = active
def _affected_by_traffic_light(self, lights_list, max_distance=None): def _affected_by_traffic_light(self, lights_list=None, max_distance=None):
""" """
Method to check if there is a red light affecting us. This version of Method to check if there is a red light affecting the vehicle.
the method is compatible with both European and US style traffic lights.
:param lights_list: list containing TrafficLight objects :param lights_list (list of carla.TrafficLight): list containing TrafficLight objects.
:return: a tuple given by (bool_flag, traffic_light), where If None, all traffic lights in the scene are used
- bool_flag is True if there is a traffic light in RED :param max_distance (float): max distance for traffic lights to be considered relevant.
affecting us and False otherwise If None, the base threshold value is used
- traffic_light is the object itself or None if there is no
red traffic light affecting us
""" """
if self._ignore_traffic_lights: if self._ignore_traffic_lights:
return (False, None) return (False, None)
if not lights_list:
lights_list = self._world.get_actors().filter("*traffic_light*")
if not max_distance: if not max_distance:
max_distance = self._base_tlight_threshold max_distance = self._base_tlight_threshold
@ -265,28 +240,27 @@ class BasicAgent(object):
if traffic_light.state != carla.TrafficLightState.Red: if traffic_light.state != carla.TrafficLightState.Red:
continue continue
if is_within_distance(object_waypoint.transform, if is_within_distance(object_waypoint.transform, self._vehicle.get_transform(), max_distance, [0, 90]):
self._vehicle.get_transform(),
max_distance,
[0, 90]):
self._last_traffic_light = traffic_light self._last_traffic_light = traffic_light
return (True, traffic_light) return (True, traffic_light)
return (False, None) return (False, None)
def _vehicle_obstacle_detected(self, vehicle_list, max_distance=None): def _vehicle_obstacle_detected(self, vehicle_list=None, max_distance=None):
""" """
:param vehicle_list: list of potential obstacle to check Method to check if there is a vehicle in front of the agent blocking its path.
:param max_distance: max freespace to check for obstacles
:return: a tuple given by (bool_flag, vehicle), where :param vehicle_list (list of carla.Vehicle): list contatining vehicle objects.
- bool_flag is True if there is a vehicle ahead blocking us If None, all vehicle in the scene are used
and False otherwise :param max_distance: max freespace to check for obstacles.
- vehicle is the blocker object itself If None, the base threshold value is used
""" """
if self._ignore_vehicles: if self._ignore_vehicles:
return (False, None) return (False, None)
if not vehicle_list:
vehicle_list = self._world.get_actors().filter("*vehicle*")
if not max_distance: if not max_distance:
max_distance = self._base_vehicle_threshold max_distance = self._base_vehicle_threshold

View File

@ -40,7 +40,7 @@ class LocalPlanner(object):
unless a given global plan has already been specified. unless a given global plan has already been specified.
""" """
def __init__(self, vehicle, opt_dict=None): def __init__(self, vehicle, opt_dict={}):
""" """
:param vehicle: actor to apply to local planner logic onto :param vehicle: actor to apply to local planner logic onto
:param opt_dict: dictionary of arguments with different parameters: :param opt_dict: dictionary of arguments with different parameters:

View File

@ -9,7 +9,6 @@ This file has several useful functions related to the AD Map library
from __future__ import print_function from __future__ import print_function
from math import floor, sqrt, pow
import os import os
import numpy as np import numpy as np
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
@ -239,18 +238,6 @@ def _get_route_waypoints(route, resolution, town_map):
return wp_route return wp_route
def get_opendrive_ids(ad_lane_id):
"""Given an AD map lane ID, returns the corresponding opendrive road and lane ids"""
num_lane_id = float(str(ad_lane_id))
road_id = floor(num_lane_id / 10000)
remnant = num_lane_id - road_id * 10000
lane_segment_id = floor(remnant / 100)
remnant = remnant - lane_segment_id * 100
lane_id = floor(remnant - 50)
return road_id, lane_id, lane_segment_id
def _get_lane_para_points(lane_interval, distance=1): def _get_lane_para_points(lane_interval, distance=1):
""" """
Samples a given lane interval in points every 'distance' meters, Samples a given lane interval in points every 'distance' meters,