carla/Co-Simulation/Sumo/sumo_integration/sumo_simulation.py

518 lines
18 KiB
Python

#!/usr/bin/env python
# Copyright (c) 2020 Computer Vision Center (CVC) at the Universitat Autonoma de
# Barcelona (UAB).
#
# This work is licensed under the terms of the MIT license.
# For a copy, see <https://opensource.org/licenses/MIT>.
""" This module is responsible for the management of the sumo simulation. """
# ==================================================================================================
# -- imports ---------------------------------------------------------------------------------------
# ==================================================================================================
import collections
import enum
import logging
import os
import carla # pylint: disable=import-error
import sumolib # pylint: disable=import-error
import traci # pylint: disable=import-error
from .constants import INVALID_ACTOR_ID
import lxml.etree as ET # pylint: disable=import-error
# ==================================================================================================
# -- sumo definitions ------------------------------------------------------------------------------
# ==================================================================================================
# https://sumo.dlr.de/docs/Simulation/Traffic_Lights.html#signal_state_definitions
class SumoSignalState(object):
"""
SumoSignalState contains the different traffic light states.
"""
RED = 'r'
YELLOW = 'y'
GREEN = 'G'
GREEN_WITHOUT_PRIORITY = 'g'
GREEN_RIGHT_TURN = 's'
RED_YELLOW = 'u'
OFF_BLINKING = 'o'
OFF = 'O'
# https://sumo.dlr.de/docs/TraCI/Vehicle_Signalling.html
class SumoVehSignal(object):
"""
SumoVehSignal contains the different sumo vehicle signals.
"""
BLINKER_RIGHT = 1 << 0
BLINKER_LEFT = 1 << 1
BLINKER_EMERGENCY = 1 << 2
BRAKELIGHT = 1 << 3
FRONTLIGHT = 1 << 4
FOGLIGHT = 1 << 5
HIGHBEAM = 1 << 6
BACKDRIVE = 1 << 7
WIPER = 1 << 8
DOOR_OPEN_LEFT = 1 << 9
DOOR_OPEN_RIGHT = 1 << 10
EMERGENCY_BLUE = 1 << 11
EMERGENCY_RED = 1 << 12
EMERGENCY_YELLOW = 1 << 13
# https://sumo.dlr.de/docs/Definition_of_Vehicles,_Vehicle_Types,_and_Routes.html#abstract_vehicle_class
class SumoActorClass(enum.Enum):
"""
SumoActorClass enumerates the different sumo actor classes.
"""
IGNORING = "ignoring"
PRIVATE = "private"
EMERGENCY = "emergency"
AUTHORITY = "authority"
ARMY = "army"
VIP = "vip"
PEDESTRIAN = "pedestrian"
PASSENGER = "passenger"
HOV = "hov"
TAXI = "taxi"
BUS = "bus"
COACH = "coach"
DELIVERY = "delivery"
TRUCK = "truck"
TRAILER = "trailer"
MOTORCYCLE = "motorcycle"
MOPED = "moped"
BICYCLE = "bicycle"
EVEHICLE = "evehicle"
TRAM = "tram"
RAIL_URBAN = "rail_urban"
RAIL = "rail"
RAIL_ELECTRIC = "rail_electric"
RAIL_FAST = "rail_fast"
SHIP = "ship"
CUSTOM1 = "custom1"
CUSTOM2 = "custom2"
SumoActor = collections.namedtuple('SumoActor', 'type_id vclass transform signals extent color')
# ==================================================================================================
# -- sumo traffic lights ---------------------------------------------------------------------------
# ==================================================================================================
class SumoTLLogic(object):
"""
SumoTLLogic holds the data relative to a traffic light in sumo.
"""
def __init__(self, tlid, states, parameters):
self.tlid = tlid
self.states = states
self._landmark2link = {}
self._link2landmark = {}
for link_index, landmark_id in parameters.items():
# Link index information is added in the parameter as 'linkSignalID:x'
link_index = int(link_index.split(':')[1])
if landmark_id not in self._landmark2link:
self._landmark2link[landmark_id] = []
self._landmark2link[landmark_id].append((tlid, link_index))
self._link2landmark[(tlid, link_index)] = landmark_id
def get_number_signals(self):
"""
Returns number of internal signals of the traffic light.
"""
if len(self.states) > 0:
return len(self.states[0])
return 0
def get_all_signals(self):
"""
Returns all the signals of the traffic light.
:returns list: [(tlid, link_index), (tlid, link_index), ...]
"""
return [(self.tlid, i) for i in range(self.get_number_signals())]
def get_all_landmarks(self):
"""
Returns all the landmarks associated with this traffic light.
"""
return self._landmark2link.keys()
def get_associated_signals(self, landmark_id):
"""
Returns all the signals associated with the given landmark.
:returns list: [(tlid, link_index), (tlid, link_index), ...]
"""
return self._landmark2link.get(landmark_id, [])
class SumoTLManager(object):
"""
SumoTLManager is responsible for the management of the sumo traffic lights (i.e., keeps control
of the current program, phase, ...)
"""
def __init__(self):
self._tls = {} # {tlid: {program_id: SumoTLLogic}
self._current_program = {} # {tlid: program_id}
self._current_phase = {} # {tlid: index_phase}
for tlid in traci.trafficlight.getIDList():
self.subscribe(tlid)
self._tls[tlid] = {}
for tllogic in traci.trafficlight.getAllProgramLogics(tlid):
states = [phase.state for phase in tllogic.getPhases()]
parameters = tllogic.getParameters()
tl = SumoTLLogic(tlid, states, parameters)
self._tls[tlid][tllogic.programID] = tl
# Get current status of the traffic lights.
self._current_program[tlid] = traci.trafficlight.getProgram(tlid)
self._current_phase[tlid] = traci.trafficlight.getPhase(tlid)
self._off = False
@staticmethod
def subscribe(tlid):
"""
Subscribe the given traffic ligth to the following variables:
* Current program.
* Current phase.
"""
traci.trafficlight.subscribe(tlid, [
traci.constants.TL_CURRENT_PROGRAM,
traci.constants.TL_CURRENT_PHASE,
])
@staticmethod
def unsubscribe(tlid):
"""
Unsubscribe the given traffic ligth from receiving updated information each step.
"""
traci.trafficlight.unsubscribe(tlid)
def get_all_signals(self):
"""
Returns all the traffic light signals.
"""
signals = set()
for tlid, program_id in self._current_program.items():
signals.update(self._tls[tlid][program_id].get_all_signals())
return signals
def get_all_landmarks(self):
"""
Returns all the landmarks associated with a traffic light in the simulation.
"""
landmarks = set()
for tlid, program_id in self._current_program.items():
landmarks.update(self._tls[tlid][program_id].get_all_landmarks())
return landmarks
def get_all_associated_signals(self, landmark_id):
"""
Returns all the signals associated with the given landmark.
:returns list: [(tlid, link_index), (tlid, link_index), ...]
"""
signals = set()
for tlid, program_id in self._current_program.items():
signals.update(self._tls[tlid][program_id].get_associated_signals(landmark_id))
return signals
def get_state(self, landmark_id):
"""
Returns the traffic light state of the signals associated with the given landmark.
"""
states = set()
for tlid, link_index in self.get_all_associated_signals(landmark_id):
current_program = self._current_program[tlid]
current_phase = self._current_phase[tlid]
tl = self._tls[tlid][current_program]
states.update(tl.states[current_phase][link_index])
if len(states) == 1:
return states.pop()
elif len(states) > 1:
logging.warning('Landmark %s is associated with signals with different states',
landmark_id)
return SumoSignalState.RED
else:
return None
def set_state(self, landmark_id, state):
"""
Updates the state of all the signals associated with the given landmark.
"""
for tlid, link_index in self.get_all_associated_signals(landmark_id):
traci.trafficlight.setLinkState(tlid, link_index, state)
return True
def switch_off(self):
"""
Switch off all traffic lights.
"""
for tlid, link_index in self.get_all_signals():
traci.trafficlight.setLinkState(tlid, link_index, SumoSignalState.OFF)
self._off = True
def tick(self):
"""
Tick to traffic light manager
"""
if self._off is False:
for tl_id in traci.trafficlight.getIDList():
results = traci.trafficlight.getSubscriptionResults(tl_id)
current_program = results[traci.constants.TL_CURRENT_PROGRAM]
current_phase = results[traci.constants.TL_CURRENT_PHASE]
if current_program != 'online':
self._current_program[tl_id] = current_program
self._current_phase[tl_id] = current_phase
# ==================================================================================================
# -- sumo simulation -------------------------------------------------------------------------------
# ==================================================================================================
def _get_sumo_net(cfg_file):
"""
Returns sumo net.
This method reads the sumo configuration file and retrieve the sumo net filename to create the
net.
"""
cfg_file = os.path.join(os.getcwd(), cfg_file)
tree = ET.parse(cfg_file)
tag = tree.find('//net-file')
if tag is None:
return None
net_file = os.path.join(os.path.dirname(cfg_file), tag.get('value'))
logging.debug('Reading net file: %s', net_file)
sumo_net = sumolib.net.readNet(net_file)
return sumo_net
class SumoSimulation(object):
"""
SumoSimulation is responsible for the management of the sumo simulation.
"""
def __init__(self, cfg_file, step_length, host=None, port=None, sumo_gui=False, client_order=1):
if sumo_gui is True:
sumo_binary = sumolib.checkBinary('sumo-gui')
else:
sumo_binary = sumolib.checkBinary('sumo')
if host is None or port is None:
logging.info('Starting new sumo server...')
if sumo_gui is True:
logging.info('Remember to press the play button to start the simulation')
traci.start([sumo_binary,
'--configuration-file', cfg_file,
'--step-length', str(step_length),
'--lateral-resolution', '0.25',
'--collision.check-junctions'
])
else:
logging.info('Connection to sumo server. Host: %s Port: %s', host, port)
traci.init(host=host, port=port)
traci.setOrder(client_order)
# Retrieving net from configuration file.
self.net = _get_sumo_net(cfg_file)
# To keep track of the vehicle classes for which a route has been generated in sumo.
self._routes = set()
# Variable to asign an id to new added actors.
self._sequential_id = 0
# Structures to keep track of the spawned and destroyed vehicles at each time step.
self.spawned_actors = set()
self.destroyed_actors = set()
# Traffic light manager.
self.traffic_light_manager = SumoTLManager()
@property
def traffic_light_ids(self):
return self.traffic_light_manager.get_all_landmarks()
@staticmethod
def subscribe(actor_id):
"""
Subscribe the given actor to the following variables:
* Type.
* Vehicle class.
* Color.
* Length, Width, Height.
* Position3D (i.e., x, y, z).
* Angle, Slope.
* Speed.
* Lateral speed.
* Signals.
"""
traci.vehicle.subscribe(actor_id, [
traci.constants.VAR_TYPE, traci.constants.VAR_VEHICLECLASS, traci.constants.VAR_COLOR,
traci.constants.VAR_LENGTH, traci.constants.VAR_WIDTH, traci.constants.VAR_HEIGHT,
traci.constants.VAR_POSITION3D, traci.constants.VAR_ANGLE, traci.constants.VAR_SLOPE,
traci.constants.VAR_SPEED, traci.constants.VAR_SPEED_LAT, traci.constants.VAR_SIGNALS
])
@staticmethod
def unsubscribe(actor_id):
"""
Unsubscribe the given actor from receiving updated information each step.
"""
traci.vehicle.unsubscribe(actor_id)
def get_net_offset(self):
"""
Accessor for sumo net offset.
"""
if self.net is None:
return (0, 0)
return self.net.getLocationOffset()
@staticmethod
def get_actor(actor_id):
"""
Accessor for sumo actor.
"""
results = traci.vehicle.getSubscriptionResults(actor_id)
type_id = results[traci.constants.VAR_TYPE]
vclass = SumoActorClass(results[traci.constants.VAR_VEHICLECLASS])
color = results[traci.constants.VAR_COLOR]
length = results[traci.constants.VAR_LENGTH]
width = results[traci.constants.VAR_WIDTH]
height = results[traci.constants.VAR_HEIGHT]
location = list(results[traci.constants.VAR_POSITION3D])
rotation = [results[traci.constants.VAR_SLOPE], results[traci.constants.VAR_ANGLE], 0.0]
transform = carla.Transform(carla.Location(location[0], location[1], location[2]),
carla.Rotation(rotation[0], rotation[1], rotation[2]))
signals = results[traci.constants.VAR_SIGNALS]
extent = carla.Vector3D(length / 2.0, width / 2.0, height / 2.0)
return SumoActor(type_id, vclass, transform, signals, extent, color)
def spawn_actor(self, type_id, color=None):
"""
Spawns a new actor.
:param type_id: vtype to be spawned.
:param color: color attribute for this specific actor.
:return: actor id if the actor is successfully spawned. Otherwise, INVALID_ACTOR_ID.
"""
actor_id = 'carla' + str(self._sequential_id)
try:
vclass = traci.vehicletype.getVehicleClass(type_id)
if vclass not in self._routes:
logging.debug('Creating route for %s vehicle class', vclass)
allowed_edges = [e for e in self.net.getEdges() if e.allows(vclass)]
if allowed_edges:
traci.route.add("carla_route_{}".format(vclass), [allowed_edges[0].getID()])
self._routes.add(vclass)
else:
logging.error(
'Could not found a route for %s. No vehicle will be spawned in sumo',
type_id)
return INVALID_ACTOR_ID
traci.vehicle.add(actor_id, 'carla_route_{}'.format(vclass), typeID=type_id)
except traci.exceptions.TraCIException as error:
logging.error('Spawn sumo actor failed: %s', error)
return INVALID_ACTOR_ID
if color is not None:
color = color.split(',')
traci.vehicle.setColor(actor_id, color)
self._sequential_id += 1
return actor_id
@staticmethod
def destroy_actor(actor_id):
"""
Destroys the given actor.
"""
traci.vehicle.remove(actor_id)
def get_traffic_light_state(self, landmark_id):
"""
Accessor for traffic light state.
If the traffic ligth does not exist, returns None.
"""
return self.traffic_light_manager.get_state(landmark_id)
def switch_off_traffic_lights(self):
"""
Switch off all traffic lights.
"""
self.traffic_light_manager.switch_off()
def synchronize_vehicle(self, vehicle_id, transform, signals=None):
"""
Updates vehicle state.
:param vehicle_id: id of the actor to be updated.
:param transform: new vehicle transform (i.e., position and rotation).
:param signals: new vehicle signals.
:return: True if successfully updated. Otherwise, False.
"""
loc_x, loc_y = transform.location.x, transform.location.y
yaw = transform.rotation.yaw
traci.vehicle.moveToXY(vehicle_id, "", 0, loc_x, loc_y, angle=yaw, keepRoute=2)
if signals is not None:
traci.vehicle.setSignals(vehicle_id, signals)
return True
def synchronize_traffic_light(self, landmark_id, state):
"""
Updates traffic light state.
:param tl_id: id of the traffic light to be updated (logic id, link index).
:param state: new traffic light state.
:return: True if successfully updated. Otherwise, False.
"""
self.traffic_light_manager.set_state(landmark_id, state)
def tick(self):
"""
Tick to sumo simulation.
"""
traci.simulationStep()
self.traffic_light_manager.tick()
# Update data structures for the current frame.
self.spawned_actors = set(traci.simulation.getDepartedIDList())
self.destroyed_actors = set(traci.simulation.getArrivedIDList())
@staticmethod
def close():
"""
Closes traci client.
"""
traci.close()