added new version of basic_agent and agent base class

This commit is contained in:
German Ros 2018-12-20 16:36:58 -08:00 committed by nsubiron
parent 466058b2ec
commit 09a74db1fb
5 changed files with 135 additions and 120 deletions

View File

@ -0,0 +1,54 @@
#!/usr/bin/env python
# Copyright (c) 2018 Intel Labs.
# authors: German Ros (german.ros@intel.com)
#
# This work is licensed under the terms of the MIT license.
# For a copy, see <https://opensource.org/licenses/MIT>.
""" This module implements an agent that roams around a track following random waypoints and avoiding other vehicles.
The agent also responds to traffic lights. """
from enum import Enum
import carla
class AgentState(Enum):
"""
AGENT_STATE represents the possible states of a roaming agent
"""
NAVIGATING = 1
BLOCKED_BY_VEHICLE = 2
BLOCKED_RED_LIGHT = 3
class Agent(object):
"""
Base class to define agents in CARLA
"""
def __init__(self, vehicle):
"""
:param vehicle: actor to apply to local planner logic onto
"""
self._vehicle = vehicle
self._world = self._vehicle.get_world()
self._map = self._vehicle.get_world().get_map()
def run_step(self, debug=False):
"""
Execute one step of navigation.
:return: control
"""
control = carla.VehicleControl()
control.steer = 0.0
control.throttle = 0.0
control.brake = 0.0
control.hand_brake = False
control.manual_gear_shift = False
return control

View File

