Added lane changes

This commit is contained in:
Guillermo 2022-05-09 10:12:31 +02:00 committed by bernat
parent c11dfddcab
commit c918960aa1
2 changed files with 99 additions and 4 deletions

View File

@ -10,12 +10,13 @@ It can also make use of the global route planner to follow a specifed route
""" """
import carla import carla
from enum import Enum
from shapely.geometry import Polygon from shapely.geometry import Polygon
from agents.navigation.local_planner import LocalPlanner from agents.navigation.local_planner import LocalPlanner, RoadOption
from agents.navigation.global_route_planner import GlobalRoutePlanner from agents.navigation.global_route_planner import GlobalRoutePlanner
from agents.tools.misc import get_speed, is_within_distance, get_trafficlight_trigger_location, compute_distance from agents.tools.misc import (get_speed, is_within_distance,
get_trafficlight_trigger_location,
compute_distance)
class BasicAgent(object): class BasicAgent(object):
@ -217,6 +218,21 @@ class BasicAgent(object):
"""(De)activates the checks for stop signs""" """(De)activates the checks for stop signs"""
self._ignore_vehicles = active self._ignore_vehicles = active
def lane_change(self, direction, same_lane_time=0, other_lane_time=0, lane_change_time=2):
"""Changes the path so that the vehicle performs a lane change"""
speed = self._vehicle.get_velocity().length()
path = self._generate_lane_change_path(
self._map.get_waypoint(self._vehicle.get_location()),
direction,
same_lane_time * speed,
other_lane_time * speed,
lane_change_time * speed,
False,
1,
self._sampling_resolution
)
self.set_global_plan(path)
def _affected_by_traffic_light(self, lights_list=None, max_distance=None): def _affected_by_traffic_light(self, lights_list=None, max_distance=None):
""" """
Method to check if there is a red light affecting the vehicle. Method to check if there is a red light affecting the vehicle.
@ -371,3 +387,76 @@ class BasicAgent(object):
return (False, None, -1) return (False, None, -1)
return (False, None, -1) return (False, None, -1)
def _generate_lane_change_path(self, waypoint, direction='left', distance_same_lane=10,
distance_other_lane=25, lane_change_distance=25,
check=True, lane_changes=1, step_distance=2):
"""
This methods generates a path that results in a lane change.
Use the different distances to fine-tune the maneuver.
If the lane change is impossible, the returned path will be empty.
"""
plan = []
plan.append((waypoint, RoadOption.LANEFOLLOW)) # start position
option = RoadOption.LANEFOLLOW
# Same lane
distance = 0
while distance < distance_same_lane:
next_wps = plan[-1][0].next(step_distance)
if not next_wps:
return []
next_wp = next_wps[0]
distance += next_wp.transform.location.distance(plan[-1][0].transform.location)
plan.append((next_wp, RoadOption.LANEFOLLOW))
if direction == 'left':
option = RoadOption.CHANGELANELEFT
elif direction == 'right':
option = RoadOption.CHANGELANERIGHT
else:
# ERROR, input value for change must be 'left' or 'right'
return []
lane_changes_done = 0
lane_change_distance = lane_change_distance / lane_changes
# Lane change
while lane_changes_done < lane_changes:
# Move forward
next_wps = plan[-1][0].next(lane_change_distance)
if not next_wps:
return []
next_wp = next_wps[0]
# Get the side lane
if direction == 'left':
if check and str(next_wp.lane_change) not in ['Left', 'Both']:
return []
side_wp = next_wp.get_left_lane()
else:
if check and str(next_wp.lane_change) not in ['Right', 'Both']:
return []
side_wp = next_wp.get_right_lane()
if not side_wp or side_wp.lane_type != carla.LaneType.Driving:
return []
# Update the plan
plan.append((side_wp, option))
lane_changes_done += 1
# Other lane
distance = 0
while distance < distance_other_lane:
next_wps = plan[-1][0].next(step_distance)
if not next_wps:
return []
next_wp = next_wps[0]
distance += next_wp.transform.location.distance(plan[-1][0].transform.location)
plan.append((next_wp, RoadOption.LANEFOLLOW))
return plan

View File

@ -39,6 +39,7 @@ class ConstantVelocityAgent(BasicAgent):
self._target_speed = target_speed / 3.6 # [m/s] self._target_speed = target_speed / 3.6 # [m/s]
self._current_speed = vehicle.get_velocity().length() # [m/s] self._current_speed = vehicle.get_velocity().length() # [m/s]
self._constant_velocity_stop_time = None self._constant_velocity_stop_time = None
self._collision_sensor = None
self._restart_time = float('inf') # Time after collision before the constant velocity behavior starts again self._restart_time = float('inf') # Time after collision before the constant velocity behavior starts again
@ -54,7 +55,7 @@ class ConstantVelocityAgent(BasicAgent):
self._set_constant_velocity(target_speed) self._set_constant_velocity(target_speed)
def set_target_speed(self, speed): def set_target_speed(self, speed):
"""Changes the target speed of the agent""" """Changes the target speed of the agent [km/h]"""
self._target_speed = speed / 3.6 self._target_speed = speed / 3.6
self._local_planner.set_speed(speed) self._local_planner.set_speed(speed)
@ -121,3 +122,8 @@ class ConstantVelocityAgent(BasicAgent):
blueprint = self._world.get_blueprint_library().find('sensor.other.collision') blueprint = self._world.get_blueprint_library().find('sensor.other.collision')
self._collision_sensor = self._world.spawn_actor(blueprint, carla.Transform(), attach_to=self._vehicle) self._collision_sensor = self._world.spawn_actor(blueprint, carla.Transform(), attach_to=self._vehicle)
self._collision_sensor.listen(lambda event: self.stop_constant_velocity()) self._collision_sensor.listen(lambda event: self.stop_constant_velocity())
def destroy_sensor(self):
if self._collision_sensor:
self._collision_sensor.destroy()
self._collision_sensor = None