From d0689df5f23d297b6187f5b77cc9a1a28c840ce6 Mon Sep 17 00:00:00 2001 From: Praveen Kumar Date: Fri, 14 Dec 2018 22:07:09 +0530 Subject: [PATCH] * Added global_route_planner.py * Added global_route_planner_dao.py * Added test_global_route_planner.py --- PythonAPI/source/carla/navigation/__init__.py | 0 .../carla/navigation/global_route_planner.py | 185 ++++++++++++++++++ .../navigation/global_route_planner_dao.py | 46 +++++ .../navigation/test_global_route_planner.py | 98 ++++++++++ 4 files changed, 329 insertions(+) create mode 100644 PythonAPI/source/carla/navigation/__init__.py create mode 100644 PythonAPI/source/carla/navigation/global_route_planner.py create mode 100644 PythonAPI/source/carla/navigation/global_route_planner_dao.py create mode 100644 PythonAPI/source/carla/navigation/test_global_route_planner.py diff --git a/PythonAPI/source/carla/navigation/__init__.py b/PythonAPI/source/carla/navigation/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/PythonAPI/source/carla/navigation/global_route_planner.py b/PythonAPI/source/carla/navigation/global_route_planner.py new file mode 100644 index 000000000..09f8b2e7b --- /dev/null +++ b/PythonAPI/source/carla/navigation/global_route_planner.py @@ -0,0 +1,185 @@ +# This work is licensed under the terms of the MIT license. +# For a copy, see . + +""" +This module provides GlobalRoutePlanner implementation. +""" + +import math +import networkx as nx +import carla + + +class GlobalRoutePlanner(object): + """ + This class provides a very high level route plan. + Instantiate the calss by passing a reference to + A GlobalRoutePlannerDAO object. + """ + + def __init__(self, dao): + """ + Constructor + """ + self.dao = dao + + def setup(self): + """ + Perform initial server data lookup for detailed topology + and builds graph representation of the world map. + """ + self.topology = self.dao.get_topology() + # Creating graph of the world map and also a map from + # node co-ordinates to node id + self.graph, self.id_map = self.build_graph() + + def plan_route(self, origin, destination): + """ + The following function generates the route plan based on + origin : tuple containing x, y of the route's start position + destination : tuple containing x, y of the route's end position + + return : list of turn by turn navigation decision + possible values (for now) are START, GO_STRAIGHT, LEFT, RIGHT, + FOLLOW_LANE, STOP + """ + + threshold = 0.0523599 # 5 degrees + route = self.path_search(origin, destination) + plan = [] + plan.append('START') + # Compare current edge and next edge to decide on action + for i in range(len(route) - 2): + current_edge = self.graph.edges[route[i], route[i + 1]] + next_edge = self.graph.edges[route[i + 1], route[i + 2]] + if not current_edge['intersection']: + cv = current_edge['exit_vector'] + nv = None + if next_edge['intersection']: + nv = next_edge['net_vector'] + ca = round(math.atan2(*cv[::-1]) * 180 / math.pi) + na = round(math.atan2(*nv[::-1]) * 180 / math.pi) + angle_list = [ca, na] + for i in range(len(angle_list)): + if angle_list[i] > 0: + angle_list[i] = 180 - angle_list[i] + elif angle_list[i] < 0: + angle_list[i] = -1 * (180 + angle_list[i]) + ca, na = angle_list + if abs(na - ca) < threshold: + action = 'GO_STRAIGHT' + elif na - ca > 0: + action = 'LEFT' + else: + action = 'RIGHT' + else: + action = 'FOLLOW_LANE' + plan.append(action) + plan.append('STOP') + return plan + + def path_search(self, origin, destination): + """ + This function finds the shortest path connecting origin and destination + using A* search with distance heuristic + """ + xo, yo = origin + xd, yd = destination + start = self.localise(xo, yo) + end = self.localise(xd, yd) + + def dist(n1, n2): + (x1, y1) = self.graph.nodes[n1]['vertex'] + (x2, y2) = self.graph.nodes[n2]['vertex'] + return ((x1 - x2) ** 2 + (y1 - y2) ** 2) ** 0.5 + route = nx.astar_path( + self.graph, source=self.id_map[start['entry']], + target=self.id_map[end['exit']], + heuristic=dist, + weight='length') + + return route + + def localise(self, x, y): + """ + This function finds the road segment closest to (x, y) + """ + distance = float('inf') + nearest = (distance, dict()) + + # Measuring distances from segment waypoints and (x, y) + for segment in self.topology: + entryxy = segment['entry'] + exitxy = segment['exit'] + path = segment['path'] + for xp, yp in [entryxy] + path + [exitxy]: + new_distance = self.distance((xp, yp), (x, y)) + if new_distance < nearest[0]: + nearest = (new_distance, segment) + + segment = nearest[1] + return segment + + def build_graph(self): + """ + This function builds a graph representation of topology + """ + graph = nx.DiGraph() + # Map with structure {(x,y): id, ... } + id_map = dict() + + for segment in self.topology: + + entryxy = segment['entry'] + exitxy = segment['exit'] + path = segment['path'] + intersection = segment['intersection'] + for vertex in entryxy, exitxy: + # Adding unique nodes and populating id_map + if vertex not in id_map: + new_id = len(id_map) + id_map[vertex] = new_id + graph.add_node(new_id, vertex=vertex) + + n1, n2 = id_map[entryxy], id_map[exitxy] + # Adding edge with attributes + graph.add_edge( + n1, n2, + length=len(path) + 1, path=path, + entry_vector=self.unit_vector( + entryxy, path[0] if len(path) > 0 else exitxy), + exit_vector=self.unit_vector( + path[-1] if len(path) > 0 else entryxy, exitxy), + net_vector=self.unit_vector(entryxy, exitxy), + intersection=intersection) + + return graph, id_map + + def distance(self, point1, point2): + """ + returns the distance between point1 and point2 + """ + x1, y1 = point1 + x2, y2 = point2 + return math.sqrt((x2 - x1)**2 + (y2 - y1)**2) + + def unit_vector(self, point1, point2): + """ + This function returns the unit vector from point1 to point2 + """ + x1, y1 = point1 + x2, y2 = point2 + + vector = (x2 - x1, y2 - y1) + vector_mag = math.sqrt(vector[0]**2 + vector[1]**2) + vector = (vector[0] / vector_mag, vector[1] / vector_mag) + + return vector + + def dot(self, vector1, vector2): + """ + This function returns the dot product of vector1 with vector2 + """ + return vector1[0] * vector2[0] + vector1[1] * vector2[1] + + pass diff --git a/PythonAPI/source/carla/navigation/global_route_planner_dao.py b/PythonAPI/source/carla/navigation/global_route_planner_dao.py new file mode 100644 index 000000000..38582be0e --- /dev/null +++ b/PythonAPI/source/carla/navigation/global_route_planner_dao.py @@ -0,0 +1,46 @@ +# This work is licensed under the terms of the MIT license. +# For a copy, see . + +""" +This module provides implementation for GlobalRoutePlannerDAO +""" + +import carla + + +class GlobalRoutePlannerDAO(object): + """ + This class is the data access layer for fetching data + from the carla server instance for GlobalRoutePlanner + """ + + def __init__(self, wmap): + """ Constructor """ + self.wmap = wmap # Carla world map + + def get_topology(self): + """ Accessor for topology """ + topology = [] + # Retrieving waypoints to construct a detailed topology + for segment in self.wmap.get_topology(): + x1 = segment[0].transform.location.x + y1 = segment[0].transform.location.y + x2 = segment[1].transform.location.x + y2 = segment[1].transform.location.y + seg_dict = dict() + seg_dict['entry'] = (x1, y1) + seg_dict['exit'] = (x2, y2) + seg_dict['path'] = [] + wp1 = segment[0] + wp2 = segment[1] + seg_dict['intersection'] = True if wp1.is_intersection else False + endloc = wp2.transform.location + w = wp1.next(1)[0] + while w.transform.location.distance(endloc) > 1: + x = w.transform.location.x + y = w.transform.location.y + seg_dict['path'].append((x, y)) + w = w.next(1)[0] + + topology.append(seg_dict) + return topology diff --git a/PythonAPI/source/carla/navigation/test_global_route_planner.py b/PythonAPI/source/carla/navigation/test_global_route_planner.py new file mode 100644 index 000000000..962a41af4 --- /dev/null +++ b/PythonAPI/source/carla/navigation/test_global_route_planner.py @@ -0,0 +1,98 @@ +import math +import unittest +from mock import Mock +import carla +from global_route_planner import GlobalRoutePlanner +from global_route_planner_dao import GlobalRoutePlannerDAO + + +class Test_GlobalRoutePlanner(unittest.TestCase): + """ + Test class for GlobalRoutePlanner class + """ + + def setUp(self): + # == Utilities test instance without DAO == # + self.simple_grp = GlobalRoutePlanner(None) + + # == Integration test instance == # + client = carla.Client('localhost', 2000) + world = client.get_world() + integ_dao = GlobalRoutePlannerDAO(world.get_map()) + self.integ_grp = GlobalRoutePlanner(integ_dao) + self.integ_grp.setup() + pass + + def tearDown(self): + self.simple_grp = None + self.dao_grp = None + self.integ_grp = None + pass + + def test_plan_route(self): + """ + Test for GlobalROutePlanner.plan_route() + Run this test with carla server running Town03 + """ + plan = self.integ_grp.plan_route((-60, -5), (-77.65, 72.72)) + self.assertEqual( + plan, ['START', 'LEFT', 'LEFT', 'GO_STRAIGHT', 'LEFT', 'STOP']) + + def test_path_search(self): + """ + Test for GlobalRoutePlanner.path_search() + Run this test with carla server running Town03 + """ + self.integ_grp.path_search((191.947, -5.602), (78.730, -50.091)) + self.assertEqual( + self.integ_grp.path_search((196.947, -5.602), (78.730, -50.091)), + [256, 157, 158, 117, 118, 59, 55, 230]) + + def test_localise(self): + """ + Test for GlobalRoutePlanner.localise() + Run this test with carla server running Town03 + """ + x, y = (200, -250) + segment = self.integ_grp.localise(x, y) + self.assertEqual(self.integ_grp.id_map[segment['entry']], 5) + self.assertEqual(self.integ_grp.id_map[segment['exit']], 225) + + def test_unit_vector(self): + """ + Test for GlobalROutePlanner.unit_vector() + """ + vector = self.simple_grp.unit_vector((1, 1), (2, 2)) + self.assertAlmostEquals(vector[0], 1 / math.sqrt(2)) + self.assertAlmostEquals(vector[1], 1 / math.sqrt(2)) + + def test_dot(self): + """ + Test for GlobalROutePlanner.test_dot() + """ + self.assertAlmostEqual(self.simple_grp.dot((1, 0), (0, 1)), 0) + self.assertAlmostEqual(self.simple_grp.dot((1, 0), (1, 0)), 1) + + +def suite(): + """ + Gathering all tests + """ + + suite = unittest.TestSuite() + suite.addTest(Test_GlobalRoutePlanner('test_unit_vector')) + suite.addTest(Test_GlobalRoutePlanner('test_dot')) + suite.addTest(Test_GlobalRoutePlanner('test_localise')) + suite.addTest(Test_GlobalRoutePlanner('test_path_search')) + suite.addTest(Test_GlobalRoutePlanner('test_plan_route')) + + return suite + + +if __name__ == '__main__': + """ + Running test suite + """ + mySuit = suite() + runner = unittest.TextTestRunner() + runner.run(mySuit)