Source code for plaza_preprocessing.merger.plazatransformer
from datetime import datetime
from osmium import SimpleWriter
from osmium.osm.mutable import Way, Node
OSM_ID_START = 10**10
[docs]def transform_plazas(plazas, node_file, way_file, footway_tags):
""" transforms plazas to OSM and write them to a file """
node_writer = SimpleWriter(node_file)
way_writer = SimpleWriter(way_file)
entry_node_mappings = {}
osm_id_nodes = osm_id_ways = OSM_ID_START
try:
for plaza in plazas:
if "graph_edges" not in plaza:
raise ValueError(f"No graph edges in {plaza['osm_id']}")
if "entry_points" not in plaza:
raise ValueError(f"No entry points in {plaza['osm_id']}")
transformer = PlazaTransformer(osm_id_nodes, osm_id_ways, footway_tags)
transformer.transform_plaza(plaza)
# merge entry node mappings
entry_node_mappings = {**entry_node_mappings, **transformer.entry_node_mappings}
for node in transformer.nodes.values():
node_writer.add_node(node)
for way in transformer.ways:
way_writer.add_way(way)
osm_id_nodes += len(transformer.nodes)
osm_id_ways += len(transformer.ways)
finally:
node_writer.close()
way_writer.close()
return entry_node_mappings
[docs]class PlazaTransformer:
"""
Transforms plaza graph edges to an OSM Format
"""
def __init__(self, start_id_nodes, start_id_ways, footway_tags):
self.osm_id_nodes = start_id_nodes
self.osm_id_ways = start_id_ways
# use coordinates as keys and osmium objects as values
self.nodes = {}
self.ways = []
# maps entry ways of plazas to entry node ids
self.entry_node_mappings = {}
self.footway_tags = [(key, value) for tag in footway_tags for key, value in tag.items()]
[docs] def transform_plaza(self, plaza):
""" takes a plaza with edge geometries and constructs nodes and ways """
for edge in plaza['graph_edges']:
self._create_way(edge)
for entry_line in plaza.get('entry_lines'):
self.entry_node_mappings[entry_line['way_id']] = [
{
'id': self._get_node_id((p.x, p.y)),
'coords': (p.x, p.y)
}
for p in entry_line['entry_points']]
def _create_way(self, edge):
""" create a way with corresponding nodes """
node_refs = []
for coords in edge.coords:
node_refs.append(self._get_node_id(coords))
way_id = self._get_new_way_osm_id()
way = Way(nodes=node_refs)
way.tags = self.footway_tags
way.id = way_id
way.version = 1
way.timestamp = self._create_osm_timestamp()
self.ways.append(way)
def _get_node_id(self, coords):
"""
get a node id for the coords, creates a new node if it doesn't exist yet
"""
if coords in self.nodes:
return self.nodes[coords].id
else:
node_id = self._get_new_node_osm_id()
node = Node(location=coords)
node.id = node_id
node.version = 1
node.timestamp = self._create_osm_timestamp()
self.nodes[coords] = node
return node_id
def _get_new_node_osm_id(self):
self.osm_id_nodes += 1
return self.osm_id_nodes
def _get_new_way_osm_id(self):
self.osm_id_ways += 1
return self.osm_id_ways
def _create_osm_timestamp(self):
now = datetime.utcnow()
return now.strftime('%Y-%m-%dT%H:%M:%SZ')