Source code for plaza_preprocessing.optimizer.shortest_paths
import logging
import time
import networkx as nx
from shapely.geometry import LineString, Point
from typing import List, Tuple, Set, Dict
logger = logging.getLogger('plaza_preprocessing.optimizer')
[docs]def create_graph(graph_edges: List[LineString]) -> nx.Graph:
""" create a networkx graph with a collection of edges"""
graph = nx.Graph()
graph.add_nodes_from(_collect_nodes(graph_edges))
graph.add_edges_from(_collect_edges(graph_edges))
return graph
[docs]def compute_dijkstra_shortest_paths(graph: nx.Graph, entry_points: List[Point]) -> List[LineString]:
"""
compute a list of shortest paths as LineStrings between all pairs of entry points
using the dijkstra algorithm
"""
entry_coords = list(map(lambda point: (point.x, point.y), entry_points))
start_time = time.perf_counter()
paths = dict(nx.all_pairs_dijkstra_path(graph))
shortest_lines = _extract_lines_between_entry_points(paths, entry_coords)
end_time = time.perf_counter()
elapsed_time_ms = (end_time - start_time) * 1000
logger.debug(f"computed dijkstra shortest paths in {elapsed_time_ms:.2f} milliseconds")
return shortest_lines
[docs]def compute_astar_shortest_paths(graph: nx.Graph, entry_points: List[Point]) -> List[LineString]:
"""
compute a list of shortest paths as LineStrings between all pairs of entry points
using the astar algorithm. Uses direct distance between entry points as a heuristic
"""
entry_coords = list(map(lambda point: (point.x, point.y), entry_points))
lines = []
start_time = time.perf_counter()
for start_node in entry_coords:
for end_node in entry_coords:
if start_node < end_node:
try:
path = nx.astar_path(graph, start_node, end_node, heuristic=_distance_between_nodes)
except (nx.NetworkXNoPath, nx.NodeNotFound):
logger.debug(f"no path between {start_node} and {end_node}, discarding path")
continue
lines.append(LineString(path))
end_time = time.perf_counter()
elapsed_time_ms = (end_time - start_time) * 1000
logger.debug(f"computed shortest paths with astar in {elapsed_time_ms:.2f} milliseconds")
return lines
def _extract_lines_between_entry_points(shortest_paths: Dict, entry_coords: List[Tuple]) -> List[LineString]:
""" create shortest lines between every pair of entry points"""
lines = []
for start_node in entry_coords:
if start_node not in shortest_paths:
logger.warning(f"entry point {start_node} is not reachable on the graph, discarding paths")
continue
for end_node in entry_coords:
if start_node < end_node:
path_start = shortest_paths[start_node]
if end_node not in path_start:
logger.debug(f"entry point {end_node} is not reachable from {start_node}, discarding path")
continue
path = path_start[end_node]
lines.append(LineString(path))
return lines
def _distance_between_nodes(node_1, node_2):
"""calculate the euclidean distance of two points using pythagoras"""
x1, y1 = node_1
x2, y2 = node_2
return ((x1 - x2) ** 2 + (y1 - y2) ** 2) ** 0.5
def _collect_nodes(graph_edges: List[LineString]) -> Set[Tuple[float, float]]:
"""" collect a set of nodes from all edges without duplicates """
nodes = set()
coordinates = (coords for line in graph_edges for coords in line.coords)
for node_coords in coordinates:
nodes.add(node_coords)
return nodes
def _collect_edges(graph_edges: List[LineString]) -> List[Tuple[float, float, Dict[str, float]]]:
""" collect edges with corresponding weights as the geometric distance between the points """
edges = []
for line in graph_edges:
weight = _calculate_weight_of_line(line)
edges.append((line.coords[0], line.coords[1], {'weight': weight}))
return edges
def _calculate_weight_of_line(line: LineString) -> float:
""" calculate the weight of a line as the distance between the points """
weight = 0
point = Point(line.coords[0])
for coords in line.coords[1:]:
next_point = Point(coords)
weight += point.distance(next_point)
point = next_point
return weight