* using enumeration for navigation decisions

* added detailed doc strings
* standards update for imports
This commit is contained in:
Praveen Kumar 2018-12-17 14:37:36 +01:00
parent d0689df5f2
commit 62b603e593
3 changed files with 115 additions and 33 deletions

View File

@ -6,10 +6,25 @@ This module provides GlobalRoutePlanner implementation.
"""
import math
from enum import Enum
import networkx as nx
import carla
class NavEnum(Enum):
"""
Enumeration class containing possible navigation decisions
"""
START = "START"
GO_STRAIGHT = "GO_STRAIGHT"
LEFT = "LEFT"
RIGHT = "RIGHT"
FOLLOW_LANE = "FOLLOW_LANE"
STOP = "STOP"
pass
class GlobalRoutePlanner(object):
"""
This class provides a very high level route plan.
@ -21,17 +36,20 @@ class GlobalRoutePlanner(object):
"""
Constructor
"""
self.dao = dao
self._dao = dao
self._topology = None
self._graph = None
self._id_map = None
def setup(self):
"""
Perform initial server data lookup for detailed topology
and builds graph representation of the world map.
"""
self.topology = self.dao.get_topology()
# Creating graph of the world map and also a map from
self._topology = self._dao.get_topology()
# Creating graph of the world map and also a maping from
# node co-ordinates to node id
self.graph, self.id_map = self.build_graph()
self._graph, self._id_map = self.build_graph()
def plan_route(self, origin, destination):
"""
@ -39,19 +57,20 @@ class GlobalRoutePlanner(object):
origin : tuple containing x, y of the route's start position
destination : tuple containing x, y of the route's end position
return : list of turn by turn navigation decision
possible values (for now) are START, GO_STRAIGHT, LEFT, RIGHT,
return : list of turn by turn navigation decisions as
NavEnum elements
Possible values (for now) are START, GO_STRAIGHT, LEFT, RIGHT,
FOLLOW_LANE, STOP
"""
threshold = 0.0523599 # 5 degrees
route = self.path_search(origin, destination)
plan = []
plan.append('START')
plan.append(NavEnum.START)
# Compare current edge and next edge to decide on action
for i in range(len(route) - 2):
current_edge = self.graph.edges[route[i], route[i + 1]]
next_edge = self.graph.edges[route[i + 1], route[i + 2]]
current_edge = self._graph.edges[route[i], route[i + 1]]
next_edge = self._graph.edges[route[i + 1], route[i + 2]]
if not current_edge['intersection']:
cv = current_edge['exit_vector']
nv = None
@ -67,35 +86,46 @@ class GlobalRoutePlanner(object):
angle_list[i] = -1 * (180 + angle_list[i])
ca, na = angle_list
if abs(na - ca) < threshold:
action = 'GO_STRAIGHT'
action = NavEnum.GO_STRAIGHT
elif na - ca > 0:
action = 'LEFT'
action = NavEnum.LEFT
else:
action = 'RIGHT'
action = NavEnum.RIGHT
else:
action = 'FOLLOW_LANE'
action = NavEnum.FOLLOW_LANE
plan.append(action)
plan.append('STOP')
plan.append(NavEnum.STOP)
return plan
def _distance_heuristic(self, n1, n2):
"""
Distance heuristic calculator for path searching
in self._graph
"""
(x1, y1) = self._graph.nodes[n1]['vertex']
(x2, y2) = self._graph.nodes[n2]['vertex']
return ((x1 - x2) ** 2 + (y1 - y2) ** 2) ** 0.5
def path_search(self, origin, destination):
"""
This function finds the shortest path connecting origin and destination
using A* search with distance heuristic
using A* search with distance heuristic.
origin : tuple containing x, y co-ordinates of start position
desitnation : tuple containing x, y co-ordinates of end position
return : path as list of node ids (as int) of the graph self._graph
connecting origin and destination
"""
xo, yo = origin
xd, yd = destination
start = self.localise(xo, yo)
end = self.localise(xd, yd)
def dist(n1, n2):
(x1, y1) = self.graph.nodes[n1]['vertex']
(x2, y2) = self.graph.nodes[n2]['vertex']
return ((x1 - x2) ** 2 + (y1 - y2) ** 2) ** 0.5
route = nx.astar_path(
self.graph, source=self.id_map[start['entry']],
target=self.id_map[end['exit']],
heuristic=dist,
self._graph, source=self._id_map[start['entry']],
target=self._id_map[end['exit']],
heuristic=self._distance_heuristic,
weight='length')
return route
@ -103,12 +133,16 @@ class GlobalRoutePlanner(object):
def localise(self, x, y):
"""
This function finds the road segment closest to (x, y)
x, y : co-ordinates of the point to be localized
return : pair of points, tuple of tuples containing co-ordinates
of points that represents the road segment closest to x, y
"""
distance = float('inf')
nearest = (distance, dict())
# Measuring distances from segment waypoints and (x, y)
for segment in self.topology:
for segment in self._topology:
entryxy = segment['entry']
exitxy = segment['exit']
path = segment['path']
@ -122,13 +156,25 @@ class GlobalRoutePlanner(object):
def build_graph(self):
"""
This function builds a graph representation of topology
This function builds a networkx graph representation of topology.
The topology is read from self._topology.
graph node properties:
vertex - (x,y) of node's position in world map
graph edge properties:
entry_vector - unit vector along tangent at entry point
exit_vector - unit vector along tangent at exit point
net_vector - unit vector of the chord from entry to exit
intersection - boolean indicating if the edge belongs to an
intersection
return : graph -> networkx graph representing the world map,
id_map-> mapping from (x,y) to node id
"""
graph = nx.DiGraph()
# Map with structure {(x,y): id, ... }
id_map = dict()
for segment in self.topology:
for segment in self._topology:
entryxy = segment['entry']
exitxy = segment['exit']
@ -158,6 +204,11 @@ class GlobalRoutePlanner(object):
def distance(self, point1, point2):
"""
returns the distance between point1 and point2
point1 : (x,y) of first point
point2 : (x,y) of second point
return : distance from point1 to point2
"""
x1, y1 = point1
x2, y2 = point2
@ -166,6 +217,12 @@ class GlobalRoutePlanner(object):
def unit_vector(self, point1, point2):
"""
This function returns the unit vector from point1 to point2
point1 : (x,y) of first point
point2 : (x,y) of second point
return : tuple containing x and y components of unit vector
from point1 to point2
"""
x1, y1 = point1
x2, y2 = point2
@ -179,6 +236,11 @@ class GlobalRoutePlanner(object):
def dot(self, vector1, vector2):
"""
This function returns the dot product of vector1 with vector2
vector1 : x, y components of first vector
vector2 : x, y components of second vector
return : dot porduct scalar between vector1 and vector2
"""
return vector1[0] * vector2[0] + vector1[1] * vector2[1]

