#!/usr/bin/env python # Copyright (c) 2020 Computer Vision Center (CVC) at the Universitat Autonoma de # Barcelona (UAB). # # This work is licensed under the terms of the MIT license. # For a copy, see . """ This module is responsible for the management of the sumo simulation. """ # ================================================================================================== # -- imports --------------------------------------------------------------------------------------- # ================================================================================================== import collections import enum import logging import os import carla # pylint: disable=import-error import sumolib # pylint: disable=import-error import traci # pylint: disable=import-error from .constants import INVALID_ACTOR_ID import lxml.etree as ET # pylint: disable=import-error # ================================================================================================== # -- sumo definitions ------------------------------------------------------------------------------ # ================================================================================================== # https://sumo.dlr.de/docs/Simulation/Traffic_Lights.html#signal_state_definitions class SumoSignalState(object): """ SumoSignalState contains the different traffic light states. """ RED = 'r' YELLOW = 'y' GREEN = 'G' GREEN_WITHOUT_PRIORITY = 'g' GREEN_RIGHT_TURN = 's' RED_YELLOW = 'u' OFF_BLINKING = 'o' OFF = 'O' # https://sumo.dlr.de/docs/TraCI/Vehicle_Signalling.html class SumoVehSignal(object): """ SumoVehSignal contains the different sumo vehicle signals. """ BLINKER_RIGHT = 1 << 0 BLINKER_LEFT = 1 << 1 BLINKER_EMERGENCY = 1 << 2 BRAKELIGHT = 1 << 3 FRONTLIGHT = 1 << 4 FOGLIGHT = 1 << 5 HIGHBEAM = 1 << 6 BACKDRIVE = 1 << 7 WIPER = 1 << 8 DOOR_OPEN_LEFT = 1 << 9 DOOR_OPEN_RIGHT = 1 << 10 EMERGENCY_BLUE = 1 << 11 EMERGENCY_RED = 1 << 12 EMERGENCY_YELLOW = 1 << 13 # https://sumo.dlr.de/docs/Definition_of_Vehicles,_Vehicle_Types,_and_Routes.html#abstract_vehicle_class class SumoActorClass(enum.Enum): """ SumoActorClass enumerates the different sumo actor classes. """ IGNORING = "ignoring" PRIVATE = "private" EMERGENCY = "emergency" AUTHORITY = "authority" ARMY = "army" VIP = "vip" PEDESTRIAN = "pedestrian" PASSENGER = "passenger" HOV = "hov" TAXI = "taxi" BUS = "bus" COACH = "coach" DELIVERY = "delivery" TRUCK = "truck" TRAILER = "trailer" MOTORCYCLE = "motorcycle" MOPED = "moped" BICYCLE = "bicycle" EVEHICLE = "evehicle" TRAM = "tram" RAIL_URBAN = "rail_urban" RAIL = "rail" RAIL_ELECTRIC = "rail_electric" RAIL_FAST = "rail_fast" SHIP = "ship" CUSTOM1 = "custom1" CUSTOM2 = "custom2" SumoActor = collections.namedtuple('SumoActor', 'type_id vclass transform signals extent color') # ================================================================================================== # -- sumo traffic lights --------------------------------------------------------------------------- # ================================================================================================== class SumoTLLogic(object): """ SumoTLLogic holds the data relative to a traffic light in sumo. """ def __init__(self, tlid, states, parameters): self.tlid = tlid self.states = states self._landmark2link = {} self._link2landmark = {} for link_index, landmark_id in parameters.items(): # Link index information is added in the parameter as 'linkSignalID:x' link_index = int(link_index.split(':')[1]) if landmark_id not in self._landmark2link: self._landmark2link[landmark_id] = [] self._landmark2link[landmark_id].append((tlid, link_index)) self._link2landmark[(tlid, link_index)] = landmark_id def get_number_signals(self): """ Returns number of internal signals of the traffic light. """ if len(self.states) > 0: return len(self.states[0]) return 0 def get_all_signals(self): """ Returns all the signals of the traffic light. :returns list: [(tlid, link_index), (tlid, link_index), ...] """ return [(self.tlid, i) for i in range(self.get_number_signals())] def get_all_landmarks(self): """ Returns all the landmarks associated with this traffic light. """ return self._landmark2link.keys() def get_associated_signals(self, landmark_id): """ Returns all the signals associated with the given landmark. :returns list: [(tlid, link_index), (tlid, link_index), ...] """ return self._landmark2link.get(landmark_id, []) class SumoTLManager(object): """ SumoTLManager is responsible for the management of the sumo traffic lights (i.e., keeps control of the current program, phase, ...) """ def __init__(self): self._tls = {} # {tlid: {program_id: SumoTLLogic} self._current_program = {} # {tlid: program_id} self._current_phase = {} # {tlid: index_phase} for tlid in traci.trafficlight.getIDList(): self.subscribe(tlid) self._tls[tlid] = {} for tllogic in traci.trafficlight.getAllProgramLogics(tlid): states = [phase.state for phase in tllogic.getPhases()] parameters = tllogic.getParameters() tl = SumoTLLogic(tlid, states, parameters) self._tls[tlid][tllogic.programID] = tl # Get current status of the traffic lights. self._current_program[tlid] = traci.trafficlight.getProgram(tlid) self._current_phase[tlid] = traci.trafficlight.getPhase(tlid) self._off = False @staticmethod def subscribe(tlid): """ Subscribe the given traffic ligth to the following variables: * Current program. * Current phase. """ traci.trafficlight.subscribe(tlid, [ traci.constants.TL_CURRENT_PROGRAM, traci.constants.TL_CURRENT_PHASE, ]) @staticmethod def unsubscribe(tlid): """ Unsubscribe the given traffic ligth from receiving updated information each step. """ traci.trafficlight.unsubscribe(tlid) def get_all_signals(self): """ Returns all the traffic light signals. """ signals = set() for tlid, program_id in self._current_program.items(): signals.update(self._tls[tlid][program_id].get_all_signals()) return signals def get_all_landmarks(self): """ Returns all the landmarks associated with a traffic light in the simulation. """ landmarks = set() for tlid, program_id in self._current_program.items(): landmarks.update(self._tls[tlid][program_id].get_all_landmarks()) return landmarks def get_all_associated_signals(self, landmark_id): """ Returns all the signals associated with the given landmark. :returns list: [(tlid, link_index), (tlid, link_index), ...] """ signals = set() for tlid, program_id in self._current_program.items(): signals.update(self._tls[tlid][program_id].get_associated_signals(landmark_id)) return signals def get_state(self, landmark_id): """ Returns the traffic light state of the signals associated with the given landmark. """ states = set() for tlid, link_index in self.get_all_associated_signals(landmark_id): current_program = self._current_program[tlid] current_phase = self._current_phase[tlid] tl = self._tls[tlid][current_program] states.update(tl.states[current_phase][link_index]) if len(states) == 1: return states.pop() elif len(states) > 1: logging.warning('Landmark %s is associated with signals with different states', landmark_id) return SumoSignalState.RED else: return None def set_state(self, landmark_id, state): """ Updates the state of all the signals associated with the given landmark. """ for tlid, link_index in self.get_all_associated_signals(landmark_id): traci.trafficlight.setLinkState(tlid, link_index, state) return True def switch_off(self): """ Switch off all traffic lights. """ for tlid, link_index in self.get_all_signals(): traci.trafficlight.setLinkState(tlid, link_index, SumoSignalState.OFF) self._off = True def tick(self): """ Tick to traffic light manager """ if self._off is False: for tl_id in traci.trafficlight.getIDList(): results = traci.trafficlight.getSubscriptionResults(tl_id) current_program = results[traci.constants.TL_CURRENT_PROGRAM] current_phase = results[traci.constants.TL_CURRENT_PHASE] if current_program != 'online': self._current_program[tl_id] = current_program self._current_phase[tl_id] = current_phase # ================================================================================================== # -- sumo simulation ------------------------------------------------------------------------------- # ================================================================================================== def _get_sumo_net(cfg_file): """ Returns sumo net. This method reads the sumo configuration file and retrieve the sumo net filename to create the net. """ cfg_file = os.path.join(os.getcwd(), cfg_file) tree = ET.parse(cfg_file) tag = tree.find('//net-file') if tag is None: return None net_file = os.path.join(os.path.dirname(cfg_file), tag.get('value')) logging.debug('Reading net file: %s', net_file) sumo_net = traci.sumolib.net.readNet(net_file) return sumo_net class SumoSimulation(object): """ SumoSimulation is responsible for the management of the sumo simulation. """ def __init__(self, cfg_file, step_length, host=None, port=None, sumo_gui=False, client_order=1): if sumo_gui is True: sumo_binary = sumolib.checkBinary('sumo-gui') else: sumo_binary = sumolib.checkBinary('sumo') if host is None or port is None: logging.info('Starting new sumo server...') if sumo_gui is True: logging.info('Remember to press the play button to start the simulation') traci.start([sumo_binary, '--configuration-file', cfg_file, '--step-length', str(step_length), '--lateral-resolution', '0.25', '--collision.check-junctions' ]) else: logging.info('Connection to sumo server. Host: %s Port: %s', host, port) traci.init(host=host, port=port) traci.setOrder(client_order) # Retrieving net from configuration file. self.net = _get_sumo_net(cfg_file) # To keep track of the vehicle classes for which a route has been generated in sumo. self._routes = set() # Variable to asign an id to new added actors. self._sequential_id = 0 # Structures to keep track of the spawned and destroyed vehicles at each time step. self.spawned_actors = set() self.destroyed_actors = set() # Traffic light manager. self.traffic_light_manager = SumoTLManager() @property def traffic_light_ids(self): return self.traffic_light_manager.get_all_landmarks() @staticmethod def subscribe(actor_id): """ Subscribe the given actor to the following variables: * Type. * Vehicle class. * Color. * Length, Width, Height. * Position3D (i.e., x, y, z). * Angle, Slope. * Speed. * Lateral speed. * Signals. """ traci.vehicle.subscribe(actor_id, [ traci.constants.VAR_TYPE, traci.constants.VAR_VEHICLECLASS, traci.constants.VAR_COLOR, traci.constants.VAR_LENGTH, traci.constants.VAR_WIDTH, traci.constants.VAR_HEIGHT, traci.constants.VAR_POSITION3D, traci.constants.VAR_ANGLE, traci.constants.VAR_SLOPE, traci.constants.VAR_SPEED, traci.constants.VAR_SPEED_LAT, traci.constants.VAR_SIGNALS ]) @staticmethod def unsubscribe(actor_id): """ Unsubscribe the given actor from receiving updated information each step. """ traci.vehicle.unsubscribe(actor_id) def get_net_offset(self): """ Accessor for sumo net offset. """ if self.net is None: return (0, 0) return self.net.getLocationOffset() @staticmethod def get_actor(actor_id): """ Accessor for sumo actor. """ results = traci.vehicle.getSubscriptionResults(actor_id) type_id = results[traci.constants.VAR_TYPE] vclass = SumoActorClass(results[traci.constants.VAR_VEHICLECLASS]) color = results[traci.constants.VAR_COLOR] length = results[traci.constants.VAR_LENGTH] width = results[traci.constants.VAR_WIDTH] height = results[traci.constants.VAR_HEIGHT] location = list(results[traci.constants.VAR_POSITION3D]) rotation = [results[traci.constants.VAR_SLOPE], results[traci.constants.VAR_ANGLE], 0.0] transform = carla.Transform(carla.Location(location[0], location[1], location[2]), carla.Rotation(rotation[0], rotation[1], rotation[2])) signals = results[traci.constants.VAR_SIGNALS] extent = carla.Vector3D(length / 2.0, width / 2.0, height / 2.0) return SumoActor(type_id, vclass, transform, signals, extent, color) def spawn_actor(self, type_id, color=None): """ Spawns a new actor. :param type_id: vtype to be spawned. :param color: color attribute for this specific actor. :return: actor id if the actor is successfully spawned. Otherwise, INVALID_ACTOR_ID. """ actor_id = 'carla' + str(self._sequential_id) try: vclass = traci.vehicletype.getVehicleClass(type_id) if vclass not in self._routes: logging.debug('Creating route for %s vehicle class', vclass) allowed_edges = [e for e in self.net.getEdges() if e.allows(vclass)] if allowed_edges: traci.route.add("carla_route_{}".format(vclass), [allowed_edges[0].getID()]) self._routes.add(vclass) else: logging.error( 'Could not found a route for %s. No vehicle will be spawned in sumo', type_id) return INVALID_ACTOR_ID traci.vehicle.add(actor_id, 'carla_route_{}'.format(vclass), typeID=type_id) except traci.exceptions.TraCIException as error: logging.error('Spawn sumo actor failed: %s', error) return INVALID_ACTOR_ID if color is not None: color = color.split(',') traci.vehicle.setColor(actor_id, color) self._sequential_id += 1 return actor_id @staticmethod def destroy_actor(actor_id): """ Destroys the given actor. """ traci.vehicle.remove(actor_id) def get_traffic_light_state(self, landmark_id): """ Accessor for traffic light state. If the traffic ligth does not exist, returns None. """ return self.traffic_light_manager.get_state(landmark_id) def switch_off_traffic_lights(self): """ Switch off all traffic lights. """ self.traffic_light_manager.switch_off() def synchronize_vehicle(self, vehicle_id, transform, signals=None): """ Updates vehicle state. :param vehicle_id: id of the actor to be updated. :param transform: new vehicle transform (i.e., position and rotation). :param signals: new vehicle signals. :return: True if successfully updated. Otherwise, False. """ loc_x, loc_y = transform.location.x, transform.location.y yaw = transform.rotation.yaw traci.vehicle.moveToXY(vehicle_id, "", 0, loc_x, loc_y, angle=yaw, keepRoute=2) if signals is not None: traci.vehicle.setSignals(vehicle_id, signals) return True def synchronize_traffic_light(self, landmark_id, state): """ Updates traffic light state. :param tl_id: id of the traffic light to be updated (logic id, link index). :param state: new traffic light state. :return: True if successfully updated. Otherwise, False. """ self.traffic_light_manager.set_state(landmark_id, state) def tick(self): """ Tick to sumo simulation. """ traci.simulationStep() self.traffic_light_manager.tick() # Update data structures for the current frame. self.spawned_actors = set(traci.simulation.getDepartedIDList()) self.destroyed_actors = set(traci.simulation.getArrivedIDList()) @staticmethod def close(): """ Closes traci client. """ traci.close()