@ -9,27 +9,15 @@
""" This module implements an agent that roams around a track following random waypoints and avoiding other vehicles.
The agent also responds to traffic lights. """
from enum import Enum
import carla
from agents.navigation.local_planner import LocalPlanner
from agents.navigation.global_route_planner import GlobalRoutePlanner
from agents.navigation.global_route_planner import NavEnum
from agents.navigation.global_route_planner_dao import GlobalRoutePlannerDAO
from agents.navigation.agent import *
from agents.navigation.local_planner import LocalPlanner, compute_connection, RoadOption
from agents.tools.misc import is_within_distance_ahead, compute_magnitude_angle
class AgentState(Enum):
"""
AGENT_STATE represents the possible states of a roaming agent
"""
NAVIGATING = 1
BLOCKED_BY_VEHICLE = 2
BLOCKED_RED_LIGHT = 3
class BasicAgent(object):
class BasicAgent(Agent):
"""
RoamingAgent implements a basic agent that navigates scenes making random choices when facing an intersection.
BasicAgent implements a basic agent that navigates scenes to reach a given target destination.
This agent respects traffic lights and other vehicles.
"""
@ -38,29 +26,59 @@ class BasicAgent(object):
:param vehicle: actor to apply to local planner logic onto
"""
self._vehicle = vehicle
self._world = self._vehicle.get_world()
self._map = self._vehicle.get_world().get_map()
super(BasicAgent, self).__init__(vehicle)
self._proximity_threshold = 10.0 # meters
self._state = AgentState.NAVIGATING
self._local_planner = LocalPlanner(self._vehicle, plan=None)
self._local_planner = LocalPlanner(self._vehicle)
# setting up global router
self._current_plan = None
self._integ_dao = GlobalRoutePlannerDAO(self._map)
self._integ_grp = GlobalRoutePlanner(self._integ_dao)
self._integ_grp.setup()
def set_destination(self, location):
vehicle_location = self._vehicle.get_location()
self._current_plan = self._integ_grp.plan_route((vehicle_location.x, vehicle_location.y), location)
self._local_planner.set_global_plan(self._current_plan)
start_waypoint = self._map.get_waypoint(self._vehicle.get_location())
end_waypoint = self._map.get_waypoint(carla.Location(location[0], location[1], location[2]))
current_waypoint = start_waypoint
active_list = [ [(current_waypoint, RoadOption.LANEFOLLOW)] ]
solution = []
while not solution:
for _ in range(len(active_list)):
trajectory = active_list.pop()
if len(trajectory) > 1000:
continue
# expand this trajectory
current_waypoint, _ = trajectory[-1]
next_waypoints = current_waypoint.next(5.0)
while len(next_waypoints) == 1:
next_option = compute_connection(current_waypoint, next_waypoints[0])
current_distance = next_waypoints[0].transform.location.distance(end_waypoint.transform.location)
if current_distance < 5.0:
solution = trajectory + [(end_waypoint, RoadOption.LANEFOLLOW)]
break
# keep adding nodes
trajectory.append((next_waypoints[0], next_option))
current_waypoint, _ = trajectory[-1]
next_waypoints = current_waypoint.next(5.0)
if not solution:
# multiple choices
for waypoint in next_waypoints:
next_option = compute_connection(current_waypoint, waypoint)
active_list.append(trajectory + [(waypoint, next_option)])
assert solution
self._current_plan = solution
self._local_planner.set_global_plan(self._current_plan)
def run_step(self, debug=False):
"""
Execute one step of navigation.
:return:
:return: carla.VehicleControl
"""
# is there an obstacle in front of us?

View File

@ -12,6 +12,7 @@ from enum import Enum
from collections import deque
import random
import carla
from agents.navigation.controller import VehiclePIDController
from agents.navigation.global_route_planner import NavEnum
from agents.tools.misc import distance_vehicle, draw_waypoints
@ -40,7 +41,7 @@ class LocalPlanner(object):
# total distance)
MIN_DISTANCE_PERCENTAGE = 0.9
def __init__(self, vehicle, opt_dict={}, plan=None):
def __init__(self, vehicle, opt_dict={}):
"""
:param vehicle: actor to apply to local planner logic onto
:param opt_dict: dictionary of arguments with the following semantics:
@ -74,13 +75,13 @@ class LocalPlanner(object):
self._waypoints_queue = deque(maxlen=200)
#
self.init_controller(opt_dict, plan)
self.init_controller(opt_dict)
def __del__(self):
self._vehicle.destroy()
print("Destroying ego-vehicle!")
def init_controller(self, opt_dict, plan):
def init_controller(self, opt_dict):
"""
Controller initialization.
@ -122,20 +123,14 @@ class LocalPlanner(object):
args_lateral=args_lateral_dict,
args_longitudinal=args_longitudinal_dict)
self._global_plan = []
self._global_plan = False
# compute initial waypoints
self._waypoints_queue.append(
(self._current_waypoint.next(
self._sampling_radius)[0],
RoadOption.LANEFOLLOW))
self._waypoints_queue.append( (self._current_waypoint.next(self._sampling_radius)[0], RoadOption.LANEFOLLOW))
self._target_road_option = RoadOption.LANEFOLLOW
# fill waypoint trajectory queue
if plan is None:
self._compute_next_waypoints(k=200)
else:
self._compute_next_waypoints_global_plan(k=200)
self._compute_next_waypoints(k=200)
def set_speed(self, speed):
"""
@ -154,8 +149,7 @@ class LocalPlanner(object):
:return:
"""
# check we do not overflow the queue
available_entries = self._waypoints_queue.maxlen - \
len(self._waypoints_queue)
available_entries = self._waypoints_queue.maxlen - len(self._waypoints_queue)
k = min(available_entries, k)
for _ in range(k):
@ -177,67 +171,12 @@ class LocalPlanner(object):
self._waypoints_queue.append((next_waypoint, road_option))
def _compute_next_waypoints_global_plan(self, k=1):
"""
Add new waypoints to the trajectory queue.
:param k: how many waypoints to compute
:return:
"""
# check we do not overflow the queue
print(self._global_plan)
available_entries = self._waypoints_queue.maxlen - len(self._waypoints_queue)
k = min(available_entries, k)
for i in range(k):
last_waypoint = self._waypoints_queue[-1][0]
next_waypoints = list(last_waypoint.next(self._sampling_radius))
print(i)
if len(next_waypoints) == 1:
# only one option available ==> lanefollowing
next_waypoint = next_waypoints[0]
road_option = RoadOption.LANEFOLLOW
else:
# random choice between the possible options
road_options_list = retrieve_options(next_waypoints, last_waypoint)
planned_option = self._global_plan.pop(0)
print("--- PLanned option = {}".format(planned_option))
print("+++ List options = {}".format(road_options_list))
if planned_option == NavEnum.FOLLOW_LANE:
planned_option = RoadOption.LANEFOLLOW
elif planned_option == NavEnum.LEFT:
planned_option = RoadOption.LEFT
elif planned_option == NavEnum.RIGHT:
planned_option = RoadOption.RIGHT
elif planned_option == NavEnum.GO_STRAIGHT:
planned_option = RoadOption.STRAIGHT
else:
planned_option = RoadOption.VOID
if planned_option in road_options_list:
next_waypoint = next_waypoints[road_options_list.index(planned_option)]
else:
print('Option not found!!')
import pdb; pdb.set_trace()
road_option = planned_option
self._waypoints_queue.append((next_waypoint, road_option))
def set_global_plan(self, current_plan):
self._waypoints_queue.clear()
self._global_plan = current_plan
self._global_plan.pop(0)
# compute initial waypoints
self._waypoints_queue.append((self._current_waypoint.next(self._sampling_radius)[0], RoadOption.LANEFOLLOW))
for elem in current_plan:
self._waypoints_queue.append(elem)
self._target_road_option = RoadOption.LANEFOLLOW
self._compute_next_waypoints_global_plan(k=100)
self._global_plan = True
def run_step(self, debug=True):
"""
@ -252,8 +191,16 @@ class LocalPlanner(object):
if len(self._waypoints_queue) < int(self._waypoints_queue.maxlen * 0.5):
if not self._global_plan:
self._compute_next_waypoints(k=100)
else:
self._compute_next_waypoints_global_plan(k=100)
if len(self._waypoints_queue) == 0:
control = carla.VehicleControl()
control.steer = 0.0
control.throttle = 0.0
control.brake = 0.0
control.hand_brake = False
control.manual_gear_shift = False
return control
# current vehicle waypoint
self._current_waypoint = self._map.get_waypoint(self._vehicle.get_location())

