Source code for flatland.envs.schedule_generators

"""Schedule generators (railway undertaking, "EVU")."""
import warnings
from typing import Tuple, List, Callable, Mapping, Optional, Any

import msgpack
import numpy as np
from numpy.random.mtrand import RandomState

from flatland.core.grid.grid4_utils import get_new_position
from flatland.core.transition_map import GridTransitionMap
from flatland.envs.agent_utils import EnvAgent
from flatland.envs.schedule_utils import Schedule

AgentPosition = Tuple[int, int]
ScheduleGenerator = Callable[[GridTransitionMap, int, Optional[Any], Optional[int]], Schedule]


[docs]def speed_initialization_helper(nb_agents: int, speed_ratio_map: Mapping[float, float] = None, seed: int = None, np_random: RandomState = None) -> List[float]: """ Parameters ---------- nb_agents : int The number of agents to generate a speed for speed_ratio_map : Mapping[float,float] A map of speeds mappint to their ratio of appearance. The ratios must sum up to 1. Returns ------- List[float] A list of size nb_agents of speeds with the corresponding probabilistic ratios. """ if speed_ratio_map is None: return [1.0] * nb_agents nb_classes = len(speed_ratio_map.keys()) speed_ratio_map_as_list: List[Tuple[float, float]] = list(speed_ratio_map.items()) speed_ratios = list(map(lambda t: t[1], speed_ratio_map_as_list)) speeds = list(map(lambda t: t[0], speed_ratio_map_as_list)) return list(map(lambda index: speeds[index], np_random.choice(nb_classes, nb_agents, p=speed_ratios)))
[docs]def complex_schedule_generator(speed_ratio_map: Mapping[float, float] = None, seed: int = 1) -> ScheduleGenerator: """ Generator used to generate the levels of Round 1 in the Flatland Challenge. It can only be used together with complex_rail_generator. It places agents at end and start points provided by the rail generator. It assigns speeds to the different agents according to the speed_ratio_map :param speed_ratio_map: Speed ratios of all agents. They are probabilities of all different speeds and have to add up to 1. :param seed: Initiate random seed generator :return: """ def generator(rail: GridTransitionMap, num_agents: int, hints: Any = None, num_resets: int = 0, np_random: RandomState = None) -> Schedule: """ The generator that assigns tasks to all the agents :param rail: Rail infrastructure given by the rail_generator :param num_agents: Number of agents to include in the schedule :param hints: Hints provided by the rail_generator These include positions of start/target positions :param num_resets: How often the generator has been reset. :return: Returns the generator to the rail constructor """ # Todo: Remove parameters and variables not used for next version, Issue: <https://gitlab.aicrowd.com/flatland/flatland/issues/305> _runtime_seed = seed + num_resets start_goal = hints['start_goal'] start_dir = hints['start_dir'] agents_position = [sg[0] for sg in start_goal[:num_agents]] agents_target = [sg[1] for sg in start_goal[:num_agents]] agents_direction = start_dir[:num_agents] if speed_ratio_map: speeds = speed_initialization_helper(num_agents, speed_ratio_map, seed=_runtime_seed, np_random=np_random) else: speeds = [1.0] * len(agents_position) return Schedule(agent_positions=agents_position, agent_directions=agents_direction, agent_targets=agents_target, agent_speeds=speeds, agent_malfunction_rates=None) return generator
[docs]def sparse_schedule_generator(speed_ratio_map: Mapping[float, float] = None, seed: int = 1) -> ScheduleGenerator: """ This is the schedule generator which is used for Round 2 of the Flatland challenge. It produces schedules to railway networks provided by sparse_rail_generator. :param speed_ratio_map: Speed ratios of all agents. They are probabilities of all different speeds and have to add up to 1. :param seed: Initiate random seed generator """ def generator(rail: GridTransitionMap, num_agents: int, hints: Any = None, num_resets: int = 0, np_random: RandomState = None) -> Schedule: """ The generator that assigns tasks to all the agents :param rail: Rail infrastructure given by the rail_generator :param num_agents: Number of agents to include in the schedule :param hints: Hints provided by the rail_generator These include positions of start/target positions :param num_resets: How often the generator has been reset. :return: Returns the generator to the rail constructor """ _runtime_seed = seed + num_resets train_stations = hints['train_stations'] city_positions = hints['city_positions'] city_orientation = hints['city_orientations'] max_num_agents = hints['num_agents'] city_orientations = hints['city_orientations'] if num_agents > max_num_agents: num_agents = max_num_agents warnings.warn("Too many agents! Changes number of agents.") # Place agents and targets within available train stations agents_position = [] agents_target = [] agents_direction = [] for agent_idx in range(num_agents): infeasible_agent = True tries = 0 while infeasible_agent: tries += 1 infeasible_agent = False # Set target for agent city_idx = np_random.choice(len(city_positions), 2, replace=False) start_city = city_idx[0] target_city = city_idx[1] start_idx = np_random.choice(np.arange(len(train_stations[start_city]))) target_idx = np_random.choice(np.arange(len(train_stations[target_city]))) start = train_stations[start_city][start_idx] target = train_stations[target_city][target_idx] while start[1] % 2 != 0: start_idx = np_random.choice(np.arange(len(train_stations[start_city]))) start = train_stations[start_city][start_idx] while target[1] % 2 != 1: target_idx = np_random.choice(np.arange(len(train_stations[target_city]))) target = train_stations[target_city][target_idx] possible_orientations = [city_orientation[start_city], (city_orientation[start_city] + 2) % 4] agent_orientation = np_random.choice(possible_orientations) if not rail.check_path_exists(start[0], agent_orientation, target[0]): agent_orientation = (agent_orientation + 2) % 4 if not (rail.check_path_exists(start[0], agent_orientation, target[0])): infeasible_agent = True if tries >= 100: warnings.warn("Did not find any possible path, check your parameters!!!") break agents_position.append((start[0][0], start[0][1])) agents_target.append((target[0][0], target[0][1])) agents_direction.append(agent_orientation) # Orient the agent correctly if speed_ratio_map: speeds = speed_initialization_helper(num_agents, speed_ratio_map, seed=_runtime_seed, np_random=np_random) else: speeds = [1.0] * len(agents_position) return Schedule(agent_positions=agents_position, agent_directions=agents_direction, agent_targets=agents_target, agent_speeds=speeds, agent_malfunction_rates=None) return generator
[docs]def random_schedule_generator(speed_ratio_map: Optional[Mapping[float, float]] = None, seed: int = 1) -> ScheduleGenerator: """ Given a `rail` GridTransitionMap, return a random placement of agents (initial position, direction and target). Parameters ---------- speed_ratio_map : Optional[Mapping[float, float]] A map of speeds mapping to their ratio of appearance. The ratios must sum up to 1. Returns ------- Tuple[List[Tuple[int,int]], List[Tuple[int,int]], List[Tuple[int,int]], List[float]] initial positions, directions, targets speeds """ def generator(rail: GridTransitionMap, num_agents: int, hints: Any = None, num_resets: int = 0, np_random: RandomState = None) -> Schedule: _runtime_seed = seed + num_resets valid_positions = [] for r in range(rail.height): for c in range(rail.width): if rail.get_full_transitions(r, c) > 0: valid_positions.append((r, c)) if len(valid_positions) == 0: return Schedule(agent_positions=[], agent_directions=[], agent_targets=[], agent_speeds=[], agent_malfunction_rates=None) if len(valid_positions) < num_agents: warnings.warn("schedule_generators: len(valid_positions) < num_agents") return Schedule(agent_positions=[], agent_directions=[], agent_targets=[], agent_speeds=[], agent_malfunction_rates=None) agents_position_idx = [i for i in np_random.choice(len(valid_positions), num_agents, replace=False)] agents_position = [valid_positions[agents_position_idx[i]] for i in range(num_agents)] agents_target_idx = [i for i in np_random.choice(len(valid_positions), num_agents, replace=False)] agents_target = [valid_positions[agents_target_idx[i]] for i in range(num_agents)] update_agents = np.zeros(num_agents) re_generate = True cnt = 0 while re_generate: cnt += 1 if cnt > 1: print("re_generate cnt={}".format(cnt)) if cnt > 1000: raise Exception("After 1000 re_generates still not success, giving up.") # update position for i in range(num_agents): if update_agents[i] == 1: x = np.setdiff1d(np.arange(len(valid_positions)), agents_position_idx) agents_position_idx[i] = np_random.choice(x) agents_position[i] = valid_positions[agents_position_idx[i]] x = np.setdiff1d(np.arange(len(valid_positions)), agents_target_idx) agents_target_idx[i] = np_random.choice(x) agents_target[i] = valid_positions[agents_target_idx[i]] update_agents = np.zeros(num_agents) # agents_direction must be a direction for which a solution is # guaranteed. agents_direction = [0] * num_agents re_generate = False for i in range(num_agents): valid_movements = [] for direction in range(4): position = agents_position[i] moves = rail.get_transitions(position[0], position[1], direction) for move_index in range(4): if moves[move_index]: valid_movements.append((direction, move_index)) valid_starting_directions = [] for m in valid_movements: new_position = get_new_position(agents_position[i], m[1]) if m[0] not in valid_starting_directions and rail.check_path_exists(new_position, m[1], agents_target[i]): valid_starting_directions.append(m[0]) if len(valid_starting_directions) == 0: update_agents[i] = 1 warnings.warn( "reset position for agent[{}]: {} -> {}".format(i, agents_position[i], agents_target[i])) re_generate = True break else: agents_direction[i] = valid_starting_directions[ np_random.choice(len(valid_starting_directions), 1)[0]] agents_speed = speed_initialization_helper(num_agents, speed_ratio_map, seed=_runtime_seed, np_random=np_random) return Schedule(agent_positions=agents_position, agent_directions=agents_direction, agent_targets=agents_target, agent_speeds=agents_speed, agent_malfunction_rates=None) return generator
[docs]def schedule_from_file(filename, load_from_package=None) -> ScheduleGenerator: """ Utility to load pickle file Parameters ---------- input_file : Pickle file generated by env.save() or editor Returns ------- Tuple[List[Tuple[int,int]], List[Tuple[int,int]], List[Tuple[int,int]], List[float]] initial positions, directions, targets speeds """ def generator(rail: GridTransitionMap, num_agents: int, hints: Any = None, num_resets: int = 0, np_random: RandomState = None) -> Schedule: if load_from_package is not None: from importlib_resources import read_binary load_data = read_binary(load_from_package, filename) else: with open(filename, "rb") as file_in: load_data = file_in.read() data = msgpack.unpackb(load_data, use_list=False, encoding='utf-8') if "agents_static" in data: agents = EnvAgent.load_legacy_static_agent(data["agents_static"]) else: agents = [EnvAgent(*d[0:12]) for d in data["agents"]] # setup with loaded data agents_position = [a.initial_position for a in agents] agents_direction = [a.direction for a in agents] agents_target = [a.target for a in agents] agents_speed = [a.speed_data['speed'] for a in agents] agents_malfunction = [a.malfunction_data['malfunction_rate'] for a in agents] return Schedule(agent_positions=agents_position, agent_directions=agents_direction, agent_targets=agents_target, agent_speeds=agents_speed, agent_malfunction_rates=None) return generator