diff --git a/PythonAPI/carla/agents/navigation/basic_agent.py b/PythonAPI/carla/agents/navigation/basic_agent.py index d1d521b07..6da74334c 100644 --- a/PythonAPI/carla/agents/navigation/basic_agent.py +++ b/PythonAPI/carla/agents/navigation/basic_agent.py @@ -9,13 +9,18 @@ The agent also responds to traffic lights. """ import carla from agents.navigation.agent import Agent -from agents.navigation.local_planner import LocalPlanner +from agents.navigation.local_planner import LocalPlanner, RoadOption from agents.navigation.global_route_planner import GlobalRoutePlanner from agents.navigation.global_route_planner_dao import GlobalRoutePlannerDAO from agents.tools.misc import get_speed from enum import Enum +import os +import xml.etree.ElementTree as ET +from agents.tools.map_helper import get_shortest_route +import ad_map_access as ad + class AgentState(Enum): """ AGENT_STATE represents the possible states of a roaming agent @@ -54,6 +59,47 @@ class BasicAgent(Agent): } ) + self._initialize_map() + + def _initialize_map(self): + """Initialize the AD map library and, creates the file needed to do so.""" + lat_ref = 0.0 + lon_ref = 0.0 + + opendrive_contents = self._map.to_opendrive() + xodr_name = 'BasicAgentMap.xodr' + txt_name = 'BasicAgentMap.txt' + + # Save the opendrive data into a file + with open(xodr_name, 'w') as f: + f.write(opendrive_contents) + + # Get geo reference + xml_tree = ET.parse(xodr_name) + for geo_elem in xml_tree.find('header').find('geoReference').text.split(' '): + if geo_elem.startswith('+lat_0'): + lat_ref = float(geo_elem.split('=')[-1]) + elif geo_elem.startswith('+lon_0'): + lon_ref = float(geo_elem.split('=')[-1]) + + # Save the previous info + with open(txt_name, 'w') as f: + txt_content = "[ADMap]\n" \ + "map=" + xodr_name + "\n" \ + "[ENUReference]\n" \ + "default=" + str(lat_ref) + " " + str(lon_ref) + " 0.0" + f.write(txt_content) + + # Intialize the map and remove created files + initialized = ad.map.access.init(txt_name) + if not initialized: + raise ValueError("Couldn't initialize the map") + + for fname in [txt_name, xodr_name]: + if os.path.exists(fname): + os.remove(fname) + + def emergency_stop(self): """ Send an emergency stop command to the vehicle @@ -80,8 +126,9 @@ class BasicAgent(Agent): def set_destination(self, end_location, start_location=None): """ - This method creates a list of waypoints from agent's position to destination location - based on the route returned by the global router + This method creates a list of waypoints between a starting and ending location, + based on the route returned by the global router, and adds it to the local planner. + If no starting location is passed, the vehicle location is chosen. :param end_location: final location of the route :param start_location: starting location of the route @@ -95,13 +142,14 @@ class BasicAgent(Agent): start_waypoint = self._map.get_waypoint(self._vehicle.get_location()) end_waypoint = self._map.get_waypoint(end_location) - route_trace = self._trace_route(start_waypoint, end_waypoint) + route_trace = self.trace_route(start_waypoint, end_waypoint) self._local_planner.set_global_plan(route_trace, clean_queue=clean_queue) def set_global_plan(self, plan, stop_waypoint_creation=True, clean_queue=True): """ Adds a specific plan to the agent. + :param plan: list of [carla.Waypoint, RoadOption] representing the route to be followed :param stop_waypoint_creation: stops the automatic creation of waypoints :param clean_queue: resets the current agent's plan """ @@ -111,29 +159,54 @@ class BasicAgent(Agent): clean_queue=clean_queue ) - - def _trace_route(self, start_waypoint, end_waypoint): + # AD Map version + def trace_route(self, start_waypoint, end_waypoint): """ - This method sets up a global router and returns the optimal route - from start_waypoint to end_waypoint + This method sets up a global router and returns the + optimal route from start_waypoint to end_waypoint. :param start_waypoint: initial position :param end_waypoint: final position """ - # Setting up global router - if self._grp is None: - dao = GlobalRoutePlannerDAO(self._vehicle.get_world().get_map(), self._sampling_resolution) - grp = GlobalRoutePlanner(dao) - grp.setup() - self._grp = grp - - # Obtain route plan - route = self._grp.trace_route( + route = get_shortest_route( start_waypoint.transform.location, - end_waypoint.transform.location) + end_waypoint.transform.location, + self._map, + sample_resolution=self._sampling_resolution, + distance=1, + world=self._vehicle.get_world() + ) - return route + route_with_options = [] + for waypoint in route: + route_with_options.append([waypoint, RoadOption.LANEFOLLOW]) + + return route_with_options + + # global route planner version + # def trace_route(self, start_waypoint, end_waypoint): + # """ + # This method sets up a global router and returns the optimal route + # from start_waypoint to end_waypoint + + # :param start_waypoint: initial position + # :param end_waypoint: final position + # """ + + # # Setting up global router + # if self._grp is None: + # dao = GlobalRoutePlannerDAO(self._vehicle.get_world().get_map(), self._sampling_resolution) + # grp = GlobalRoutePlanner(dao) + # grp.setup() + # self._grp = grp + + # # Obtain route plan + # route = self._grp.trace_route( + # start_waypoint.transform.location, + # end_waypoint.transform.location) + + # return route def run_step(self, debug=False): """ diff --git a/PythonAPI/carla/agents/navigation/local_planner.py b/PythonAPI/carla/agents/navigation/local_planner.py index f92adc072..66d9a0deb 100644 --- a/PythonAPI/carla/agents/navigation/local_planner.py +++ b/PythonAPI/carla/agents/navigation/local_planner.py @@ -188,8 +188,8 @@ class LocalPlanner(object): def set_global_plan(self, current_plan, stop_waypoint_creation=True, clean_queue=True): """ - Adds a new plan to the local planner. - If 'clean_queue`, erases the previous plan, and if note, it is added to the old one + Adds a new plan to the local planner. A plan must be a list of [carla.Waypoint, RoadOption] pairs + If 'clean_queue`, erases the previous plan, and if not, it is added to the old one The 'stop_waypoint_creation' flag avoids creating more random waypoints :param current_plan: list of (carla.Waypoint, RoadOption) @@ -200,6 +200,14 @@ class LocalPlanner(object): if clean_queue: self._waypoints_queue.clear() + # Remake the waypoints queue if the new plan has a higher length than the queue + new_plan_length = len(current_plan) + len(self._waypoints_queue) + if new_plan_length > self._waypoints_queue.maxlen: + new_waypoint_queue = deque(maxlen=new_plan_length) + for wp in self._waypoints_queue: + new_waypoint_queue.append(wp) + self._waypoints_queue = new_waypoint_queue + for elem in current_plan: self._waypoints_queue.append(elem) diff --git a/PythonAPI/carla/agents/tools/map_helper.py b/PythonAPI/carla/agents/tools/map_helper.py new file mode 100644 index 000000000..c690f4bb2 --- /dev/null +++ b/PythonAPI/carla/agents/tools/map_helper.py @@ -0,0 +1,203 @@ +# Copyright (c) # Copyright (c) 2018-2021 CVC. +# +# This work is licensed under the terms of the MIT license. +# For a copy, see . + +""" +This file has several useful functions related to the AD Map library +""" + +from __future__ import print_function + +import carla +import numpy as np +import ad_map_access as ad + +def carla_loc_to_enu(carla_location): + """Transform a CARLA location into an ENU point""" + return ad.map.point.createENUPoint(carla_location.x, -carla_location.y, carla_location.z) + +def enu_to_carla_loc(enu_point): + """Transform an ENU point into a CARLA location""" + return carla.Location(float(enu_point.x), float(-enu_point.y), float(enu_point.z)) + +def para_point_to_carla_waypoint(para_point, town_map, lane_type=carla.LaneType.Driving): + """Transform a para point into a CARLA waypoint""" + enu_point = ad.map.lane.getENULanePoint(para_point) + carla_point = enu_to_carla_loc(enu_point) + carla_waypoint = town_map.get_waypoint(carla_point, lane_type=lane_type) + return carla_waypoint + +def is_point_at_driving_lane(para_point, town_map): + """Checks if a parapoint is part of a CARLA driving lane""" + carla_waypoint = para_point_to_carla_waypoint(para_point, town_map, carla.LaneType.Any) + return carla_waypoint.lane_type == carla.LaneType.Driving + +def get_shortest_route(start_location, end_location, town_map, sample_resolution=1, distance=1, probability=0, world=None): + """ + Gets the shortest route between a starting and end location. + """ + route = [] + + ad_distance = ad.physics.Distance(distance) + ad_probability = ad.physics.Probability(probability) + ad_map_matching = ad.map.match.AdMapMatching() + route_segment = None + + start_matches = ad_map_matching.getMapMatchedPositions( + carla_loc_to_enu(start_location), ad_distance, ad_probability) + end_matches = ad_map_matching.getMapMatchedPositions( + carla_loc_to_enu(end_location), ad_distance, ad_probability) + + if not start_matches: + print("WARNING: Couldn't find a paraPoint for location '{}'.".format(start_location)) + return route + if not end_matches: + print("WARNING: Couldn't find a paraPoint for location '{}'.".format(end_location)) + return route + + # Filter the paraPoints that aren't part of driving lanes + start_matches = [match for match in start_matches if is_point_at_driving_lane(match.lanePoint.paraPoint, town_map)] + end_matches = [match for match in end_matches if is_point_at_driving_lane(match.lanePoint.paraPoint, town_map)] + + # Get the shortest route + min_length = float('inf') + for start_match in start_matches: + start_point = start_match.lanePoint.paraPoint + for end_match in end_matches: + # Get the route + new_route_segment = ad.map.route.planRoute(start_point, end_match.lanePoint.paraPoint) + if len(new_route_segment.roadSegments) == 0: + continue # The route doesn't exist, ignore it + + # Calculate route length, as a sum of the mean of the road's lanes length + length = 0 + for road_segment in new_route_segment.roadSegments: + road_length = 0 + number_lanes = 0 + for lane_segment in road_segment.drivableLaneSegments: + seg_start = float(lane_segment.laneInterval.start) + seg_end = float(lane_segment.laneInterval.end) + seg_length = float(ad.map.lane.calcLength(lane_segment.laneInterval.laneId)) + + road_length += seg_length * abs(seg_end - seg_start) + number_lanes += 1 + + if number_lanes != 0: + length += road_length / number_lanes + + # Save the shortest route + if length < min_length: + min_length = length + route_segment = new_route_segment + start_lane_id = start_point.laneId + + if not route_segment: + print("WARNING: Couldn't find a viable route between locations " + "'{}' and '{}'.".format(start_location, end_location)) + return route + + # print(route_segment) + + # Transform the AD map route representation into waypoints (If not needed to parse the altitude) + for segment in get_route_lane_list(route_segment, start_lane_id): + lane_id = segment.laneInterval.laneId + param_list = get_lane_interval_list(segment.laneInterval, sample_resolution) + for i in range(len(param_list)): + para_point = ad.map.point.createParaPoint(lane_id, ad.physics.ParametricValue(param_list[i])) + carla_waypoint = para_point_to_carla_waypoint(para_point, town_map) + route.append(carla_waypoint) + if i == 0: + world.debug.draw_point(carla_waypoint.transform.location, size=0.2, life_time=10000, color=carla.Color(255,0,0)) + world.debug.draw_point(carla_waypoint.transform.location, size=0.1, life_time=10000, color=carla.Color(255,255,0)) + + return route + +def get_lane_altitude_list(start_z, end_z, length): + """ + Gets the z values of a lane. This is a simple linear interpolation + and it won't be necessary whenever the AD map parses the altitude + """ + if length == 0: + return [] + if start_z == end_z: + return start_z*np.ones(length) + return np.arange(start_z, end_z, (end_z - start_z) / length) + +def get_route_lane_list(route, start_lane_id, prev_lane_id=None, prev_successors=[]): + """ + Given a route returns the lane segments corresponding to the route. + 'start_lane_id' is the lane id of the first point in the route. + In case this function is called more than once, use 'prev_lane_id' + and 'prev_successors' to keep computing the lanes without any information loss. + """ + segments = [] + + for road_segment in route.roadSegments: + current_segment = None + for lane_segment in road_segment.drivableLaneSegments: + successors = lane_segment.successors + predecessors = lane_segment.predecessors + lane_id = lane_segment.laneInterval.laneId + + if not prev_lane_id: + # First lane, match the lane with the starting lane id + if lane_segment.laneInterval.laneId != start_lane_id: + continue # match the lane to the starting point + + if prev_lane_id and prev_lane_id not in predecessors: + continue + + if prev_successors and lane_id not in prev_successors: + continue + + current_segment = lane_segment + break + + # Basically, the next lane won't be chosen if the current one has no successors, as it means + # that the next lane diverges from the route. Watch out with false empty successors though + + if not current_segment: + # print(len(prev_successors)) + # print("The previous lane diverged from the route, lane changing") + # TODO: This shouldn't really be random + current_segment = np.random.choice(road_segment.drivableLaneSegments) + # else: + # print(len(prev_successors)) + # print("Found a connected route lane") + + segments.append(current_segment) + prev_lane_id = current_segment.laneInterval.laneId + prev_successors = current_segment.successors + + return segments + +def to_ad_paraPoint(location, distance=1, probability=0): + """ + Transforms a carla.Location into an ad.map.point.ParaPoint() + """ + ad_distance = ad.physics.Distance(distance) + ad_probability = ad.physics.Probability(probability) + ad_map_matching = ad.map.match.AdMapMatching() + ad_location = carla_loc_to_enu(location) + + match_results = ad_map_matching.getMapMatchedPositions(ad_location, ad_distance, ad_probability) + + if not match_results: + print("WARNING: Couldn't find a para point for CARLA location {}".format(location)) + return None + + # Filter the closest one to the given location + distance = [float(mmap.matchedPointDistance) for mmap in match_results] + return match_results[distance.index(min(distance))].lanePoint.paraPoint + +def get_lane_interval_list(lane_interval, distance=1): + """ + Separates a given lane interval into smaller intervals of length equal to 'distance' + """ + start = float(lane_interval.start) + end = float(lane_interval.end) + length = float(ad.map.lane.calcLength(lane_interval.laneId)) + if start == end: + return [] + return np.arange(start, end, np.sign(end - start) * distance / length) \ No newline at end of file