View File

@ -12,21 +12,12 @@ The agent also responds to traffic lights. """
from enum import Enum
import carla
from agents.navigation.agent import *
from agents.navigation.local_planner import LocalPlanner
from agents.tools.misc import is_within_distance_ahead, compute_magnitude_angle
class AgentState(Enum):
"""
AGENT_STATE represents the possible states of a roaming agent
"""
NAVIGATING = 1
BLOCKED_BY_VEHICLE = 2
BLOCKED_RED_LIGHT = 3
class RoamingAgent(object):
class RoamingAgent(Agent):
"""
RoamingAgent implements a basic agent that navigates scenes making random choices when facing an intersection.
This agent respects traffic lights and other vehicles.
@ -37,9 +28,7 @@ class RoamingAgent(object):
:param vehicle: actor to apply to local planner logic onto
"""
self._vehicle = vehicle
self._world = self._vehicle.get_world()
self._map = self._vehicle.get_world().get_map()
super(RoamingAgent, self).__init__(vehicle)
self._proximity_threshold = 10.0 # meters
self._state = AgentState.NAVIGATING
self._local_planner = LocalPlanner(self._vehicle)
@ -47,7 +36,7 @@ class RoamingAgent(object):
def run_step(self, debug=False):
"""
Execute one step of navigation.
:return:
:return: carla.VehicleControl
"""
# is there an obstacle in front of us?

View File

@ -75,7 +75,7 @@ except IndexError:
import carla
from carla import ColorConverter as cc
#from agents.navigation.roaming_agent import *
from agents.navigation.roaming_agent import *
from agents.navigation.basic_agent import *
@ -616,8 +616,12 @@ def game_loop(args):
world = World(client.get_world(), hud)
controller = KeyboardControl(world, False)
agent = BasicAgent(world.vehicle)
agent.set_destination((0.8, 48.4))
if args.agent == "Roaming":
agent = RoamingAgent(world.vehicle)
else:
agent = BasicAgent(world.vehicle)
spawn_point = world.world.get_map().get_spawn_points()[0]
agent.set_destination((spawn_point.location.x, spawn_point.location.y, spawn_point.location.z))
clock = pygame.time.Clock()
while True:
@ -670,6 +674,9 @@ def main():
metavar='WIDTHxHEIGHT',
default='1280x720',
help='window resolution (default: 1280x720)')
argparser.add_argument("-a", "--agent", type=str, choices=["Roaming", "Basic"],
help="select which agent to run", default="Basic")
args = argparser.parse_args()
args.width, args.height = [int(x) for x in args.res.split('x')]