Source code for plaza_routing.business.public_transport_connection_finder

from typing import List
import logging

from plaza_routing.business.util import coordinate_transformer

from plaza_routing.integration import overpass_service
from plaza_routing.integration import search_ch_service

logger = logging.getLogger('plaza_routing.public_transport_connection_finder')


[docs]def get_public_transport_stops(start: tuple) -> dict: return overpass_service.get_public_transport_stops(start)
[docs]def get_public_transport_connection(start_uic_ref: str, destination: tuple, departure: str) -> dict: connection = search_ch_service.get_connection(start_uic_ref, _tuple_to_str(destination), departure) return _generate_public_transport_connection(connection)
[docs]def optimize_public_transport_connection(public_transport_connection: dict) -> dict: """ retrieves accurate coordinates for the public transport stops in each leg """ for leg in public_transport_connection['path']: logger.debug(f'optimize public transport connection leg: {leg["start"]} to {leg["destination"]}') fallback_start_position = tuple(leg['start_position']) fallback_exit_position = tuple(leg['exit_position']) lookup_position = fallback_start_position start_position, exit_position = overpass_service.get_connection_coordinates(lookup_position, leg['start_stop_uicref'], leg['exit_stop_uicref'], leg['line'], fallback_start_position, fallback_exit_position) leg['start_position'] = [*start_position] leg['exit_position'] = [*exit_position] return public_transport_connection
def _generate_public_transport_connection(connection: dict) -> dict: """ produces an routable public transport connection """ legs = connection['legs'] result = { 'type': 'public_transport', 'path': [], 'duration': connection['duration'], 'number_of_legs': 0 } for leg in legs: start_position = coordinate_transformer.transform_ch_to_wgs(leg['x'], leg['y']) exit_position = coordinate_transformer.transform_ch_to_wgs(leg['exit']['x'], leg['exit']['y']) result['path'].append(_generate_path(leg, start_position, exit_position)) result['number_of_legs'] = len(result['path']) return result def _generate_path(leg: dict, start_position: tuple, exit_position: tuple) -> dict: return { 'start': leg['name'], 'destination': leg['exit']['name'], 'line_type': leg['type'], 'line': leg['line'], 'track': leg['track'], 'terminal': leg['terminal'], 'departure': leg['departure'], 'arrival': leg['exit']['arrival'], 'start_position': [*start_position], 'exit_position': [*exit_position], 'start_stop_uicref': leg['stopid'], 'exit_stop_uicref': leg['exit']['stopid'], 'stopovers': _get_stopovers(leg['stops']) } def _get_stopovers(stopovers: List[dict]) -> List[List[float]]: """ Returns the coordinates for all provided stopovers in a list. The approximation based on the search.ch coordinates is enough. """ path = [] for stopover in stopovers: path.append([*coordinate_transformer.transform_ch_to_wgs(stopover['x'], stopover['y'])]) return path def _tuple_to_str(value: tuple) -> str: """ returns a tuple as a string without parentheses """ return ','.join(map(str, value))