Merge branch 'master' into feature/new_opendrive_parser

This commit is contained in:
nsubiron 2019-03-29 21:33:57 +01:00
commit 73b08e4a37
7 changed files with 353 additions and 308 deletions

View File

@ -1,5 +1,8 @@
## Latest Changes
* Basic agent integrated with global router
* Fixed local planner to avoid premature route pruning at path overlaps
* Fixed global router behavior to be consistent with new Waypoint API
* LaneInvasionSensor stabilization
- Fix naming: Use 'LaneInvasionSensor'/'lane_invasion' instead of mixture with 'LaneDetector'/'lane_detector'
- Create server-side LaneInvasionSensor (to be able to access it via ROS bridge)

1
Jenkinsfile vendored
View File

@ -52,6 +52,7 @@ pipeline {
steps {
sh 'make package'
sh 'make export-maps ARGS="--map=/Game/Carla/Maps/Town06 --file=Town06"'
sh 'make export-maps ARGS="--map=/Game/Carla/Maps/Town07 --file=Town07"'
}
post {
always {

View File

@ -132,7 +132,7 @@ class Agent(object):
magnitude, angle = compute_magnitude_angle(loc,
ego_vehicle_location,
self._vehicle.get_transform().rotation.yaw)
if magnitude < 80.0 and angle < min(25.0, min_angle):
if magnitude < 60.0 and angle < min(25.0, min_angle):
sel_magnitude = magnitude
sel_traffic_light = traffic_light
min_angle = angle

View File

@ -10,18 +10,12 @@
waypoints and avoiding other vehicles.
The agent also responds to traffic lights. """
import math
import numpy as np
import carla
from agents.navigation.agent import Agent, AgentState
from agents.navigation.local_planner import LocalPlanner
from agents.navigation.local_planner import RoadOption
from agents.navigation.global_route_planner import GlobalRoutePlanner
from agents.navigation.global_route_planner_dao import GlobalRoutePlannerDAO
from agents.tools.misc import vector
class BasicAgent(Agent):
"""
@ -38,93 +32,54 @@ class BasicAgent(Agent):
self._proximity_threshold = 10.0 # meters
self._state = AgentState.NAVIGATING
self._local_planner = LocalPlanner(self._vehicle, opt_dict={'target_speed': target_speed})
args_lateral_dict = {
'K_P': 1,
'K_D': 0.02,
'K_I': 0,
'dt': 1.0/20.0}
self._local_planner = LocalPlanner(
self._vehicle, opt_dict={'target_speed' : target_speed,
'lateral_control_dict':args_lateral_dict})
self._hop_resolution = 2.0
# setting up global router
self._current_plan = None
self._path_seperation_hop = 2
self._path_seperation_threshold = 0.5
self._target_speed = target_speed
self._grp = None
def set_destination(self, location):
"""
This method creates a list of waypoints from agent's position to destination location
based on the route returned by the global router
"""
start_waypoint = self._map.get_waypoint(self._vehicle.get_location())
end_waypoint = self._map.get_waypoint(
carla.Location(location[0], location[1], location[2]))
solution = []
route_trace = self._trace_route(start_waypoint, end_waypoint)
assert route_trace
self._local_planner.set_global_plan(route_trace)
def _trace_route(self, start_waypoint, end_waypoint):
"""
This method sets up a global router and returns the optimal route
from start_waypoint to end_waypoint
"""
# Setting up global router
dao = GlobalRoutePlannerDAO(self._vehicle.get_world().get_map())
grp = GlobalRoutePlanner(dao)
grp.setup()
if self._grp is None:
dao = GlobalRoutePlannerDAO(self._vehicle.get_world().get_map(), self._hop_resolution)
grp = GlobalRoutePlanner(dao)
grp.setup()
self._grp = grp
# Obtain route plan
x1 = start_waypoint.transform.location.x
y1 = start_waypoint.transform.location.y
x2 = end_waypoint.transform.location.x
y2 = end_waypoint.transform.location.y
route = grp.plan_route((x1, y1), (x2, y2))
route = self._grp.trace_route(
start_waypoint.transform.location,
end_waypoint.transform.location)
current_waypoint = start_waypoint
route.append(RoadOption.VOID)
for action in route:
# Generate waypoints to next junction
wp_choice = current_waypoint.next(self._hop_resolution)
while len(wp_choice) == 1:
current_waypoint = wp_choice[0]
solution.append((current_waypoint, RoadOption.LANEFOLLOW))
wp_choice = current_waypoint.next(self._hop_resolution)
# Stop at destination
if current_waypoint.transform.location.distance(
end_waypoint.transform.location) < self._hop_resolution:
break
if action == RoadOption.VOID:
break
# Select appropriate path at the junction
if len(wp_choice) > 1:
# Current heading vector
current_transform = current_waypoint.transform
current_location = current_transform.location
projected_location = current_location + \
carla.Location(
x=math.cos(math.radians(current_transform.rotation.yaw)),
y=math.sin(math.radians(current_transform.rotation.yaw)))
v_current = vector(current_location, projected_location)
direction = 0
if action == RoadOption.LEFT:
direction = 1
elif action == RoadOption.RIGHT:
direction = -1
elif action == RoadOption.STRAIGHT:
direction = 0
select_criteria = float('inf')
# Choose correct path
for wp_select in wp_choice:
v_select = vector(
current_location, wp_select.transform.location)
cross = float('inf')
if direction == 0:
cross = abs(np.cross(v_current, v_select)[-1])
else:
cross = direction * np.cross(v_current, v_select)[-1]
if cross < select_criteria:
select_criteria = cross
current_waypoint = wp_select
# Generate all waypoints within the junction
# along selected path
solution.append((current_waypoint, action))
current_waypoint = current_waypoint.next(self._hop_resolution)[0]
while current_waypoint.is_intersection:
solution.append((current_waypoint, action))
current_waypoint = current_waypoint.next(self._hop_resolution)[0]
assert solution
self._current_plan = solution
self._local_planner.set_global_plan(self._current_plan)
return route
def run_step(self, debug=False):
"""

View File

@ -10,13 +10,15 @@ import math
import numpy as np
import networkx as nx
import carla
from agents.navigation.local_planner import RoadOption
from agents.tools.misc import vector
class GlobalRoutePlanner(object):
"""
This class provides a very high level route plan.
Instantiate the calss by passing a reference to
Instantiate the class by passing a reference to
A GlobalRoutePlannerDAO object.
"""
@ -28,122 +30,23 @@ class GlobalRoutePlanner(object):
self._topology = None
self._graph = None
self._id_map = None
self._road_id_to_edge = None
def setup(self):
"""
Perform initial server data lookup for detailed topology
Performs initial server data lookup for detailed topology
and builds graph representation of the world map.
"""
self._topology = self._dao.get_topology()
# Creating graph of the world map and also a maping from
# node co-ordinates to node id
self._graph, self._id_map = self.build_graph()
self._graph, self._id_map, self._road_id_to_edge = self._build_graph()
self._lane_change_link()
def plan_route(self, origin, destination):
"""
The following function generates the route plan based on
origin : tuple containing x, y of the route's start position
destination : tuple containing x, y of the route's end position
return : list of turn by turn navigation decisions as
agents.navigation.local_planner.RoadOption elements
Possible values (for now) are STRAIGHT, LEFT, RIGHT, LANEFOLLOW, VOID
"""
threshold = math.radians(4.0)
route = self.path_search(origin, destination)
plan = []
# Compare current edge and next edge to decide on action
for i in range(len(route) - 2):
current_edge = self._graph.edges[route[i], route[i + 1]]
next_edge = self._graph.edges[route[i + 1], route[i + 2]]
cv = current_edge['exit_vector']
nv = next_edge['net_vector']
cv, nv = cv + (0,), nv + (0,) # Making vectors 3D
num_edges = 0
cross_list = []
# Accumulating cross products of all other paths
for neighbor in self._graph.neighbors(route[i + 1]):
num_edges += 1
if neighbor != route[i + 2]:
select_edge = self._graph.edges[route[i + 1], neighbor]
sv = select_edge['net_vector']
cross_list.append(np.cross(cv, sv)[2])
# Calculating turn decision
if next_edge['intersection'] and num_edges > 1:
next_cross = np.cross(cv, nv)[2]
deviation = math.acos(np.dot(cv, nv) /
(np.linalg.norm(cv) * np.linalg.norm(nv)))
if deviation < threshold:
action = RoadOption.STRAIGHT
elif next_cross < min(cross_list):
action = RoadOption.LEFT
elif next_cross > max(cross_list):
action = RoadOption.RIGHT
plan.append(action)
return plan
def _distance_heuristic(self, n1, n2):
"""
Distance heuristic calculator for path searching
in self._graph
"""
(x1, y1) = self._graph.nodes[n1]['vertex']
(x2, y2) = self._graph.nodes[n2]['vertex']
return ((x1 - x2) ** 2 + (y1 - y2) ** 2) ** 0.5
def path_search(self, origin, destination):
"""
This function finds the shortest path connecting origin and destination
using A* search with distance heuristic.
origin : tuple containing x, y co-ordinates of start position
desitnation : tuple containing x, y co-ordinates of end position
return : path as list of node ids (as int) of the graph self._graph
connecting origin and destination
"""
xo, yo = origin
xd, yd = destination
start = self.localise(xo, yo)
end = self.localise(xd, yd)
route = nx.astar_path(
self._graph, source=self._id_map[start['entry']],
target=self._id_map[end['exit']],
heuristic=self._distance_heuristic,
weight='length')
return route
def localise(self, x, y):
"""
This function finds the road segment closest to (x, y)
x, y : co-ordinates of the point to be localized
return : pair of points, tuple of tuples containing co-ordinates
of points that represents the road segment closest to x, y
"""
distance = float('inf')
nearest = (distance, dict())
# Measuring distances from segment waypoints and (x, y)
for segment in self._topology:
entryxy = segment['entry']
exitxy = segment['exit']
path = segment['path']
for xp, yp in [entryxy] + path + [exitxy]:
new_distance = self.distance((xp, yp), (x, y))
if new_distance < nearest[0]:
nearest = (new_distance, segment)
segment = nearest[1]
return segment
def build_graph(self):
def _build_graph(self):
"""
This function builds a networkx graph representation of topology.
The topology is read from self._topology.
graph node properties:
vertex - (x,y) of node's position in world map
num_edges - Number of exit edges from the node
vertex - (x,y,z) position in world map
graph edge properties:
entry_vector - unit vector along tangent at entry point
exit_vector - unit vector along tangent at exit point
@ -151,72 +54,242 @@ class GlobalRoutePlanner(object):
intersection - boolean indicating if the edge belongs to an
intersection
return : graph -> networkx graph representing the world map,
id_map-> mapping from (x,y) to node id
id_map-> mapping from (x,y,z) to node id
road_id_to_edge-> map from road id to edge in the graph
"""
graph = nx.DiGraph()
# Map with structure {(x,y): id, ... }
id_map = dict()
id_map = dict() # Map with structure {(x,y,z): id, ... }
road_id_to_edge = dict() # Map with structure {road_id: {lane_id: edge, ... }, ... }
for segment in self._topology:
entryxy = segment['entry']
exitxy = segment['exit']
entry_xyz, exit_xyz = segment['entryxyz'], segment['exitxyz']
path = segment['path']
intersection = segment['intersection']
for vertex in entryxy, exitxy:
entry_wp, exit_wp = segment['entry'], segment['exit']
intersection = entry_wp.is_intersection
road_id, lane_id = entry_wp.road_id, entry_wp.lane_id
for vertex in entry_xyz, exit_xyz:
# Adding unique nodes and populating id_map
if vertex not in id_map:
new_id = len(id_map)
id_map[vertex] = new_id
graph.add_node(new_id, vertex=vertex)
n1 = id_map[entry_xyz]
n2 = id_map[exit_xyz]
if road_id not in road_id_to_edge:
road_id_to_edge[road_id] = dict()
road_id_to_edge[road_id][lane_id] = (n1, n2)
n1, n2 = id_map[entryxy], id_map[exitxy]
# Adding edge with attributes
graph.add_edge(
n1, n2,
length=len(path) + 1, path=path,
entry_vector=self.unit_vector(
entryxy, path[0] if len(path) > 0 else exitxy),
exit_vector=self.unit_vector(
path[-1] if len(path) > 0 else entryxy, exitxy),
net_vector=self.unit_vector(entryxy, exitxy),
intersection=intersection)
entry_waypoint=entry_wp, exit_waypoint=exit_wp,
entry_vector=vector(
entry_wp.transform.location,
path[0].transform.location if len(path) > 0 else exit_wp.transform.location),
exit_vector=vector(
path[-1].transform.location if len(path) > 0 else entry_wp.transform.location,
exit_wp.transform.location),
net_vector=vector(entry_wp.transform.location, exit_wp.transform.location),
intersection=intersection, type=RoadOption.LANEFOLLOW)
return graph, id_map
return graph, id_map, road_id_to_edge
def distance(self, point1, point2):
def _localize(self, location):
"""
returns the distance between point1 and point2
point1 : (x,y) of first point
point2 : (x,y) of second point
return : distance from point1 to point2
This function finds the road segment closest to given location
location : carla.Location to be localized in the graph
return : pair node ids representing an edge in the graph
"""
x1, y1 = point1
x2, y2 = point2
return math.sqrt((x2 - x1)**2 + (y2 - y1)**2)
waypoint = self._dao.get_waypoint(location)
return self._road_id_to_edge[waypoint.road_id][waypoint.lane_id]
def unit_vector(self, point1, point2):
def _lane_change_link(self):
"""
This function returns the unit vector from point1 to point2
point1 : (x,y) of first point
point2 : (x,y) of second point
return : tuple containing x and y components of unit vector
from point1 to point2
This method places zero cost links in the topology graph
representing availability of lane changes.
"""
x1, y1 = point1
x2, y2 = point2
vector = (x2 - x1, y2 - y1)
vector_mag = math.sqrt(vector[0]**2 + vector[1]**2)
vector = (vector[0] / vector_mag, vector[1] / vector_mag)
for segment in self._topology:
left_found, right_found = False, False
return vector
for waypoint in segment['path']:
if not segment['entry'].is_intersection:
next_waypoint, next_road_option, next_segment = None, None, None
def dot(self, vector1, vector2):
if bool(waypoint.lane_change & carla.LaneChange.Right) and not right_found:
next_waypoint = waypoint.get_right_lane()
if next_waypoint is not None and next_waypoint.lane_type == carla.LaneType.Driving and \
waypoint.road_id == next_waypoint.road_id:
next_road_option = RoadOption.CHANGELANERIGHT
try:
next_segment = self._localize(next_waypoint.transform.location)
except KeyError:
print("Failed to localize! : ", next_waypoint.road_id, next_waypoint.lane_id)
if next_segment is not None:
self._graph.add_edge(
self._id_map[segment['entryxyz']], next_segment[0], entry_waypoint=segment['entry'],
exit_waypoint=self._graph.edges[next_segment[0], next_segment[1]]['entry_waypoint'],
path=[], length=0, type=next_road_option, change_waypoint = waypoint)
right_found = True
if bool(waypoint.lane_change & carla.LaneChange.Left) and not left_found:
next_waypoint = waypoint.get_left_lane()
if next_waypoint is not None and next_waypoint.lane_type == carla.LaneType.Driving and \
waypoint.road_id == next_waypoint.road_id:
next_road_option = RoadOption.CHANGELANELEFT
try:
next_segment = self._localize(next_waypoint.transform.location)
except KeyError:
print("Failed to localize! : ", next_waypoint.road_id, next_waypoint.lane_id)
if next_segment is not None:
self._graph.add_edge(
self._id_map[segment['entryxyz']], next_segment[0], entry_waypoint=segment['entry'],
exit_waypoint=self._graph.edges[next_segment[0], next_segment[1]]['entry_waypoint'],
path=[], length=0, type=next_road_option, change_waypoint = waypoint)
left_found = True
if left_found and right_found:
break
def _distance_heuristic(self, n1, n2):
"""
This function returns the dot product of vector1 with vector2
vector1 : x, y components of first vector
vector2 : x, y components of second vector
return : dot porduct scalar between vector1 and vector2
Distance heuristic calculator for path searching
in self._graph
"""
return vector1[0] * vector2[0] + vector1[1] * vector2[1]
l1 = np.array(self._graph.nodes[n1]['vertex'])
l2 = np.array(self._graph.nodes[n2]['vertex'])
return np.linalg.norm(l1-l2)
def _path_search(self, origin, destination):
"""
This function finds the shortest path connecting origin and destination
using A* search with distance heuristic.
origin : carla.Location object of start position
destination : carla.Location object of of end position
return : path as list of node ids (as int) of the graph self._graph
connecting origin and destination
"""
start, end = self._localize(origin), self._localize(destination)
route = nx.astar_path(
self._graph, source=start[0], target=end[0],
heuristic=self._distance_heuristic, weight='length')
route.append(end[1])
return route
def _turn_decision(self, index, route, threshold=math.radians(5)):
"""
This method returns the turn decision (RoadOption) for pair of edges
around current index of route list
"""
decision = None
previous_node = route[index-1]
current_node = route[index]
next_node = route[index+1]
next_edge = self._graph.edges[current_node, next_node]
if index > 0:
current_edge = self._graph.edges[previous_node, current_node]
calculate_turn = current_edge['type'].value == RoadOption.LANEFOLLOW.value and \
not current_edge['intersection'] and \
next_edge['type'].value == RoadOption.LANEFOLLOW.value and \
next_edge['intersection']
if calculate_turn:
cv, nv = current_edge['exit_vector'], next_edge['net_vector']
cross_list = []
for neighbor in self._graph.successors(current_node):
select_edge = self._graph.edges[current_node, neighbor]
if select_edge['type'].value == RoadOption.LANEFOLLOW.value:
if neighbor != route[index+1]:
sv = select_edge['net_vector']
cross_list.append(np.cross(cv, sv)[2])
next_cross = np.cross(cv, nv)[2]
deviation = math.acos(np.dot(cv, nv) /\
(np.linalg.norm(cv)*np.linalg.norm(nv)))
if not cross_list:
cross_list.append(0)
if deviation < threshold:
decision = RoadOption.STRAIGHT
elif cross_list and next_cross < min(cross_list):
decision = RoadOption.LEFT
elif cross_list and next_cross > max(cross_list):
decision = RoadOption.RIGHT
else:
decision = next_edge['type']
else:
decision = next_edge['type']
return decision
def abstract_route_plan(self, origin, destination):
"""
The following function generates the route plan based on
origin : carla.Location object of the route's start position
destination : carla.Location object of the route's end position
return : list of turn by turn navigation decisions as
agents.navigation.local_planner.RoadOption elements
Possible values are STRAIGHT, LEFT, RIGHT, LANEFOLLOW, VOID
CHANGELANELEFT, CHANGELANERIGHT
"""
route = self._path_search(origin, destination)
plan = []
for i in range(len(route) - 1):
road_option = self._turn_decision(i, route)
plan.append(road_option)
return plan
def _find_closest_in_list(self, current_waypoint, waypoint_list):
min_distance = float('inf')
closest_index = -1
for i, waypoint in enumerate(waypoint_list):
distance = waypoint.transform.location.distance(
current_waypoint.transform.location)
if distance < min_distance:
min_distance = distance
closest_index = i
return closest_index
def trace_route(self, origin, destination):
"""
This method returns list of (carla.Waypoint, RoadOption) from origin to destination
"""
route_trace = []
route = self._path_search(origin, destination)
current_waypoint = self._dao.get_waypoint(origin)
resolution = self._dao.get_resolution()
for i in range(len(route) - 1):
road_option = self._turn_decision(i, route)
edge = self._graph.edges[route[i], route[i+1]]
path = []
if edge['type'].value != RoadOption.LANEFOLLOW.value and edge['type'].value != RoadOption.VOID.value:
n1, n2 = self._road_id_to_edge[edge['exit_waypoint'].road_id][edge['exit_waypoint'].lane_id]
next_edge = self._graph.edges[n1, n2]
if next_edge['path']:
closest_index = self._find_closest_in_list(current_waypoint, next_edge['path'])
closest_index = min(len(next_edge['path'])-1, closest_index+5)
current_waypoint = next_edge['path'][closest_index]
else:
current_waypoint = next_edge['exit_waypoint']
route_trace.append((current_waypoint, road_option))
else:
path = path + [edge['entry_waypoint']] + edge['path'] + [edge['exit_waypoint']]
closest_index = self._find_closest_in_list(current_waypoint, path)
for waypoint in path[closest_index:]:
current_waypoint = waypoint
route_trace.append((current_waypoint, road_option))
if len(route)-i <= 2 and waypoint.transform.location.distance(destination) < 2*resolution:
break
return route_trace

View File

@ -5,6 +5,8 @@
This module provides implementation for GlobalRoutePlannerDAO
"""
import numpy as np
class GlobalRoutePlannerDAO(object):
"""
@ -12,12 +14,13 @@ class GlobalRoutePlannerDAO(object):
from the carla server instance for GlobalRoutePlanner
"""
def __init__(self, wmap):
"""
def __init__(self, wmap, sampling_resolution=1):
"""get_topology
Constructor
wmap : carl world map object
"""
self._sampling_resolution = sampling_resolution
self._wmap = wmap
def get_topology(self):
@ -28,34 +31,43 @@ class GlobalRoutePlannerDAO(object):
topology into a list of dictionary objects.
return: list of dictionary objects with the following attributes
entry - (x,y) of entry point of road segment
exit - (x,y) of exit point of road segment
entry - waypoint of entry point of road segment
entryxyz- (x,y,z) of entry point of road segment
exit - waypoint of exit point of road segment
exitxyz - (x,y,z) of exit point of road segment
path - list of waypoints separated by 1m from entry
to exit
intersection - Boolean indicating if the road segment
is an intersection
"""
topology = []
# Retrieving waypoints to construct a detailed topology
for segment in self._wmap.get_topology():
x1 = segment[0].transform.location.x
y1 = segment[0].transform.location.y
x2 = segment[1].transform.location.x
y2 = segment[1].transform.location.y
wp1, wp2 = segment[0], segment[1]
l1, l2 = wp1.transform.location, wp2.transform.location
# Rounding off to avoid floating point imprecision
x1, y1, z1, x2, y2, z2 = np.round([l1.x, l1.y, l1.z, l2.x, l2.y, l2.z], 0)
wp1.transform.location, wp2.transform.location = l1, l2
seg_dict = dict()
seg_dict['entry'] = (x1, y1)
seg_dict['exit'] = (x2, y2)
seg_dict['entry'], seg_dict['exit'] = wp1, wp2
seg_dict['entryxyz'], seg_dict['exitxyz'] = (x1, y1, z1), (x2, y2, z2)
seg_dict['path'] = []
wp1 = segment[0]
wp2 = segment[1]
seg_dict['intersection'] = True if wp1.is_intersection else False
endloc = wp2.transform.location
w = wp1.next(1)[0]
while w.transform.location.distance(endloc) > 1:
x = w.transform.location.x
y = w.transform.location.y
seg_dict['path'].append((x, y))
w = w.next(1)[0]
if wp1.transform.location.distance(endloc) > self._sampling_resolution:
w = wp1.next(self._sampling_resolution)[0]
while w.transform.location.distance(endloc) > self._sampling_resolution:
seg_dict['path'].append(w)
w = w.next(self._sampling_resolution)[0]
else:
seg_dict['path'].append(wp1.next(self._sampling_resolution/2.0)[0])
topology.append(seg_dict)
return topology
def get_waypoint(self, location):
"""
The method returns waypoint at given location
"""
waypoint = self._wmap.get_waypoint(location)
return waypoint
def get_resolution(self):
""" Accessor for self._sampling_resolution """
return self._sampling_resolution

View File

@ -26,6 +26,8 @@ class RoadOption(Enum):
RIGHT = 2
STRAIGHT = 3
LANEFOLLOW = 4
CHANGELANELEFT = 5
CHANGELANERIGHT = 6
class LocalPlanner(object):
@ -61,23 +63,23 @@ class LocalPlanner(object):
self._vehicle = vehicle
self._map = self._vehicle.get_world().get_map()
self._buffer_size = 5
self.dt = None
self.target_speed = None
self.sampling_radius = None
self.min_distance = None
self.current_waypoint = None
self.target_road_option = None
self.next_waypoints = None
self._dt = None
self._target_speed = None
self._sampling_radius = None
self._min_distance = None
self._current_waypoint = None
self._target_road_option = None
self._next_waypoints = None
self.target_waypoint = None
self.vehicle_controller = None
self.global_plan = None
self._vehicle_controller = None
self._global_plan = None
# queue with tuples of (waypoint, RoadOption)
self.waypoints_queue = deque(maxlen=600)
self.waypoint_buffer = deque(maxlen=self._buffer_size)
self._waypoints_queue = deque(maxlen=20000)
self._buffer_size = 5
self._waypoint_buffer = deque(maxlen=self._buffer_size)
# initializing controller
self.init_controller(opt_dict)
self._init_controller(opt_dict)
def __del__(self):
if self._vehicle:
@ -88,7 +90,7 @@ class LocalPlanner(object):
self._vehicle = None
print("Resetting ego-vehicle!")
def init_controller(self, opt_dict):
def _init_controller(self, opt_dict):
"""
Controller initialization.
@ -96,46 +98,46 @@ class LocalPlanner(object):
:return:
"""
# default params
self.dt = 1.0 / 20.0
self.target_speed = 20.0 # Km/h
self.sampling_radius = self.target_speed * 0.5 / 3.6 # 0.5 seconds horizon
self.min_distance = self.sampling_radius * self.MIN_DISTANCE_PERCENTAGE
self._dt = 1.0 / 20.0
self._target_speed = 20.0 # Km/h
self._sampling_radius = self._target_speed * 1 / 3.6 # 1 seconds horizon
self._min_distance = self._sampling_radius * self.MIN_DISTANCE_PERCENTAGE
args_lateral_dict = {
'K_P': 1.95,
'K_D': 0.01,
'K_I': 1.4,
'dt': self.dt}
'dt': self._dt}
args_longitudinal_dict = {
'K_P': 1.0,
'K_D': 0,
'K_I': 1,
'dt': self.dt}
'dt': self._dt}
# parameters overload
if opt_dict:
if 'dt' in opt_dict:
self.dt = opt_dict['dt']
self._dt = opt_dict['dt']
if 'target_speed' in opt_dict:
self.target_speed = opt_dict['target_speed']
self._target_speed = opt_dict['target_speed']
if 'sampling_radius' in opt_dict:
self.sampling_radius = self.target_speed * \
self._sampling_radius = self._target_speed * \
opt_dict['sampling_radius'] / 3.6
if 'lateral_control_dict' in opt_dict:
args_lateral_dict = opt_dict['lateral_control_dict']
if 'longitudinal_control_dict' in opt_dict:
args_longitudinal_dict = opt_dict['longitudinal_control_dict']
self.current_waypoint = self._map.get_waypoint(self._vehicle.get_location())
self.vehicle_controller = VehiclePIDController(self._vehicle,
self._current_waypoint = self._map.get_waypoint(self._vehicle.get_location())
self._vehicle_controller = VehiclePIDController(self._vehicle,
args_lateral=args_lateral_dict,
args_longitudinal=args_longitudinal_dict)
self.global_plan = False
self._global_plan = False
# compute initial waypoints
self.waypoints_queue.append((self.current_waypoint.next(self.sampling_radius)[0], RoadOption.LANEFOLLOW))
self._waypoints_queue.append((self._current_waypoint.next(self._sampling_radius)[0], RoadOption.LANEFOLLOW))
self.target_road_option = RoadOption.LANEFOLLOW
self._target_road_option = RoadOption.LANEFOLLOW
# fill waypoint trajectory queue
self._compute_next_waypoints(k=200)
@ -146,7 +148,7 @@ class LocalPlanner(object):
:param speed: new target speed in Km/h
:return:
"""
self.target_speed = speed
self._target_speed = speed
def _compute_next_waypoints(self, k=1):
"""
@ -156,12 +158,12 @@ class LocalPlanner(object):
:return:
"""
# check we do not overflow the queue
available_entries = self.waypoints_queue.maxlen - len(self.waypoints_queue)
available_entries = self._waypoints_queue.maxlen - len(self._waypoints_queue)
k = min(available_entries, k)
for _ in range(k):
last_waypoint = self.waypoints_queue[-1][0]
next_waypoints = list(last_waypoint.next(self.sampling_radius))
last_waypoint = self._waypoints_queue[-1][0]
next_waypoints = list(last_waypoint.next(self._sampling_radius))
if len(next_waypoints) == 1:
# only one option available ==> lanefollowing
@ -169,20 +171,20 @@ class LocalPlanner(object):
road_option = RoadOption.LANEFOLLOW
else:
# random choice between the possible options
road_options_list = retrieve_options(
road_options_list = _retrieve_options(
next_waypoints, last_waypoint)
road_option = random.choice(road_options_list)
next_waypoint = next_waypoints[road_options_list.index(
road_option)]
self.waypoints_queue.append((next_waypoint, road_option))
self._waypoints_queue.append((next_waypoint, road_option))
def set_global_plan(self, current_plan):
self.waypoints_queue.clear()
self._waypoints_queue.clear()
for elem in current_plan:
self.waypoints_queue.append(elem)
self.target_road_option = RoadOption.LANEFOLLOW
self.global_plan = True
self._waypoints_queue.append(elem)
self._target_road_option = RoadOption.LANEFOLLOW
self._global_plan = True
def run_step(self, debug=True):
"""
@ -194,47 +196,46 @@ class LocalPlanner(object):
"""
# not enough waypoints in the horizon? => add more!
if len(self.waypoints_queue) < int(self.waypoints_queue.maxlen * 0.5):
if not self.global_plan:
self._compute_next_waypoints(k=100)
if not self._global_plan and len(self._waypoints_queue) < int(self._waypoints_queue.maxlen * 0.5):
self._compute_next_waypoints(k=100)
if len(self.waypoints_queue) == 0:
if len(self._waypoints_queue) == 0:
control = carla.VehicleControl()
control.steer = 0.0
control.throttle = 0.0
control.brake = 0.0
control.brake = 1.0
control.hand_brake = False
control.manual_gear_shift = False
return control
# Buffering the waypoints
if not self.waypoint_buffer:
if not self._waypoint_buffer:
for i in range(self._buffer_size):
if self.waypoints_queue:
self.waypoint_buffer.append(
self.waypoints_queue.popleft())
if self._waypoints_queue:
self._waypoint_buffer.append(
self._waypoints_queue.popleft())
else:
break
# current vehicle waypoint
self.current_waypoint = self._map.get_waypoint(self._vehicle.get_location())
self._current_waypoint = self._map.get_waypoint(self._vehicle.get_location())
# target waypoint
self.target_waypoint, self.target_road_option = self.waypoint_buffer[0]
self.target_waypoint, self._target_road_option = self._waypoint_buffer[0]
# move using PID controllers
control = self.vehicle_controller.run_step(self.target_speed, self.target_waypoint)
control = self._vehicle_controller.run_step(self._target_speed, self.target_waypoint)
# purge the queue of obsolete waypoints
vehicle_transform = self._vehicle.get_transform()
max_index = -1
for i, (waypoint, _) in enumerate(self.waypoint_buffer):
for i, (waypoint, _) in enumerate(self._waypoint_buffer):
if distance_vehicle(
waypoint, vehicle_transform) < self.min_distance:
waypoint, vehicle_transform) < self._min_distance:
max_index = i
if max_index >= 0:
for i in range(max_index + 1):
self.waypoint_buffer.popleft()
self._waypoint_buffer.popleft()
if debug:
draw_waypoints(self._vehicle.get_world(), [self.target_waypoint], self._vehicle.get_location().z + 1.0)
@ -242,7 +243,7 @@ class LocalPlanner(object):
return control
def retrieve_options(list_waypoints, current_waypoint):
def _retrieve_options(list_waypoints, current_waypoint):
"""
Compute the type of connection between the current active waypoint and the multiple waypoints present in
list_waypoints. The result is encoded as a list of RoadOption enums.
@ -258,13 +259,13 @@ def retrieve_options(list_waypoints, current_waypoint):
# the beggining of an intersection, therefore the
# variation in angle is small
next_next_waypoint = next_waypoint.next(3.0)[0]
link = compute_connection(current_waypoint, next_next_waypoint)
link = _compute_connection(current_waypoint, next_next_waypoint)
options.append(link)
return options
def compute_connection(current_waypoint, next_waypoint):
def _compute_connection(current_waypoint, next_waypoint):
"""
Compute the type of topological connection between an active waypoint (current_waypoint) and a target waypoint
(next_waypoint).