More improvements

This commit is contained in:
Guillermo 2021-07-01 14:59:01 +02:00 committed by bernat
parent 97bd0e58f0
commit 4f5ec0dae2
4 changed files with 125 additions and 89 deletions

View File

@ -58,19 +58,19 @@ class Agent(object):
return control return control
def ignore_traffic_lights(self, active): def ignore_traffic_lights(self, active=True):
""" """
(De)activates the checks for traffic lights (De)activates the checks for traffic lights
""" """
self._ignore_traffic_lights = active self._ignore_traffic_lights = active
def ignore_stop_signs(self, active): def ignore_stop_signs(self, active=True):
""" """
(De)activates the checks for stop signs (De)activates the checks for stop signs
""" """
self._ignore_stop_signs = active self._ignore_stop_signs = active
def ignore_vehicles(self, active): def ignore_vehicles(self, active=True):
""" """
(De)activates the checks for stop signs (De)activates the checks for stop signs
""" """

View File

@ -8,15 +8,11 @@
This module provides GlobalRoutePlanner implementation. This module provides GlobalRoutePlanner implementation.
""" """
import math import math
import os
import numpy as np import numpy as np
import xml.etree.ElementTree as ET
import carla import carla
from agents.navigation.local_planner import RoadOption from agents.navigation.local_planner import RoadOption
from agents.tools.global_route_planner_helper import trace_route from agents.tools.global_route_planner_helper import trace_route, initialize_map
from agents.tools.misc import vector
import ad_map_access as ad import ad_map_access as ad
class GlobalRoutePlanner(object): class GlobalRoutePlanner(object):
@ -32,58 +28,28 @@ class GlobalRoutePlanner(object):
self._sampling_resolution = sampling_resolution self._sampling_resolution = sampling_resolution
self._wmap = wmap self._wmap = wmap
self._initialize_map() initialize_map(self._wmap)
def _initialize_map(self): def trace_route(self, origin, destination, with_options=True):
"""Initialize the AD map library and, creates the file needed to do so."""
lat_ref = 0.0
lon_ref = 0.0
opendrive_contents = self._wmap.to_opendrive()
xodr_name = 'RoutePlannerMap.xodr'
txt_name = 'RoutePlannerMap.txt'
# Save the opendrive data into a file
with open(xodr_name, 'w') as f:
f.write(opendrive_contents)
# Get geo reference
xml_tree = ET.parse(xodr_name)
for geo_elem in xml_tree.find('header').find('geoReference').text.split(' '):
if geo_elem.startswith('+lat_0'):
lat_ref = float(geo_elem.split('=')[-1])
elif geo_elem.startswith('+lon_0'):
lon_ref = float(geo_elem.split('=')[-1])
# Save the previous info
with open(txt_name, 'w') as f:
txt_content = "[ADMap]\n" \
"map=" + xodr_name + "\n" \
"[ENUReference]\n" \
"default=" + str(lat_ref) + " " + str(lon_ref) + " 0.0"
f.write(txt_content)
# Intialize the map and remove created files
initialized = ad.map.access.init(txt_name)
if not initialized:
raise ValueError("Couldn't initialize the map")
for fname in [txt_name, xodr_name]:
if os.path.exists(fname):
os.remove(fname)
def trace_route(self, origin, destination):
""" """
This method returns list of (carla.Waypoint, RoadOption) This method traces a route between a starting and ending point.
from origin to destination IF 'with_options' is True (deafult behavior), returns a list of [carla.Waypoint, RoadOption],
being RoadOption a high level represnetation of what the direction of the route.
If 'with_options' is False, the road options are computed, returning a list of carla.Waypoint
:param origin (carla.Waypoint): starting point of the route
:param destination (carla.Waypoint): ending point of the route
:param with_options (bool): whether or not the road options will be added to the route info
""" """
route = trace_route(origin, destination, self._wmap, self._sampling_resolution) route = trace_route(origin, destination, self._wmap, self._sampling_resolution)
if not route: if not route:
return [] return []
# Add options # Add options, or not
route_with_options = self.add_options_to_route(route) if with_options:
return route_with_options return self.add_options_to_route(route)
else:
return route
def add_options_to_route(self, route): def add_options_to_route(self, route):
""" """
@ -127,43 +93,54 @@ class GlobalRoutePlanner(object):
# Route ended at a junction # Route ended at a junction
if self._prev_at_junction: if self._prev_at_junction:
road_option = self._compute_options(route[-1], waypoint) road_option = self._compute_options(route[entry_index], route[-1])
for j in range(entry_index, len(route)): for j in range(entry_index, len(route)):
route_with_options.append([route[j], road_option]) route_with_options.append([route[j], road_option])
entry_index = None entry_index = None
# Part 2: Add lane changes # Part 2: Add lane changes
prev_lane_change = None lane_change_type = None
for i in range(0, len(route_with_options) - 1): for i in range(0, len(route_with_options)):
waypoint, option = route_with_options[i] waypoint, option = route_with_options[i]
next_waypoint, _ = route_with_options[i+1]
# Lane changes are set to both lanes # Start and end cases
if prev_lane_change: if i == len(route_with_options) - 1:
route_with_lane_changes.append([waypoint, prev_lane_change]) route_with_lane_changes.append([waypoint, lane_change_type if lane_change_type else option])
prev_lane_change = None
continue continue
if i == 0:
prev_direction = waypoint.transform.get_forward_vector()
np_prev_direction = np.array([prev_direction.x, prev_direction.y, prev_direction.z])
# Check the dot product between the two consecutive waypoint # Check the dot product between the two consecutive waypoint
direction = waypoint.transform.get_forward_vector() route_direction = route_with_options[i+1][0].transform.location - waypoint.transform.location
np_direction = np.array([direction.x, direction.y, direction.z])
route_direction = next_waypoint.transform.location - waypoint.transform.location
np_route_direction = np.array([route_direction.x, route_direction.y, route_direction.z]) np_route_direction = np.array([route_direction.x, route_direction.y, route_direction.z])
if np.linalg.norm(np_route_direction): if np.linalg.norm(np_route_direction) and np.linalg.norm(np_prev_direction):
dot = np.dot(np_direction, np_route_direction) dot = np.dot(np_prev_direction, np_route_direction)
dot /= np.linalg.norm(np_direction) * np.linalg.norm(np_route_direction) dot /= np.linalg.norm(np_prev_direction) * np.linalg.norm(np_route_direction)
else: else:
dot = 1 dot = 1
if 0 < dot < math.cos(math.radians(45)): if 0 < dot < math.cos(math.radians(45)):
cross = np.cross(np_direction, np_route_direction) if lane_change_type:
prev_lane_change = RoadOption.CHANGELANERIGHT if cross[2] > 0 else RoadOption.CHANGELANELEFT # Last lane change waypoint
route_with_lane_changes.append([waypoint, prev_lane_change]) new_option = lane_change_type
lane_change_type = None
else: else:
route_with_lane_changes.append([waypoint, option]) # First lane change waypoint
cross = np.cross(np_prev_direction, np_route_direction)
lane_change_type = RoadOption.CHANGELANERIGHT if cross[2] > 0 else RoadOption.CHANGELANELEFT
new_option = lane_change_type
else:
if lane_change_type:
new_option = lane_change_type
else:
new_option = option
route_with_lane_changes.append([waypoint, new_option])
np_prev_direction = np_route_direction
return route_with_lane_changes return route_with_lane_changes
@ -174,10 +151,8 @@ class GlobalRoutePlanner(object):
diff = (exit_waypoint.transform.rotation.yaw - entry_waypoint.transform.rotation.yaw) % 360 diff = (exit_waypoint.transform.rotation.yaw - entry_waypoint.transform.rotation.yaw) % 360
if diff > 315.0: if diff > 315.0:
option = RoadOption.STRAIGHT option = RoadOption.STRAIGHT
elif diff > 225.0: elif diff > 180.0:
option = RoadOption.LEFT option = RoadOption.LEFT
elif diff > 135.0:
option = RoadOption.HALFTURN
elif diff > 45.0: elif diff > 45.0:
option = RoadOption.RIGHT option = RoadOption.RIGHT
else: else:

View File

@ -23,10 +23,9 @@ class RoadOption(Enum):
LEFT = 1 LEFT = 1
RIGHT = 2 RIGHT = 2
STRAIGHT = 3 STRAIGHT = 3
HALFTURN = 4 LANEFOLLOW = 4
LANEFOLLOW = 5 CHANGELANELEFT = 5
CHANGELANELEFT = 6 CHANGELANERIGHT = 6
CHANGELANERIGHT = 7
class LocalPlanner(object): class LocalPlanner(object):

View File

@ -9,10 +9,52 @@ This file has several useful functions related to the AD Map library
from __future__ import print_function from __future__ import print_function
import carla from math import floor
import os
import numpy as np import numpy as np
import xml.etree.ElementTree as ET
import carla
import ad_map_access as ad import ad_map_access as ad
def initialize_map(wmap):
"""Initialize the AD map library and, creates the file needed to do so."""
lat_ref = 0.0
lon_ref = 0.0
opendrive_contents = wmap.to_opendrive()
xodr_name = 'RoutePlannerMap.xodr'
txt_name = 'RoutePlannerMap.txt'
# Save the opendrive data into a file
with open(xodr_name, 'w') as f:
f.write(opendrive_contents)
# Get geo reference
xml_tree = ET.parse(xodr_name)
for geo_elem in xml_tree.find('header').find('geoReference').text.split(' '):
if geo_elem.startswith('+lat_0'):
lat_ref = float(geo_elem.split('=')[-1])
elif geo_elem.startswith('+lon_0'):
lon_ref = float(geo_elem.split('=')[-1])
# Save the previous info
with open(txt_name, 'w') as f:
txt_content = "[ADMap]\n" \
"map=" + xodr_name + "\n" \
"[ENUReference]\n" \
"default=" + str(lat_ref) + " " + str(lon_ref) + " 0.0"
f.write(txt_content)
# Intialize the map and remove created files
initialized = ad.map.access.init(txt_name)
if not initialized:
raise ValueError("Couldn't initialize the map")
for fname in [txt_name, xodr_name]:
if os.path.exists(fname):
os.remove(fname)
def carla_loc_to_enu(carla_location): def carla_loc_to_enu(carla_location):
"""Transform a CARLA location into an ENU point""" """Transform a CARLA location into an ENU point"""
return ad.map.point.createENUPoint(carla_location.x, -carla_location.y, carla_location.z) return ad.map.point.createENUPoint(carla_location.x, -carla_location.y, carla_location.z)
@ -25,6 +67,26 @@ def enu_to_carla_loc(enu_point):
"""Transform an ENU point into a CARLA location""" """Transform an ENU point into a CARLA location"""
return carla.Location(float(enu_point.x), float(-enu_point.y), float(enu_point.z)) return carla.Location(float(enu_point.x), float(-enu_point.y), float(enu_point.z))
def para_point_to_carla_waypoint_2(para_point, town_map, lane_type=carla.LaneType.Driving):
"""Transform a para point into a CARLA waypoint"""
ad_lane_id = para_point.laneId
num_lane_id = float(str(para_point.laneId))
road_id = floor(num_lane_id / 10000)
remnant = num_lane_id - road_id * 10000
lane_segment_id = floor(remnant / 100)
remnant = remnant - lane_segment_id * 100
lane_id = floor(remnant - 50)
length = float(ad.map.lane.calcLength(ad_lane_id))
is_positive_lane = ad.map.lane.isLaneDirectionPositive(ad_lane_id)
if is_positive_lane and not lane_id < 0 or not is_positive_lane and lane_id < 0:
s = length * (1 - float(para_point.parametricOffset))
else:
s = length * float(para_point.parametricOffset)
return town_map.get_waypoint_xodr(road_id, lane_id, s)
def para_point_to_carla_waypoint(para_point, town_map, lane_type=carla.LaneType.Driving): def para_point_to_carla_waypoint(para_point, town_map, lane_type=carla.LaneType.Driving):
"""Transform a para point into a CARLA waypoint""" """Transform a para point into a CARLA waypoint"""
enu_point = ad.map.lane.getENULanePoint(para_point) enu_point = ad.map.lane.getENULanePoint(para_point)
@ -56,7 +118,7 @@ def to_ad_paraPoint(location, distance=1, probability=0):
distance = [float(mmap.matchedPointDistance) for mmap in match_results] distance = [float(mmap.matchedPointDistance) for mmap in match_results]
return match_results[distance.index(min(distance))].lanePoint.paraPoint return match_results[distance.index(min(distance))].lanePoint.paraPoint
def trace_route(start_waypoint, end_waypoint, town_map, sample_resolution=1, max_distance=0.5, probability=0): def trace_route(start_waypoint, end_waypoint, town_map, sample_resolution=1, max_distance=1.6, probability=0):
""" """
Gets the shortest route between a starting and end waypoint. This transforms the given location Gets the shortest route between a starting and end waypoint. This transforms the given location
to AD map paraPoints, and iterates through all permutations to return the shortest route. to AD map paraPoints, and iterates through all permutations to return the shortest route.
@ -172,27 +234,27 @@ def _get_route_length(route):
def _get_route_waypoints(route, resolution, town_map): def _get_route_waypoints(route, resolution, town_map):
""" """
Given a route, transforms it into a list of [carla.Waypoint, RoadOption] Given a route, transforms it into a list of [carla.Waypoint, RoadOption].
Take into account that at locations where multiple lanes overlap,
while the waypoints will be correctly placed, they might be part of a different lane.
:param route (ad.map.route.FullRoute): AD map route instance created with RouteCreationMode Undefined. :param route (ad.map.route.FullRoute): AD map route instance created with RouteCreationMode Undefined.
Other creation modes return mode than one lane, which would need a prefiltering. Other creation modes return mode than one lane, which would need a prefiltering.
:param resolution (float): Distance between the waypoints that form the route. :param resolution (float): Distance between the waypoints that form the route.
:param town_map (carla.Map): CARLA map instance where the route will be computed :param town_map (carla.Map): CARLA map instance where the route will be computed
""" """
# TODO: use para_point_to_carla_waypoint_2, which directly transforms from paraPoint to waypoint
# to ensure all waypoints correspond to the correct lane
wp_route = [] wp_route = []
for road_segment in route.roadSegments: for road_segment in route.roadSegments:
for lane_segment in road_segment.drivableLaneSegments: for lane_segment in road_segment.drivableLaneSegments:
lane_id = lane_segment.laneInterval.laneId lane_id = lane_segment.laneInterval.laneId
param_list = _get_lane_interval_list(lane_segment.laneInterval, resolution) param_list = _get_lane_interval_list(lane_segment.laneInterval, resolution)
for i in range(len(param_list)): for param in param_list:
para_point = ad.map.point.createParaPoint(lane_id, ad.physics.ParametricValue(param_list[i])) para_point = ad.map.point.createParaPoint(lane_id, ad.physics.ParametricValue(param))
carla_waypoint = para_point_to_carla_waypoint(para_point, town_map) carla_waypoint = para_point_to_carla_waypoint(para_point, town_map)
wp_route.append(carla_waypoint) wp_route.append(carla_waypoint)
# if i == 0:
# world.debug.draw_point(
# carla_waypoint.transform.location, size=0.2, life_time=10000, color=carla.Color(255,0,0))
# world.debug.draw_point(
# carla_waypoint.transform.location, size=0.1, life_time=10000, color=carla.Color(255,255,0))
return wp_route return wp_route