View File

@ -15,14 +15,31 @@ class GlobalRoutePlannerDAO(object):
"""
def __init__(self, wmap):
""" Constructor """
self.wmap = wmap # Carla world map
"""
Constructor
wmap : carl world map object
"""
self._wmap = wmap
def get_topology(self):
""" Accessor for topology """
"""
Accessor for topology.
This function retrieves topology from the server as a list of
road segments as pairs of waypoint objects, and processes the
topology into a list of dictionary objects.
return: list of dictionary objects with the following attributes
entry - (x,y) of entry point of road segment
exit - (x,y) of exit point of road segment
path - list of waypoints separated by 1m from entry
to exit
intersection - Boolean indicating if the road segment
is an intersection
"""
topology = []
# Retrieving waypoints to construct a detailed topology
for segment in self.wmap.get_topology():
for segment in self._wmap.get_topology():
x1 = segment[0].transform.location.x
y1 = segment[0].transform.location.y
x2 = segment[1].transform.location.x

View File

@ -1,8 +1,10 @@
import math
import unittest
from mock import Mock
import carla
from global_route_planner import GlobalRoutePlanner
from global_route_planner import NavEnum
from global_route_planner_dao import GlobalRoutePlannerDAO
@ -36,7 +38,8 @@ class Test_GlobalRoutePlanner(unittest.TestCase):
"""
plan = self.integ_grp.plan_route((-60, -5), (-77.65, 72.72))
self.assertEqual(
plan, ['START', 'LEFT', 'LEFT', 'GO_STRAIGHT', 'LEFT', 'STOP'])
plan, [NavEnum.START, NavEnum.LEFT, NavEnum.LEFT,
NavEnum.GO_STRAIGHT, NavEnum.LEFT, NavEnum.STOP])
def test_path_search(self):
"""
@ -55,8 +58,8 @@ class Test_GlobalRoutePlanner(unittest.TestCase):
"""
x, y = (200, -250)
segment = self.integ_grp.localise(x, y)
self.assertEqual(self.integ_grp.id_map[segment['entry']], 5)
self.assertEqual(self.integ_grp.id_map[segment['exit']], 225)
self.assertEqual(self.integ_grp._id_map[segment['entry']], 5)
self.assertEqual(self.integ_grp._id_map[segment['exit']], 225)
def test_unit_vector(self):
"""