WIP: trying to transform parapoint to waypoint

This commit is contained in:
Guillermo 2021-07-08 16:55:49 +02:00 committed by bernat
parent d61afc74f4
commit 4aae9ebd88
2 changed files with 155 additions and 69 deletions

View File

@ -12,7 +12,6 @@ import numpy as np
from agents.navigation.local_planner import RoadOption from agents.navigation.local_planner import RoadOption
from agents.tools.route_helper import trace_route, initialize_map from agents.tools.route_helper import trace_route, initialize_map
import ad_map_access as ad
class GlobalRoutePlanner(object): class GlobalRoutePlanner(object):
""" """

View File

@ -9,13 +9,15 @@ This file has several useful functions related to the AD Map library
from __future__ import print_function from __future__ import print_function
from math import floor from math import floor, sqrt, pow
import os import os
from re import L
import numpy as np import numpy as np
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
import carla import carla
import ad_map_access as ad from ad_map_access import map as ad_map
from ad_physics import physics as ad_physics
def initialize_map(wmap): def initialize_map(wmap):
"""Initialize the AD map library and, creates the file needed to do so.""" """Initialize the AD map library and, creates the file needed to do so."""
@ -47,7 +49,7 @@ def initialize_map(wmap):
f.write(txt_content) f.write(txt_content)
# Intialize the map and remove created files # Intialize the map and remove created files
initialized = ad.map.access.init(txt_name) initialized = ad_map.access.init(txt_name)
if not initialized: if not initialized:
raise ValueError("Couldn't initialize the map") raise ValueError("Couldn't initialize the map")
@ -57,55 +59,44 @@ def initialize_map(wmap):
def carla_loc_to_enu(carla_location): def carla_loc_to_enu(carla_location):
"""Transform a CARLA location into an ENU point""" """Transform a CARLA location into an ENU point"""
return ad.map.point.createENUPoint(carla_location.x, -carla_location.y, carla_location.z) return ad_map.point.createENUPoint(carla_location.x, -carla_location.y, carla_location.z)
def carla_loc_to_ecef(carla_location): def carla_loc_to_ecef(carla_location):
"""Transform a CARLA location into an ENU point""" """Transform a CARLA location into an ENU point"""
return ad.map.point.toECEF(carla_loc_to_enu(carla_location)) return ad_map.point.toECEF(carla_loc_to_enu(carla_location))
def enu_to_carla_loc(enu_point): def enu_to_carla_loc(enu_point):
"""Transform an ENU point into a CARLA location""" """Transform an ENU point into a CARLA location"""
return carla.Location(float(enu_point.x), float(-enu_point.y), float(enu_point.z)) return carla.Location(float(enu_point.x), float(-enu_point.y), float(enu_point.z))
def para_point_to_carla_waypoint_2(para_point, town_map, lane_type=carla.LaneType.Driving): def para_point_to_carla_waypoint(para_point, town_map, lane_length):
"""Transform a para point into a CARLA waypoint""" """Transform a para point into a CARLA waypoint"""
ad_lane_id = para_point.laneId ad_lane_id = para_point.laneId
num_lane_id = float(str(para_point.laneId)) road_id, lane_id, segment_id = get_opendrive_ids(ad_lane_id)
road_id = floor(num_lane_id / 10000) is_positive_lane = ad_map.lane.isLaneDirectionPositive(ad_lane_id)
remnant = num_lane_id - road_id * 10000
lane_segment_id = floor(remnant / 100)
remnant = remnant - lane_segment_id * 100
lane_id = floor(remnant - 50)
length = float(ad.map.lane.calcLength(ad_lane_id))
is_positive_lane = ad.map.lane.isLaneDirectionPositive(ad_lane_id)
if is_positive_lane and not lane_id < 0 or not is_positive_lane and lane_id < 0: if is_positive_lane and not lane_id < 0 or not is_positive_lane and lane_id < 0:
s = length * (1 - float(para_point.parametricOffset)) s = lane_length * (1 - float(para_point.parametricOffset))
else: else:
s = length * float(para_point.parametricOffset) s = lane_length * float(para_point.parametricOffset)
# print("Road: {} - Lane: {} - Lane segment: {} - s: {} ".format(road_id, lane_id, segment_id, s))
return town_map.get_waypoint_xodr(road_id, lane_id, s) return town_map.get_waypoint_xodr(road_id, lane_id, s)
def para_point_to_carla_waypoint(para_point, town_map, lane_type=carla.LaneType.Driving):
"""Transform a para point into a CARLA waypoint"""
enu_point = ad.map.lane.getENULanePoint(para_point)
carla_point = enu_to_carla_loc(enu_point)
carla_waypoint = town_map.get_waypoint(carla_point, lane_type=lane_type)
return carla_waypoint
def is_point_at_driving_lane(para_point, town_map): def is_point_at_driving_lane(para_point, town_map):
"""Checks if a parapoint is part of a CARLA driving lane""" """Checks if a parapoint is part of a CARLA driving lane"""
carla_waypoint = para_point_to_carla_waypoint(para_point, town_map, carla.LaneType.Any) enu_point = ad_map.lane.getENULanePoint(para_point)
carla_point = enu_to_carla_loc(enu_point)
carla_waypoint = town_map.get_waypoint(carla_point, lane_type=carla.LaneType.Any)
return carla_waypoint.lane_type == carla.LaneType.Driving return carla_waypoint.lane_type == carla.LaneType.Driving
def to_ad_paraPoint(location, distance=1, probability=0): def to_ad_paraPoint(location, distance=1, probability=0):
""" """
Transforms a carla.Location into an ad.map.point.ParaPoint() Transforms a carla.Location into an ad_map.point.ParaPoint()
""" """
ad_distance = ad.physics.Distance(distance) ad_distance = ad_physics.Distance(distance)
ad_probability = ad.physics.Probability(probability) ad_probability = ad_physics.Probability(probability)
ad_map_matching = ad.map.match.AdMapMatching() ad_map_matching = ad_map.match.AdMapMatching()
ad_location = carla_loc_to_enu(location) ad_location = carla_loc_to_enu(location)
match_results = ad_map_matching.getMapMatchedPositions(ad_location, ad_distance, ad_probability) match_results = ad_map_matching.getMapMatchedPositions(ad_location, ad_distance, ad_probability)
@ -188,15 +179,15 @@ def _waypoint_matches(waypoint, town_map, max_distance, probability=0):
ecef_front_location= carla_loc_to_ecef(front_location) ecef_front_location= carla_loc_to_ecef(front_location)
# Get the map matching and the heading hint # Get the map matching and the heading hint
ecef_heading = ad.map.point.createECEFHeading(ecef_location, ecef_front_location) ecef_heading = ad_map.point.createECEFHeading(ecef_location, ecef_front_location)
ad_map_matching = ad.map.match.AdMapMatching() ad_map_matching = ad_map.match.AdMapMatching()
ad_map_matching.addHeadingHint(ecef_heading) ad_map_matching.addHeadingHint(ecef_heading)
# Get the matches and filter the none driving lane ones # Get the matches and filter the none driving lane ones
matches = ad_map_matching.getMapMatchedPositions( matches = ad_map_matching.getMapMatchedPositions(
carla_loc_to_enu(location), carla_loc_to_enu(location),
ad.physics.Distance(max_distance), ad_physics.Distance(max_distance),
ad.physics.Probability(probability) ad_physics.Probability(probability)
) )
matches = [m for m in matches if is_point_at_driving_lane(m.lanePoint.paraPoint, town_map)] matches = [m for m in matches if is_point_at_driving_lane(m.lanePoint.paraPoint, town_map)]
@ -211,8 +202,10 @@ def _filter_shortest_route(start_matches, end_matches):
start_point = start_match.lanePoint.paraPoint start_point = start_match.lanePoint.paraPoint
for end_match in end_matches: for end_match in end_matches:
# Get the route # Get the route
new_route_segment = ad.map.route.planRoute( new_route_segment = ad_map.route.planRoute(
start_point, end_match.lanePoint.paraPoint, ad.map.route.RouteCreationMode.Undefined) start_point, end_match.lanePoint.paraPoint,
ad_map.route.RouteCreationMode.Undefined
)
if len(new_route_segment.roadSegments) == 0: if len(new_route_segment.roadSegments) == 0:
continue # The route doesn't exist, ignore it continue # The route doesn't exist, ignore it
@ -235,7 +228,7 @@ def _get_route_length(route):
for lane_segment in road_segment.drivableLaneSegments: for lane_segment in road_segment.drivableLaneSegments:
lane_start = float(lane_segment.laneInterval.start) lane_start = float(lane_segment.laneInterval.start)
lane_end = float(lane_segment.laneInterval.end) lane_end = float(lane_segment.laneInterval.end)
lane_length = float(ad.map.lane.calcLength(lane_segment.laneInterval.laneId)) lane_length = float(ad_map.lane.calcLength(lane_segment.laneInterval.laneId))
road_length += lane_length * abs(lane_end - lane_start) road_length += lane_length * abs(lane_end - lane_start)
@ -245,58 +238,152 @@ def _get_route_length(route):
return route_length return route_length
from srunner.scenariomanager.carla_data_provider import CarlaDataProvider
def _get_route_waypoints(route, resolution, town_map): def _get_route_waypoints(route, resolution, town_map):
""" """
Given a route, transforms it into a list of [carla.Waypoint, RoadOption]. Given a route, transforms it into a list of [carla.Waypoint, RoadOption].
Take into account that at locations where multiple lanes overlap, Take into account that at locations where multiple lanes overlap,
while the waypoints will be correctly placed, they might be part of a different lane. while the waypoints will be correctly placed, they might be part of a different lane.
:param route (ad.map.route.FullRoute): AD map route instance created with RouteCreationMode Undefined. :param route (ad_map.route.FullRoute): AD map route instance created with RouteCreationMode Undefined.
Other creation modes return mode than one lane, which would need a prefiltering. Other creation modes return mode than one lane, which would need a prefiltering.
:param resolution (float): Distance between the waypoints that form the route. :param resolution (float): Distance between the waypoints that form the route.
:param town_map (carla.Map): CARLA map instance where the route will be computed :param town_map (carla.Map): CARLA map instance where the route will be computed
""" """
# TODO: use para_point_to_carla_waypoint_2, which directly transforms from paraPoint to waypoint # world = CarlaDataProvider.get_world()
# to ensure all waypoints correspond to the correct lane
wp_route = [] wp_route = []
# i = 0
for road_segment in route.roadSegments: for road_segment in route.roadSegments:
for lane_segment in road_segment.drivableLaneSegments: for lane_segment in road_segment.drivableLaneSegments:
# print(" ----------------- ") # Just one segment due to route mode 'Undefined'. If not, some filtering is needed
# border = ad.map.route.getENUBorder(lane_segment.laneInterval) if float(lane_segment.laneInterval.start) == float(lane_segment.laneInterval.end):
# for point in border.left: continue # Lengthless interval
# world.debug.draw_point(enu_to_carla_loc(point), life_time=100, size=0.2) para_points = _get_lane_para_points(lane_segment.laneInterval, resolution)
# for point in border.right: # lane_length = _get_opendrive_lane_length(lane_segment.laneInterval)
# world.debug.draw_point(enu_to_carla_loc(point), life_time=100, size=0.2) for para_point in para_points:
# border_ = ad.map.lane.getLateralAlignmentEdge(border, ad.physics.ParametricValue(0.5)) # carla_waypoint = para_point_to_carla_waypoint(para_point, town_map, lane_length)
# length = 0 # if not carla_waypoint:
# for i in range(0, len(border_)-1): # print("Failed the previous wp")
# p1 = border_[i] # # Use its location instead
# p2 = border_[i+1] # enu_point = ad_map.lane.getENULanePoint(para_point)
# dist = sqrt(pow(p2.x - p1.x, 2) + pow(p2.y - p1.y, 2) + pow(p2.z - p1.z, 2)) # carla_point = enu_to_carla_loc(enu_point)
# print("Adding: {}".format(dist)) # carla_waypoint = town_map.get_waypoint(carla_point)
# length += dist enu_point = ad_map.lane.getENULanePoint(para_point)
# world.debug.draw_point(enu_to_carla_loc(p1), life_time=100, size=0.2, color=carla.Color(0,0,0)) carla_point = enu_to_carla_loc(enu_point)
# world.debug.draw_arrow(enu_to_carla_loc(p1), enu_to_carla_loc(p2), life_time=100, color=carla.Color(0,0,0)) carla_waypoint = town_map.get_waypoint(carla_point)
# print("Summ Length: {}".format(length))
# print("Calc Length: {}".format(ad.map.lane.calcLength(lane_segment.laneInterval.laneId)))
lane_id = lane_segment.laneInterval.laneId # world.debug.draw_string(carla_waypoint.transform.location, str(i), life_time=100, color=carla.Color(0,0,0))
param_list = _get_lane_interval_list(lane_segment.laneInterval, resolution)
for param in param_list:
para_point = ad.map.point.createParaPoint(lane_id, ad.physics.ParametricValue(param))
carla_waypoint = para_point_to_carla_waypoint(para_point, town_map)
wp_route.append(carla_waypoint) wp_route.append(carla_waypoint)
# i += 1
return wp_route return wp_route
def _get_lane_interval_list(lane_interval, distance=1): def _get_opendrive_lane_length(lane_interval):
"""Given a lane interval, gets the length of the lane"""
def create_interval(lane_id, start, end):
"""Creates and returns laneInterval instance (as ad_map.route.LaneInterval() doesn't accept inputs)"""
interval = ad_map.route.LaneInterval()
interval.laneId = lane_id
interval.start = ad_physics.ParametricValue(start)
interval.end = ad_physics.ParametricValue(end)
return interval
# As the opendrive length is the center of the road, get the lane with id 1 / -1
_, lane_id, _ = get_opendrive_ids(lane_interval.laneId)
ad_lane_id = int(str(lane_interval.laneId))
if lane_id > 0:
lane_range = range(lane_id - 1, -1, -1)
else:
lane_range = range(lane_id + 1, 1, 1)
for lane_diff in lane_range:
center_lane_id = ad_lane_id - lane_diff
# print("OpenDrive key: {}*{} - AD lane ID: {} - Center Lane ID: {}".format(
# road_id, lane_id, ad_lane_id, center_lane_id))
if ad_map.lane.isValid(center_lane_id):
break
start = float(lane_interval.start)
end = float(lane_interval.end)
if end > start:
lane_interval = create_interval(center_lane_id, 0, 1)
elif start > end:
lane_interval = create_interval(center_lane_id, 1, 0)
is_positive_lane = ad_map.lane.isLaneDirectionPositive(lane_interval.laneId)
enu_border = ad_map.route.getENUBorder(lane_interval)
if is_positive_lane and not lane_id < 0 or not is_positive_lane and lane_id < 0:
border = ad_map.lane.getLateralAlignmentEdge(enu_border, ad_physics.ParametricValue(0))
else:
border = ad_map.lane.getLateralAlignmentEdge(enu_border, ad_physics.ParametricValue(1))
# world = CarlaDataProvider.get_world()
# for point in border:
# world.debug.draw_point(enu_to_carla_loc(point)+carla.Location(z=1), life_time=1000, color=carla.Color(0,0,0))
# And its length
lane_length = 0
for i in range(1, len(border)):
prev_point = border[i-1]
point = border[i]
dist_sq = pow(prev_point.x - point.x, 2) + pow(prev_point.y - point.y, 2) + pow(prev_point.y - point.y, 2)
lane_length += sqrt(dist_sq)
# border = ad_map.lane.getLateralAlignmentEdge(enu_border, ad_physics.ParametricValue(0))
# lane_length = 0
# prev_p = None
# for p in border:
# if prev_p:
# lane_length += sqrt(pow(prev_p.x - p.x, 2) + pow(prev_p.y - p.y, 2) + pow(prev_p.z - p.z, 2))
# prev_p = p
# border = ad_map.lane.getLateralAlignmentEdge(enu_border, ad_physics.ParametricValue(1))
# lane_length_ = 0
# prev_p = None
# for p in border:
# if prev_p:
# lane_length_ += sqrt(pow(prev_p.x - p.x, 2) + pow(prev_p.y - p.y, 2) + pow(prev_p.z - p.z, 2))
# prev_p = p
# print("Permutation: {}*{} - R Length: {} - L Length: {}".format(
# is_positive_lane, lane_id>0, lane_length, lane_length_))
return lane_length
def get_opendrive_ids(ad_lane_id):
"""Given an AD map lane ID, returns the corresponding opendrive road and lane ids"""
num_lane_id = float(str(ad_lane_id))
road_id = floor(num_lane_id / 10000)
remnant = num_lane_id - road_id * 10000
lane_segment_id = floor(remnant / 100)
remnant = remnant - lane_segment_id * 100
lane_id = floor(remnant - 50)
return road_id, lane_id, lane_segment_id
def _get_lane_para_points(lane_interval, distance=1):
""" """
Separates a given lane interval into smaller intervals of length equal to 'distance' Samples a given lane interval in points every 'distance' meters,
returning a list of the values needed to achieve that.
""" """
start = float(lane_interval.start) start = float(lane_interval.start)
end = float(lane_interval.end) end = float(lane_interval.end)
length = float(ad.map.lane.calcLength(lane_interval.laneId)) length = float(ad_map.lane.calcLength(lane_interval.laneId))
if start == end: if start == end:
return [] return []
return np.arange(start, end, np.sign(end - start) * distance / length) values = np.arange(start, end, np.sign(end - start) * distance / length)
para_points = []
for value in values:
para_point = ad_map.point.createParaPoint(lane_interval.laneId, ad_physics.ParametricValue(value))
para_points.append(para_point)
return para_points
"""
laneInterval:LaneInterval(laneId:10310148, start:0.218181, end:1, wrongWay:0))
"""