Changed the global planner to use the AD map
This commit is contained in:
parent
4efe0dc2e6
commit
8e16e696b1
|
@ -3,23 +3,20 @@
|
||||||
# This work is licensed under the terms of the MIT license.
|
# This work is licensed under the terms of the MIT license.
|
||||||
# For a copy, see <https://opensource.org/licenses/MIT>.
|
# For a copy, see <https://opensource.org/licenses/MIT>.
|
||||||
|
|
||||||
""" This module implements an agent that roams around a track following random
|
"""
|
||||||
waypoints and avoiding other vehicles.
|
This module implements an agent that roams around a track following random
|
||||||
The agent also responds to traffic lights. """
|
waypoints and avoiding other vehicles. The agent also responds to traffic lights.
|
||||||
|
"""
|
||||||
|
|
||||||
import carla
|
import carla
|
||||||
from agents.navigation.agent import Agent
|
|
||||||
from agents.navigation.local_planner import LocalPlanner, RoadOption
|
|
||||||
from agents.navigation.global_route_planner import GlobalRoutePlanner
|
|
||||||
from agents.navigation.global_route_planner_dao import GlobalRoutePlannerDAO
|
|
||||||
from agents.tools.misc import get_speed
|
|
||||||
|
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
|
|
||||||
import os
|
from agents.navigation.agent import Agent
|
||||||
import xml.etree.ElementTree as ET
|
from agents.navigation.local_planner import LocalPlanner
|
||||||
from agents.tools.map_helper import get_shortest_route
|
from agents.navigation.global_route_planner import GlobalRoutePlanner
|
||||||
import ad_map_access as ad
|
from agents.tools.misc import get_speed
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class AgentState(Enum):
|
class AgentState(Enum):
|
||||||
"""
|
"""
|
||||||
|
@ -51,55 +48,16 @@ class BasicAgent(Agent):
|
||||||
self._base_tlight_threshold = 5.0 # meters
|
self._base_tlight_threshold = 5.0 # meters
|
||||||
self._base_vehicle_threshold = 5.0 # meters
|
self._base_vehicle_threshold = 5.0 # meters
|
||||||
self._max_brake = 0.5
|
self._max_brake = 0.5
|
||||||
|
self._max_steering = 0.3
|
||||||
self._local_planner = LocalPlanner(
|
self._local_planner = LocalPlanner(
|
||||||
self._vehicle,
|
self._vehicle,
|
||||||
opt_dict={
|
opt_dict={
|
||||||
'target_speed' : target_speed,
|
'target_speed' : target_speed,
|
||||||
'max_brake': self._max_brake
|
'max_brake': self._max_brake,
|
||||||
|
'max_steering': self._max_steering
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
self._initialize_map()
|
|
||||||
|
|
||||||
def _initialize_map(self):
|
|
||||||
"""Initialize the AD map library and, creates the file needed to do so."""
|
|
||||||
lat_ref = 0.0
|
|
||||||
lon_ref = 0.0
|
|
||||||
|
|
||||||
opendrive_contents = self._map.to_opendrive()
|
|
||||||
xodr_name = 'BasicAgentMap.xodr'
|
|
||||||
txt_name = 'BasicAgentMap.txt'
|
|
||||||
|
|
||||||
# Save the opendrive data into a file
|
|
||||||
with open(xodr_name, 'w') as f:
|
|
||||||
f.write(opendrive_contents)
|
|
||||||
|
|
||||||
# Get geo reference
|
|
||||||
xml_tree = ET.parse(xodr_name)
|
|
||||||
for geo_elem in xml_tree.find('header').find('geoReference').text.split(' '):
|
|
||||||
if geo_elem.startswith('+lat_0'):
|
|
||||||
lat_ref = float(geo_elem.split('=')[-1])
|
|
||||||
elif geo_elem.startswith('+lon_0'):
|
|
||||||
lon_ref = float(geo_elem.split('=')[-1])
|
|
||||||
|
|
||||||
# Save the previous info
|
|
||||||
with open(txt_name, 'w') as f:
|
|
||||||
txt_content = "[ADMap]\n" \
|
|
||||||
"map=" + xodr_name + "\n" \
|
|
||||||
"[ENUReference]\n" \
|
|
||||||
"default=" + str(lat_ref) + " " + str(lon_ref) + " 0.0"
|
|
||||||
f.write(txt_content)
|
|
||||||
|
|
||||||
# Intialize the map and remove created files
|
|
||||||
initialized = ad.map.access.init(txt_name)
|
|
||||||
if not initialized:
|
|
||||||
raise ValueError("Couldn't initialize the map")
|
|
||||||
|
|
||||||
for fname in [txt_name, xodr_name]:
|
|
||||||
if os.path.exists(fname):
|
|
||||||
os.remove(fname)
|
|
||||||
|
|
||||||
|
|
||||||
def emergency_stop(self):
|
def emergency_stop(self):
|
||||||
"""
|
"""
|
||||||
Send an emergency stop command to the vehicle
|
Send an emergency stop command to the vehicle
|
||||||
|
@ -134,12 +92,13 @@ class BasicAgent(Agent):
|
||||||
:param start_location: starting location of the route
|
:param start_location: starting location of the route
|
||||||
"""
|
"""
|
||||||
if not start_location:
|
if not start_location:
|
||||||
start_location = self._vehicle.get_location()
|
start_location = self._local_planner.target_waypoint.transform.location
|
||||||
clean_queue = True
|
clean_queue = True
|
||||||
else:
|
else:
|
||||||
|
start_location = self._vehicle.get_location()
|
||||||
clean_queue = False
|
clean_queue = False
|
||||||
|
|
||||||
start_waypoint = self._map.get_waypoint(self._vehicle.get_location())
|
start_waypoint = self._map.get_waypoint(start_location)
|
||||||
end_waypoint = self._map.get_waypoint(end_location)
|
end_waypoint = self._map.get_waypoint(end_location)
|
||||||
|
|
||||||
route_trace = self.trace_route(start_waypoint, end_waypoint)
|
route_trace = self.trace_route(start_waypoint, end_waypoint)
|
||||||
|
@ -159,7 +118,6 @@ class BasicAgent(Agent):
|
||||||
clean_queue=clean_queue
|
clean_queue=clean_queue
|
||||||
)
|
)
|
||||||
|
|
||||||
# AD Map version
|
|
||||||
def trace_route(self, start_waypoint, end_waypoint):
|
def trace_route(self, start_waypoint, end_waypoint):
|
||||||
"""
|
"""
|
||||||
This method sets up a global router and returns the
|
This method sets up a global router and returns the
|
||||||
|
@ -168,45 +126,10 @@ class BasicAgent(Agent):
|
||||||
:param start_waypoint: initial position
|
:param start_waypoint: initial position
|
||||||
:param end_waypoint: final position
|
:param end_waypoint: final position
|
||||||
"""
|
"""
|
||||||
|
if self._grp is None:
|
||||||
route = get_shortest_route(
|
self._grp = GlobalRoutePlanner(self._map, self._sampling_resolution, self._world)
|
||||||
start_waypoint.transform.location,
|
route = self._grp.trace_route(start_waypoint, end_waypoint)
|
||||||
end_waypoint.transform.location,
|
return route
|
||||||
self._map,
|
|
||||||
sample_resolution=self._sampling_resolution,
|
|
||||||
distance=1,
|
|
||||||
world=self._vehicle.get_world()
|
|
||||||
)
|
|
||||||
|
|
||||||
route_with_options = []
|
|
||||||
for waypoint in route:
|
|
||||||
route_with_options.append([waypoint, RoadOption.LANEFOLLOW])
|
|
||||||
|
|
||||||
return route_with_options
|
|
||||||
|
|
||||||
# global route planner version
|
|
||||||
# def trace_route(self, start_waypoint, end_waypoint):
|
|
||||||
# """
|
|
||||||
# This method sets up a global router and returns the optimal route
|
|
||||||
# from start_waypoint to end_waypoint
|
|
||||||
|
|
||||||
# :param start_waypoint: initial position
|
|
||||||
# :param end_waypoint: final position
|
|
||||||
# """
|
|
||||||
|
|
||||||
# # Setting up global router
|
|
||||||
# if self._grp is None:
|
|
||||||
# dao = GlobalRoutePlannerDAO(self._vehicle.get_world().get_map(), self._sampling_resolution)
|
|
||||||
# grp = GlobalRoutePlanner(dao)
|
|
||||||
# grp.setup()
|
|
||||||
# self._grp = grp
|
|
||||||
|
|
||||||
# # Obtain route plan
|
|
||||||
# route = self._grp.trace_route(
|
|
||||||
# start_waypoint.transform.location,
|
|
||||||
# end_waypoint.transform.location)
|
|
||||||
|
|
||||||
# return route
|
|
||||||
|
|
||||||
def run_step(self, debug=False):
|
def run_step(self, debug=False):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -7,381 +7,194 @@
|
||||||
"""
|
"""
|
||||||
This module provides GlobalRoutePlanner implementation.
|
This module provides GlobalRoutePlanner implementation.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import math
|
import math
|
||||||
|
import os
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import networkx as nx
|
import xml.etree.ElementTree as ET
|
||||||
|
|
||||||
import carla
|
import carla
|
||||||
|
|
||||||
from agents.navigation.local_planner import RoadOption
|
from agents.navigation.local_planner import RoadOption
|
||||||
|
from agents.tools.global_route_planner_helper import trace_route
|
||||||
from agents.tools.misc import vector
|
from agents.tools.misc import vector
|
||||||
|
|
||||||
|
import ad_map_access as ad
|
||||||
|
|
||||||
class GlobalRoutePlanner(object):
|
class GlobalRoutePlanner(object):
|
||||||
"""
|
"""
|
||||||
This class provides a very high level route plan.
|
This class provides a high level route planner.
|
||||||
Instantiate the class by passing a reference to
|
|
||||||
A GlobalRoutePlannerDAO object.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, dao):
|
def __init__(self, wmap, sampling_resolution, world=None):
|
||||||
"""
|
"""
|
||||||
Constructor
|
Constructor
|
||||||
"""
|
"""
|
||||||
self._dao = dao
|
|
||||||
self._topology = None
|
|
||||||
self._graph = None
|
|
||||||
self._id_map = None
|
|
||||||
self._road_id_to_edge = None
|
|
||||||
self._intersection_end_node = -1
|
|
||||||
self._previous_decision = RoadOption.VOID
|
self._previous_decision = RoadOption.VOID
|
||||||
|
|
||||||
def setup(self):
|
self._sampling_resolution = sampling_resolution
|
||||||
"""
|
self._world = world # TODO: remove it after debugging
|
||||||
Performs initial server data lookup for detailed topology
|
self._wmap = wmap
|
||||||
and builds graph representation of the world map.
|
self._initialize_map()
|
||||||
"""
|
|
||||||
self._topology = self._dao.get_topology()
|
|
||||||
self._graph, self._id_map, self._road_id_to_edge = self._build_graph()
|
|
||||||
self._find_loose_ends()
|
|
||||||
self._lane_change_link()
|
|
||||||
|
|
||||||
def _build_graph(self):
|
def _initialize_map(self):
|
||||||
"""
|
"""Initialize the AD map library and, creates the file needed to do so."""
|
||||||
This function builds a networkx graph representation of topology.
|
lat_ref = 0.0
|
||||||
The topology is read from self._topology.
|
lon_ref = 0.0
|
||||||
graph node properties:
|
|
||||||
vertex - (x,y,z) position in world map
|
|
||||||
graph edge properties:
|
|
||||||
entry_vector - unit vector along tangent at entry point
|
|
||||||
exit_vector - unit vector along tangent at exit point
|
|
||||||
net_vector - unit vector of the chord from entry to exit
|
|
||||||
intersection - boolean indicating if the edge belongs to an
|
|
||||||
intersection
|
|
||||||
return : graph -> networkx graph representing the world map,
|
|
||||||
id_map-> mapping from (x,y,z) to node id
|
|
||||||
road_id_to_edge-> map from road id to edge in the graph
|
|
||||||
"""
|
|
||||||
graph = nx.DiGraph()
|
|
||||||
id_map = dict() # Map with structure {(x,y,z): id, ... }
|
|
||||||
road_id_to_edge = dict() # Map with structure {road_id: {lane_id: edge, ... }, ... }
|
|
||||||
|
|
||||||
for segment in self._topology:
|
opendrive_contents = self._wmap.to_opendrive()
|
||||||
|
xodr_name = 'RoutePlannerMap.xodr'
|
||||||
|
txt_name = 'RoutePlannerMap.txt'
|
||||||
|
|
||||||
entry_xyz, exit_xyz = segment['entryxyz'], segment['exitxyz']
|
# Save the opendrive data into a file
|
||||||
path = segment['path']
|
with open(xodr_name, 'w') as f:
|
||||||
entry_wp, exit_wp = segment['entry'], segment['exit']
|
f.write(opendrive_contents)
|
||||||
intersection = entry_wp.is_junction
|
|
||||||
road_id, section_id, lane_id = entry_wp.road_id, entry_wp.section_id, entry_wp.lane_id
|
|
||||||
|
|
||||||
for vertex in entry_xyz, exit_xyz:
|
# Get geo reference
|
||||||
# Adding unique nodes and populating id_map
|
xml_tree = ET.parse(xodr_name)
|
||||||
if vertex not in id_map:
|
for geo_elem in xml_tree.find('header').find('geoReference').text.split(' '):
|
||||||
new_id = len(id_map)
|
if geo_elem.startswith('+lat_0'):
|
||||||
id_map[vertex] = new_id
|
lat_ref = float(geo_elem.split('=')[-1])
|
||||||
graph.add_node(new_id, vertex=vertex)
|
elif geo_elem.startswith('+lon_0'):
|
||||||
n1 = id_map[entry_xyz]
|
lon_ref = float(geo_elem.split('=')[-1])
|
||||||
n2 = id_map[exit_xyz]
|
|
||||||
if road_id not in road_id_to_edge:
|
|
||||||
road_id_to_edge[road_id] = dict()
|
|
||||||
if section_id not in road_id_to_edge[road_id]:
|
|
||||||
road_id_to_edge[road_id][section_id] = dict()
|
|
||||||
road_id_to_edge[road_id][section_id][lane_id] = (n1, n2)
|
|
||||||
|
|
||||||
entry_carla_vector = entry_wp.transform.rotation.get_forward_vector()
|
# Save the previous info
|
||||||
exit_carla_vector = exit_wp.transform.rotation.get_forward_vector()
|
with open(txt_name, 'w') as f:
|
||||||
|
txt_content = "[ADMap]\n" \
|
||||||
|
"map=" + xodr_name + "\n" \
|
||||||
|
"[ENUReference]\n" \
|
||||||
|
"default=" + str(lat_ref) + " " + str(lon_ref) + " 0.0"
|
||||||
|
f.write(txt_content)
|
||||||
|
|
||||||
# Adding edge with attributes
|
# Intialize the map and remove created files
|
||||||
graph.add_edge(
|
initialized = ad.map.access.init(txt_name)
|
||||||
n1, n2,
|
if not initialized:
|
||||||
length=len(path) + 1, path=path,
|
raise ValueError("Couldn't initialize the map")
|
||||||
entry_waypoint=entry_wp, exit_waypoint=exit_wp,
|
|
||||||
entry_vector=np.array(
|
|
||||||
[entry_carla_vector.x, entry_carla_vector.y, entry_carla_vector.z]),
|
|
||||||
exit_vector=np.array(
|
|
||||||
[exit_carla_vector.x, exit_carla_vector.y, exit_carla_vector.z]),
|
|
||||||
net_vector=vector(entry_wp.transform.location, exit_wp.transform.location),
|
|
||||||
intersection=intersection, type=RoadOption.LANEFOLLOW)
|
|
||||||
|
|
||||||
return graph, id_map, road_id_to_edge
|
for fname in [txt_name, xodr_name]:
|
||||||
|
if os.path.exists(fname):
|
||||||
def _find_loose_ends(self):
|
os.remove(fname)
|
||||||
"""
|
|
||||||
This method finds road segments that have an unconnected end, and
|
|
||||||
adds them to the internal graph representation
|
|
||||||
"""
|
|
||||||
count_loose_ends = 0
|
|
||||||
hop_resolution = self._dao.get_resolution()
|
|
||||||
for segment in self._topology:
|
|
||||||
end_wp = segment['exit']
|
|
||||||
exit_xyz = segment['exitxyz']
|
|
||||||
road_id, section_id, lane_id = end_wp.road_id, end_wp.section_id, end_wp.lane_id
|
|
||||||
if road_id in self._road_id_to_edge and section_id in self._road_id_to_edge[road_id] and lane_id in self._road_id_to_edge[road_id][section_id]:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
count_loose_ends += 1
|
|
||||||
if road_id not in self._road_id_to_edge:
|
|
||||||
self._road_id_to_edge[road_id] = dict()
|
|
||||||
if section_id not in self._road_id_to_edge[road_id]:
|
|
||||||
self._road_id_to_edge[road_id][section_id] = dict()
|
|
||||||
n1 = self._id_map[exit_xyz]
|
|
||||||
n2 = -1*count_loose_ends
|
|
||||||
self._road_id_to_edge[road_id][section_id][lane_id] = (n1, n2)
|
|
||||||
next_wp = end_wp.next(hop_resolution)
|
|
||||||
path = []
|
|
||||||
while next_wp is not None and next_wp and next_wp[0].road_id == road_id and next_wp[0].section_id == section_id and next_wp[0].lane_id == lane_id:
|
|
||||||
path.append(next_wp[0])
|
|
||||||
next_wp = next_wp[0].next(hop_resolution)
|
|
||||||
if path:
|
|
||||||
n2_xyz = (path[-1].transform.location.x,
|
|
||||||
path[-1].transform.location.y,
|
|
||||||
path[-1].transform.location.z)
|
|
||||||
self._graph.add_node(n2, vertex=n2_xyz)
|
|
||||||
self._graph.add_edge(
|
|
||||||
n1, n2,
|
|
||||||
length=len(path) + 1, path=path,
|
|
||||||
entry_waypoint=end_wp, exit_waypoint=path[-1],
|
|
||||||
entry_vector=None, exit_vector=None, net_vector=None,
|
|
||||||
intersection=end_wp.is_junction, type=RoadOption.LANEFOLLOW)
|
|
||||||
|
|
||||||
def _localize(self, location):
|
|
||||||
"""
|
|
||||||
This function finds the road segment closest to given location
|
|
||||||
location : carla.Location to be localized in the graph
|
|
||||||
return : pair node ids representing an edge in the graph
|
|
||||||
"""
|
|
||||||
waypoint = self._dao.get_waypoint(location)
|
|
||||||
edge = None
|
|
||||||
try:
|
|
||||||
edge = self._road_id_to_edge[waypoint.road_id][waypoint.section_id][waypoint.lane_id]
|
|
||||||
except KeyError:
|
|
||||||
print(
|
|
||||||
"Failed to localize! : ",
|
|
||||||
"Road id : ", waypoint.road_id,
|
|
||||||
"Section id : ", waypoint.section_id,
|
|
||||||
"Lane id : ", waypoint.lane_id,
|
|
||||||
"Location : ", waypoint.transform.location.x,
|
|
||||||
waypoint.transform.location.y)
|
|
||||||
return edge
|
|
||||||
|
|
||||||
def _lane_change_link(self):
|
|
||||||
"""
|
|
||||||
This method places zero cost links in the topology graph
|
|
||||||
representing availability of lane changes.
|
|
||||||
"""
|
|
||||||
|
|
||||||
for segment in self._topology:
|
|
||||||
left_found, right_found = False, False
|
|
||||||
|
|
||||||
for waypoint in segment['path']:
|
|
||||||
if not segment['entry'].is_junction:
|
|
||||||
next_waypoint, next_road_option, next_segment = None, None, None
|
|
||||||
|
|
||||||
if waypoint.right_lane_marking.lane_change & carla.LaneChange.Right and not right_found:
|
|
||||||
next_waypoint = waypoint.get_right_lane()
|
|
||||||
if next_waypoint is not None and next_waypoint.lane_type == carla.LaneType.Driving and waypoint.road_id == next_waypoint.road_id:
|
|
||||||
next_road_option = RoadOption.CHANGELANERIGHT
|
|
||||||
next_segment = self._localize(next_waypoint.transform.location)
|
|
||||||
if next_segment is not None:
|
|
||||||
self._graph.add_edge(
|
|
||||||
self._id_map[segment['entryxyz']], next_segment[0], entry_waypoint=waypoint,
|
|
||||||
exit_waypoint=next_waypoint, intersection=False, exit_vector=None,
|
|
||||||
path=[], length=0, type=next_road_option, change_waypoint=next_waypoint)
|
|
||||||
right_found = True
|
|
||||||
if waypoint.left_lane_marking.lane_change & carla.LaneChange.Left and not left_found:
|
|
||||||
next_waypoint = waypoint.get_left_lane()
|
|
||||||
if next_waypoint is not None and next_waypoint.lane_type == carla.LaneType.Driving and waypoint.road_id == next_waypoint.road_id:
|
|
||||||
next_road_option = RoadOption.CHANGELANELEFT
|
|
||||||
next_segment = self._localize(next_waypoint.transform.location)
|
|
||||||
if next_segment is not None:
|
|
||||||
self._graph.add_edge(
|
|
||||||
self._id_map[segment['entryxyz']], next_segment[0], entry_waypoint=waypoint,
|
|
||||||
exit_waypoint=next_waypoint, intersection=False, exit_vector=None,
|
|
||||||
path=[], length=0, type=next_road_option, change_waypoint=next_waypoint)
|
|
||||||
left_found = True
|
|
||||||
if left_found and right_found:
|
|
||||||
break
|
|
||||||
|
|
||||||
def _distance_heuristic(self, n1, n2):
|
|
||||||
"""
|
|
||||||
Distance heuristic calculator for path searching
|
|
||||||
in self._graph
|
|
||||||
"""
|
|
||||||
l1 = np.array(self._graph.nodes[n1]['vertex'])
|
|
||||||
l2 = np.array(self._graph.nodes[n2]['vertex'])
|
|
||||||
return np.linalg.norm(l1-l2)
|
|
||||||
|
|
||||||
def _path_search(self, origin, destination):
|
|
||||||
"""
|
|
||||||
This function finds the shortest path connecting origin and destination
|
|
||||||
using A* search with distance heuristic.
|
|
||||||
origin : carla.Location object of start position
|
|
||||||
destination : carla.Location object of of end position
|
|
||||||
return : path as list of node ids (as int) of the graph self._graph
|
|
||||||
connecting origin and destination
|
|
||||||
"""
|
|
||||||
|
|
||||||
start, end = self._localize(origin), self._localize(destination)
|
|
||||||
|
|
||||||
route = nx.astar_path(
|
|
||||||
self._graph, source=start[0], target=end[0],
|
|
||||||
heuristic=self._distance_heuristic, weight='length')
|
|
||||||
route.append(end[1])
|
|
||||||
return route
|
|
||||||
|
|
||||||
def _successive_last_intersection_edge(self, index, route):
|
|
||||||
"""
|
|
||||||
This method returns the last successive intersection edge
|
|
||||||
from a starting index on the route.
|
|
||||||
This helps moving past tiny intersection edges to calculate
|
|
||||||
proper turn decisions.
|
|
||||||
"""
|
|
||||||
|
|
||||||
last_intersection_edge = None
|
|
||||||
last_node = None
|
|
||||||
for node1, node2 in [(route[i], route[i+1]) for i in range(index, len(route)-1)]:
|
|
||||||
candidate_edge = self._graph.edges[node1, node2]
|
|
||||||
if node1 == route[index]:
|
|
||||||
last_intersection_edge = candidate_edge
|
|
||||||
if candidate_edge['type'] == RoadOption.LANEFOLLOW and candidate_edge['intersection']:
|
|
||||||
last_intersection_edge = candidate_edge
|
|
||||||
last_node = node2
|
|
||||||
else:
|
|
||||||
break
|
|
||||||
|
|
||||||
return last_node, last_intersection_edge
|
|
||||||
|
|
||||||
def _turn_decision(self, index, route, threshold=math.radians(35)):
|
|
||||||
"""
|
|
||||||
This method returns the turn decision (RoadOption) for pair of edges
|
|
||||||
around current index of route list
|
|
||||||
"""
|
|
||||||
|
|
||||||
decision = None
|
|
||||||
previous_node = route[index-1]
|
|
||||||
current_node = route[index]
|
|
||||||
next_node = route[index+1]
|
|
||||||
next_edge = self._graph.edges[current_node, next_node]
|
|
||||||
if index > 0:
|
|
||||||
if self._previous_decision != RoadOption.VOID and self._intersection_end_node > 0 and self._intersection_end_node != previous_node and next_edge['type'] == RoadOption.LANEFOLLOW and next_edge['intersection']:
|
|
||||||
decision = self._previous_decision
|
|
||||||
else:
|
|
||||||
self._intersection_end_node = -1
|
|
||||||
current_edge = self._graph.edges[previous_node, current_node]
|
|
||||||
calculate_turn = current_edge['type'] == RoadOption.LANEFOLLOW and not current_edge[
|
|
||||||
'intersection'] and next_edge['type'] == RoadOption.LANEFOLLOW and next_edge['intersection']
|
|
||||||
if calculate_turn:
|
|
||||||
last_node, tail_edge = self._successive_last_intersection_edge(index, route)
|
|
||||||
self._intersection_end_node = last_node
|
|
||||||
if tail_edge is not None:
|
|
||||||
next_edge = tail_edge
|
|
||||||
cv, nv = current_edge['exit_vector'], next_edge['exit_vector']
|
|
||||||
if cv is None or nv is None:
|
|
||||||
return next_edge['type']
|
|
||||||
cross_list = []
|
|
||||||
for neighbor in self._graph.successors(current_node):
|
|
||||||
select_edge = self._graph.edges[current_node, neighbor]
|
|
||||||
if select_edge['type'] == RoadOption.LANEFOLLOW:
|
|
||||||
if neighbor != route[index+1]:
|
|
||||||
sv = select_edge['net_vector']
|
|
||||||
cross_list.append(np.cross(cv, sv)[2])
|
|
||||||
next_cross = np.cross(cv, nv)[2]
|
|
||||||
deviation = math.acos(np.clip(
|
|
||||||
np.dot(cv, nv)/(np.linalg.norm(cv)*np.linalg.norm(nv)), -1.0, 1.0))
|
|
||||||
if not cross_list:
|
|
||||||
cross_list.append(0)
|
|
||||||
if deviation < threshold:
|
|
||||||
decision = RoadOption.STRAIGHT
|
|
||||||
elif cross_list and next_cross < min(cross_list):
|
|
||||||
decision = RoadOption.LEFT
|
|
||||||
elif cross_list and next_cross > max(cross_list):
|
|
||||||
decision = RoadOption.RIGHT
|
|
||||||
elif next_cross < 0:
|
|
||||||
decision = RoadOption.LEFT
|
|
||||||
elif next_cross > 0:
|
|
||||||
decision = RoadOption.RIGHT
|
|
||||||
else:
|
|
||||||
decision = next_edge['type']
|
|
||||||
|
|
||||||
else:
|
|
||||||
decision = next_edge['type']
|
|
||||||
|
|
||||||
self._previous_decision = decision
|
|
||||||
return decision
|
|
||||||
|
|
||||||
def abstract_route_plan(self, origin, destination):
|
|
||||||
"""
|
|
||||||
The following function generates the route plan based on
|
|
||||||
origin : carla.Location object of the route's start position
|
|
||||||
destination : carla.Location object of the route's end position
|
|
||||||
return : list of turn by turn navigation decisions as
|
|
||||||
agents.navigation.local_planner.RoadOption elements
|
|
||||||
Possible values are STRAIGHT, LEFT, RIGHT, LANEFOLLOW, VOID
|
|
||||||
CHANGELANELEFT, CHANGELANERIGHT
|
|
||||||
"""
|
|
||||||
|
|
||||||
route = self._path_search(origin, destination)
|
|
||||||
plan = []
|
|
||||||
|
|
||||||
for i in range(len(route) - 1):
|
|
||||||
road_option = self._turn_decision(i, route)
|
|
||||||
plan.append(road_option)
|
|
||||||
|
|
||||||
return plan
|
|
||||||
|
|
||||||
def _find_closest_in_list(self, current_waypoint, waypoint_list):
|
|
||||||
min_distance = float('inf')
|
|
||||||
closest_index = -1
|
|
||||||
for i, waypoint in enumerate(waypoint_list):
|
|
||||||
distance = waypoint.transform.location.distance(
|
|
||||||
current_waypoint.transform.location)
|
|
||||||
if distance < min_distance:
|
|
||||||
min_distance = distance
|
|
||||||
closest_index = i
|
|
||||||
|
|
||||||
return closest_index
|
|
||||||
|
|
||||||
def trace_route(self, origin, destination):
|
def trace_route(self, origin, destination):
|
||||||
"""
|
"""
|
||||||
This method returns list of (carla.Waypoint, RoadOption)
|
This method returns list of (carla.Waypoint, RoadOption)
|
||||||
from origin to destination
|
from origin to destination
|
||||||
"""
|
"""
|
||||||
|
route = trace_route(origin, destination, self._wmap, self._sampling_resolution)
|
||||||
|
|
||||||
route_trace = []
|
# Add options
|
||||||
route = self._path_search(origin, destination)
|
route_with_options = self.add_options_to_route(route)
|
||||||
current_waypoint = self._dao.get_waypoint(origin)
|
|
||||||
destination_waypoint = self._dao.get_waypoint(destination)
|
|
||||||
resolution = self._dao.get_resolution()
|
|
||||||
|
|
||||||
for i in range(len(route) - 1):
|
# TODO: remove it
|
||||||
road_option = self._turn_decision(i, route)
|
for w in route_with_options:
|
||||||
edge = self._graph.edges[route[i], route[i+1]]
|
wp = w[0].transform.location + carla.Location(z=0.1)
|
||||||
path = []
|
if w[1] == RoadOption.LEFT: # Yellow
|
||||||
|
color = carla.Color(255, 255, 0)
|
||||||
|
elif w[1] == RoadOption.RIGHT: # Cyan
|
||||||
|
color = carla.Color(0, 255, 255)
|
||||||
|
elif w[1] == RoadOption.CHANGELANELEFT: # Orange
|
||||||
|
color = carla.Color(255, 64, 0)
|
||||||
|
elif w[1] == RoadOption.CHANGELANERIGHT: # Dark Cyan
|
||||||
|
color = carla.Color(0, 64, 255)
|
||||||
|
elif w[1] == RoadOption.STRAIGHT: # Gray
|
||||||
|
color = carla.Color(128, 128, 128)
|
||||||
|
else: # LANEFOLLOW
|
||||||
|
color = carla.Color(0, 255, 0) # Green
|
||||||
|
self._world.debug.draw_point(wp, size=0.08, color=color, life_time=100)
|
||||||
|
|
||||||
if edge['type'] != RoadOption.LANEFOLLOW and edge['type'] != RoadOption.VOID:
|
return route_with_options
|
||||||
route_trace.append((current_waypoint, road_option))
|
|
||||||
exit_wp = edge['exit_waypoint']
|
|
||||||
n1, n2 = self._road_id_to_edge[exit_wp.road_id][exit_wp.section_id][exit_wp.lane_id]
|
|
||||||
next_edge = self._graph.edges[n1, n2]
|
|
||||||
if next_edge['path']:
|
|
||||||
closest_index = self._find_closest_in_list(current_waypoint, next_edge['path'])
|
|
||||||
closest_index = min(len(next_edge['path'])-1, closest_index+5)
|
|
||||||
current_waypoint = next_edge['path'][closest_index]
|
|
||||||
else:
|
|
||||||
current_waypoint = next_edge['exit_waypoint']
|
|
||||||
route_trace.append((current_waypoint, road_option))
|
|
||||||
|
|
||||||
|
def add_options_to_route(self, route):
|
||||||
|
"""
|
||||||
|
This method adds the road options to a route, returning a list of [carla.Waypoint, RoadOption].
|
||||||
|
CHANGELANELEFT and CHANGELANERIGHT are used to signalize a lane change. For the other options,
|
||||||
|
LEFT, RIGHT and STRAIGHT indicate the specific lane chosen at an intersection and outside those,
|
||||||
|
LANEFOLLOW is always used.
|
||||||
|
|
||||||
|
This has been tested for sampling resolutions up to ~7 meters, and might fail for higher values.
|
||||||
|
|
||||||
|
:param route (list of carla.Waypoint): list of waypoints representing the route
|
||||||
|
"""
|
||||||
|
route_with_options = []
|
||||||
|
route_with_lane_changes = []
|
||||||
|
|
||||||
|
# Part 1: Add road options, excluding lane changes
|
||||||
|
if route[0].is_junction:
|
||||||
|
self._prev_at_junction = True
|
||||||
|
entry_index = 0
|
||||||
|
else:
|
||||||
|
self._prev_at_junction = False
|
||||||
|
entry_index = None
|
||||||
|
|
||||||
|
for i, waypoint in enumerate(route):
|
||||||
|
at_junction = waypoint.is_junction
|
||||||
|
if not at_junction and self._prev_at_junction:
|
||||||
|
# Just exited a junction, get all of its data
|
||||||
|
road_option = self._compute_options(route[entry_index], waypoint)
|
||||||
|
for j in range(entry_index, i):
|
||||||
|
route_with_options.append([route[j], road_option])
|
||||||
|
entry_index = None
|
||||||
|
route_with_options.append([waypoint, RoadOption.LANEFOLLOW])
|
||||||
|
elif not at_junction and not self._prev_at_junction:
|
||||||
|
# Outside a junction, always LANEFOLLOW
|
||||||
|
route_with_options.append([waypoint, RoadOption.LANEFOLLOW])
|
||||||
|
elif not self._prev_at_junction:
|
||||||
|
# Just entered a junction, save its entrypoint and wait for the exit
|
||||||
|
entry_index = i
|
||||||
|
|
||||||
|
self._prev_at_junction = at_junction
|
||||||
|
|
||||||
|
# Route ended at a junction
|
||||||
|
if self._prev_at_junction:
|
||||||
|
road_option = self._compute_options(route[-1], waypoint)
|
||||||
|
for j in range(entry_index, len(route)):
|
||||||
|
route_with_options.append([route[j], road_option])
|
||||||
|
entry_index = None
|
||||||
|
|
||||||
|
# Part 2: Add lane changes
|
||||||
|
prev_lane_change = None
|
||||||
|
|
||||||
|
for i in range(0, len(route_with_options) - 1):
|
||||||
|
waypoint, option = route_with_options[i]
|
||||||
|
next_waypoint, _ = route_with_options[i+1]
|
||||||
|
|
||||||
|
# Lane changes are set to both lanes
|
||||||
|
if prev_lane_change:
|
||||||
|
route_with_lane_changes.append([waypoint, prev_lane_change])
|
||||||
|
prev_lane_change = None
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Check the dot product between the two consecutive waypoint
|
||||||
|
direction = waypoint.transform.get_forward_vector()
|
||||||
|
np_direction = np.array([direction.x, direction.y, direction.z])
|
||||||
|
|
||||||
|
route_direction = next_waypoint.transform.location - waypoint.transform.location
|
||||||
|
np_route_direction = np.array([route_direction.x, route_direction.y, route_direction.z])
|
||||||
|
|
||||||
|
dot = np.dot(np_direction, np_route_direction)
|
||||||
|
dot /= np.linalg.norm(np_direction) * np.linalg.norm(np_route_direction)
|
||||||
|
|
||||||
|
if 0 < dot < math.cos(math.radians(45)):
|
||||||
|
cross = np.cross(np_direction, np_route_direction)
|
||||||
|
prev_lane_change = RoadOption.CHANGELANERIGHT if cross[2] > 0 else RoadOption.CHANGELANELEFT
|
||||||
|
route_with_lane_changes.append([waypoint, prev_lane_change])
|
||||||
else:
|
else:
|
||||||
path = path + [edge['entry_waypoint']] + edge['path'] + [edge['exit_waypoint']]
|
route_with_lane_changes.append([waypoint, option])
|
||||||
closest_index = self._find_closest_in_list(current_waypoint, path)
|
|
||||||
for waypoint in path[closest_index:]:
|
|
||||||
current_waypoint = waypoint
|
|
||||||
route_trace.append((current_waypoint, road_option))
|
|
||||||
if len(route)-i <= 2 and waypoint.transform.location.distance(destination) < 2*resolution:
|
|
||||||
break
|
|
||||||
elif len(route)-i <= 2 and current_waypoint.road_id == destination_waypoint.road_id and current_waypoint.section_id == destination_waypoint.section_id and current_waypoint.lane_id == destination_waypoint.lane_id:
|
|
||||||
destination_index = self._find_closest_in_list(destination_waypoint, path)
|
|
||||||
if closest_index > destination_index:
|
|
||||||
break
|
|
||||||
|
|
||||||
return route_trace
|
return route_with_lane_changes
|
||||||
|
|
||||||
|
def _compute_options(self, entry_waypoint, exit_waypoint):
|
||||||
|
"""
|
||||||
|
Computes the road option of a
|
||||||
|
"""
|
||||||
|
diff = (exit_waypoint.transform.rotation.yaw - entry_waypoint.transform.rotation.yaw) % 360
|
||||||
|
if diff > 315.0:
|
||||||
|
option = RoadOption.STRAIGHT
|
||||||
|
elif diff > 225.0:
|
||||||
|
option = RoadOption.LEFT
|
||||||
|
elif diff > 135.0:
|
||||||
|
option = RoadOption.HALFTURN
|
||||||
|
elif diff > 45.0:
|
||||||
|
option = RoadOption.RIGHT
|
||||||
|
else:
|
||||||
|
option = RoadOption.STRAIGHT
|
||||||
|
|
||||||
|
return option
|
||||||
|
|
|
@ -11,20 +11,22 @@ import random
|
||||||
|
|
||||||
import carla
|
import carla
|
||||||
from agents.navigation.controller import VehiclePIDController
|
from agents.navigation.controller import VehiclePIDController
|
||||||
from agents.tools.misc import draw_waypoints
|
from agents.tools.misc import draw_waypoints, get_speed
|
||||||
|
|
||||||
|
|
||||||
class RoadOption(Enum):
|
class RoadOption(Enum):
|
||||||
"""
|
"""
|
||||||
RoadOption represents the possible topological configurations when moving from a segment of lane to other.
|
RoadOption represents the possible topological configurations when moving from a segment of lane to other.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
VOID = -1
|
VOID = -1
|
||||||
LEFT = 1
|
LEFT = 1
|
||||||
RIGHT = 2
|
RIGHT = 2
|
||||||
STRAIGHT = 3
|
STRAIGHT = 3
|
||||||
LANEFOLLOW = 4
|
HALFTURN = 4
|
||||||
CHANGELANELEFT = 5
|
LANEFOLLOW = 5
|
||||||
CHANGELANERIGHT = 6
|
CHANGELANELEFT = 6
|
||||||
|
CHANGELANERIGHT = 7
|
||||||
|
|
||||||
|
|
||||||
class LocalPlanner(object):
|
class LocalPlanner(object):
|
||||||
|
@ -58,8 +60,8 @@ class LocalPlanner(object):
|
||||||
self._map = self._world.get_map()
|
self._map = self._world.get_map()
|
||||||
|
|
||||||
self._target_speed = 20.0 # Km/h
|
self._target_speed = 20.0 # Km/h
|
||||||
self._sampling_radius = 1.0
|
self._sampling_radius = 2.0
|
||||||
self._min_distance = 8.0
|
self._base_min_distance = 3.0
|
||||||
self._dt = 1.0 / 20.0
|
self._dt = 1.0 / 20.0
|
||||||
self._max_brake = 0.3
|
self._max_brake = 0.3
|
||||||
self._max_throt = 0.75
|
self._max_throt = 0.75
|
||||||
|
@ -125,10 +127,7 @@ class LocalPlanner(object):
|
||||||
# Compute the current vehicle waypoint
|
# Compute the current vehicle waypoint
|
||||||
current_waypoint = self._map.get_waypoint(self._vehicle.get_location())
|
current_waypoint = self._map.get_waypoint(self._vehicle.get_location())
|
||||||
self.target_waypoint, self.target_road_option = (current_waypoint, RoadOption.LANEFOLLOW)
|
self.target_waypoint, self.target_road_option = (current_waypoint, RoadOption.LANEFOLLOW)
|
||||||
|
|
||||||
# Fill the waypoint queue
|
|
||||||
self._waypoints_queue.append((self.target_waypoint, self.target_road_option))
|
self._waypoints_queue.append((self.target_waypoint, self.target_road_option))
|
||||||
self._compute_next_waypoints(k=self._min_waypoint_queue_length)
|
|
||||||
|
|
||||||
|
|
||||||
def set_speed(self, speed):
|
def set_speed(self, speed):
|
||||||
|
@ -213,7 +212,6 @@ class LocalPlanner(object):
|
||||||
|
|
||||||
self._stop_waypoint_creation = stop_waypoint_creation
|
self._stop_waypoint_creation = stop_waypoint_creation
|
||||||
|
|
||||||
|
|
||||||
def run_step(self, debug=False):
|
def run_step(self, debug=False):
|
||||||
"""
|
"""
|
||||||
Execute one step of local planning which involves running the longitudinal and lateral PID controllers to
|
Execute one step of local planning which involves running the longitudinal and lateral PID controllers to
|
||||||
|
@ -229,24 +227,20 @@ class LocalPlanner(object):
|
||||||
if not self._stop_waypoint_creation and len(self._waypoints_queue) < self._min_waypoint_queue_length:
|
if not self._stop_waypoint_creation and len(self._waypoints_queue) < self._min_waypoint_queue_length:
|
||||||
self._compute_next_waypoints(k=self._min_waypoint_queue_length)
|
self._compute_next_waypoints(k=self._min_waypoint_queue_length)
|
||||||
|
|
||||||
if len(self._waypoints_queue) == 0:
|
# Purge the queue of obsolete waypoints
|
||||||
control = carla.VehicleControl()
|
|
||||||
control.steer = 0.0
|
|
||||||
control.throttle = 0.0
|
|
||||||
control.brake = 1.0
|
|
||||||
control.hand_brake = False
|
|
||||||
control.manual_gear_shift = False
|
|
||||||
return control
|
|
||||||
|
|
||||||
# Get the target waypoint and move using the PID controllers
|
|
||||||
self.target_waypoint, self.target_road_option = self._waypoints_queue[0]
|
|
||||||
control = self._vehicle_controller.run_step(self._target_speed, self.target_waypoint)
|
|
||||||
|
|
||||||
# # Purge the queue of obsolete waypoints
|
|
||||||
veh_location = self._vehicle.get_location()
|
veh_location = self._vehicle.get_location()
|
||||||
|
vehicle_speed = get_speed(self._vehicle) / 3.6
|
||||||
|
self._min_distance = self._base_min_distance + 0.5 *vehicle_speed
|
||||||
|
|
||||||
num_waypoint_removed = 0
|
num_waypoint_removed = 0
|
||||||
for waypoint, _ in self._waypoints_queue:
|
for waypoint, _ in self._waypoints_queue:
|
||||||
if veh_location.distance(waypoint.transform.location) < self._min_distance:
|
|
||||||
|
if len(self._waypoints_queue) - num_waypoint_removed == 1:
|
||||||
|
min_distance = 1 # Don't remove the last waypoint until very close by
|
||||||
|
else:
|
||||||
|
min_distance = self._min_distance
|
||||||
|
|
||||||
|
if veh_location.distance(waypoint.transform.location) < min_distance:
|
||||||
num_waypoint_removed += 1
|
num_waypoint_removed += 1
|
||||||
else:
|
else:
|
||||||
break
|
break
|
||||||
|
@ -255,6 +249,18 @@ class LocalPlanner(object):
|
||||||
for _ in range(num_waypoint_removed):
|
for _ in range(num_waypoint_removed):
|
||||||
self._waypoints_queue.popleft()
|
self._waypoints_queue.popleft()
|
||||||
|
|
||||||
|
# Get the target waypoint and move using the PID controllers. Stop if no target waypoint
|
||||||
|
if len(self._waypoints_queue) == 0:
|
||||||
|
control = carla.VehicleControl()
|
||||||
|
control.steer = 0.0
|
||||||
|
control.throttle = 0.0
|
||||||
|
control.brake = 1.0
|
||||||
|
control.hand_brake = False
|
||||||
|
control.manual_gear_shift = False
|
||||||
|
else:
|
||||||
|
self.target_waypoint, self.target_road_option = self._waypoints_queue[0]
|
||||||
|
control = self._vehicle_controller.run_step(self._target_speed, self.target_waypoint)
|
||||||
|
|
||||||
if debug:
|
if debug:
|
||||||
draw_waypoints(self._vehicle.get_world(), [self.target_waypoint], 1.0)
|
draw_waypoints(self._vehicle.get_world(), [self.target_waypoint], 1.0)
|
||||||
|
|
||||||
|
@ -303,13 +309,13 @@ def _retrieve_options(list_waypoints, current_waypoint):
|
||||||
# the beggining of an intersection, therefore the
|
# the beggining of an intersection, therefore the
|
||||||
# variation in angle is small
|
# variation in angle is small
|
||||||
next_next_waypoint = next_waypoint.next(3.0)[0]
|
next_next_waypoint = next_waypoint.next(3.0)[0]
|
||||||
link = _compute_connection(current_waypoint, next_next_waypoint)
|
link = compute_connection(current_waypoint, next_next_waypoint)
|
||||||
options.append(link)
|
options.append(link)
|
||||||
|
|
||||||
return options
|
return options
|
||||||
|
|
||||||
|
|
||||||
def _compute_connection(current_waypoint, next_waypoint, threshold=35):
|
def compute_connection(current_waypoint, next_waypoint, threshold=35):
|
||||||
"""
|
"""
|
||||||
Compute the type of topological connection between an active waypoint (current_waypoint) and a target waypoint
|
Compute the type of topological connection between an active waypoint (current_waypoint) and a target waypoint
|
||||||
(next_waypoint).
|
(next_waypoint).
|
||||||
|
|
|
@ -0,0 +1,208 @@
|
||||||
|
# Copyright (c) # Copyright (c) 2018-2021 CVC.
|
||||||
|
#
|
||||||
|
# This work is licensed under the terms of the MIT license.
|
||||||
|
# For a copy, see <https://opensource.org/licenses/MIT>.
|
||||||
|
|
||||||
|
"""
|
||||||
|
This file has several useful functions related to the AD Map library
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import print_function
|
||||||
|
|
||||||
|
import carla
|
||||||
|
import numpy as np
|
||||||
|
import ad_map_access as ad
|
||||||
|
|
||||||
|
def carla_loc_to_enu(carla_location):
|
||||||
|
"""Transform a CARLA location into an ENU point"""
|
||||||
|
return ad.map.point.createENUPoint(carla_location.x, -carla_location.y, carla_location.z)
|
||||||
|
|
||||||
|
def carla_loc_to_ecef(carla_location):
|
||||||
|
"""Transform a CARLA location into an ENU point"""
|
||||||
|
return ad.map.point.toECEF(carla_loc_to_enu(carla_location))
|
||||||
|
|
||||||
|
def enu_to_carla_loc(enu_point):
|
||||||
|
"""Transform an ENU point into a CARLA location"""
|
||||||
|
return carla.Location(float(enu_point.x), float(-enu_point.y), float(enu_point.z))
|
||||||
|
|
||||||
|
def para_point_to_carla_waypoint(para_point, town_map, lane_type=carla.LaneType.Driving):
|
||||||
|
"""Transform a para point into a CARLA waypoint"""
|
||||||
|
enu_point = ad.map.lane.getENULanePoint(para_point)
|
||||||
|
carla_point = enu_to_carla_loc(enu_point)
|
||||||
|
carla_waypoint = town_map.get_waypoint(carla_point, lane_type=lane_type)
|
||||||
|
return carla_waypoint
|
||||||
|
|
||||||
|
def is_point_at_driving_lane(para_point, town_map):
|
||||||
|
"""Checks if a parapoint is part of a CARLA driving lane"""
|
||||||
|
carla_waypoint = para_point_to_carla_waypoint(para_point, town_map, carla.LaneType.Any)
|
||||||
|
return carla_waypoint.lane_type == carla.LaneType.Driving
|
||||||
|
|
||||||
|
def to_ad_paraPoint(location, distance=1, probability=0):
|
||||||
|
"""
|
||||||
|
Transforms a carla.Location into an ad.map.point.ParaPoint()
|
||||||
|
"""
|
||||||
|
ad_distance = ad.physics.Distance(distance)
|
||||||
|
ad_probability = ad.physics.Probability(probability)
|
||||||
|
ad_map_matching = ad.map.match.AdMapMatching()
|
||||||
|
ad_location = carla_loc_to_enu(location)
|
||||||
|
|
||||||
|
match_results = ad_map_matching.getMapMatchedPositions(ad_location, ad_distance, ad_probability)
|
||||||
|
|
||||||
|
if not match_results:
|
||||||
|
print("WARNING: Couldn't find a para point for CARLA location {}".format(location))
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Filter the closest one to the given location
|
||||||
|
distance = [float(mmap.matchedPointDistance) for mmap in match_results]
|
||||||
|
return match_results[distance.index(min(distance))].lanePoint.paraPoint
|
||||||
|
|
||||||
|
def trace_route(start_waypoint, end_waypoint, town_map, sample_resolution=1, max_distance=0.5, probability=0):
|
||||||
|
"""
|
||||||
|
Gets the shortest route between a starting and end waypoint. This transforms the given location
|
||||||
|
to AD map paraPoints, and iterates through all permutations to return the shortest route.
|
||||||
|
This is useful to ensure that the correct route is chosen when starting / ending at intersections.
|
||||||
|
|
||||||
|
Then, the route is transformed back to a list of [carla.Waypoint, RoadOption]
|
||||||
|
|
||||||
|
:param start_waypoint (carla.Waypoint): Starting waypoint of the route
|
||||||
|
:param end_waypoint (carla.Waypoint): Ending waypoint of the route
|
||||||
|
:param town_map (carla.Map): CARLA map instance where the route will be computed
|
||||||
|
:param sample_resolution (float): Distance between the waypoints that form the route
|
||||||
|
:param max_distance (float): Max distance between the given location and the matched AD map para points.
|
||||||
|
If this value is too large, the matching might result in waypoints on different lanes.
|
||||||
|
"""
|
||||||
|
wp_route = []
|
||||||
|
start_location = start_waypoint.transform.location
|
||||||
|
end_location = end_waypoint.transform.location
|
||||||
|
|
||||||
|
# Get starting point matches
|
||||||
|
start_matches = _waypoint_matches(start_waypoint, town_map, max_distance, probability)
|
||||||
|
if not start_matches:
|
||||||
|
print("WARNING: Couldn't find a paraPoint for location '{}'.".format(start_location))
|
||||||
|
return wp_route
|
||||||
|
|
||||||
|
# Get ending point matches
|
||||||
|
end_matches = _waypoint_matches(end_waypoint, town_map, max_distance, probability)
|
||||||
|
if not end_matches:
|
||||||
|
print("WARNING: Couldn't find a paraPoint for location '{}'.".format(end_location))
|
||||||
|
return wp_route
|
||||||
|
|
||||||
|
# Get the shortest route
|
||||||
|
route_segment = _filter_shortest_route(start_matches, end_matches)
|
||||||
|
if not route_segment:
|
||||||
|
print("WARNING: Couldn't find a viable route between locations "
|
||||||
|
"'{}' and '{}'.".format(start_location, end_location))
|
||||||
|
return wp_route
|
||||||
|
|
||||||
|
# Change the route to waypoints
|
||||||
|
wp_route = _get_route_waypoints(route_segment, sample_resolution, town_map)
|
||||||
|
return wp_route
|
||||||
|
|
||||||
|
def _waypoint_matches(waypoint, town_map, max_distance, probability=0):
|
||||||
|
"""
|
||||||
|
Given a waypoint, maps its transform to the AD map, returning a list of possible matches.
|
||||||
|
All matches are filtered to make sure they represent driving lanes.
|
||||||
|
"""
|
||||||
|
# ECEF location of the waypoint
|
||||||
|
location = waypoint.transform.location
|
||||||
|
ecef_location= carla_loc_to_ecef(location)
|
||||||
|
|
||||||
|
# ECEF location of a point in front of the waypoint
|
||||||
|
f_vec = waypoint.transform.get_right_vector()
|
||||||
|
front_location = location + carla.Location(x=f_vec.x, y=f_vec.y)
|
||||||
|
ecef_front_location= carla_loc_to_ecef(front_location)
|
||||||
|
|
||||||
|
# Get the map matching and the heading hint
|
||||||
|
ecef_heading = ad.map.point.createECEFHeading(ecef_location, ecef_front_location)
|
||||||
|
ad_map_matching = ad.map.match.AdMapMatching()
|
||||||
|
ad_map_matching.addHeadingHint(ecef_heading)
|
||||||
|
|
||||||
|
# Get the matches and filter the none driving lane ones
|
||||||
|
matches = ad_map_matching.getMapMatchedPositions(
|
||||||
|
carla_loc_to_enu(location),
|
||||||
|
ad.physics.Distance(max_distance),
|
||||||
|
ad.physics.Probability(probability)
|
||||||
|
)
|
||||||
|
matches = [m for m in matches if is_point_at_driving_lane(m.lanePoint.paraPoint, town_map)]
|
||||||
|
|
||||||
|
return matches
|
||||||
|
|
||||||
|
def _filter_shortest_route(start_matches, end_matches):
|
||||||
|
"""Given a set of starting and ending matches, computes all possible routes and selects the shortest one"""
|
||||||
|
route_segment = None
|
||||||
|
# Get the shortest route
|
||||||
|
min_length = float('inf')
|
||||||
|
for start_match in start_matches:
|
||||||
|
start_point = start_match.lanePoint.paraPoint
|
||||||
|
for end_match in end_matches:
|
||||||
|
# Get the route
|
||||||
|
new_route_segment = ad.map.route.planRoute(
|
||||||
|
start_point, end_match.lanePoint.paraPoint, ad.map.route.RouteCreationMode.Undefined)
|
||||||
|
if len(new_route_segment.roadSegments) == 0:
|
||||||
|
continue # The route doesn't exist, ignore it
|
||||||
|
|
||||||
|
# Save the shortest route
|
||||||
|
length = _get_route_length(new_route_segment)
|
||||||
|
if length < min_length:
|
||||||
|
min_length = length
|
||||||
|
route_segment = new_route_segment
|
||||||
|
|
||||||
|
return route_segment
|
||||||
|
|
||||||
|
def _get_route_length(route):
|
||||||
|
"""
|
||||||
|
Gets the length of the route, being the sum of the road segment lengths.
|
||||||
|
Each road segment length is the mean of the length of its lane segments.
|
||||||
|
"""
|
||||||
|
route_length = 0
|
||||||
|
for road_segment in route.roadSegments:
|
||||||
|
road_length = 0
|
||||||
|
for lane_segment in road_segment.drivableLaneSegments:
|
||||||
|
lane_start = float(lane_segment.laneInterval.start)
|
||||||
|
lane_end = float(lane_segment.laneInterval.end)
|
||||||
|
lane_length = float(ad.map.lane.calcLength(lane_segment.laneInterval.laneId))
|
||||||
|
|
||||||
|
road_length += lane_length * abs(lane_end - lane_start)
|
||||||
|
|
||||||
|
num_lanes = len(road_segment.drivableLaneSegments)
|
||||||
|
if num_lanes != 0:
|
||||||
|
route_length += road_length / num_lanes
|
||||||
|
|
||||||
|
return route_length
|
||||||
|
|
||||||
|
def _get_route_waypoints(route, resolution, town_map):
|
||||||
|
"""
|
||||||
|
Given a route, transforms it into a list of [carla.Waypoint, RoadOption]
|
||||||
|
|
||||||
|
:param route (ad.map.route.FullRoute): AD map route instance created with RouteCreationMode Undefined.
|
||||||
|
Other creation modes return mode than one lane, which would need a prefiltering.
|
||||||
|
:param resolution (float): Distance between the waypoints that form the route.
|
||||||
|
:param town_map (carla.Map): CARLA map instance where the route will be computed
|
||||||
|
"""
|
||||||
|
wp_route = []
|
||||||
|
for road_segment in route.roadSegments:
|
||||||
|
for lane_segment in road_segment.drivableLaneSegments:
|
||||||
|
lane_id = lane_segment.laneInterval.laneId
|
||||||
|
param_list = _get_lane_interval_list(lane_segment.laneInterval, resolution)
|
||||||
|
for i in range(len(param_list)):
|
||||||
|
para_point = ad.map.point.createParaPoint(lane_id, ad.physics.ParametricValue(param_list[i]))
|
||||||
|
carla_waypoint = para_point_to_carla_waypoint(para_point, town_map)
|
||||||
|
wp_route.append(carla_waypoint)
|
||||||
|
# if i == 0:
|
||||||
|
# world.debug.draw_point(
|
||||||
|
# carla_waypoint.transform.location, size=0.2, life_time=10000, color=carla.Color(255,0,0))
|
||||||
|
# world.debug.draw_point(
|
||||||
|
# carla_waypoint.transform.location, size=0.1, life_time=10000, color=carla.Color(255,255,0))
|
||||||
|
|
||||||
|
return wp_route
|
||||||
|
|
||||||
|
def _get_lane_interval_list(lane_interval, distance=1):
|
||||||
|
"""
|
||||||
|
Separates a given lane interval into smaller intervals of length equal to 'distance'
|
||||||
|
"""
|
||||||
|
start = float(lane_interval.start)
|
||||||
|
end = float(lane_interval.end)
|
||||||
|
length = float(ad.map.lane.calcLength(lane_interval.laneId))
|
||||||
|
if start == end:
|
||||||
|
return []
|
||||||
|
return np.arange(start, end, np.sign(end - start) * distance / length)
|
|
@ -1,203 +0,0 @@
|
||||||
# Copyright (c) # Copyright (c) 2018-2021 CVC.
|
|
||||||
#
|
|
||||||
# This work is licensed under the terms of the MIT license.
|
|
||||||
# For a copy, see <https://opensource.org/licenses/MIT>.
|
|
||||||
|
|
||||||
"""
|
|
||||||
This file has several useful functions related to the AD Map library
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import print_function
|
|
||||||
|
|
||||||
import carla
|
|
||||||
import numpy as np
|
|
||||||
import ad_map_access as ad
|
|
||||||
|
|
||||||
def carla_loc_to_enu(carla_location):
|
|
||||||
"""Transform a CARLA location into an ENU point"""
|
|
||||||
return ad.map.point.createENUPoint(carla_location.x, -carla_location.y, carla_location.z)
|
|
||||||
|
|
||||||
def enu_to_carla_loc(enu_point):
|
|
||||||
"""Transform an ENU point into a CARLA location"""
|
|
||||||
return carla.Location(float(enu_point.x), float(-enu_point.y), float(enu_point.z))
|
|
||||||
|
|
||||||
def para_point_to_carla_waypoint(para_point, town_map, lane_type=carla.LaneType.Driving):
|
|
||||||
"""Transform a para point into a CARLA waypoint"""
|
|
||||||
enu_point = ad.map.lane.getENULanePoint(para_point)
|
|
||||||
carla_point = enu_to_carla_loc(enu_point)
|
|
||||||
carla_waypoint = town_map.get_waypoint(carla_point, lane_type=lane_type)
|
|
||||||
return carla_waypoint
|
|
||||||
|
|
||||||
def is_point_at_driving_lane(para_point, town_map):
|
|
||||||
"""Checks if a parapoint is part of a CARLA driving lane"""
|
|
||||||
carla_waypoint = para_point_to_carla_waypoint(para_point, town_map, carla.LaneType.Any)
|
|
||||||
return carla_waypoint.lane_type == carla.LaneType.Driving
|
|
||||||
|
|
||||||
def get_shortest_route(start_location, end_location, town_map, sample_resolution=1, distance=1, probability=0, world=None):
|
|
||||||
"""
|
|
||||||
Gets the shortest route between a starting and end location.
|
|
||||||
"""
|
|
||||||
route = []
|
|
||||||
|
|
||||||
ad_distance = ad.physics.Distance(distance)
|
|
||||||
ad_probability = ad.physics.Probability(probability)
|
|
||||||
ad_map_matching = ad.map.match.AdMapMatching()
|
|
||||||
route_segment = None
|
|
||||||
|
|
||||||
start_matches = ad_map_matching.getMapMatchedPositions(
|
|
||||||
carla_loc_to_enu(start_location), ad_distance, ad_probability)
|
|
||||||
end_matches = ad_map_matching.getMapMatchedPositions(
|
|
||||||
carla_loc_to_enu(end_location), ad_distance, ad_probability)
|
|
||||||
|
|
||||||
if not start_matches:
|
|
||||||
print("WARNING: Couldn't find a paraPoint for location '{}'.".format(start_location))
|
|
||||||
return route
|
|
||||||
if not end_matches:
|
|
||||||
print("WARNING: Couldn't find a paraPoint for location '{}'.".format(end_location))
|
|
||||||
return route
|
|
||||||
|
|
||||||
# Filter the paraPoints that aren't part of driving lanes
|
|
||||||
start_matches = [match for match in start_matches if is_point_at_driving_lane(match.lanePoint.paraPoint, town_map)]
|
|
||||||
end_matches = [match for match in end_matches if is_point_at_driving_lane(match.lanePoint.paraPoint, town_map)]
|
|
||||||
|
|
||||||
# Get the shortest route
|
|
||||||
min_length = float('inf')
|
|
||||||
for start_match in start_matches:
|
|
||||||
start_point = start_match.lanePoint.paraPoint
|
|
||||||
for end_match in end_matches:
|
|
||||||
# Get the route
|
|
||||||
new_route_segment = ad.map.route.planRoute(start_point, end_match.lanePoint.paraPoint)
|
|
||||||
if len(new_route_segment.roadSegments) == 0:
|
|
||||||
continue # The route doesn't exist, ignore it
|
|
||||||
|
|
||||||
# Calculate route length, as a sum of the mean of the road's lanes length
|
|
||||||
length = 0
|
|
||||||
for road_segment in new_route_segment.roadSegments:
|
|
||||||
road_length = 0
|
|
||||||
number_lanes = 0
|
|
||||||
for lane_segment in road_segment.drivableLaneSegments:
|
|
||||||
seg_start = float(lane_segment.laneInterval.start)
|
|
||||||
seg_end = float(lane_segment.laneInterval.end)
|
|
||||||
seg_length = float(ad.map.lane.calcLength(lane_segment.laneInterval.laneId))
|
|
||||||
|
|
||||||
road_length += seg_length * abs(seg_end - seg_start)
|
|
||||||
number_lanes += 1
|
|
||||||
|
|
||||||
if number_lanes != 0:
|
|
||||||
length += road_length / number_lanes
|
|
||||||
|
|
||||||
# Save the shortest route
|
|
||||||
if length < min_length:
|
|
||||||
min_length = length
|
|
||||||
route_segment = new_route_segment
|
|
||||||
start_lane_id = start_point.laneId
|
|
||||||
|
|
||||||
if not route_segment:
|
|
||||||
print("WARNING: Couldn't find a viable route between locations "
|
|
||||||
"'{}' and '{}'.".format(start_location, end_location))
|
|
||||||
return route
|
|
||||||
|
|
||||||
# print(route_segment)
|
|
||||||
|
|
||||||
# Transform the AD map route representation into waypoints (If not needed to parse the altitude)
|
|
||||||
for segment in get_route_lane_list(route_segment, start_lane_id):
|
|
||||||
lane_id = segment.laneInterval.laneId
|
|
||||||
param_list = get_lane_interval_list(segment.laneInterval, sample_resolution)
|
|
||||||
for i in range(len(param_list)):
|
|
||||||
para_point = ad.map.point.createParaPoint(lane_id, ad.physics.ParametricValue(param_list[i]))
|
|
||||||
carla_waypoint = para_point_to_carla_waypoint(para_point, town_map)
|
|
||||||
route.append(carla_waypoint)
|
|
||||||
if i == 0:
|
|
||||||
world.debug.draw_point(carla_waypoint.transform.location, size=0.2, life_time=10000, color=carla.Color(255,0,0))
|
|
||||||
world.debug.draw_point(carla_waypoint.transform.location, size=0.1, life_time=10000, color=carla.Color(255,255,0))
|
|
||||||
|
|
||||||
return route
|
|
||||||
|
|
||||||
def get_lane_altitude_list(start_z, end_z, length):
|
|
||||||
"""
|
|
||||||
Gets the z values of a lane. This is a simple linear interpolation
|
|
||||||
and it won't be necessary whenever the AD map parses the altitude
|
|
||||||
"""
|
|
||||||
if length == 0:
|
|
||||||
return []
|
|
||||||
if start_z == end_z:
|
|
||||||
return start_z*np.ones(length)
|
|
||||||
return np.arange(start_z, end_z, (end_z - start_z) / length)
|
|
||||||
|
|
||||||
def get_route_lane_list(route, start_lane_id, prev_lane_id=None, prev_successors=[]):
|
|
||||||
"""
|
|
||||||
Given a route returns the lane segments corresponding to the route.
|
|
||||||
'start_lane_id' is the lane id of the first point in the route.
|
|
||||||
In case this function is called more than once, use 'prev_lane_id'
|
|
||||||
and 'prev_successors' to keep computing the lanes without any information loss.
|
|
||||||
"""
|
|
||||||
segments = []
|
|
||||||
|
|
||||||
for road_segment in route.roadSegments:
|
|
||||||
current_segment = None
|
|
||||||
for lane_segment in road_segment.drivableLaneSegments:
|
|
||||||
successors = lane_segment.successors
|
|
||||||
predecessors = lane_segment.predecessors
|
|
||||||
lane_id = lane_segment.laneInterval.laneId
|
|
||||||
|
|
||||||
if not prev_lane_id:
|
|
||||||
# First lane, match the lane with the starting lane id
|
|
||||||
if lane_segment.laneInterval.laneId != start_lane_id:
|
|
||||||
continue # match the lane to the starting point
|
|
||||||
|
|
||||||
if prev_lane_id and prev_lane_id not in predecessors:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if prev_successors and lane_id not in prev_successors:
|
|
||||||
continue
|
|
||||||
|
|
||||||
current_segment = lane_segment
|
|
||||||
break
|
|
||||||
|
|
||||||
# Basically, the next lane won't be chosen if the current one has no successors, as it means
|
|
||||||
# that the next lane diverges from the route. Watch out with false empty successors though
|
|
||||||
|
|
||||||
if not current_segment:
|
|
||||||
# print(len(prev_successors))
|
|
||||||
# print("The previous lane diverged from the route, lane changing")
|
|
||||||
# TODO: This shouldn't really be random
|
|
||||||
current_segment = np.random.choice(road_segment.drivableLaneSegments)
|
|
||||||
# else:
|
|
||||||
# print(len(prev_successors))
|
|
||||||
# print("Found a connected route lane")
|
|
||||||
|
|
||||||
segments.append(current_segment)
|
|
||||||
prev_lane_id = current_segment.laneInterval.laneId
|
|
||||||
prev_successors = current_segment.successors
|
|
||||||
|
|
||||||
return segments
|
|
||||||
|
|
||||||
def to_ad_paraPoint(location, distance=1, probability=0):
|
|
||||||
"""
|
|
||||||
Transforms a carla.Location into an ad.map.point.ParaPoint()
|
|
||||||
"""
|
|
||||||
ad_distance = ad.physics.Distance(distance)
|
|
||||||
ad_probability = ad.physics.Probability(probability)
|
|
||||||
ad_map_matching = ad.map.match.AdMapMatching()
|
|
||||||
ad_location = carla_loc_to_enu(location)
|
|
||||||
|
|
||||||
match_results = ad_map_matching.getMapMatchedPositions(ad_location, ad_distance, ad_probability)
|
|
||||||
|
|
||||||
if not match_results:
|
|
||||||
print("WARNING: Couldn't find a para point for CARLA location {}".format(location))
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Filter the closest one to the given location
|
|
||||||
distance = [float(mmap.matchedPointDistance) for mmap in match_results]
|
|
||||||
return match_results[distance.index(min(distance))].lanePoint.paraPoint
|
|
||||||
|
|
||||||
def get_lane_interval_list(lane_interval, distance=1):
|
|
||||||
"""
|
|
||||||
Separates a given lane interval into smaller intervals of length equal to 'distance'
|
|
||||||
"""
|
|
||||||
start = float(lane_interval.start)
|
|
||||||
end = float(lane_interval.end)
|
|
||||||
length = float(ad.map.lane.calcLength(lane_interval.laneId))
|
|
||||||
if start == end:
|
|
||||||
return []
|
|
||||||
return np.arange(start, end, np.sign(end - start) * distance / length)
|
|
Loading…
Reference in New Issue