Source code for flatland.envs.rail_env_shortest_paths

import math
from typing import Dict, List, Optional, Tuple, Set

import matplotlib.pyplot as plt
import numpy as np

from flatland.core.grid.grid4 import Grid4TransitionsEnum
from flatland.core.grid.grid4_utils import get_new_position
from flatland.core.transition_map import GridTransitionMap
from flatland.envs.agent_utils import RailAgentStatus
from flatland.envs.distance_map import DistanceMap
from flatland.envs.rail_env import RailEnvNextAction, RailEnvActions, RailEnv
from flatland.envs.rail_trainrun_data_structures import Waypoint
from flatland.utils.ordered_set import OrderedSet


[docs]def get_valid_move_actions_(agent_direction: Grid4TransitionsEnum, agent_position: Tuple[int, int], rail: GridTransitionMap) -> Set[RailEnvNextAction]: """ Get the valid move actions (forward, left, right) for an agent. TODO https://gitlab.aicrowd.com/flatland/flatland/issues/299 The implementation could probably be more efficient and more elegant. But given the few calls this has no priority now. Parameters ---------- agent_direction : Grid4TransitionsEnum agent_position: Tuple[int,int] rail : GridTransitionMap Returns ------- Set of `RailEnvNextAction` (tuples of (action,position,direction)) Possible move actions (forward,left,right) and the next position/direction they lead to. It is not checked that the next cell is free. """ valid_actions: Set[RailEnvNextAction] = OrderedSet() possible_transitions = rail.get_transitions(*agent_position, agent_direction) num_transitions = np.count_nonzero(possible_transitions) # Start from the current orientation, and see which transitions are available; # organize them as [left, forward, right], relative to the current orientation # If only one transition is possible, the forward branch is aligned with it. if rail.is_dead_end(agent_position): action = RailEnvActions.MOVE_FORWARD exit_direction = (agent_direction + 2) % 4 if possible_transitions[exit_direction]: new_position = get_new_position(agent_position, exit_direction) valid_actions.add(RailEnvNextAction(action, new_position, exit_direction)) elif num_transitions == 1: action = RailEnvActions.MOVE_FORWARD for new_direction in [(agent_direction + i) % 4 for i in range(-1, 2)]: if possible_transitions[new_direction]: new_position = get_new_position(agent_position, new_direction) valid_actions.add(RailEnvNextAction(action, new_position, new_direction)) else: for new_direction in [(agent_direction + i) % 4 for i in range(-1, 2)]: if possible_transitions[new_direction]: if new_direction == agent_direction: action = RailEnvActions.MOVE_FORWARD elif new_direction == (agent_direction + 1) % 4: action = RailEnvActions.MOVE_RIGHT elif new_direction == (agent_direction - 1) % 4: action = RailEnvActions.MOVE_LEFT else: raise Exception("Illegal state") new_position = get_new_position(agent_position, new_direction) valid_actions.add(RailEnvNextAction(action, new_position, new_direction)) return valid_actions
[docs]def get_new_position_for_action( agent_position: Tuple[int, int], agent_direction: Grid4TransitionsEnum, action: RailEnvActions, rail: GridTransitionMap) -> Tuple[int, int, int]: """ Get the next position for this action. TODO https://gitlab.aicrowd.com/flatland/flatland/issues/299 The implementation could probably be more efficient and more elegant. But given the few calls this has no priority now. Parameters ---------- agent_position agent_direction action rail Returns ------- Tuple[int,int,int] row, column, direction """ possible_transitions = rail.get_transitions(*agent_position, agent_direction) num_transitions = np.count_nonzero(possible_transitions) # Start from the current orientation, and see which transitions are available; # organize them as [left, forward, right], relative to the current orientation # If only one transition is possible, the forward branch is aligned with it. if rail.is_dead_end(agent_position): valid_action = RailEnvActions.MOVE_FORWARD exit_direction = (agent_direction + 2) % 4 if possible_transitions[exit_direction]: new_position = get_new_position(agent_position, exit_direction) if valid_action == action: return new_position, exit_direction elif num_transitions == 1: valid_action = RailEnvActions.MOVE_FORWARD for new_direction in [(agent_direction + i) % 4 for i in range(-1, 2)]: if possible_transitions[new_direction]: new_position = get_new_position(agent_position, new_direction) if valid_action == action: return new_position, new_direction else: for new_direction in [(agent_direction + i) % 4 for i in range(-1, 2)]: if possible_transitions[new_direction]: if new_direction == agent_direction: valid_action = RailEnvActions.MOVE_FORWARD if valid_action == action: new_position = get_new_position(agent_position, new_direction) return new_position, new_direction elif new_direction == (agent_direction + 1) % 4: valid_action = RailEnvActions.MOVE_RIGHT if valid_action == action: new_position = get_new_position(agent_position, new_direction) return new_position, new_direction elif new_direction == (agent_direction - 1) % 4: valid_action = RailEnvActions.MOVE_LEFT if valid_action == action: new_position = get_new_position(agent_position, new_direction) return new_position, new_direction
[docs]def get_action_for_move( agent_position: Tuple[int, int], agent_direction: Grid4TransitionsEnum, next_agent_position: Tuple[int, int], next_agent_direction: int, rail: GridTransitionMap) -> Optional[RailEnvActions]: """ Get the action (if any) to move from a position and direction to another. TODO https://gitlab.aicrowd.com/flatland/flatland/issues/299 The implementation could probably be more efficient and more elegant. But given the few calls this has no priority now. Parameters ---------- agent_position agent_direction next_agent_position next_agent_direction rail Returns ------- Optional[RailEnvActions] the action (if direct transition possible) or None. """ possible_transitions = rail.get_transitions(*agent_position, agent_direction) num_transitions = np.count_nonzero(possible_transitions) # Start from the current orientation, and see which transitions are available; # organize them as [left, forward, right], relative to the current orientation # If only one transition is possible, the forward branch is aligned with it. if rail.is_dead_end(agent_position): valid_action = RailEnvActions.MOVE_FORWARD new_direction = (agent_direction + 2) % 4 if possible_transitions[new_direction]: new_position = get_new_position(agent_position, new_direction) if new_position == next_agent_position and new_direction == next_agent_direction: return valid_action elif num_transitions == 1: valid_action = RailEnvActions.MOVE_FORWARD for new_direction in [(agent_direction + i) % 4 for i in range(-1, 2)]: if possible_transitions[new_direction]: new_position = get_new_position(agent_position, new_direction) if new_position == next_agent_position and new_direction == next_agent_direction: return valid_action else: for new_direction in [(agent_direction + i) % 4 for i in range(-1, 2)]: if possible_transitions[new_direction]: if new_direction == agent_direction: valid_action = RailEnvActions.MOVE_FORWARD new_position = get_new_position(agent_position, new_direction) if new_position == next_agent_position and new_direction == next_agent_direction: return valid_action elif new_direction == (agent_direction + 1) % 4: valid_action = RailEnvActions.MOVE_RIGHT new_position = get_new_position(agent_position, new_direction) if new_position == next_agent_position and new_direction == next_agent_direction: return valid_action elif new_direction == (agent_direction - 1) % 4: valid_action = RailEnvActions.MOVE_LEFT new_position = get_new_position(agent_position, new_direction) if new_position == next_agent_position and new_direction == next_agent_direction: return valid_action
# N.B. get_shortest_paths is not part of distance_map since it refers to RailEnvActions (would lead to circularity!)
[docs]def get_shortest_paths(distance_map: DistanceMap, max_depth: Optional[int] = None, agent_handle: Optional[int] = None) \ -> Dict[int, Optional[List[Waypoint]]]: """ Computes the shortest path for each agent to its target and the action to be taken to do so. The paths are derived from a `DistanceMap`. If there is no path (rail disconnected), the path is given as None. The agent state (moving or not) and its speed are not taken into account example: agent_fixed_travel_paths = get_shortest_paths(env.distance_map, None, agent.handle) path = agent_fixed_travel_paths[agent.handle] Parameters ---------- distance_map : reference to the distance_map max_depth : max path length, if the shortest path is longer, it will be cutted agent_handle : if set, the shortest for agent.handle will be returned , otherwise for all agents Returns ------- Dict[int, Optional[List[WalkingElement]]] """ shortest_paths = dict() def _shortest_path_for_agent(agent): if agent.status == RailAgentStatus.READY_TO_DEPART: position = agent.initial_position elif agent.status == RailAgentStatus.ACTIVE: position = agent.position elif agent.status == RailAgentStatus.DONE: position = agent.target else: shortest_paths[agent.handle] = None return direction = agent.direction shortest_paths[agent.handle] = [] distance = math.inf depth = 0 while (position != agent.target and (max_depth is None or depth < max_depth)): next_actions = get_valid_move_actions_(direction, position, distance_map.rail) best_next_action = None for next_action in next_actions: next_action_distance = distance_map.get()[ agent.handle, next_action.next_position[0], next_action.next_position[ 1], next_action.next_direction] if next_action_distance < distance: best_next_action = next_action distance = next_action_distance shortest_paths[agent.handle].append(Waypoint(position, direction)) depth += 1 # if there is no way to continue, the rail must be disconnected! # (or distance map is incorrect) if best_next_action is None: shortest_paths[agent.handle] = None return position = best_next_action.next_position direction = best_next_action.next_direction if max_depth is None or depth < max_depth: shortest_paths[agent.handle].append(Waypoint(position, direction)) if agent_handle is not None: _shortest_path_for_agent(distance_map.agents[agent_handle]) else: for agent in distance_map.agents: _shortest_path_for_agent(agent) return shortest_paths
[docs]def get_k_shortest_paths(env: RailEnv, source_position: Tuple[int, int], source_direction: int, target_position=Tuple[int, int], k: int = 1, debug=False) -> List[Tuple[Waypoint]]: """ Computes the k shortest paths using modified Dijkstra following pseudo-code https://en.wikipedia.org/wiki/K_shortest_path_routing In contrast to the pseudo-code in wikipedia, we do not a allow for loopy paths. Parameters ---------- env : RailEnv source_position: Tuple[int,int] source_direction: int target_position: Tuple[int,int] k : int max number of shortest paths debug: bool print debug statements Returns ------- List[Tuple[WalkingElement]] We use tuples since we need the path elements to be hashable. We use a list of paths in order to keep the order of length. """ # P: set of shortest paths from s to t # P =empty, shortest_paths: List[Tuple[Waypoint]] = [] # countu: number of shortest paths found to node u # countu = 0, for all u in V count = {(r, c, d): 0 for r in range(env.height) for c in range(env.width) for d in range(4)} # B is a heap data structure containing paths # N.B. use OrderedSet to make result deterministic! heap: OrderedSet[Tuple[Waypoint]] = OrderedSet() # insert path Ps = {s} into B with cost 0 heap.add((Waypoint(source_position, source_direction),)) # while B is not empty and countt < K: while len(heap) > 0 and len(shortest_paths) < k: if debug: print("iteration heap={}, shortest_paths={}".format(heap, shortest_paths)) # – let Pu be the shortest cost path in B with cost C cost = np.inf pu = None for path in heap: if len(path) < cost: pu = path cost = len(path) u: Waypoint = pu[-1] if debug: print(" looking at pu={}".format(pu)) # – B = B − {Pu } heap.remove(pu) # – countu = countu + 1 urcd = (*u.position, u.direction) count[urcd] += 1 # – if u = t then P = P U {Pu} if u.position == target_position: if debug: print(" found of length {} {}".format(len(pu), pu)) shortest_paths.append(pu) # – if countu ≤ K then # CAVEAT: do not allow for loopy paths elif count[urcd] <= k: possible_transitions = env.rail.get_transitions(*urcd) if debug: print(" looking at neighbors of u={}, transitions are {}".format(u, possible_transitions)) # for each vertex v adjacent to u: for new_direction in range(4): if debug: print(" looking at new_direction={}".format(new_direction)) if possible_transitions[new_direction]: new_position = get_new_position(u.position, new_direction) if debug: print(" looking at neighbor v={}".format((*new_position, new_direction))) v = Waypoint(position=new_position, direction=new_direction) # CAVEAT: do not allow for loopy paths if v in pu: continue # – let Pv be a new path with cost C + w(u, v) formed by concatenating edge (u, v) to path Pu pv = pu + (v,) # – insert Pv into B heap.add(pv) # return P return shortest_paths
[docs]def visualize_distance_map(distance_map: DistanceMap, agent_handle: int = 0): if agent_handle >= distance_map.get().shape[0]: print("Error: agent_handle cannot be larger than actual number of agents") return # take min value of all 4 directions min_distance_map = np.min(distance_map.get(), axis=3) plt.imshow(min_distance_map[agent_handle][:][:]) plt.show()