From 1eada5f5e23a878b33db885f72eda2995a68eaab Mon Sep 17 00:00:00 2001 From: German Ros Date: Thu, 20 Dec 2018 13:06:02 -0800 Subject: [PATCH] added basic_agent class --- PythonAPI/agents/navigation/basic_agent.py | 228 +++++++++++++++++++ PythonAPI/agents/navigation/local_planner.py | 55 ++++- 2 files changed, 281 insertions(+), 2 deletions(-) create mode 100644 PythonAPI/agents/navigation/basic_agent.py diff --git a/PythonAPI/agents/navigation/basic_agent.py b/PythonAPI/agents/navigation/basic_agent.py new file mode 100644 index 000000000..70740473c --- /dev/null +++ b/PythonAPI/agents/navigation/basic_agent.py @@ -0,0 +1,228 @@ +#!/usr/bin/env python + +# Copyright (c) 2018 Intel Labs. +# authors: German Ros (german.ros@intel.com) +# +# This work is licensed under the terms of the MIT license. +# For a copy, see . + +""" This module implements an agent that roams around a track following random waypoints and avoiding other vehicles. +The agent also responds to traffic lights. """ + +from enum import Enum + +import carla +from agents.navigation.local_planner import LocalPlanner +from agents.navigation.global_route_planner import GlobalRoutePlanner +from agents.navigation.global_route_planner import NavEnum +from agents.navigation.global_route_planner_dao import GlobalRoutePlannerDAO +from agents.tools.misc import is_within_distance_ahead, compute_magnitude_angle + +class AgentState(Enum): + """ + AGENT_STATE represents the possible states of a roaming agent + """ + NAVIGATING = 1 + BLOCKED_BY_VEHICLE = 2 + BLOCKED_RED_LIGHT = 3 + + +class BasicAgent(object): + """ + RoamingAgent implements a basic agent that navigates scenes making random choices when facing an intersection. + This agent respects traffic lights and other vehicles. + """ + + def __init__(self, vehicle): + """ + + :param vehicle: actor to apply to local planner logic onto + """ + self._vehicle = vehicle + self._world = self._vehicle.get_world() + self._map = self._vehicle.get_world().get_map() + self._proximity_threshold = 10.0 # meters + self._state = AgentState.NAVIGATING + self._local_planner = LocalPlanner(self._vehicle) + + # setting up global router + self._current_plan = None + self._integ_dao = GlobalRoutePlannerDAO(self._map) + self._integ_grp = GlobalRoutePlanner(self._integ_dao) + self._integ_grp.setup() + + def set_destination(self, location): + vehicle_location = self._vehicle.get_location() + self._current_plan = self._integ_grp.plan_route((vehicle_location.x, vehicle_location.y), location) + self._local_planner.set_global_plan(self._current_plan) + + + def run_step(self, debug=False): + """ + Execute one step of navigation. + :return: + """ + + # is there an obstacle in front of us? + hazard_detected = False + + # retrieve relevant elements for safe navigation, i.e.: traffic lights + # and other vehicles + actor_list = self._world.get_actors() + vehicle_list = actor_list.filter("*vehicle*") + lights_list = actor_list.filter("*traffic_light*") + + # check possible obstacles + vehicle_state, vehicle = self._is_vehicle_hazard(vehicle_list) + if vehicle_state: + if debug: + print('!!! VEHICLE BLOCKING AHEAD [{}])'.format(vehicle.id)) + + self._state = AgentState.BLOCKED_BY_VEHICLE + hazard_detected = True + + # check for the state of the traffic lights + light_state, traffic_light = self._is_light_red(lights_list) + if light_state: + if debug: + print('=== RED LIGHT AHEAD [{}])'.format(traffic_light.id)) + + self._state = AgentState.BLOCKED_RED_LIGHT + hazard_detected = True + + if hazard_detected: + control = self.emergency_stop() + else: + self._state = AgentState.NAVIGATING + # standard local planner behavior + control = self._local_planner.run_step() + + return control + + def _is_light_red(self, lights_list): + """ + Method to check if there is a red light affecting us. This version of the method is compatible + with both European and US style traffic lights. + + :param lights_list: list containing TrafficLight objects + :return: a tuple given by (bool_flag, traffic_light), where + - bool_flag is True if there is a traffic light in RED affecting us and False otherwise + - traffic_light is the object itself or None if there is no red traffic light affecting us + """ + if self._world.map_name == 'Town01' or self._world.map_name == 'Town02': + return self._is_light_red_europe_style(lights_list) + else: + return self._is_light_red_us_style(lights_list) + + def _is_light_red_europe_style(self, lights_list): + """ + This method is specialized to check European style traffic lights. + + :param lights_list: list containing TrafficLight objects + :return: a tuple given by (bool_flag, traffic_light), where + - bool_flag is True if there is a traffic light in RED affecting us and False otherwise + - traffic_light is the object itself or None if there is no red traffic light affecting us + """ + ego_vehicle_location = self._vehicle.get_location() + ego_vehicle_waypoint = self._map.get_waypoint(ego_vehicle_location) + + for traffic_light in lights_list: + object_waypoint = self._map.get_waypoint(traffic_light.get_location()) + if object_waypoint.road_id != ego_vehicle_waypoint.road_id or object_waypoint.lane_id != ego_vehicle_waypoint.lane_id: + continue + + loc = traffic_light.get_location() + if is_within_distance_ahead(loc, ego_vehicle_location, self._vehicle.get_transform().rotation.yaw, + self._proximity_threshold): + if traffic_light.state == carla.libcarla.TrafficLightState.Red: + return (True, traffic_light) + + return (False, None) + + def _is_light_red_us_style(self, lights_list, debug=False): + """ + This method is specialized to check US style traffic lights. + + :param lights_list: list containing TrafficLight objects + :return: a tuple given by (bool_flag, traffic_light), where + - bool_flag is True if there is a traffic light in RED affecting us and False otherwise + - traffic_light is the object itself or None if there is no red traffic light affecting us + """ + ego_vehicle_location = self._vehicle.get_location() + ego_vehicle_waypoint = self._map.get_waypoint(ego_vehicle_location) + + if ego_vehicle_waypoint.is_intersection: + # It is too late. Do not block the intersection! Keep going! + return (False, None) + + if self._local_planner._target_waypoint is not None: + if self._local_planner._target_waypoint.is_intersection: + potential_lights = [] + min_angle = 180.0 + sel_magnitude = 0.0 + sel_traffic_light = None + for traffic_light in lights_list: + loc = traffic_light.get_location() + magnitude, angle = compute_magnitude_angle(loc, ego_vehicle_location, self._vehicle.get_transform().rotation.yaw) + if magnitude < 80.0 and angle < min(25.0, min_angle): + sel_magnitude = magnitude + sel_traffic_light = traffic_light + min_angle = angle + + if sel_traffic_light is not None: + if debug: + print('=== Magnitude = {} | Angle = {} | ID = {}'.format(sel_magnitude, min_angle, sel_traffic_light.id)) + if sel_traffic_light.state == carla.libcarla.TrafficLightState.Red: + return (True, sel_traffic_light) + + return (False, None) + + def _is_vehicle_hazard(self, vehicle_list): + """ + Check if a given vehicle is an obstacle in our way. To this end we take into account the road and lane + the target vehicle is on and run a geometry test to check if the target vehicle is under a certain distance + in front of our ego vehicle. + + WARNING: This method is an approximation that could fail for very large vehicles, which center is actually + on a different lane but their extension falls within the ego vehicle lane. + + :param vehicle_list: list of potential obstacle to check + :return: a tuple given by (bool_flag, vehicle), where + - bool_flag is True if there is a vehicle ahead blocking us and False otherwise + - vehicle is the blocker object itself + """ + + ego_vehicle_location = self._vehicle.get_location() + ego_vehicle_waypoint = self._map.get_waypoint(ego_vehicle_location) + + for target_vehicle in vehicle_list: + # do not account for the ego vehicle + if target_vehicle.id == self._vehicle.id: + continue + + # if the object is not in our lane it's not an obstacle + target_vehicle_waypoint = self._map.get_waypoint(target_vehicle.get_location()) + if target_vehicle_waypoint.road_id != ego_vehicle_waypoint.road_id or \ + target_vehicle_waypoint.lane_id != ego_vehicle_waypoint.lane_id: + continue + + loc = target_vehicle.get_location() + if is_within_distance_ahead(loc, ego_vehicle_location, self._vehicle.get_transform().rotation.yaw, + self._proximity_threshold): + return (True, target_vehicle) + + return (False, None) + + + def emergency_stop(self): + """ + Send an emergency stop command to the vehicle + :return: + """ + control = carla.VehicleControl() + control.steer = 0.0 + control.throttle = 0.0 + control.brake = 1.0 + control.hand_brake = False + + return control \ No newline at end of file diff --git a/PythonAPI/agents/navigation/local_planner.py b/PythonAPI/agents/navigation/local_planner.py index 732f50697..11225c39e 100644 --- a/PythonAPI/agents/navigation/local_planner.py +++ b/PythonAPI/agents/navigation/local_planner.py @@ -68,6 +68,7 @@ class LocalPlanner(object): self._next_waypoints = None self._target_waypoint = None self._vehicle_controller = None + self._global_plan = None # queue with tuples of (waypoint, RoadOption) self._waypoints_queue = deque(maxlen=200) @@ -120,6 +121,8 @@ class LocalPlanner(object): args_lateral=args_lateral_dict, args_longitudinal=args_longitudinal_dict) + self._global_plan = [] + # compute initial waypoints self._waypoints_queue.append( (self._current_waypoint.next( @@ -128,7 +131,7 @@ class LocalPlanner(object): self._target_road_option = RoadOption.LANEFOLLOW # fill waypoint trajectory queue - self._compute_next_waypoints(k=200) + self._compute_next_waypoints_global_plan(k=200) def set_speed(self, speed): """ @@ -169,6 +172,54 @@ class LocalPlanner(object): self._waypoints_queue.append((next_waypoint, road_option)) + + def _compute_next_waypoints_global_plan(self, k=1): + """ + Add new waypoints to the trajectory queue. + + :param k: how many waypoints to compute + :return: + """ + # check we do not overflow the queue + available_entries = self._waypoints_queue.maxlen - len(self._waypoints_queue) + k = min(available_entries, k) + + for _ in range(k): + last_waypoint = self._waypoints_queue[-1][0] + next_waypoints = list(last_waypoint.next(self._sampling_radius)) + + if len(next_waypoints) == 1: + # only one option available ==> lanefollowing + next_waypoint = next_waypoints[0] + road_option = RoadOption.LANEFOLLOW + else: + # random choice between the possible options + road_options_list = retrieve_options(next_waypoints, last_waypoint) + planned_option = self._global_plan.pop(0) + + if planned_option == NavEnum.FOLLOW_LANE: + planned_option = RoadOption.LANEFOLLOW + if planned_option == NavEnum.LEFT: + planned_option = RedOption.LEFT + if planned_option == NavEnum.RIGHT: + planned_option = ReadOption.RIGHT + if planned_option == NavEnum.GO_STRAIGHT: + planned_option = RoadOption.STRAIGHT + else: + planned_option = RoadOption.VOID + + if planned_option in road_options_list: + next_waypoint = next_waypoints[road_options_list.index(planned_option)] + else: + print('Option not found!!') + import pdb; pdb.set_trace() + + self._waypoints_queue.append((next_waypoint, road_option)) + + def set_global_plan(self, current_plan): + self._global_plan = current_plan + self._global_plan.pop(0) + def run_step(self, debug=True): """ Execute one step of local planning which involves running the longitudinal and lateral PID controllers to @@ -180,7 +231,7 @@ class LocalPlanner(object): # not enough waypoints in the horizon? => add more! if len(self._waypoints_queue) < int(self._waypoints_queue.maxlen * 0.5): - self._compute_next_waypoints(k=100) + self._compute_next_waypoints_global_plan(k=100) # current vehicle waypoint self._current_waypoint = self._map.get_waypoint(self._vehicle.get_location())