Source code for plaza_preprocessing.importer.importer

import logging
import osmium
from osmium._osmium import InvalidLocationError
import shapely.wkb as wkblib
from plaza_preprocessing.importer import osmholder
from plaza_preprocessing import configuration

logger = logging.getLogger('plaza_preprocessing.importer')
WKBFAB = osmium.geom.WKBFactory()

OSM_MAX_ID = 10**10 


[docs]def import_osm(filename, tag_filters): """ imports a OSM / PBF file and returns a holder with all plazas, buildings, lines and points with shapely geometries """ logger.info(f'importing {filename}') handler = _PlazaHandler(tag_filters) index_type = 'sparse_mem_array' handler.apply_file(filename, locations=True, idx=index_type) logger.debug(f'found {len(handler.plazas)} plazas') logger.debug(f'found {len(handler.buildings)} buildings') logger.debug(f'found {len(handler.lines)} lines') logger.debug(f'found {len(handler.points)} points') if handler.invalid_count > 0: logger.warning(f'encountered {handler.invalid_count} invalid objects (may be because of boundaries)') return osmholder.OSMHolder(handler.plazas, handler.buildings, handler.lines, handler.points)
class _PlazaHandler(osmium.SimpleHandler): def __init__(self, tag_filters): super().__init__() self.tag_filters = tag_filters self.plazas = [] self.buildings = [] self.points = [] self.lines = [] self.invalid_count = 0 def node(self, node): if self._is_relevant_node(node): self._check_max_id(node.id) point_wkb = WKBFAB.create_point(node) point_geometry = wkblib.loads(point_wkb, hex=True) self.points.append(point_geometry) def way(self, way): if self._is_relevant_way(way): self._check_max_id(way.id) try: line_wkb = WKBFAB.create_linestring(way) line_geometry = wkblib.loads(line_wkb, hex=True) self.lines.append({ 'id': way.id, 'geometry': line_geometry, 'tags': {t.k: t.v for t in way.tags} }) except InvalidLocationError: logger.debug(f'Encountered invalid location in way {way.id}') self.invalid_count += 1 except RuntimeError as ex: logger.debug(f'Error importing way {way.id}: {ex}') self.invalid_count += 1 def area(self, area): if self._is_plaza(area): self._check_max_id(area.id) multipolygon_geom = self._create_multipolygon(area) if multipolygon_geom: for polygon in multipolygon_geom.geoms: plaza = { 'osm_id': area.orig_id(), 'geometry': polygon } self.plazas.append(plaza) elif self._is_relevant_building(area): geometry = self._create_multipolygon(area) if geometry: self.buildings.append(geometry) def _create_multipolygon(self, area): try: building_wkb = WKBFAB.create_multipolygon(area) return wkblib.loads(building_wkb, hex=True) except InvalidLocationError: logger.debug(f'Encountered invalid location in area {area.id}') self.invalid_count += 1 return None except RuntimeError as ex: logger.debug(f'Error importing way {area.id}: {ex}') self.invalid_count += 1 return None def _is_relevant_node(self, node): return node.tags.get("level", "0") == "0" and \ node.tags.get("layer", "0") == "0" and \ configuration.filter_tags(node.tags, self.tag_filters['point_obstacle']) def _is_relevant_way(self, way): return not way.is_closed() and \ "highway" in way.tags or \ way.tags.get("railway") == "tram" or \ configuration.filter_tags(way.tags, self.tag_filters['barrier']) def _is_plaza(self, area): return configuration.filter_tags(area.tags, self.tag_filters['plaza']) def _is_relevant_building(self, area): return "building" in area.tags \ and area.tags.get("layer", "0") == "0" def _check_max_id(self, osm_id): if osm_id >= OSM_MAX_ID: logger.error(f"OSM id {osm_id} is larger than the allowed {OSM_MAX_ID}")