Typing and modernisation of Agent code (#8072)

* added cpp client build docs (#7942)

* fixed IMU units (#7960)

* Update README.md with new TinyURL links (#7988)

* Added inverse transform (#7999)

Co-authored-by: glopezdiest <glopez@cvc.uab.cat>

* Aaron/fixwheelchair (#8001)

* Fix OSM2ODR build

* Updated fix wheelchair default value

* Docs/unit updates (#8007)

* fixed IMU units

* updated autitwheel version

* Add a `*.pyi` file for auto-completion & hints.

To enable auto-completion and hints in code editors such as VScode, create a `*.pyi` file. This feature is compatible with `python 3.9` and later versions.

* Fixes and missing Iterators

* Fixed Actor.parent

Can be None or an Actor

* Fixed missing return types

* Updated changelog

needs merge with dev version

* Added DSVEventArray iterator

* Added missing type for Labelled Point

* Fixed spelling misstakes

* Removed wrong unit indication

* Added missing -> World to load_world

* Added missing return value to reload_world

* FIX: __init__ methods do not return

* FIX: added ApplyTransform, fixed ApplyTorque

* Filled in missing information and types.

* ActorList.filter actually returns ActorList

* Fixed CityObjectLabels

* Disambiguated get_waypoint signature

Syntax fix (squased)

* Added undocumented variables

FutureActor
laod_world_if_different

* Corrected Sensor.is_listening

Was changed to a function in 0.9.15. More info see: https://github.com/carla-simulator/carla/pull/7439

* Added type hints for `values` attribute on enums

* Fix intendation shadowing methods

* Fix missing @property

* Formatted some docstring to be shorter

* Added stubs for HUD drawing

Functions from #7168

* Corrected and more precise type-hints

- fixed carla.Waypoint.next_until_lane_end

* Improved get_waypoint disambiguation

correctly added two overload function

* Fix spelling mistakes

* Better usage of Enum if typing.Self is availiable

Using Self will not report an override / incompatible error.

* Fix: Enum values were tuples. Added Flag or Int to Enums

* Fixes for wrong stubs

- OpendriveGenerationParameter had no init
- missing @property
- wrong signatures

* Added self parameter to property signatures

* Various fixes

- wrong signatures
- wrong names

* Added setters for VehicleControl

* Improved get_waypoints and Literal type hints

* Corrected [try_]spawn_actor keyword name

* Added Transform.inverse_transform and corrected signature

parameter is called in_point not in_vector

* Improved Callable and callbacks signature

* Corrections and additions

more setters
missing, wrong types corrected
spelling

* Fixed Vector arithmetic

* added digital twins video (#8090)

* Typing for GlobalRoutePlanner

* 0.9.13+ modernisation of code

* made function static

* Updated changelog

* fixed spelling mistakes

* No unnecessary inheritance from object and empty class brackets & other

Removed empty variables and whitespaces
formatting

* navigation information is now loaded when changing maps

* Porting the changes done to UE5 to fix the recording leak to UE4

The slowdown is considerably more noticeable here since the engine runs much smoother. This makes evident that this is a stopgap measure, and should be looked into further down the line.

* Fixed typo in CityScapes palette (#8137)

* Correcting makefile typo to avoid override warning for target "downloadplugins" (#8167)

The downloadplugins target is already defined below (line 162).

* fix typo in title (#8225)

* added unreal coord system, fixed v2x (#8251)

---------

Co-authored-by: MattRoweEAIF <125647690+MattRoweEAIF@users.noreply.github.com>
Co-authored-by: Blyron <samaniegoaaron112@gmail.com>
Co-authored-by: glopezdiest <58212725+glopezdiest@users.noreply.github.com>
Co-authored-by: glopezdiest <glopez@cvc.uab.cat>
Co-authored-by: Blyron <53337103+Blyron@users.noreply.github.com>
Co-authored-by: Minokori <73998474+Minokori@users.noreply.github.com>
Co-authored-by: AreopagX <49414432+AreopagX@users.noreply.github.com>
Co-authored-by: Jorge Virgos <jorgevirgos.dev@gmail.com>
Co-authored-by: Sergio Paniego Blanco <sergiopaniegoblanco@gmail.com>
Co-authored-by: Ylmdrin <150919430+Ylmdrin@users.noreply.github.com>
This commit is contained in:
Daraan 2024-10-18 17:47:49 +02:00 committed by GitHub
parent 715c217ad0
commit 9a59963cad
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 117 additions and 65 deletions

View File

@ -20,6 +20,7 @@
* Added V2X sensors for cooperative awareness message and custom user-defined messages to support vehicle-to-vehicle communication * Added V2X sensors for cooperative awareness message and custom user-defined messages to support vehicle-to-vehicle communication
* Added named tuples for BasicAgent.py's detection result to allow for type-hints and better semantics. * Added named tuples for BasicAgent.py's detection result to allow for type-hints and better semantics.
* Added type-hint support for the PythonAPI * Added type-hint support for the PythonAPI
* Added type-hints to GlobalRoutePlanner and use carla.Vector3D code instead of pre 0.9.13 numpy code.
## CARLA 0.9.15 ## CARLA 0.9.15

View File

@ -6,7 +6,7 @@
""" """
This module implements an agent that roams around a track following random This module implements an agent that roams around a track following random
waypoints and avoiding other vehicles. The agent also responds to traffic lights. waypoints and avoiding other vehicles. The agent also responds to traffic lights.
It can also make use of the global route planner to follow a specifed route It can also make use of the global route planner to follow a specified route
""" """
import carla import carla
@ -15,13 +15,12 @@ from shapely.geometry import Polygon
from agents.navigation.local_planner import LocalPlanner, RoadOption from agents.navigation.local_planner import LocalPlanner, RoadOption
from agents.navigation.global_route_planner import GlobalRoutePlanner from agents.navigation.global_route_planner import GlobalRoutePlanner
from agents.tools.misc import (get_speed, is_within_distance, from agents.tools.misc import (get_speed, is_within_distance,
get_trafficlight_trigger_location, get_trafficlight_trigger_location)
compute_distance)
from agents.tools.hints import ObstacleDetectionResult, TrafficLightDetectionResult from agents.tools.hints import ObstacleDetectionResult, TrafficLightDetectionResult
class BasicAgent(object): class BasicAgent:
""" """
BasicAgent implements an agent that navigates the scene. BasicAgent implements an agent that navigates the scene.
This agent respects traffic lights and other vehicles, but ignores stop signs. This agent respects traffic lights and other vehicles, but ignores stop signs.
@ -31,7 +30,7 @@ class BasicAgent(object):
def __init__(self, vehicle, target_speed=20, opt_dict={}, map_inst=None, grp_inst=None): def __init__(self, vehicle, target_speed=20, opt_dict={}, map_inst=None, grp_inst=None):
""" """
Initialization the agent paramters, the local and the global planner. Initialization the agent parameters, the local and the global planner.
:param vehicle: actor to apply to agent logic onto :param vehicle: actor to apply to agent logic onto
:param target_speed: speed (in Km/h) at which the vehicle will move :param target_speed: speed (in Km/h) at which the vehicle will move
@ -102,7 +101,7 @@ class BasicAgent(object):
# Get the static elements of the scene # Get the static elements of the scene
self._lights_list = self._world.get_actors().filter("*traffic_light*") self._lights_list = self._world.get_actors().filter("*traffic_light*")
self._lights_map = {} # Dictionary mapping a traffic light to a wp corrspoing to its trigger volume location self._lights_map = {} # Dictionary mapping a traffic light to a wp corresponding to its trigger volume location
def add_emergency_stop(self, control): def add_emergency_stop(self, control):
""" """
@ -139,14 +138,14 @@ class BasicAgent(object):
def get_global_planner(self): def get_global_planner(self):
"""Get method for protected member local planner""" """Get method for protected member local planner"""
return self._global_planner return self._global_planner
def set_destination(self, end_location, start_location=None, clean_queue=True): def set_destination(self, end_location, start_location=None, clean_queue=True):
# type: (carla.Location, carla.Location | None, bool) -> None # type: (carla.Location, carla.Location | None, bool) -> None
""" """
This method creates a list of waypoints between a starting and ending location, This method creates a list of waypoints between a starting and ending location,
based on the route returned by the global router, and adds it to the local planner. based on the route returned by the global router, and adds it to the local planner.
If no starting location is passed and `clean_queue` is True, the vehicle local planner's If no starting location is passed and `clean_queue` is True, the vehicle local planner's
target location is chosen, which corresponds (by default), to a location about 5 meters target location is chosen, which corresponds (by default), to a location about 5 meters
in front of the vehicle. in front of the vehicle.
If `clean_queue` is False the newly planned route will be appended to the current route. If `clean_queue` is False the newly planned route will be appended to the current route.
@ -157,19 +156,19 @@ class BasicAgent(object):
if not start_location: if not start_location:
if clean_queue and self._local_planner.target_waypoint: if clean_queue and self._local_planner.target_waypoint:
# Plan from the waypoint in front of the vehicle onwards # Plan from the waypoint in front of the vehicle onwards
start_location = self._local_planner.target_waypoint.transform.location start_location = self._local_planner.target_waypoint.transform.location
elif not clean_queue and self._local_planner._waypoints_queue: elif not clean_queue and self._local_planner._waypoints_queue:
# Append to the current plan # Append to the current plan
start_location = self._local_planner._waypoints_queue[-1][0].transform.location start_location = self._local_planner._waypoints_queue[-1][0].transform.location
else: else:
# no target_waypoint or _waypoints_queue empty, use vehicle location # no target_waypoint or _waypoints_queue empty, use vehicle location
start_location = self._vehicle.get_location() start_location = self._vehicle.get_location()
start_waypoint = self._map.get_waypoint(start_location) start_waypoint = self._map.get_waypoint(start_location)
end_waypoint = self._map.get_waypoint(end_location) end_waypoint = self._map.get_waypoint(end_location)
route_trace = self.trace_route(start_waypoint, end_waypoint) route_trace = self.trace_route(start_waypoint, end_waypoint)
self._local_planner.set_global_plan(route_trace, clean_queue=clean_queue) self._local_planner.set_global_plan(route_trace, clean_queue=clean_queue)
def set_global_plan(self, plan, stop_waypoint_creation=True, clean_queue=True): def set_global_plan(self, plan, stop_waypoint_creation=True, clean_queue=True):
""" """
Adds a specific plan to the agent. Adds a specific plan to the agent.
@ -325,7 +324,7 @@ class BasicAgent(object):
""" """
Method to check if there is a vehicle in front of the agent blocking its path. Method to check if there is a vehicle in front of the agent blocking its path.
:param vehicle_list (list of carla.Vehicle): list contatining vehicle objects. :param vehicle_list (list of carla.Vehicle): list containing vehicle objects.
If None, all vehicle in the scene are used If None, all vehicle in the scene are used
:param max_distance: max freespace to check for obstacles. :param max_distance: max freespace to check for obstacles.
If None, the base threshold value is used If None, the base threshold value is used
@ -354,7 +353,7 @@ class BasicAgent(object):
return None return None
return Polygon(route_bb) return Polygon(route_bb)
if self._ignore_vehicles: if self._ignore_vehicles:
return ObstacleDetectionResult(False, None, -1) return ObstacleDetectionResult(False, None, -1)
@ -404,7 +403,7 @@ class BasicAgent(object):
target_polygon = Polygon(target_list) target_polygon = Polygon(target_list)
if route_polygon.intersects(target_polygon): if route_polygon.intersects(target_polygon):
return ObstacleDetectionResult(True, target_vehicle, compute_distance(target_vehicle.get_location(), ego_location)) return ObstacleDetectionResult(True, target_vehicle, target_vehicle.get_location().distance(ego_location))
# Simplified approach, using only the plan waypoints (similar to TM) # Simplified approach, using only the plan waypoints (similar to TM)
else: else:
@ -425,13 +424,15 @@ class BasicAgent(object):
) )
if is_within_distance(target_rear_transform, ego_front_transform, max_distance, [low_angle_th, up_angle_th]): if is_within_distance(target_rear_transform, ego_front_transform, max_distance, [low_angle_th, up_angle_th]):
return ObstacleDetectionResult(True, target_vehicle, compute_distance(target_transform.location, ego_transform.location)) return ObstacleDetectionResult(True, target_vehicle, target_transform.location.distance(ego_transform.location))
return ObstacleDetectionResult(False, None, -1) return ObstacleDetectionResult(False, None, -1)
def _generate_lane_change_path(self, waypoint, direction='left', distance_same_lane=10, @staticmethod
def _generate_lane_change_path(waypoint, direction='left', distance_same_lane=10,
distance_other_lane=25, lane_change_distance=25, distance_other_lane=25, lane_change_distance=25,
check=True, lane_changes=1, step_distance=2): check=True, lane_changes=1, step_distance=2):
# type: (carla.Waypoint, str, float, float, float, bool, int, float) -> list[tuple[carla.Waypoint, RoadOption]]
""" """
This methods generates a path that results in a lane change. This methods generates a path that results in a lane change.
Use the different distances to fine-tune the maneuver. Use the different distances to fine-tune the maneuver.

View File

@ -8,14 +8,13 @@
waypoints and avoiding other vehicles. The agent also responds to traffic lights, waypoints and avoiding other vehicles. The agent also responds to traffic lights,
traffic signs, and has different possible configurations. """ traffic signs, and has different possible configurations. """
import random
import numpy as np import numpy as np
import carla import carla
from agents.navigation.basic_agent import BasicAgent from agents.navigation.basic_agent import BasicAgent
from agents.navigation.local_planner import RoadOption from agents.navigation.local_planner import RoadOption
from agents.navigation.behavior_types import Cautious, Aggressive, Normal from agents.navigation.behavior_types import Cautious, Aggressive, Normal
from agents.tools.misc import get_speed, positive, is_within_distance, compute_distance from agents.tools.misc import get_speed, positive
class BehaviorAgent(BasicAgent): class BehaviorAgent(BasicAgent):
""" """

View File

@ -4,7 +4,7 @@
""" This module contains the different parameters sets for each behavior. """ """ This module contains the different parameters sets for each behavior. """
class Cautious(object): class Cautious:
"""Class for Cautious agent.""" """Class for Cautious agent."""
max_speed = 40 max_speed = 40
speed_lim_dist = 6 speed_lim_dist = 6
@ -15,7 +15,7 @@ class Cautious(object):
tailgate_counter = 0 tailgate_counter = 0
class Normal(object): class Normal:
"""Class for Normal agent.""" """Class for Normal agent."""
max_speed = 50 max_speed = 50
speed_lim_dist = 3 speed_lim_dist = 3
@ -26,7 +26,7 @@ class Normal(object):
tailgate_counter = 0 tailgate_counter = 0
class Aggressive(object): class Aggressive:
"""Class for Aggressive agent.""" """Class for Aggressive agent."""
max_speed = 70 max_speed = 70
speed_lim_dist = 1 speed_lim_dist = 1

View File

@ -6,7 +6,7 @@
""" """
This module implements an agent that roams around a track following random This module implements an agent that roams around a track following random
waypoints and avoiding other vehicles. The agent also responds to traffic lights. waypoints and avoiding other vehicles. The agent also responds to traffic lights.
It can also make use of the global route planner to follow a specifed route It can also make use of the global route planner to follow a specified route
""" """
import carla import carla

View File

@ -12,7 +12,7 @@ import carla
from agents.tools.misc import get_speed from agents.tools.misc import get_speed
class VehiclePIDController(): class VehiclePIDController:
""" """
VehiclePIDController is the combination of two PID controllers VehiclePIDController is the combination of two PID controllers
(lateral and longitudinal) to perform the (lateral and longitudinal) to perform the
@ -105,7 +105,7 @@ class VehiclePIDController():
self._lat_controller.set_offset(offset) self._lat_controller.set_offset(offset)
class PIDLongitudinalController(): class PIDLongitudinalController:
""" """
PIDLongitudinalController implements longitudinal control using a PID. PIDLongitudinalController implements longitudinal control using a PID.
""" """
@ -171,7 +171,7 @@ class PIDLongitudinalController():
self._dt = dt self._dt = dt
class PIDLateralController(): class PIDLateralController:
""" """
PIDLateralController implements lateral control using a PID. PIDLateralController implements lateral control using a PID.
""" """
@ -199,7 +199,7 @@ class PIDLateralController():
def run_step(self, waypoint): def run_step(self, waypoint):
""" """
Execute one step of lateral control to steer Execute one step of lateral control to steer
the vehicle towards a certain waypoin. the vehicle towards a certain waypoint.
:param waypoint: target waypoint :param waypoint: target waypoint
:return: steering control in the range [-1, 1] where: :return: steering control in the range [-1, 1] where:

View File

@ -14,20 +14,55 @@ import networkx as nx
import carla import carla
from agents.navigation.local_planner import RoadOption from agents.navigation.local_planner import RoadOption
from agents.tools.misc import vector
class GlobalRoutePlanner(object): # Python 2 compatibility
TYPE_CHECKING = False
if TYPE_CHECKING:
import sys
if sys.version_info >= (3, 11):
from typing import TypedDict, NotRequired
elif sys.version_info >= (3, 8):
from typing import TypedDict
from typing_extensions import NotRequired
else:
from typing_extensions import TypedDict, NotRequired
TopologyDict = TypedDict('TopologyDict',
{
'entry': carla.Waypoint,
'exit': carla.Waypoint,
'entryxyz': tuple[float, float, float],
'exitxyz': tuple[float, float, float],
'path': list[carla.Waypoint]
})
EdgeDict = TypedDict('EdgeDict',
{
'length': int,
'path': list[carla.Waypoint],
'entry_waypoint': carla.Waypoint,
'exit_waypoint': carla.Waypoint,
'entry_vector': np.ndarray,
'exit_vector': np.ndarray,
'net_vector': list[float],
'intersection': bool,
'type': RoadOption,
'change_waypoint': NotRequired[carla.Waypoint]
})
class GlobalRoutePlanner:
""" """
This class provides a very high level route plan. This class provides a very high level route plan.
""" """
def __init__(self, wmap, sampling_resolution): def __init__(self, wmap, sampling_resolution):
# type: (carla.Map, float) -> None
self._sampling_resolution = sampling_resolution self._sampling_resolution = sampling_resolution
self._wmap = wmap self._wmap = wmap
self._topology = None self._topology = [] # type: list[TopologyDict]
self._graph = None self._graph = None # type: nx.DiGraph # type: ignore[assignment]
self._id_map = None self._id_map = None # type: dict[tuple[float, float, float], int] # type: ignore[assignment]
self._road_id_to_edge = None self._road_id_to_edge = None # type: dict[int, dict[int, dict[int, tuple[int, int]]]] # type: ignore[assignment]
self._intersection_end_node = -1 self._intersection_end_node = -1
self._previous_decision = RoadOption.VOID self._previous_decision = RoadOption.VOID
@ -39,28 +74,29 @@ class GlobalRoutePlanner(object):
self._lane_change_link() self._lane_change_link()
def trace_route(self, origin, destination): def trace_route(self, origin, destination):
# type: (carla.Location, carla.Location) -> list[tuple[carla.Waypoint, RoadOption]]
""" """
This method returns list of (carla.Waypoint, RoadOption) This method returns list of (carla.Waypoint, RoadOption)
from origin to destination from origin to destination
""" """
route_trace = [] route_trace = [] # type: list[tuple[carla.Waypoint, RoadOption]]
route = self._path_search(origin, destination) route = self._path_search(origin, destination)
current_waypoint = self._wmap.get_waypoint(origin) current_waypoint = self._wmap.get_waypoint(origin)
destination_waypoint = self._wmap.get_waypoint(destination) destination_waypoint = self._wmap.get_waypoint(destination)
for i in range(len(route) - 1): for i in range(len(route) - 1):
road_option = self._turn_decision(i, route) road_option = self._turn_decision(i, route)
edge = self._graph.edges[route[i], route[i+1]] edge = self._graph.edges[route[i], route[i + 1]] # type: EdgeDict
path = [] path = [] # type: list[carla.Waypoint]
if edge['type'] != RoadOption.LANEFOLLOW and edge['type'] != RoadOption.VOID: if edge['type'] != RoadOption.LANEFOLLOW and edge['type'] != RoadOption.VOID:
route_trace.append((current_waypoint, road_option)) route_trace.append((current_waypoint, road_option))
exit_wp = edge['exit_waypoint'] exit_wp = edge['exit_waypoint']
n1, n2 = self._road_id_to_edge[exit_wp.road_id][exit_wp.section_id][exit_wp.lane_id] n1, n2 = self._road_id_to_edge[exit_wp.road_id][exit_wp.section_id][exit_wp.lane_id]
next_edge = self._graph.edges[n1, n2] next_edge = self._graph.edges[n1, n2] # type: EdgeDict
if next_edge['path']: if next_edge['path']:
closest_index = self._find_closest_in_list(current_waypoint, next_edge['path']) closest_index = self._find_closest_in_list(current_waypoint, next_edge['path'])
closest_index = min(len(next_edge['path'])-1, closest_index+5) closest_index = min(len(next_edge['path']) - 1, closest_index + 5)
current_waypoint = next_edge['path'][closest_index] current_waypoint = next_edge['path'][closest_index]
else: else:
current_waypoint = next_edge['exit_waypoint'] current_waypoint = next_edge['exit_waypoint']
@ -72,9 +108,11 @@ class GlobalRoutePlanner(object):
for waypoint in path[closest_index:]: for waypoint in path[closest_index:]:
current_waypoint = waypoint current_waypoint = waypoint
route_trace.append((current_waypoint, road_option)) route_trace.append((current_waypoint, road_option))
if len(route)-i <= 2 and waypoint.transform.location.distance(destination) < 2*self._sampling_resolution: if len(route) - i <= 2 and waypoint.transform.location.distance(
destination) < 2 * self._sampling_resolution:
break break
elif len(route)-i <= 2 and current_waypoint.road_id == destination_waypoint.road_id and current_waypoint.section_id == destination_waypoint.section_id and current_waypoint.lane_id == destination_waypoint.lane_id: elif len(
route) - i <= 2 and current_waypoint.road_id == destination_waypoint.road_id and current_waypoint.section_id == destination_waypoint.section_id and current_waypoint.lane_id == destination_waypoint.lane_id:
destination_index = self._find_closest_in_list(destination_waypoint, path) destination_index = self._find_closest_in_list(destination_waypoint, path)
if closest_index > destination_index: if closest_index > destination_index:
break break
@ -101,7 +139,7 @@ class GlobalRoutePlanner(object):
# Rounding off to avoid floating point imprecision # Rounding off to avoid floating point imprecision
x1, y1, z1, x2, y2, z2 = np.round([l1.x, l1.y, l1.z, l2.x, l2.y, l2.z], 0) x1, y1, z1, x2, y2, z2 = np.round([l1.x, l1.y, l1.z, l2.x, l2.y, l2.z], 0)
wp1.transform.location, wp2.transform.location = l1, l2 wp1.transform.location, wp2.transform.location = l1, l2
seg_dict = dict() seg_dict = dict() # type: TopologyDict # type: ignore[assignment]
seg_dict['entry'], seg_dict['exit'] = wp1, wp2 seg_dict['entry'], seg_dict['exit'] = wp1, wp2
seg_dict['entryxyz'], seg_dict['exitxyz'] = (x1, y1, z1), (x2, y2, z2) seg_dict['entryxyz'], seg_dict['exitxyz'] = (x1, y1, z1), (x2, y2, z2)
seg_dict['path'] = [] seg_dict['path'] = []
@ -163,6 +201,7 @@ class GlobalRoutePlanner(object):
entry_carla_vector = entry_wp.transform.rotation.get_forward_vector() entry_carla_vector = entry_wp.transform.rotation.get_forward_vector()
exit_carla_vector = exit_wp.transform.rotation.get_forward_vector() exit_carla_vector = exit_wp.transform.rotation.get_forward_vector()
net_carla_vector = (exit_wp.transform.location - entry_wp.transform.location).make_unit_vector()
# Adding edge with attributes # Adding edge with attributes
self._graph.add_edge( self._graph.add_edge(
@ -173,7 +212,7 @@ class GlobalRoutePlanner(object):
[entry_carla_vector.x, entry_carla_vector.y, entry_carla_vector.z]), [entry_carla_vector.x, entry_carla_vector.y, entry_carla_vector.z]),
exit_vector=np.array( exit_vector=np.array(
[exit_carla_vector.x, exit_carla_vector.y, exit_carla_vector.z]), [exit_carla_vector.x, exit_carla_vector.y, exit_carla_vector.z]),
net_vector=vector(entry_wp.transform.location, exit_wp.transform.location), net_vector=[net_carla_vector.x, net_carla_vector.y, net_carla_vector.z],
intersection=intersection, type=RoadOption.LANEFOLLOW) intersection=intersection, type=RoadOption.LANEFOLLOW)
def _find_loose_ends(self): def _find_loose_ends(self):
@ -198,10 +237,10 @@ class GlobalRoutePlanner(object):
if section_id not in self._road_id_to_edge[road_id]: if section_id not in self._road_id_to_edge[road_id]:
self._road_id_to_edge[road_id][section_id] = dict() self._road_id_to_edge[road_id][section_id] = dict()
n1 = self._id_map[exit_xyz] n1 = self._id_map[exit_xyz]
n2 = -1*count_loose_ends n2 = -1 * count_loose_ends
self._road_id_to_edge[road_id][section_id][lane_id] = (n1, n2) self._road_id_to_edge[road_id][section_id][lane_id] = (n1, n2)
next_wp = end_wp.next(hop_resolution) next_wp = end_wp.next(hop_resolution)
path = [] path = [] # type: list[carla.Waypoint]
while next_wp is not None and next_wp \ while next_wp is not None and next_wp \
and next_wp[0].road_id == road_id \ and next_wp[0].road_id == road_id \
and next_wp[0].section_id == section_id \ and next_wp[0].section_id == section_id \
@ -263,12 +302,13 @@ class GlobalRoutePlanner(object):
break break
def _localize(self, location): def _localize(self, location):
# type: (carla.Location) -> None | tuple[int, int]
""" """
This function finds the road segment that a given location This function finds the road segment that a given location
is part of, returning the edge it belongs to is part of, returning the edge it belongs to
""" """
waypoint = self._wmap.get_waypoint(location) waypoint = self._wmap.get_waypoint(location)
edge = None edge = None # type: None | tuple[int, int]
try: try:
edge = self._road_id_to_edge[waypoint.road_id][waypoint.section_id][waypoint.lane_id] edge = self._road_id_to_edge[waypoint.road_id][waypoint.section_id][waypoint.lane_id]
except KeyError: except KeyError:
@ -282,9 +322,10 @@ class GlobalRoutePlanner(object):
""" """
l1 = np.array(self._graph.nodes[n1]['vertex']) l1 = np.array(self._graph.nodes[n1]['vertex'])
l2 = np.array(self._graph.nodes[n2]['vertex']) l2 = np.array(self._graph.nodes[n2]['vertex'])
return np.linalg.norm(l1-l2) return np.linalg.norm(l1 - l2)
def _path_search(self, origin, destination): def _path_search(self, origin, destination):
# type: (carla.Location, carla.Location) -> list[int]
""" """
This function finds the shortest path connecting origin and destination This function finds the shortest path connecting origin and destination
using A* search with distance heuristic. using A* search with distance heuristic.
@ -302,6 +343,7 @@ class GlobalRoutePlanner(object):
return route return route
def _successive_last_intersection_edge(self, index, route): def _successive_last_intersection_edge(self, index, route):
# type: (int, list[int]) -> tuple[int | None, EdgeDict | None]
""" """
This method returns the last successive intersection edge This method returns the last successive intersection edge
from a starting index on the route. from a starting index on the route.
@ -309,10 +351,10 @@ class GlobalRoutePlanner(object):
proper turn decisions. proper turn decisions.
""" """
last_intersection_edge = None last_intersection_edge = None # type: EdgeDict | None
last_node = None last_node = None
for node1, node2 in [(route[i], route[i+1]) for i in range(index, len(route)-1)]: for node1, node2 in [(route[i], route[i + 1]) for i in range(index, len(route) - 1)]:
candidate_edge = self._graph.edges[node1, node2] candidate_edge = self._graph.edges[node1, node2] # type: EdgeDict
if node1 == route[index]: if node1 == route[index]:
last_intersection_edge = candidate_edge last_intersection_edge = candidate_edge
if candidate_edge['type'] == RoadOption.LANEFOLLOW and candidate_edge['intersection']: if candidate_edge['type'] == RoadOption.LANEFOLLOW and candidate_edge['intersection']:
@ -324,16 +366,17 @@ class GlobalRoutePlanner(object):
return last_node, last_intersection_edge return last_node, last_intersection_edge
def _turn_decision(self, index, route, threshold=math.radians(35)): def _turn_decision(self, index, route, threshold=math.radians(35)):
# type: (int, list[int], float) -> RoadOption
""" """
This method returns the turn decision (RoadOption) for pair of edges This method returns the turn decision (RoadOption) for pair of edges
around current index of route list around current index of route list
""" """
decision = None decision = None
previous_node = route[index-1] previous_node = route[index - 1]
current_node = route[index] current_node = route[index]
next_node = route[index+1] next_node = route[index + 1]
next_edge = self._graph.edges[current_node, next_node] next_edge = self._graph.edges[current_node, next_node] # type: EdgeDict
if index > 0: if index > 0:
if self._previous_decision != RoadOption.VOID \ if self._previous_decision != RoadOption.VOID \
and self._intersection_end_node > 0 \ and self._intersection_end_node > 0 \
@ -343,7 +386,7 @@ class GlobalRoutePlanner(object):
decision = self._previous_decision decision = self._previous_decision
else: else:
self._intersection_end_node = -1 self._intersection_end_node = -1
current_edge = self._graph.edges[previous_node, current_node] current_edge = self._graph.edges[previous_node, current_node] # type: EdgeDict
calculate_turn = current_edge['type'] == RoadOption.LANEFOLLOW and not current_edge[ calculate_turn = current_edge['type'] == RoadOption.LANEFOLLOW and not current_edge[
'intersection'] and next_edge['type'] == RoadOption.LANEFOLLOW and next_edge['intersection'] 'intersection'] and next_edge['type'] == RoadOption.LANEFOLLOW and next_edge['intersection']
if calculate_turn: if calculate_turn:
@ -358,12 +401,12 @@ class GlobalRoutePlanner(object):
for neighbor in self._graph.successors(current_node): for neighbor in self._graph.successors(current_node):
select_edge = self._graph.edges[current_node, neighbor] select_edge = self._graph.edges[current_node, neighbor]
if select_edge['type'] == RoadOption.LANEFOLLOW: if select_edge['type'] == RoadOption.LANEFOLLOW:
if neighbor != route[index+1]: if neighbor != route[index + 1]:
sv = select_edge['net_vector'] sv = select_edge['net_vector']
cross_list.append(np.cross(cv, sv)[2]) cross_list.append(np.cross(cv, sv)[2])
next_cross = np.cross(cv, nv)[2] next_cross = np.cross(cv, nv)[2]
deviation = math.acos(np.clip( deviation = math.acos(np.clip(
np.dot(cv, nv)/(np.linalg.norm(cv)*np.linalg.norm(nv)), -1.0, 1.0)) np.dot(cv, nv) / (np.linalg.norm(cv) * np.linalg.norm(nv)), -1.0, 1.0))
if not cross_list: if not cross_list:
cross_list.append(0) cross_list.append(0)
if deviation < threshold: if deviation < threshold:

View File

@ -28,7 +28,7 @@ class RoadOption(IntEnum):
CHANGELANERIGHT = 6 CHANGELANERIGHT = 6
class LocalPlanner(object): class LocalPlanner:
""" """
LocalPlanner implements the basic behavior of following a LocalPlanner implements the basic behavior of following a
trajectory of waypoints that is generated on-the-fly. trajectory of waypoints that is generated on-the-fly.
@ -151,7 +151,7 @@ class LocalPlanner(object):
def follow_speed_limits(self, value=True): def follow_speed_limits(self, value=True):
""" """
Activates a flag that makes the max speed dynamically vary according to the spped limits Activates a flag that makes the max speed dynamically vary according to the speed limits
:param value: bool :param value: bool
:return: :return:
@ -287,7 +287,7 @@ class LocalPlanner(object):
try: try:
wpt, direction = self._waypoints_queue[-1] wpt, direction = self._waypoints_queue[-1]
return wpt, direction return wpt, direction
except IndexError as i: except IndexError:
return None, RoadOption.VOID return None, RoadOption.VOID
def get_plan(self): def get_plan(self):
@ -316,7 +316,7 @@ def _retrieve_options(list_waypoints, current_waypoint):
options = [] options = []
for next_waypoint in list_waypoints: for next_waypoint in list_waypoints:
# this is needed because something we are linking to # this is needed because something we are linking to
# the beggining of an intersection, therefore the # the beginning of an intersection, therefore the
# variation in angle is small # variation in angle is small
next_next_waypoint = next_waypoint.next(3.0)[0] next_next_waypoint = next_waypoint.next(3.0)[0]
link = _compute_connection(current_waypoint, next_next_waypoint) link = _compute_connection(current_waypoint, next_next_waypoint)

View File

@ -18,7 +18,7 @@ else:
class ObstacleDetectionResult(NamedTuple): class ObstacleDetectionResult(NamedTuple):
obstacle_was_found : bool obstacle_was_found : bool
obstacle : Union[Actor, None] obstacle : Union[Actor, None]
distance : float distance : float
# distance : Union[float, Literal[-1]] # Python 3.8+ only # distance : Union[float, Literal[-1]] # Python 3.8+ only
class TrafficLightDetectionResult(NamedTuple): class TrafficLightDetectionResult(NamedTuple):
@ -30,5 +30,5 @@ else:
ObstacleDetectionResult = NamedTuple('ObstacleDetectionResult', [('obstacle_was_found', bool), ('obstacle', Union[Actor, None]), ('distance', Union[float, Literal[-1]])]) ObstacleDetectionResult = NamedTuple('ObstacleDetectionResult', [('obstacle_was_found', bool), ('obstacle', Union[Actor, None]), ('distance', Union[float, Literal[-1]])])
else: else:
ObstacleDetectionResult = NamedTuple('ObstacleDetectionResult', [('obstacle_was_found', bool), ('obstacle', Union[Actor, None]), ('distance', float)]) ObstacleDetectionResult = NamedTuple('ObstacleDetectionResult', [('obstacle_was_found', bool), ('obstacle', Union[Actor, None]), ('distance', float)])
TrafficLightDetectionResult = NamedTuple('TrafficLightDetectionResult', [('traffic_light_was_found', bool), ('traffic_light', Union[TrafficLight, None])]) TrafficLightDetectionResult = NamedTuple('TrafficLightDetectionResult', [('traffic_light_was_found', bool), ('traffic_light', Union[TrafficLight, None])])

View File

@ -12,6 +12,8 @@ import math
import numpy as np import numpy as np
import carla import carla
_EPS = np.finfo(float).eps
def draw_waypoints(world, waypoints, z=0.5): def draw_waypoints(world, waypoints, z=0.5):
""" """
Draw a list of waypoints at a certain height given in z. Draw a list of waypoints at a certain height given in z.
@ -140,12 +142,15 @@ def vector(location_1, location_2):
Returns the unit vector from location_1 to location_2 Returns the unit vector from location_1 to location_2
:param location_1, location_2: carla.Location objects :param location_1, location_2: carla.Location objects
.. note::
Alternatively you can use:
`(location_2 - location_1).make_unit_vector()`
""" """
x = location_2.x - location_1.x x = location_2.x - location_1.x
y = location_2.y - location_1.y y = location_2.y - location_1.y
z = location_2.z - location_1.z z = location_2.z - location_1.z
norm = np.linalg.norm([x, y, z]) + np.finfo(float).eps norm = np.linalg.norm([x, y, z]) + _EPS
return [x / norm, y / norm, z / norm] return [x / norm, y / norm, z / norm]
@ -154,11 +159,14 @@ def compute_distance(location_1, location_2):
Euclidean distance between 3D points Euclidean distance between 3D points
:param location_1, location_2: 3D points :param location_1, location_2: 3D points
.. deprecated:: 0.9.13
Use `location_1.distance(location_2)` instead
""" """
x = location_2.x - location_1.x x = location_2.x - location_1.x
y = location_2.y - location_1.y y = location_2.y - location_1.y
z = location_2.z - location_1.z z = location_2.z - location_1.z
norm = np.linalg.norm([x, y, z]) + np.finfo(float).eps norm = np.linalg.norm([x, y, z]) + _EPS
return norm return norm