Readded the old planner with QoL changes

This commit is contained in:
Guillermo 2021-07-28 10:59:34 +02:00 committed by bernat
parent e6420a0f73
commit deda2c3e5e
3 changed files with 358 additions and 384 deletions

View File

@ -149,7 +149,7 @@ class BasicAgent(object):
:param start_waypoint (carla.Waypoint): initial waypoint
:param end_waypoint (carla.Waypoint): final waypoint
"""
return self._global_planner.trace_route(start_waypoint, end_waypoint)
return self._global_planner.trace_route(start_waypoint.transform.location, end_waypoint.transform.location)
def run_step(self):
"""Execute one step of navigation."""

View File

@ -7,155 +7,386 @@
"""
This module provides GlobalRoutePlanner implementation.
"""
import math
import numpy as np
import networkx as nx
import carla
from agents.navigation.local_planner import RoadOption
from agents.tools.route_helper import trace_route, initialize_map
from agents.tools.misc import vector
class GlobalRoutePlanner(object):
"""
This class provides a high level route planner.
This class provides a very high level route plan.
"""
def __init__(self, wmap, sampling_resolution):
"""
Constructor
"""
self._previous_decision = RoadOption.VOID
self._sampling_resolution = sampling_resolution
self._wmap = wmap
initialize_map(self._wmap)
self._topology = None
self._graph = None
self._id_map = None
self._road_id_to_edge = None
def trace_route(self, origin, destination, with_options=True):
self._intersection_end_node = -1
self._previous_decision = RoadOption.VOID
# Build the graph
self._build_topology()
self._build_graph()
self._find_loose_ends()
self._lane_change_link()
def trace_route(self, origin, destination):
"""
This method traces a route between a starting and ending point.
IF 'with_options' is True (deafult behavior), returns a list of [carla.Waypoint, RoadOption],
being RoadOption a high level represnetation of what the direction of the route.
If 'with_options' is False, the road options are computed, returning a list of carla.Waypoint.
This might be interseting, for example, when computing a route in chunks, getting all the waypoint
first and then adding the options
:param origin (carla.Waypoint): starting point of the route
:param destination (carla.Waypoint): ending point of the route
:param with_options (bool): whether or not the road options will be added to the route info
This method returns list of (carla.Waypoint, RoadOption)
from origin to destination
"""
route = trace_route(origin, destination, self._wmap, self._sampling_resolution)
if not route:
return []
route_trace = []
route = self._path_search(origin, destination)
current_waypoint = self._wmap.get_waypoint(origin)
destination_waypoint = self._wmap.get_waypoint(destination)
# Add options, or not
if with_options:
return self.add_options_to_route(route)
for i in range(len(route) - 1):
road_option = self._turn_decision(i, route)
edge = self._graph.edges[route[i], route[i+1]]
path = []
if edge['type'] != RoadOption.LANEFOLLOW and edge['type'] != RoadOption.VOID:
route_trace.append((current_waypoint, road_option))
exit_wp = edge['exit_waypoint']
n1, n2 = self._road_id_to_edge[exit_wp.road_id][exit_wp.section_id][exit_wp.lane_id]
next_edge = self._graph.edges[n1, n2]
if next_edge['path']:
closest_index = self._find_closest_in_list(current_waypoint, next_edge['path'])
closest_index = min(len(next_edge['path'])-1, closest_index+5)
current_waypoint = next_edge['path'][closest_index]
else:
current_waypoint = next_edge['exit_waypoint']
route_trace.append((current_waypoint, road_option))
else:
path = path + [edge['entry_waypoint']] + edge['path'] + [edge['exit_waypoint']]
closest_index = self._find_closest_in_list(current_waypoint, path)
for waypoint in path[closest_index:]:
current_waypoint = waypoint
route_trace.append((current_waypoint, road_option))
if len(route)-i <= 2 and waypoint.transform.location.distance(destination) < 2*self._sampling_resolution:
break
elif len(route)-i <= 2 and current_waypoint.road_id == destination_waypoint.road_id and current_waypoint.section_id == destination_waypoint.section_id and current_waypoint.lane_id == destination_waypoint.lane_id:
destination_index = self._find_closest_in_list(destination_waypoint, path)
if closest_index > destination_index:
break
return route_trace
def _build_topology(self):
"""
This function retrieves topology from the server as a list of
road segments as pairs of waypoint objects, and processes the
topology into a list of dictionary objects with the following attributes
- entry (carla.Waypoint): waypoint of entry point of road segment
- entryxyz (tuple): (x,y,z) of entry point of road segment
- exit (carla.Waypoint): waypoint of exit point of road segment
- exitxyz (tuple): (x,y,z) of exit point of road segment
- path (list of carla.Waypoint): list of waypoints between entry to exit, separated by the resolution
"""
self._topology = []
# Retrieving waypoints to construct a detailed topology
for segment in self._wmap.get_topology():
wp1, wp2 = segment[0], segment[1]
l1, l2 = wp1.transform.location, wp2.transform.location
# Rounding off to avoid floating point imprecision
x1, y1, z1, x2, y2, z2 = np.round([l1.x, l1.y, l1.z, l2.x, l2.y, l2.z], 0)
wp1.transform.location, wp2.transform.location = l1, l2
seg_dict = dict()
seg_dict['entry'], seg_dict['exit'] = wp1, wp2
seg_dict['entryxyz'], seg_dict['exitxyz'] = (x1, y1, z1), (x2, y2, z2)
seg_dict['path'] = []
endloc = wp2.transform.location
if wp1.transform.location.distance(endloc) > self._sampling_resolution:
w = wp1.next(self._sampling_resolution)[0]
while w.transform.location.distance(endloc) > self._sampling_resolution:
seg_dict['path'].append(w)
w = w.next(self._sampling_resolution)[0]
else:
seg_dict['path'].append(wp1.next(self._sampling_resolution)[0])
self._topology.append(seg_dict)
def _build_graph(self):
"""
This function builds a networkx graph representation of topology, creating several class attributes:
- graph (networkx.DiGraph): networkx graph representing the world map, with:
Node properties:
vertex: (x,y,z) position in world map
Edge properties:
entry_vector: unit vector along tangent at entry point
exit_vector: unit vector along tangent at exit point
net_vector: unit vector of the chord from entry to exit
intersection: boolean indicating if the edge belongs to an intersection
- id_map (dictionary): mapping from (x,y,z) to node id
- road_id_to_edge (dictionary): map from road id to edge in the graph
"""
self._graph = nx.DiGraph()
self._id_map = dict() # Map with structure {(x,y,z): id, ... }
self._road_id_to_edge = dict() # Map with structure {road_id: {lane_id: edge, ... }, ... }
for segment in self._topology:
entry_xyz, exit_xyz = segment['entryxyz'], segment['exitxyz']
path = segment['path']
entry_wp, exit_wp = segment['entry'], segment['exit']
intersection = entry_wp.is_junction
road_id, section_id, lane_id = entry_wp.road_id, entry_wp.section_id, entry_wp.lane_id
for vertex in entry_xyz, exit_xyz:
# Adding unique nodes and populating id_map
if vertex not in self._id_map:
new_id = len(self._id_map)
self._id_map[vertex] = new_id
self._graph.add_node(new_id, vertex=vertex)
n1 = self._id_map[entry_xyz]
n2 = self._id_map[exit_xyz]
if road_id not in self._road_id_to_edge:
self._road_id_to_edge[road_id] = dict()
if section_id not in self._road_id_to_edge[road_id]:
self._road_id_to_edge[road_id][section_id] = dict()
self._road_id_to_edge[road_id][section_id][lane_id] = (n1, n2)
entry_carla_vector = entry_wp.transform.rotation.get_forward_vector()
exit_carla_vector = exit_wp.transform.rotation.get_forward_vector()
# Adding edge with attributes
self._graph.add_edge(
n1, n2,
length=len(path) + 1, path=path,
entry_waypoint=entry_wp, exit_waypoint=exit_wp,
entry_vector=np.array(
[entry_carla_vector.x, entry_carla_vector.y, entry_carla_vector.z]),
exit_vector=np.array(
[exit_carla_vector.x, exit_carla_vector.y, exit_carla_vector.z]),
net_vector=vector(entry_wp.transform.location, exit_wp.transform.location),
intersection=intersection, type=RoadOption.LANEFOLLOW)
def _find_loose_ends(self):
"""
This method finds road segments that have an unconnected end, and
adds them to the internal graph representation
"""
count_loose_ends = 0
hop_resolution = self._sampling_resolution
for segment in self._topology:
end_wp = segment['exit']
exit_xyz = segment['exitxyz']
road_id, section_id, lane_id = end_wp.road_id, end_wp.section_id, end_wp.lane_id
if road_id in self._road_id_to_edge \
and section_id in self._road_id_to_edge[road_id] \
and lane_id in self._road_id_to_edge[road_id][section_id]:
pass
else:
count_loose_ends += 1
if road_id not in self._road_id_to_edge:
self._road_id_to_edge[road_id] = dict()
if section_id not in self._road_id_to_edge[road_id]:
self._road_id_to_edge[road_id][section_id] = dict()
n1 = self._id_map[exit_xyz]
n2 = -1*count_loose_ends
self._road_id_to_edge[road_id][section_id][lane_id] = (n1, n2)
next_wp = end_wp.next(hop_resolution)
path = []
while next_wp is not None and next_wp \
and next_wp[0].road_id == road_id \
and next_wp[0].section_id == section_id \
and next_wp[0].lane_id == lane_id:
path.append(next_wp[0])
next_wp = next_wp[0].next(hop_resolution)
if path:
n2_xyz = (path[-1].transform.location.x,
path[-1].transform.location.y,
path[-1].transform.location.z)
self._graph.add_node(n2, vertex=n2_xyz)
self._graph.add_edge(
n1, n2,
length=len(path) + 1, path=path,
entry_waypoint=end_wp, exit_waypoint=path[-1],
entry_vector=None, exit_vector=None, net_vector=None,
intersection=end_wp.is_junction, type=RoadOption.LANEFOLLOW)
def _lane_change_link(self):
"""
This method places zero cost links in the topology graph
representing availability of lane changes.
"""
for segment in self._topology:
left_found, right_found = False, False
for waypoint in segment['path']:
if not segment['entry'].is_junction:
next_waypoint, next_road_option, next_segment = None, None, None
if waypoint.right_lane_marking.lane_change & carla.LaneChange.Right and not right_found:
next_waypoint = waypoint.get_right_lane()
if next_waypoint is not None \
and next_waypoint.lane_type == carla.LaneType.Driving \
and waypoint.road_id == next_waypoint.road_id:
next_road_option = RoadOption.CHANGELANERIGHT
next_segment = self._localize(next_waypoint.transform.location)
if next_segment is not None:
self._graph.add_edge(
self._id_map[segment['entryxyz']], next_segment[0], entry_waypoint=waypoint,
exit_waypoint=next_waypoint, intersection=False, exit_vector=None,
path=[], length=0, type=next_road_option, change_waypoint=next_waypoint)
right_found = True
if waypoint.left_lane_marking.lane_change & carla.LaneChange.Left and not left_found:
next_waypoint = waypoint.get_left_lane()
if next_waypoint is not None \
and next_waypoint.lane_type == carla.LaneType.Driving \
and waypoint.road_id == next_waypoint.road_id:
next_road_option = RoadOption.CHANGELANELEFT
next_segment = self._localize(next_waypoint.transform.location)
if next_segment is not None:
self._graph.add_edge(
self._id_map[segment['entryxyz']], next_segment[0], entry_waypoint=waypoint,
exit_waypoint=next_waypoint, intersection=False, exit_vector=None,
path=[], length=0, type=next_road_option, change_waypoint=next_waypoint)
left_found = True
if left_found and right_found:
break
def _localize(self, location):
"""
This function finds the road segment that a given location
is part of, returning the edge it belongs to
"""
waypoint = self._wmap.get_waypoint(location)
edge = None
try:
edge = self._road_id_to_edge[waypoint.road_id][waypoint.section_id][waypoint.lane_id]
except KeyError:
pass
return edge
def _distance_heuristic(self, n1, n2):
"""
Distance heuristic calculator for path searching
in self._graph
"""
l1 = np.array(self._graph.nodes[n1]['vertex'])
l2 = np.array(self._graph.nodes[n2]['vertex'])
return np.linalg.norm(l1-l2)
def _path_search(self, origin, destination):
"""
This function finds the shortest path connecting origin and destination
using A* search with distance heuristic.
origin : carla.Location object of start position
destination : carla.Location object of of end position
return : path as list of node ids (as int) of the graph self._graph
connecting origin and destination
"""
start, end = self._localize(origin), self._localize(destination)
route = nx.astar_path(
self._graph, source=start[0], target=end[0],
heuristic=self._distance_heuristic, weight='length')
route.append(end[1])
return route
def add_options_to_route(self, route):
def _successive_last_intersection_edge(self, index, route):
"""
This method adds the road options to a route, returning a list of [carla.Waypoint, RoadOption].
CHANGELANELEFT and CHANGELANERIGHT are used to signalize a lane change. For the other options,
LEFT, RIGHT and STRAIGHT indicate the specific lane chosen at an intersection and outside those,
LANEFOLLOW is always used.
This has been tested for sampling resolutions up to ~7 meters, and might fail for higher values.
As lane changes are marked in pairs, it will fail for even number of consecutive lane changes
(as they have an odd amount of lane change waypoints)
:param route (list of carla.Waypoint): list of waypoints representing the route
This method returns the last successive intersection edge
from a starting index on the route.
This helps moving past tiny intersection edges to calculate
proper turn decisions.
"""
route_with_options = []
route_with_lane_changes = []
# Part 1: Add road options, excluding lane changes
if route[0].is_junction:
self._prev_at_junction = True
entry_index = 0
last_intersection_edge = None
last_node = None
for node1, node2 in [(route[i], route[i+1]) for i in range(index, len(route)-1)]:
candidate_edge = self._graph.edges[node1, node2]
if node1 == route[index]:
last_intersection_edge = candidate_edge
if candidate_edge['type'] == RoadOption.LANEFOLLOW and candidate_edge['intersection']:
last_intersection_edge = candidate_edge
last_node = node2
else:
self._prev_at_junction = False
entry_index = None
break
for i, waypoint in enumerate(route):
at_junction = waypoint.is_junction
if not at_junction and self._prev_at_junction:
# Just exited a junction, get all of its data
road_option = self._compute_options(route[entry_index], waypoint)
for j in range(entry_index, i):
route_with_options.append([route[j], road_option])
entry_index = None
route_with_options.append([waypoint, RoadOption.LANEFOLLOW])
elif not at_junction and not self._prev_at_junction:
# Outside a junction, always LANEFOLLOW
route_with_options.append([waypoint, RoadOption.LANEFOLLOW])
elif not self._prev_at_junction:
# Just entered a junction, save its entrypoint and wait for the exit
entry_index = i
return last_node, last_intersection_edge
self._prev_at_junction = at_junction
# Route ended at a junction
if self._prev_at_junction:
road_option = self._compute_options(route[entry_index], route[-1])
for j in range(entry_index, len(route)):
route_with_options.append([route[j], road_option])
entry_index = None
# Part 2: Add lane changes. Use the route direction and not the waypoint as waypoints might not
# correspond to the correct lane
lane_change_type = None
for i in range(0, len(route_with_options)):
waypoint, option = route_with_options[i]
# Start and end cases
if i == len(route_with_options) - 1:
route_with_lane_changes.append([waypoint, lane_change_type if lane_change_type else option])
continue
if i == 0:
prev_direction = waypoint.transform.get_forward_vector()
np_prev_direction = np.array([prev_direction.x, prev_direction.y, prev_direction.z])
# Lane changes are marked at both lanes
if lane_change_type:
route_with_lane_changes.append([waypoint, lane_change_type])
prev_direction = waypoint.transform.get_forward_vector()
np_prev_direction = np.array([prev_direction.x, prev_direction.y, prev_direction.z])
lane_change_type = None
continue
# Check the dot product between the two consecutive waypoint
route_direction = route_with_options[i+1][0].transform.location - waypoint.transform.location
np_route_direction = np.array([route_direction.x, route_direction.y, route_direction.z])
if np.linalg.norm(np_route_direction) and np.linalg.norm(np_prev_direction):
dot = np.dot(np_prev_direction, np_route_direction)
dot /= np.linalg.norm(np_prev_direction) * np.linalg.norm(np_route_direction)
else:
dot = 1
if dot < math.cos(math.radians(45)):
cross = np.cross(np_prev_direction, np_route_direction)
lane_change_type = RoadOption.CHANGELANERIGHT if cross[2] > 0 else RoadOption.CHANGELANELEFT
new_option = lane_change_type
else:
new_option = option
route_with_lane_changes.append([waypoint, new_option])
np_prev_direction = np_route_direction
return route_with_lane_changes
def _compute_options(self, entry_waypoint, exit_waypoint):
def _turn_decision(self, index, route, threshold=math.radians(35)):
"""
Computes the road option of a
This method returns the turn decision (RoadOption) for pair of edges
around current index of route list
"""
diff = (exit_waypoint.transform.rotation.yaw - entry_waypoint.transform.rotation.yaw) % 360
if diff > 315.0:
option = RoadOption.STRAIGHT
elif diff > 180.0:
option = RoadOption.LEFT
elif diff > 45.0:
option = RoadOption.RIGHT
else:
option = RoadOption.STRAIGHT
return option
decision = None
previous_node = route[index-1]
current_node = route[index]
next_node = route[index+1]
next_edge = self._graph.edges[current_node, next_node]
if index > 0:
if self._previous_decision != RoadOption.VOID \
and self._intersection_end_node > 0 \
and self._intersection_end_node != previous_node \
and next_edge['type'] == RoadOption.LANEFOLLOW \
and next_edge['intersection']:
decision = self._previous_decision
else:
self._intersection_end_node = -1
current_edge = self._graph.edges[previous_node, current_node]
calculate_turn = current_edge['type'] == RoadOption.LANEFOLLOW and not current_edge[
'intersection'] and next_edge['type'] == RoadOption.LANEFOLLOW and next_edge['intersection']
if calculate_turn:
last_node, tail_edge = self._successive_last_intersection_edge(index, route)
self._intersection_end_node = last_node
if tail_edge is not None:
next_edge = tail_edge
cv, nv = current_edge['exit_vector'], next_edge['exit_vector']
if cv is None or nv is None:
return next_edge['type']
cross_list = []
for neighbor in self._graph.successors(current_node):
select_edge = self._graph.edges[current_node, neighbor]
if select_edge['type'] == RoadOption.LANEFOLLOW:
if neighbor != route[index+1]:
sv = select_edge['net_vector']
cross_list.append(np.cross(cv, sv)[2])
next_cross = np.cross(cv, nv)[2]
deviation = math.acos(np.clip(
np.dot(cv, nv)/(np.linalg.norm(cv)*np.linalg.norm(nv)), -1.0, 1.0))
if not cross_list:
cross_list.append(0)
if deviation < threshold:
decision = RoadOption.STRAIGHT
elif cross_list and next_cross < min(cross_list):
decision = RoadOption.LEFT
elif cross_list and next_cross > max(cross_list):
decision = RoadOption.RIGHT
elif next_cross < 0:
decision = RoadOption.LEFT
elif next_cross > 0:
decision = RoadOption.RIGHT
else:
decision = next_edge['type']
else:
decision = next_edge['type']
self._previous_decision = decision
return decision
def _find_closest_in_list(self, current_waypoint, waypoint_list):
min_distance = float('inf')
closest_index = -1
for i, waypoint in enumerate(waypoint_list):
distance = waypoint.transform.location.distance(
current_waypoint.transform.location)
if distance < min_distance:
min_distance = distance
closest_index = i
return closest_index

View File

@ -1,257 +0,0 @@
# Copyright (c) # Copyright (c) 2018-2021 CVC.
#
# This work is licensed under the terms of the MIT license.
# For a copy, see <https://opensource.org/licenses/MIT>.
"""
This file has several useful functions related to the AD Map library
"""
from __future__ import print_function
import os
import numpy as np
import xml.etree.ElementTree as ET
import carla
from ad_map_access import map as ad_map
from ad_physics import physics as ad_physics
def initialize_map(wmap):
"""Initialize the AD map library and, creates the file needed to do so."""
lat_ref = 0.0
lon_ref = 0.0
opendrive_contents = wmap.to_opendrive()
xodr_name = 'RoutePlannerMap.xodr'
txt_name = 'RoutePlannerMap.txt'
# Save the opendrive data into a file
with open(xodr_name, 'w') as f:
f.write(opendrive_contents)
# Get geo reference
xml_tree = ET.parse(xodr_name)
for geo_elem in xml_tree.find('header').find('geoReference').text.split(' '):
if geo_elem.startswith('+lat_0'):
lat_ref = float(geo_elem.split('=')[-1])
elif geo_elem.startswith('+lon_0'):
lon_ref = float(geo_elem.split('=')[-1])
# Save the previous info
with open(txt_name, 'w') as f:
txt_content = "[ADMap]\n" \
"map=" + xodr_name + "\n" \
"[ENUReference]\n" \
"default=" + str(lat_ref) + " " + str(lon_ref) + " 0.0"
f.write(txt_content)
# Intialize the map and remove created files
initialized = ad_map.access.init(txt_name)
if not initialized:
raise ValueError("Couldn't initialize the map")
for fname in [txt_name, xodr_name]:
if os.path.exists(fname):
os.remove(fname)
def carla_loc_to_enu(carla_location):
"""Transform a CARLA location into an ENU point"""
return ad_map.point.createENUPoint(carla_location.x, -carla_location.y, carla_location.z)
def carla_loc_to_ecef(carla_location):
"""Transform a CARLA location into an ENU point"""
return ad_map.point.toECEF(carla_loc_to_enu(carla_location))
def enu_to_carla_loc(enu_point):
"""Transform an ENU point into a CARLA location"""
return carla.Location(float(enu_point.x), float(-enu_point.y), float(enu_point.z))
def is_point_at_driving_lane(para_point, town_map):
"""Checks if a parapoint is part of a CARLA driving lane"""
enu_point = ad_map.lane.getENULanePoint(para_point)
carla_point = enu_to_carla_loc(enu_point)
carla_waypoint = town_map.get_waypoint(carla_point, lane_type=carla.LaneType.Any)
return carla_waypoint.lane_type == carla.LaneType.Driving
def to_ad_paraPoint(location, distance=1, probability=0):
"""
Transforms a carla.Location into an ad_map.point.ParaPoint()
"""
ad_distance = ad_physics.Distance(distance)
ad_probability = ad_physics.Probability(probability)
ad_map_matching = ad_map.match.AdMapMatching()
ad_location = carla_loc_to_enu(location)
match_results = ad_map_matching.getMapMatchedPositions(ad_location, ad_distance, ad_probability)
if not match_results:
print("WARNING: Couldn't find a para point for CARLA location {}".format(location))
return None
# Filter the closest one to the given location
distance = [float(mmap.matchedPointDistance) for mmap in match_results]
return match_results[distance.index(min(distance))].lanePoint.paraPoint
def trace_route(start_waypoint, end_waypoint, town_map, sample_resolution=1, match_dist=1):
"""
Gets the shortest route between a starting and end waypoint. This transforms the given location
to AD map paraPoints, and iterates through all permutations to return the shortest route to ensure
that the correct route is chosen when starting / ending at intersections.
Then, the route is transformed back to a list of [carla.Waypoint, RoadOption]
Due to some bugs at the altitude parsing, the matching distance isn't just one value,
but keeps increasing up to a max number, in order to increasing the probabilities of finding a match.
:param start_waypoint (carla.Waypoint): Starting waypoint of the route
:param end_waypoint (carla.Waypoint): Ending waypoint of the route
:param town_map (carla.Map): CARLA map instance where the route will be computed
:param sample_resolution (float): Distance between the waypoints that form the route
:param match_dist (float): Max distance between the given location and the matched AD map para points.
If this value is too large, the matching might result in waypoints on different lanes.
:param max_match_dist (float): In case of failed match, the previous input keeps increasing up to this number.
"""
wp_route = []
start_location = start_waypoint.transform.location
end_location = end_waypoint.transform.location
# Get starting point matches. Due to errors in altitude parsing, iterate increasing the matching distance
start_matches = _waypoint_matches(start_waypoint, town_map, match_dist)
if not start_matches:
print("WARNING: Couldn't find a paraPoint for location '{}'.".format(start_location))
return wp_route
# Get ending point matches. Due to errors in altitude parsing, iterate increasing the matching distance
end_matches = _waypoint_matches(end_waypoint, town_map, match_dist)
if not end_matches:
print("WARNING: Couldn't find a paraPoint for location '{}'.".format(end_location))
return wp_route
# Get the shortest route
route_segment = _filter_shortest_route(start_matches, end_matches)
if not route_segment:
print("WARNING: Couldn't find a viable route between locations "
"'{}' and '{}'.".format(start_location, end_location))
return wp_route
# Change the route to waypoints
wp_route = _get_route_waypoints(route_segment, sample_resolution, town_map)
return wp_route
def _waypoint_matches(waypoint, town_map, max_distance, probability=0):
"""
Given a waypoint, maps its transform to the AD map, returning a list of possible matches.
All matches are filtered to make sure they represent driving lanes.
"""
# ECEF location of the waypoint
location = waypoint.transform.location
ecef_location= carla_loc_to_ecef(location)
# ECEF location of a point in front of the waypoint
f_vec = waypoint.transform.get_right_vector()
front_location = location + carla.Location(x=f_vec.x, y=f_vec.y)
ecef_front_location= carla_loc_to_ecef(front_location)
# Get the map matching and the heading hint
ecef_heading = ad_map.point.createECEFHeading(ecef_location, ecef_front_location)
ad_map_matching = ad_map.match.AdMapMatching()
ad_map_matching.addHeadingHint(ecef_heading)
# Get the matches and filter the none driving lane ones
matches = ad_map_matching.getMapMatchedPositions(
carla_loc_to_enu(location),
ad_physics.Distance(max_distance),
ad_physics.Probability(probability)
)
matches = [m for m in matches if is_point_at_driving_lane(m.lanePoint.paraPoint, town_map)]
return matches
def _filter_shortest_route(start_matches, end_matches):
"""Given a set of starting and ending matches, computes all possible routes and selects the shortest one"""
route_segment = None
# Get the shortest route
min_length = float('inf')
for start_match in start_matches:
start_point = start_match.lanePoint.paraPoint
for end_match in end_matches:
# Get the route
new_route_segment = ad_map.route.planRoute(
start_point, end_match.lanePoint.paraPoint,
ad_map.route.RouteCreationMode.Undefined
)
if len(new_route_segment.roadSegments) == 0:
continue # The route doesn't exist, ignore it
# Save the shortest route
length = _get_route_length(new_route_segment)
if length < min_length:
min_length = length
route_segment = new_route_segment
return route_segment
def _get_route_length(route):
"""
Gets the length of the route, being the sum of the road segment lengths.
Each road segment length is the mean of the length of its lane segments.
"""
route_length = 0
for road_segment in route.roadSegments:
road_length = 0
for lane_segment in road_segment.drivableLaneSegments:
lane_start = float(lane_segment.laneInterval.start)
lane_end = float(lane_segment.laneInterval.end)
lane_length = float(ad_map.lane.calcLength(lane_segment.laneInterval.laneId))
road_length += lane_length * abs(lane_end - lane_start)
num_lanes = len(road_segment.drivableLaneSegments)
if num_lanes != 0:
route_length += road_length / num_lanes
return route_length
def _get_route_waypoints(route, resolution, town_map):
"""
Given a route, transforms it into a list of [carla.Waypoint, RoadOption].
Take into account that at locations where multiple lanes overlap,
while the waypoints will be correctly placed, they might be part of a different lane.
:param route (ad_map.route.FullRoute): AD map route instance created with RouteCreationMode Undefined.
Other creation modes return mode than one lane, which would need a prefiltering.
:param resolution (float): Distance between the waypoints that form the route.
:param town_map (carla.Map): CARLA map instance where the route will be computed
"""
wp_route = []
for road_segment in route.roadSegments:
for lane_segment in road_segment.drivableLaneSegments:
# Just one segment due to route mode 'Undefined'. If not, some filtering is needed
if float(lane_segment.laneInterval.start) == float(lane_segment.laneInterval.end):
continue # Lengthless interval
para_points = _get_lane_para_points(lane_segment.laneInterval, resolution)
for para_point in para_points:
enu_point = ad_map.lane.getENULanePoint(para_point)
carla_point = enu_to_carla_loc(enu_point)
carla_waypoint = town_map.get_waypoint(carla_point)
wp_route.append(carla_waypoint)
return wp_route
def _get_lane_para_points(lane_interval, distance=1):
"""
Samples a given lane interval in points every 'distance' meters,
returning a list of the values needed to achieve that.
"""
start = float(lane_interval.start)
end = float(lane_interval.end)
length = float(ad_map.lane.calcLength(lane_interval.laneId))
if start == end:
return []
values = np.arange(start, end, np.sign(end - start) * distance / length)
para_points = []
for value in values:
para_point = ad_map.point.createParaPoint(lane_interval.laneId, ad_physics.ParametricValue(value))
para_points.append(para_point)
return para_points