Source code for plaza_preprocessing.merger.merger

from sys import maxsize
from os import path
import tempfile
import logging
from datetime import datetime
from osmium import SimpleHandler, SimpleWriter
from osmium.osm.mutable import Way
from shapely.geometry import Point, LineString
import plaza_preprocessing.merger.plazatransformer as plazatransformer
import plaza_preprocessing.merger.osmosishelper as osmosishelper

logger = logging.getLogger('plaza_preprocessing.merger')


[docs]class WayExtractor(SimpleHandler): """ collect outer ways of plazas """ def __init__(self, entry_node_mappings): super().__init__() self.entry_node_mappings = entry_node_mappings self.ways = {}
[docs] def way(self, way): """ collect outer rings of plazas into a dictionary""" if way.id in self.entry_node_mappings: self.ways[way.id] = { 'version': way.version, 'nodes': [{'id': n.ref, 'coords': (n.lon, n.lat)} for n in way.nodes], 'tags': {t.k: t.v for t in way.tags} }
[docs]def merge_plaza_graphs(plazas, osm_file, merged_file, footway_tags): """ merge graph edges of plazas back into the original OSM file """ logger.info(f"Merging {len(plazas)} processed plazas back into {osm_file}") with tempfile.TemporaryDirectory() as tempdir: plaza_way_file = path.join(tempdir, 'plaza_ways.pbf') plaza_node_file = path.join(tempdir, 'plaza_nodes.pbf') modified_ways_file = path.join(tempdir, 'modified_ways.pbf') entry_node_mappings = plazatransformer.transform_plazas( plazas, plaza_node_file, plaza_way_file, footway_tags) plaza_ways = _extract_plaza_ways(entry_node_mappings, osm_file) _insert_entry_nodes(plaza_ways, entry_node_mappings) _write_modified_ways(plaza_ways, modified_ways_file) osmosishelper.merge_osm_files( merged_file, osm_file, plaza_way_file, plaza_node_file, modified_ways_file) logger.info(f"Merged OSM file written to {merged_file}")
def _insert_entry_nodes(plaza_ways, entry_node_mappings): """ insert entry node refs to the ways in the correct position """ for way_id, entry_nodes in entry_node_mappings.items(): if way_id not in plaza_ways: raise RuntimeError(f"Way {way_id} was not found in original osm file") way_nodes = plaza_ways.get(way_id).get('nodes') logger.debug(f"Inserting {len(entry_nodes)} entry point references into way {way_id}") for entry_node in entry_nodes: way_nodes = _insert_entry_node(entry_node, way_nodes) def _write_modified_ways(plaza_ways, filename): """ write the modified ways to an OSM file """ logger.debug(f"Writing modified ways to {filename}") ways = [] for way_id, way in plaza_ways.items(): node_refs = [node['id'] for node in way['nodes']] osm_way = Way(nodes=node_refs) osm_way.id = way_id osm_way.tags = way['tags'] # increase version number to overwrite original way osm_way.version = way['version'] + 1 osm_way.timestamp = _create_osm_timestamp() ways.append(osm_way) writer = SimpleWriter(filename) try: for way in ways: writer.add_way(way) finally: writer.close() def _insert_entry_node(entry_node, way_nodes): """ insert node reference in the correct position of way_nodes """ insert_position = _find_insert_position(entry_node, way_nodes) way_nodes.insert(insert_position, entry_node) return way_nodes def _find_insert_position(entry_node, way_nodes): entry_point = Point(entry_node['coords']) exact_match = _find_exact_insert_position(entry_point, way_nodes) return exact_match if exact_match is not None else ( _find_interpolated_insert_position(entry_point, way_nodes)) def _find_exact_insert_position(entry_point, way_nodes): """ try to find a point in the way_nodes that overlaps the entry point exactly """ for i, node in enumerate(way_nodes): way_point = Point(node['coords']) if way_point.equals(entry_point): return i return None def _find_interpolated_insert_position(entry_point, way_nodes): """ draws a line between pairs of way points and returns the index of the end node of the line to which the entry point is closest to """ logger.debug( f"Entry point {entry_point.coords} is not on way node, finding interpolated position") min_node = {'pos': None, 'distance': maxsize} for i in range(0, len(way_nodes) - 1): start_node = way_nodes[i] end_node = way_nodes[i + 1] line = LineString( [start_node['coords'], end_node['coords']]) distance = line.distance(entry_point) if distance == 0: return i + 1 if distance < min_node['distance']: min_node['pos'] = i + 1 min_node['distance'] = distance return min_node['pos'] def _extract_plaza_ways(entry_node_mappings, osm_file): way_extractor = WayExtractor(entry_node_mappings) index_type = 'sparse_mem_array' way_extractor.apply_file(osm_file, locations=True, idx=index_type) return way_extractor.ways def _create_osm_timestamp(): now = datetime.utcnow() return now.strftime('%Y-%m-%dT%H:%M:%SZ')