Merge pull request #1539 from carla-simulator/fix/router_turn_decisions

Fix for route plan turn decisions
This commit is contained in:
germanros1987 2019-04-16 08:45:33 -07:00 committed by GitHub
commit 71adfecfda
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 86 additions and 33 deletions

View File

@ -31,6 +31,8 @@ class GlobalRoutePlanner(object):
self._graph = None
self._id_map = None
self._road_id_to_edge = None
self._intersection_end_node = -1
self._previous_decision = RoadOption.VOID
def setup(self):
"""
@ -84,17 +86,17 @@ class GlobalRoutePlanner(object):
road_id_to_edge[road_id][section_id] = dict()
road_id_to_edge[road_id][section_id][lane_id] = (n1, n2)
entry_carla_vector = entry_wp.transform.rotation.get_forward_vector()
exit_carla_vector = exit_wp.transform.rotation.get_forward_vector()
# Adding edge with attributes
graph.add_edge(
n1, n2,
length=len(path) + 1, path=path,
entry_waypoint=entry_wp, exit_waypoint=exit_wp,
entry_vector=vector(
entry_wp.transform.location,
path[0].transform.location if len(path) > 0 else exit_wp.transform.location),
exit_vector=vector(
path[-1].transform.location if len(path) > 0 else entry_wp.transform.location,
exit_wp.transform.location),
entry_vector=np.array(
[entry_carla_vector.x, entry_carla_vector.y, entry_carla_vector.z]),
exit_vector=np.array(
[exit_carla_vector.x, exit_carla_vector.y, exit_carla_vector.z]),
net_vector=vector(entry_wp.transform.location, exit_wp.transform.location),
intersection=intersection, type=RoadOption.LANEFOLLOW)
@ -234,6 +236,30 @@ class GlobalRoutePlanner(object):
route.append(end[1])
return route
def _successive_last_intersection_edge(self, index, route):
"""
This method returns the last successive intersection edge
from a starting index on the route.
This helps moving past tiny intersection edges to calculate
proper turn decisions.
"""
last_intersection_edge = None
last_node = None
for node1, node2 in [(route[i], route[i+1]) for i in range(index, len(route)-1)]:
candidate_edge = self._graph.edges[node1, node2]
if node1 == route[index]:
last_intersection_edge = candidate_edge
if candidate_edge['type'] == RoadOption.LANEFOLLOW and \
candidate_edge['intersection']:
last_intersection_edge = candidate_edge
last_node = node2
else:
break
return last_node, last_intersection_edge
def _turn_decision(self, index, route, threshold=math.radians(5)):
"""
This method returns the turn decision (RoadOption) for pair of edges
@ -246,35 +272,52 @@ class GlobalRoutePlanner(object):
next_node = route[index+1]
next_edge = self._graph.edges[current_node, next_node]
if index > 0:
current_edge = self._graph.edges[previous_node, current_node]
calculate_turn = current_edge['type'].value == RoadOption.LANEFOLLOW.value and \
not current_edge['intersection'] and \
next_edge['type'].value == RoadOption.LANEFOLLOW.value and \
next_edge['intersection']
if calculate_turn:
cv, nv = current_edge['exit_vector'], next_edge['net_vector']
cross_list = []
for neighbor in self._graph.successors(current_node):
select_edge = self._graph.edges[current_node, neighbor]
if select_edge['type'].value == RoadOption.LANEFOLLOW.value:
if neighbor != route[index+1]:
sv = select_edge['net_vector']
cross_list.append(np.cross(cv, sv)[2])
next_cross = np.cross(cv, nv)[2]
deviation = math.acos(np.clip(
np.dot(cv, nv)/(np.linalg.norm(cv)*np.linalg.norm(nv)), -1.0, 1.0))
if not cross_list:
cross_list.append(0)
if deviation < threshold:
decision = RoadOption.STRAIGHT
elif cross_list and next_cross < min(cross_list):
decision = RoadOption.LEFT
elif cross_list and next_cross > max(cross_list):
decision = RoadOption.RIGHT
if self._previous_decision != RoadOption.VOID and \
self._intersection_end_node > 0 and \
self._intersection_end_node != previous_node and \
next_edge['type'] == RoadOption.LANEFOLLOW and \
next_edge['intersection']:
decision = self._previous_decision
else:
decision = next_edge['type']
self._intersection_end_node = -1
current_edge = self._graph.edges[previous_node, current_node]
calculate_turn = current_edge['type'].value == RoadOption.LANEFOLLOW.value and \
not current_edge['intersection'] and \
next_edge['type'].value == RoadOption.LANEFOLLOW.value and \
next_edge['intersection']
if calculate_turn:
last_node, tail_edge = self._successive_last_intersection_edge(index, route)
self._intersection_end_node = last_node
if tail_edge is not None:
next_edge = tail_edge
cv, nv = current_edge['exit_vector'], next_edge['net_vector']
cross_list = []
for neighbor in self._graph.successors(current_node):
select_edge = self._graph.edges[current_node, neighbor]
if select_edge['type'].value == RoadOption.LANEFOLLOW.value:
if neighbor != route[index+1]:
sv = select_edge['net_vector']
cross_list.append(np.cross(cv, sv)[2])
next_cross = np.cross(cv, nv)[2]
deviation = math.acos(np.clip(
np.dot(cv, nv)/(np.linalg.norm(cv)*np.linalg.norm(nv)), -1.0, 1.0))
if not cross_list:
cross_list.append(0)
if deviation < threshold:
decision = RoadOption.STRAIGHT
elif cross_list and next_cross < min(cross_list):
decision = RoadOption.LEFT
elif cross_list and next_cross > max(cross_list):
decision = RoadOption.RIGHT
elif next_cross < 0:
decision = RoadOption.LEFT
elif next_cross > 0:
decision = RoadOption.RIGHT
else:
decision = next_edge['type']
else:
decision = next_edge['type']
self._previous_decision = decision
return decision
@ -312,12 +355,14 @@ class GlobalRoutePlanner(object):
def trace_route(self, origin, destination):
"""
This method returns list of (carla.Waypoint, RoadOption) from origin to destination
This method returns list of (carla.Waypoint, RoadOption)
from origin (carla.Location) to destination (carla.Location)
"""
route_trace = []
route = self._path_search(origin, destination)
current_waypoint = self._dao.get_waypoint(origin)
destination_waypoint = self._dao.get_waypoint(destination)
resolution = self._dao.get_resolution()
for i in range(len(route) - 1):
@ -327,6 +372,7 @@ class GlobalRoutePlanner(object):
if edge['type'].value != RoadOption.LANEFOLLOW.value and \
edge['type'].value != RoadOption.VOID.value:
route_trace.append((current_waypoint, road_option))
exit_wp = edge['exit_waypoint']
n1, n2 = self._road_id_to_edge[exit_wp.road_id][exit_wp.section_id][exit_wp.lane_id]
next_edge = self._graph.edges[n1, n2]
@ -347,5 +393,12 @@ class GlobalRoutePlanner(object):
if len(route)-i <= 2 and \
waypoint.transform.location.distance(destination) < 2*resolution:
break
elif len(route)-i <= 2 and \
current_waypoint.road_id == destination_waypoint.road_id and \
current_waypoint.section_id == destination_waypoint.section_id and \
current_waypoint.lane_id == destination_waypoint.lane_id:
destination_index = self._find_closest_in_list(destination_waypoint, path)
if closest_index > destination_index:
break
return route_trace