diff --git a/PythonAPI/carla/agents/navigation/global_route_planner.py b/PythonAPI/carla/agents/navigation/global_route_planner.py index e30dfd00c..2f9854678 100644 --- a/PythonAPI/carla/agents/navigation/global_route_planner.py +++ b/PythonAPI/carla/agents/navigation/global_route_planner.py @@ -12,7 +12,6 @@ import numpy as np from agents.navigation.local_planner import RoadOption from agents.tools.route_helper import trace_route, initialize_map -import ad_map_access as ad class GlobalRoutePlanner(object): """ diff --git a/PythonAPI/carla/agents/tools/route_helper.py b/PythonAPI/carla/agents/tools/route_helper.py index 7b9285533..eb7238e95 100644 --- a/PythonAPI/carla/agents/tools/route_helper.py +++ b/PythonAPI/carla/agents/tools/route_helper.py @@ -9,13 +9,15 @@ This file has several useful functions related to the AD Map library from __future__ import print_function -from math import floor +from math import floor, sqrt, pow import os +from re import L import numpy as np import xml.etree.ElementTree as ET import carla -import ad_map_access as ad +from ad_map_access import map as ad_map +from ad_physics import physics as ad_physics def initialize_map(wmap): """Initialize the AD map library and, creates the file needed to do so.""" @@ -47,7 +49,7 @@ def initialize_map(wmap): f.write(txt_content) # Intialize the map and remove created files - initialized = ad.map.access.init(txt_name) + initialized = ad_map.access.init(txt_name) if not initialized: raise ValueError("Couldn't initialize the map") @@ -57,55 +59,44 @@ def initialize_map(wmap): def carla_loc_to_enu(carla_location): """Transform a CARLA location into an ENU point""" - return ad.map.point.createENUPoint(carla_location.x, -carla_location.y, carla_location.z) + return ad_map.point.createENUPoint(carla_location.x, -carla_location.y, carla_location.z) def carla_loc_to_ecef(carla_location): """Transform a CARLA location into an ENU point""" - return ad.map.point.toECEF(carla_loc_to_enu(carla_location)) + return ad_map.point.toECEF(carla_loc_to_enu(carla_location)) def enu_to_carla_loc(enu_point): """Transform an ENU point into a CARLA location""" return carla.Location(float(enu_point.x), float(-enu_point.y), float(enu_point.z)) -def para_point_to_carla_waypoint_2(para_point, town_map, lane_type=carla.LaneType.Driving): +def para_point_to_carla_waypoint(para_point, town_map, lane_length): """Transform a para point into a CARLA waypoint""" ad_lane_id = para_point.laneId - num_lane_id = float(str(para_point.laneId)) + road_id, lane_id, segment_id = get_opendrive_ids(ad_lane_id) - road_id = floor(num_lane_id / 10000) - remnant = num_lane_id - road_id * 10000 - lane_segment_id = floor(remnant / 100) - remnant = remnant - lane_segment_id * 100 - lane_id = floor(remnant - 50) - - length = float(ad.map.lane.calcLength(ad_lane_id)) - is_positive_lane = ad.map.lane.isLaneDirectionPositive(ad_lane_id) + is_positive_lane = ad_map.lane.isLaneDirectionPositive(ad_lane_id) if is_positive_lane and not lane_id < 0 or not is_positive_lane and lane_id < 0: - s = length * (1 - float(para_point.parametricOffset)) + s = lane_length * (1 - float(para_point.parametricOffset)) else: - s = length * float(para_point.parametricOffset) + s = lane_length * float(para_point.parametricOffset) - return town_map.get_waypoint_xodr(road_id, lane_id, s) - -def para_point_to_carla_waypoint(para_point, town_map, lane_type=carla.LaneType.Driving): - """Transform a para point into a CARLA waypoint""" - enu_point = ad.map.lane.getENULanePoint(para_point) - carla_point = enu_to_carla_loc(enu_point) - carla_waypoint = town_map.get_waypoint(carla_point, lane_type=lane_type) - return carla_waypoint + # print("Road: {} - Lane: {} - Lane segment: {} - s: {} ".format(road_id, lane_id, segment_id, s)) + return town_map.get_waypoint_xodr(road_id, lane_id, s) def is_point_at_driving_lane(para_point, town_map): """Checks if a parapoint is part of a CARLA driving lane""" - carla_waypoint = para_point_to_carla_waypoint(para_point, town_map, carla.LaneType.Any) + enu_point = ad_map.lane.getENULanePoint(para_point) + carla_point = enu_to_carla_loc(enu_point) + carla_waypoint = town_map.get_waypoint(carla_point, lane_type=carla.LaneType.Any) return carla_waypoint.lane_type == carla.LaneType.Driving def to_ad_paraPoint(location, distance=1, probability=0): """ - Transforms a carla.Location into an ad.map.point.ParaPoint() + Transforms a carla.Location into an ad_map.point.ParaPoint() """ - ad_distance = ad.physics.Distance(distance) - ad_probability = ad.physics.Probability(probability) - ad_map_matching = ad.map.match.AdMapMatching() + ad_distance = ad_physics.Distance(distance) + ad_probability = ad_physics.Probability(probability) + ad_map_matching = ad_map.match.AdMapMatching() ad_location = carla_loc_to_enu(location) match_results = ad_map_matching.getMapMatchedPositions(ad_location, ad_distance, ad_probability) @@ -188,15 +179,15 @@ def _waypoint_matches(waypoint, town_map, max_distance, probability=0): ecef_front_location= carla_loc_to_ecef(front_location) # Get the map matching and the heading hint - ecef_heading = ad.map.point.createECEFHeading(ecef_location, ecef_front_location) - ad_map_matching = ad.map.match.AdMapMatching() + ecef_heading = ad_map.point.createECEFHeading(ecef_location, ecef_front_location) + ad_map_matching = ad_map.match.AdMapMatching() ad_map_matching.addHeadingHint(ecef_heading) # Get the matches and filter the none driving lane ones matches = ad_map_matching.getMapMatchedPositions( carla_loc_to_enu(location), - ad.physics.Distance(max_distance), - ad.physics.Probability(probability) + ad_physics.Distance(max_distance), + ad_physics.Probability(probability) ) matches = [m for m in matches if is_point_at_driving_lane(m.lanePoint.paraPoint, town_map)] @@ -211,8 +202,10 @@ def _filter_shortest_route(start_matches, end_matches): start_point = start_match.lanePoint.paraPoint for end_match in end_matches: # Get the route - new_route_segment = ad.map.route.planRoute( - start_point, end_match.lanePoint.paraPoint, ad.map.route.RouteCreationMode.Undefined) + new_route_segment = ad_map.route.planRoute( + start_point, end_match.lanePoint.paraPoint, + ad_map.route.RouteCreationMode.Undefined + ) if len(new_route_segment.roadSegments) == 0: continue # The route doesn't exist, ignore it @@ -235,7 +228,7 @@ def _get_route_length(route): for lane_segment in road_segment.drivableLaneSegments: lane_start = float(lane_segment.laneInterval.start) lane_end = float(lane_segment.laneInterval.end) - lane_length = float(ad.map.lane.calcLength(lane_segment.laneInterval.laneId)) + lane_length = float(ad_map.lane.calcLength(lane_segment.laneInterval.laneId)) road_length += lane_length * abs(lane_end - lane_start) @@ -245,58 +238,152 @@ def _get_route_length(route): return route_length +from srunner.scenariomanager.carla_data_provider import CarlaDataProvider + def _get_route_waypoints(route, resolution, town_map): """ Given a route, transforms it into a list of [carla.Waypoint, RoadOption]. Take into account that at locations where multiple lanes overlap, while the waypoints will be correctly placed, they might be part of a different lane. - :param route (ad.map.route.FullRoute): AD map route instance created with RouteCreationMode Undefined. + :param route (ad_map.route.FullRoute): AD map route instance created with RouteCreationMode Undefined. Other creation modes return mode than one lane, which would need a prefiltering. :param resolution (float): Distance between the waypoints that form the route. :param town_map (carla.Map): CARLA map instance where the route will be computed """ - # TODO: use para_point_to_carla_waypoint_2, which directly transforms from paraPoint to waypoint - # to ensure all waypoints correspond to the correct lane - + # world = CarlaDataProvider.get_world() wp_route = [] + # i = 0 for road_segment in route.roadSegments: for lane_segment in road_segment.drivableLaneSegments: - # print(" ----------------- ") - # border = ad.map.route.getENUBorder(lane_segment.laneInterval) - # for point in border.left: - # world.debug.draw_point(enu_to_carla_loc(point), life_time=100, size=0.2) - # for point in border.right: - # world.debug.draw_point(enu_to_carla_loc(point), life_time=100, size=0.2) - # border_ = ad.map.lane.getLateralAlignmentEdge(border, ad.physics.ParametricValue(0.5)) - # length = 0 - # for i in range(0, len(border_)-1): - # p1 = border_[i] - # p2 = border_[i+1] - # dist = sqrt(pow(p2.x - p1.x, 2) + pow(p2.y - p1.y, 2) + pow(p2.z - p1.z, 2)) - # print("Adding: {}".format(dist)) - # length += dist - # world.debug.draw_point(enu_to_carla_loc(p1), life_time=100, size=0.2, color=carla.Color(0,0,0)) - # world.debug.draw_arrow(enu_to_carla_loc(p1), enu_to_carla_loc(p2), life_time=100, color=carla.Color(0,0,0)) - # print("Summ Length: {}".format(length)) - # print("Calc Length: {}".format(ad.map.lane.calcLength(lane_segment.laneInterval.laneId))) + # Just one segment due to route mode 'Undefined'. If not, some filtering is needed + if float(lane_segment.laneInterval.start) == float(lane_segment.laneInterval.end): + continue # Lengthless interval + para_points = _get_lane_para_points(lane_segment.laneInterval, resolution) + # lane_length = _get_opendrive_lane_length(lane_segment.laneInterval) + for para_point in para_points: + # carla_waypoint = para_point_to_carla_waypoint(para_point, town_map, lane_length) + # if not carla_waypoint: + # print("Failed the previous wp") + # # Use its location instead + # enu_point = ad_map.lane.getENULanePoint(para_point) + # carla_point = enu_to_carla_loc(enu_point) + # carla_waypoint = town_map.get_waypoint(carla_point) + enu_point = ad_map.lane.getENULanePoint(para_point) + carla_point = enu_to_carla_loc(enu_point) + carla_waypoint = town_map.get_waypoint(carla_point) - lane_id = lane_segment.laneInterval.laneId - param_list = _get_lane_interval_list(lane_segment.laneInterval, resolution) - for param in param_list: - para_point = ad.map.point.createParaPoint(lane_id, ad.physics.ParametricValue(param)) - carla_waypoint = para_point_to_carla_waypoint(para_point, town_map) + # world.debug.draw_string(carla_waypoint.transform.location, str(i), life_time=100, color=carla.Color(0,0,0)) wp_route.append(carla_waypoint) + # i += 1 return wp_route -def _get_lane_interval_list(lane_interval, distance=1): +def _get_opendrive_lane_length(lane_interval): + """Given a lane interval, gets the length of the lane""" + def create_interval(lane_id, start, end): + """Creates and returns laneInterval instance (as ad_map.route.LaneInterval() doesn't accept inputs)""" + interval = ad_map.route.LaneInterval() + interval.laneId = lane_id + interval.start = ad_physics.ParametricValue(start) + interval.end = ad_physics.ParametricValue(end) + return interval + + # As the opendrive length is the center of the road, get the lane with id 1 / -1 + _, lane_id, _ = get_opendrive_ids(lane_interval.laneId) + ad_lane_id = int(str(lane_interval.laneId)) + + if lane_id > 0: + lane_range = range(lane_id - 1, -1, -1) + else: + lane_range = range(lane_id + 1, 1, 1) + + for lane_diff in lane_range: + center_lane_id = ad_lane_id - lane_diff + # print("OpenDrive key: {}*{} - AD lane ID: {} - Center Lane ID: {}".format( + # road_id, lane_id, ad_lane_id, center_lane_id)) + if ad_map.lane.isValid(center_lane_id): + break + + start = float(lane_interval.start) + end = float(lane_interval.end) + if end > start: + lane_interval = create_interval(center_lane_id, 0, 1) + elif start > end: + lane_interval = create_interval(center_lane_id, 1, 0) + + is_positive_lane = ad_map.lane.isLaneDirectionPositive(lane_interval.laneId) + enu_border = ad_map.route.getENUBorder(lane_interval) + + if is_positive_lane and not lane_id < 0 or not is_positive_lane and lane_id < 0: + border = ad_map.lane.getLateralAlignmentEdge(enu_border, ad_physics.ParametricValue(0)) + else: + border = ad_map.lane.getLateralAlignmentEdge(enu_border, ad_physics.ParametricValue(1)) + + # world = CarlaDataProvider.get_world() + # for point in border: + # world.debug.draw_point(enu_to_carla_loc(point)+carla.Location(z=1), life_time=1000, color=carla.Color(0,0,0)) + + # And its length + lane_length = 0 + for i in range(1, len(border)): + prev_point = border[i-1] + point = border[i] + dist_sq = pow(prev_point.x - point.x, 2) + pow(prev_point.y - point.y, 2) + pow(prev_point.y - point.y, 2) + lane_length += sqrt(dist_sq) + + # border = ad_map.lane.getLateralAlignmentEdge(enu_border, ad_physics.ParametricValue(0)) + # lane_length = 0 + # prev_p = None + # for p in border: + # if prev_p: + # lane_length += sqrt(pow(prev_p.x - p.x, 2) + pow(prev_p.y - p.y, 2) + pow(prev_p.z - p.z, 2)) + # prev_p = p + + # border = ad_map.lane.getLateralAlignmentEdge(enu_border, ad_physics.ParametricValue(1)) + # lane_length_ = 0 + # prev_p = None + # for p in border: + # if prev_p: + # lane_length_ += sqrt(pow(prev_p.x - p.x, 2) + pow(prev_p.y - p.y, 2) + pow(prev_p.z - p.z, 2)) + # prev_p = p + + # print("Permutation: {}*{} - R Length: {} - L Length: {}".format( + # is_positive_lane, lane_id>0, lane_length, lane_length_)) + + return lane_length + +def get_opendrive_ids(ad_lane_id): + """Given an AD map lane ID, returns the corresponding opendrive road and lane ids""" + num_lane_id = float(str(ad_lane_id)) + road_id = floor(num_lane_id / 10000) + + remnant = num_lane_id - road_id * 10000 + lane_segment_id = floor(remnant / 100) + + remnant = remnant - lane_segment_id * 100 + lane_id = floor(remnant - 50) + return road_id, lane_id, lane_segment_id + +def _get_lane_para_points(lane_interval, distance=1): """ - Separates a given lane interval into smaller intervals of length equal to 'distance' + Samples a given lane interval in points every 'distance' meters, + returning a list of the values needed to achieve that. """ start = float(lane_interval.start) end = float(lane_interval.end) - length = float(ad.map.lane.calcLength(lane_interval.laneId)) + length = float(ad_map.lane.calcLength(lane_interval.laneId)) if start == end: return [] - return np.arange(start, end, np.sign(end - start) * distance / length) + values = np.arange(start, end, np.sign(end - start) * distance / length) + para_points = [] + for value in values: + para_point = ad_map.point.createParaPoint(lane_interval.laneId, ad_physics.ParametricValue(value)) + para_points.append(para_point) + + return para_points + + +""" +laneInterval:LaneInterval(laneId:10310148, start:0.218181, end:1, wrongWay:0)) +""" \ No newline at end of file