draft implementation of fix:

* Improved decision logic after intersection
* Fixed key error on lane change segments
This commit is contained in:
Praveen Kumar 2019-04-15 17:58:42 +05:30
parent 6eafccaa08
commit 6591cb3e1a
1 changed files with 67 additions and 27 deletions

View File

@ -31,6 +31,8 @@ class GlobalRoutePlanner(object):
self._graph = None
self._id_map = None
self._road_id_to_edge = None
self._intersection_end_node = -1
self._previous_decision = RoadOption.VOID
def setup(self):
"""
@ -234,6 +236,30 @@ class GlobalRoutePlanner(object):
route.append(end[1])
return route
def _successive_last_intersection_edge(self, index, route):
"""
This method returns the last successive intersection edge
from a starting index on the route.
This helps moving past tiny intersection edges to calculate
proper turn decisions.
"""
last_intersection_edge = None
last_node = None
for node1, node2 in [(route[i], route[i+1]) for i in range(index, len(route)-1)]:
candidate_edge = self._graph.edges[node1, node2]
if node1 == route[index]:
last_intersection_edge = candidate_edge
if candidate_edge['type'] == RoadOption.LANEFOLLOW and \
candidate_edge['intersection']:
last_intersection_edge = candidate_edge
last_node = node2
else:
break
return last_node, last_intersection_edge
def _turn_decision(self, index, route, threshold=math.radians(5)):
"""
This method returns the turn decision (RoadOption) for pair of edges
@ -246,35 +272,48 @@ class GlobalRoutePlanner(object):
next_node = route[index+1]
next_edge = self._graph.edges[current_node, next_node]
if index > 0:
current_edge = self._graph.edges[previous_node, current_node]
calculate_turn = current_edge['type'].value == RoadOption.LANEFOLLOW.value and \
not current_edge['intersection'] and \
next_edge['type'].value == RoadOption.LANEFOLLOW.value and \
next_edge['intersection']
if calculate_turn:
cv, nv = current_edge['exit_vector'], next_edge['net_vector']
cross_list = []
for neighbor in self._graph.successors(current_node):
select_edge = self._graph.edges[current_node, neighbor]
if select_edge['type'].value == RoadOption.LANEFOLLOW.value:
if neighbor != route[index+1]:
sv = select_edge['net_vector']
cross_list.append(np.cross(cv, sv)[2])
next_cross = np.cross(cv, nv)[2]
deviation = math.acos(np.clip(
np.dot(cv, nv)/(np.linalg.norm(cv)*np.linalg.norm(nv)), -1.0, 1.0))
if not cross_list:
cross_list.append(0)
if deviation < threshold:
decision = RoadOption.STRAIGHT
elif cross_list and next_cross < min(cross_list):
decision = RoadOption.LEFT
elif cross_list and next_cross > max(cross_list):
decision = RoadOption.RIGHT
if self._previous_decision != RoadOption.VOID and \
self._intersection_end_node > 0 and \
self._intersection_end_node != previous_node and \
next_edge['type'] == RoadOption.LANEFOLLOW and \
next_edge['intersection']:
decision = self._previous_decision
else:
decision = next_edge['type']
self._intersection_end_node = -1
current_edge = self._graph.edges[previous_node, current_node]
calculate_turn = current_edge['type'].value == RoadOption.LANEFOLLOW.value and \
not current_edge['intersection'] and \
next_edge['type'].value == RoadOption.LANEFOLLOW.value and \
next_edge['intersection']
if calculate_turn:
last_node, tail_edge = self._successive_last_intersection_edge(index, route)
self._intersection_end_node = last_node
if tail_edge is not None:
next_edge = tail_edge
cv, nv = current_edge['exit_vector'], next_edge['net_vector']
cross_list = []
for neighbor in self._graph.successors(current_node):
select_edge = self._graph.edges[current_node, neighbor]
if select_edge['type'].value == RoadOption.LANEFOLLOW.value:
if neighbor != route[index+1]:
sv = select_edge['net_vector']
cross_list.append(np.cross(cv, sv)[2])
next_cross = np.cross(cv, nv)[2]
deviation = math.acos(np.clip(
np.dot(cv, nv)/(np.linalg.norm(cv)*np.linalg.norm(nv)), -1.0, 1.0))
if not cross_list:
cross_list.append(0)
if deviation < threshold:
decision = RoadOption.STRAIGHT
elif cross_list and next_cross < min(cross_list):
decision = RoadOption.LEFT
elif cross_list and next_cross > max(cross_list):
decision = RoadOption.RIGHT
else:
decision = next_edge['type']
else:
decision = next_edge['type']
self._previous_decision = decision
return decision
@ -312,7 +351,8 @@ class GlobalRoutePlanner(object):
def trace_route(self, origin, destination):
"""
This method returns list of (carla.Waypoint, RoadOption) from origin to destination
This method returns list of (carla.Waypoint, RoadOption)
from origin (carla.Location) to destination (carla.Location)
"""
route_trace = []