User:EmericusPetro/sandbox/RFC OpenStreetMap RDF 2023 conventions (proof of concept)

From OpenStreetMap Wiki
Jump to navigation Jump to search

RFC: OpenStreetMap RDF 2022 conventions (proof of concept)

Script

This page contains public domain proof of concept of a proxy over the production OpenStreetMap API v0.6 which both serves the existing formats (e.g. XML, JSON) but if requested to generate Turtle, it will under the hood request the canonical XML output and convert for you. By default, all requests will de facto API will be cached to avoid you accidentally overload backend servers.

Python

osmapi2rdfproxy.py

#!/usr/bin/env python3
# ==============================================================================
#
#          FILE:  osmapi2rdfproxy.py
#
#         USAGE:  hug -f ./osmapi2rdfproxy.py --help
#                 # Expose http://localhost:8000/ as API endpoint:
#                     hug --port 8000 -f ./osmapi2rdfproxy.py
#                 # Disable cache of de facto API calls
#                     RDFPROXY_TTL="-1" hug -f ./osmapi2rdfproxy.py
#
#   DESCRIPTION:  Proxy over de facto public OpenStreetMap v0.6 API that adds
#                 turtle format as additional output intented to be used for
#                 local testing and/or feedback on the conventions.
#                 This is not intended for production or near-production use,
#                 however it implements python Hug (https://www.hug.rest/)
#                 (great at benchmarks, yet less code) and requests-cache
#                 (https://requests-cache.readthedocs.io/) so it is out of the
#                 box a great starting point, even if you don't know python.
#
#       OPTIONS:  ---
#
#  REQUIREMENTS:  - python3
#                   - hug (pip install hug -U)
#                   - requests-cache (pip install requests-cache)
#                 - osmrdf2023.py (Python library)
#          BUGS:  ---
#         NOTES:  ---
#       AUTHORS:  Emerson Rocha <rocha[at]ieee.org>
# COLLABORATORS:  ---
#       LICENSE:  Public Domain dedication or Zero-Clause BSD
#                 SPDX-License-Identifier: Unlicense OR 0BSD
#       VERSION:  v0.3.0
#       CREATED:  2022-11-25 15:53:00Z v0.1.0 started
#      REVISION:  2022-11-26 20:47:00Z v0.2.0 node, way, relation basic turtle,
#                                      only attached tags (no <nd> <member> yet)
#                 2022-12-21 01:46:00Z v0.3.0 osmrdf2022.py -> osmrdf2023.py
# ==============================================================================

# from poc.osmrdf2023 import (
from osmrdf2023 import (
    osmrdf_node_xml2ttl,
    osmrdf_relation_xml2ttl,
    osmrdf_way_xml2ttl
)
import json
import os
import requests
import requests_cache
import hug

# @TODO Some way to generate metadata from Tags
#       Taginfo?
#       - https://taginfo.openstreetmap.org/taginfo/apidoc
#       maybe parse infoboxes directly. Example:
#       wiki.openstreetmap.org/w/index.php?action=raw&title=Tag:highway%3Dbusway
#       or https://github.com/earwig/mwparserfromhell

# user configuration ________________________________________________________

# TIP: enviroment variable DE_FACTO_API_BASE="" can be customized!
DE_FACTO_API_BASE = os.getenv(
    'DE_FACTO_API', 'https://www.openstreetmap.org/api/0.6')

# @see https://requests-cache.readthedocs.io/en/stable/
# File osmapi_cache.sqlite will cache backend calls
requests_cache.install_cache(
    'osmapi_cache',
    # https://requests-cache.readthedocs.io/en/stable/user_guide/backends.html
    backend=os.getenv('RDFPROXY_CACHE', 'sqlite'),
    # https://requests-cache.readthedocs.io/en/stable/user_guide/expiration.html
    expire_after=os.getenv('RDFPROXY_TTL', '604800'),  # 7 days
)

# ### Hug overrides ____________________________________________________________


@hug.format.content_type('application/xml')
def format_as_xml(data, request=None, response=None):
    """format_as_xml
    @FIXME make Hug return XML formating instead of text
    """
    return str(data).encode('utf8')


@hug.format.content_type('text/turtle')
def format_as_turtle(data, request=None, response=None):
    """format_as_turtle custom Turtle output format for Hug
    """
    return str(data).encode('utf8')


# ### Tell Hug suffix can be used to infer output strategy _____________________
suffix_output = hug.output_format.suffix({
    '.json': hug.output_format.json,
    '.xml': format_as_xml,
    '.ttl': hug.output_format.text,
    '': hug.output_format.text,  # @FIXME use format_as_xml()
})

# ### Logic for API endpoints __________________________________________________
# @NOTE: This section mostly:
#        - Request+cache openstreetmap.org/api/0.6/ API calls and serve as it is
#        - If user ask .ttl, uses osmrdf2023.py to generate
# @BUG   While osmapi2rdfproxy.py is intented to only generate Turtle for
#        discussed types of data, the actuall OpenStreetMap API have more
#        endpoints than the ones here.

@hug.get('/changeset/{changeset_uid}', output=suffix_output)
def api_changeset(changeset_uid):
    """api_changeset /api/0.6/changeset/ (no changes, just proxy + cache)

    @example http://localhost:8000/changeset/1.json
    """
    content = requests.get(
        DE_FACTO_API_BASE + '/changeset/' + changeset_uid)

    if changeset_uid.endswith('.json'):
        result = json.loads(content.text)
    else:
        result = content.text
    return result


# http://localhost:8000/node/1.json
@hug.get('/node/{node_uid}', output=suffix_output)
def api_node(node_uid):
    # print(DE_FACTO_API_BASE + '/node/' + node_uid)
    content = requests.get(
        DE_FACTO_API_BASE + '/node/' + node_uid)

    if node_uid.endswith('.json'):
        result = json.loads(content.text)
    else:
        result = content.text
    return result


# http://localhost:8000/node/1.ttl
# rdfpipe --input-format='ttl' --output-format=longturtle http://localhost:8000/node/2.ttl
@hug.get('/node/{node_uid}.ttl', output=format_as_turtle)
def api_node_ttl(node_uid):
    content = requests.get(
        DE_FACTO_API_BASE + '/node/' + node_uid + '.xml')

    result = osmrdf_node_xml2ttl(content.text)
    return result

# @see https://wiki.openstreetmap.org/wiki/Relation
# @see https://wiki.openstreetmap.org/wiki/Category:Relations
# http://localhost:8000/relation/10000.json
@hug.get('/relation/{relation_uid}', output=suffix_output)
def api_relation(relation_uid):
    # print(DE_FACTO_API_BASE + '/relation/' + relation_uid)
    content = requests.get(
        DE_FACTO_API_BASE + '/relation/' + relation_uid)

    if relation_uid.endswith('.json'):
        result = json.loads(content.text)
    else:
        result = content.text
    return result

# http://localhost:8000/relation/10000.ttl
# rdfpipe --input-format='ttl' --output-format=longturtle http://localhost:8000/relation/10000.ttl


@hug.get('/relation/{relation_uid}.ttl', output=format_as_turtle)
def api_relation_ttl(relation_uid):
    content = requests.get(
        DE_FACTO_API_BASE + '/relation/' + relation_uid + '.xml')

    result = osmrdf_relation_xml2ttl(content.text)
    return result

# http://localhost:8000/way/100.json


@hug.get('/way/{way_uid}', output=suffix_output)
def api_way(way_uid):
    # print(DE_FACTO_API_BASE + '/way/' + way_uid)
    content = requests.get(
        DE_FACTO_API_BASE + '/way/' + way_uid)

    if way_uid.endswith('.json'):
        result = json.loads(content.text)
    else:
        result = content.text
    return result

# http://localhost:8000/way/100.ttl
# rdfpipe --input-format='ttl' --output-format=longturtle http://localhost:8000/way/100.ttl


@hug.get('/way/{way_id}.ttl', output=format_as_turtle)
def api_node_ttl(way_id):
    content = requests.get(
        DE_FACTO_API_BASE + '/way/' + way_id + '.xml')

    result = osmrdf_way_xml2ttl(content.text)
    return result

osmdump2rdfcli.py

#!/usr/bin/env python3
# ==============================================================================
#
#          FILE:  osmdump2rdfcli.py
#
#         USAGE:  ./osmdump2rdfcli.py --help
#
#   DESCRIPTION:  ---
#       OPTIONS:  ---
#
#  REQUIREMENTS:  - python3
#          BUGS:  ---
#         NOTES:  ---
#       AUTHORS:  Emerson Rocha <rocha[at]ieee.org>
# COLLABORATORS:  ---
#       LICENSE:  Public Domain dedication or Zero-Clause BSD
#                 SPDX-License-Identifier: Unlicense OR 0BSD
#       VERSION:  v0.3.0
#       CREATED:  2022-11-27 03:14 UTC v0.2.0 started
#      REVISION:  2022-12-21 01:46:00Z v0.3.0 osmrdf2022.py -> osmrdf2023.py
# ==============================================================================

import argparse
import sys
# from poc.osmrdf2023 import (
from osmrdf2023 import (
    # osmrdf_xmldump2_ttl,
    osmrdf_xmldump2_ttl_v2,
    OSMElementFilter
)

STDIN = sys.stdin.buffer

NOMEN = 'osmdump2rdfcli'
PROGRAM_EXE = __file__

DESCRIPTION = f"""
{PROGRAM_EXE} Proof of concept for OSM RDF 2022, CLI alternative \
(intended to run on dumps) to the proxy version.
"""

__EPILOGUM__ = f"""
------------------------------------------------------------------------------
                            EXEMPLŌRUM GRATIĀ
------------------------------------------------------------------------------
Read from file on disk . . . . . . . . . . . . . . . . . . . . . . . . . . . .
    {PROGRAM_EXE} tmp/STP.osm
    {PROGRAM_EXE} tmp/STP.osm > tmp/STP.osm.ttl
    {PROGRAM_EXE} --filter-xml-tag='way' tmp/STP.osm > tmp/STP~ways.osm.ttl

Pipe from other commands . . . . . . . . . . . . . . . . . . . . . . . . . . .
    cat tmp/STP.osm | {PROGRAM_EXE}
    bzcat tmp/BRA-north.osm.bz2 | {PROGRAM_EXE}
    bzcat tmp/BRA-north.osm.bz2 | {PROGRAM_EXE} --filter-xml-tag='relation'

Re-tag (without full inference) . . . . . . . . . . . . . . . . . . . . . . .
    {PROGRAM_EXE} --retagger-file=tmp/tagger.rdfstxt.tsv tmp/STP.osm 

Download external data examples ______________________________________________

Geofabrik download + decompress  . . . . . . . . . . . . . . . . . . . . . . .
    curl --output tmp/STP.osm.bz2 \
https://download.geofabrik.de/africa/sao-tome-and-principe-latest.osm.bz2
    bunzip2 tmp/STP.osm.bz2

Geofabrik download (but not decompress)  . . . . . . . . . . . . . . . . . . .
    curl --output tmp/BRA-north.osm.bz2 \
https://download.geofabrik.de/south-america/brazil/norte-latest.osm.bz2

Overpass download examples . . . . . . . . . . . . . . . . . . . . . . . . . .
(See http://overpass-api.de/command_line.html)
    curl --output tmp/target.osm --silent --globoff \
"https://overpass-api.de/api/interpreter?data=node[name=\"Gielgen\"];out;"
    curl --output tmp/speed-200.osm --silent --globoff \
"https://overpass-api.de/api/interpreter?data=way[maxspeed=\"200\"];out;"

Other ________________________________________________________________________

Create not-so-smart-inference (hardcoded auto tagger) . . . . . . . . . . . . .
    echo "-<TAB>*<TAB>*<TAB>created_by=*<NEWLINE>\
+<TAB><way>|<relation><TAB>*<TAB>ISO3166-1:alpha3=BRA<TAB>" \
> tmp/tagger.rdfstxt.tsv

Mapping  . . . . . . . . . . . . . . . . . . . . . . . .
    echo "-<TAB>*<TAB>*<TAB>created_by=*<NEWLINE>\
+<TAB><way>|<relation><TAB>*<TAB>ISO3166-1:alpha3=BRA<TAB>" \
> tmp/tagger.rdfstxt.tsv

------------------------------------------------------------------------------
                            EXEMPLŌRUM GRATIĀ
------------------------------------------------------------------------------
""".format(__file__)

# https://overpass-turbo.eu/s/1ohl
# way({{bbox}})[highway=residential]
#   [maxspeed](if:t["maxspeed"]>120);
# out geom;

# @see also https://gis.stackexchange.com/questions/127315/filtering-overpass-api-by-country

# https://overpass-turbo.eu/s/1ohm
# area["ISO3166-1:alpha3"="BRA"]->.boundaryarea;
# (
# way(area.boundaryarea)[maxspeed](if:t["maxspeed"]>120);
# );
# out meta;


class Cli:

    EXIT_OK = 0
    EXIT_ERROR = 1
    EXIT_SYNTAX = 2

    venandum_insectum: bool = False  # noqa: E701

    def __init__(self):
        """
        Constructs all the necessary attributes for the Cli object.
        """

    def make_args(self, hxl_output=True):
        parser = argparse.ArgumentParser(
            prog=NOMEN,
            description=DESCRIPTION,
            formatter_class=argparse.RawDescriptionHelpFormatter,
            epilog=__EPILOGUM__
        )

        parser.add_argument(
            'infile',
            help='Input file',
            # required=False,
            nargs='?'
        )

        parser.add_argument(
            '--real-infile-path',
            help='(Quick workaround for edge cases) in case infile becomes'
            'ambigous on shell scripting, use this to force real source path',
            dest='real_infile',
            nargs='?',
            default=None,
            required=False,
        )

        parser.add_argument(
            '--retagger-file',
            help='(Poors man\'s inference) sPath to a TSV file to apply OSM '
            'Tags before output RDF',
            dest='retagger_file',
            nargs='?',
            default=None,
            required=False,
        )

        parser.add_argument(
            '--filter-xml-tag',
            help='Filter XML tags',
            dest='xml_tags',
            nargs='?',
            type=lambda x: x.split(','),
            default=None
        )

        parser.add_argument(
            '--filter-xml-tag-not',
            help='Filter XML tags (not)',
            dest='xml_tags_not',
            nargs='?',
            type=lambda x: x.split(','),
            default=None
        )

        return parser.parse_args()

    def execute_cli(
            self, pyargs, stdin=STDIN, stdout=sys.stdout,
            stderr=sys.stderr):
        """execute_cli"""

        if pyargs.real_infile is not None:
            _infile = pyargs.real_infile
            _stdin = False
        else:
            if stdin.isatty():
                _infile = pyargs.infile
                _stdin = False
            else:
                _infile = None
                _stdin = True

        filter = OSMElementFilter()

        if pyargs.xml_tags:
            filter.set_filter_xml_tags(pyargs.xml_tags)

        if pyargs.xml_tags_not:
            filter.set_filter_xml_tags_not(pyargs.xml_tags_not)

        retagger = None
        if pyargs.retagger_file:
            _file = open(pyargs.retagger_file,mode='r')
            retagger = _file.read()
            _file.close()

        if _stdin:
            osmrdf_xmldump2_ttl_v2(stdin, filter)
        else:
            osmrdf_xmldump2_ttl_v2(_infile, filter, retagger)
        # print('todo')


if __name__ == "__main__":

    est_cli = Cli()
    args = est_cli.make_args()
    # print('  >>>>  args', args)
    # raise ValueError(args)

    est_cli.execute_cli(args)

# osmrdf_xmldump2_ttl('./tmp/sao-tome-and-principe-latest.osm')


# ./osmdump2rdfcli.py --help
# ./osmdump2rdfcli.py > ./tmp/sao-tome-and-principe-latest.osm.ttl

# rdfpipe --input-format='ttl' --output-format=longturtle ./tmp/sao-tome-and-principe-latest.osm.ttl > ./tmp/sao-tome-and-principe-latest~longturtle.osm.ttl

osmrdf2023.py

#!/usr/bin/env python3
# ==============================================================================
#
#          FILE:  osmrdf2023.py
#
#         USAGE:  # this is a library. Import into your code:
#                     from osmrdf2022 import *
#
#   DESCRIPTION:  ---
#
#       OPTIONS:  ---
#
#  REQUIREMENTS:  - python3
#                   - lxml
#          BUGS:  - No big XML dumps output format support (not yet)
#                 - No support for PBF Format (...not yet)
#         NOTES:  ---
#       AUTHORS:  Emerson Rocha <rocha[at]ieee.org>
# COLLABORATORS:  ---
#       LICENSE:  Public Domain dedication or Zero-Clause BSD
#                 SPDX-License-Identifier: Unlicense OR 0BSD
#       VERSION:  v0.3.0
#       CREATED:  2022-11-25 19:22:00Z v0.1.0 started
#      REVISION:  2022-11-26 20:47:00Z v0.2.0 node, way, relation basic turtle,
#                                      only attached tags (no <nd> <member> yet)
#                 2022-12-21 01:46:00Z v0.3.0 osmrdf2022.py -> osmrdf2023.py
# ==============================================================================

import sys
from typing import List, Type
import xml.etree.ElementTree as XMLElementTree
from lxml import etree


# See also: https://wiki.openstreetmap.org/wiki/Sophox#How_OSM_data_is_stored
# See also https://wiki.openstreetmap.org/wiki/Elements
RDF_TURTLE_PREFIXES = [
    'PREFIX geo: <http://www.opengis.net/ont/geosparql#>',
    'PREFIX osmnode: <https://www.openstreetmap.org/node/>',
    'PREFIX osmrel: <https://www.openstreetmap.org/relation/>',
    'PREFIX osmway: <https://www.openstreetmap.org/way/>',
    'PREFIX osmm: <https://example.org/todo-meta/>',
    'PREFIX osmt: <https://wiki.openstreetmap.org/wiki/Key:>',
    'PREFIX osmx: <https://example.org/todo-xref/>',
    'PREFIX wikidata: <http://www.wikidata.org/entity/>',
    'PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>',
]

OSM_ELEMENT_PREFIX = {
    'node': 'osmnode:',
    'relation': 'osmrel:',
    'tag': 'osmt:',
    'way': 'osmway:'
}

# Using Sophox
OSM_ELEMENT_TYPE_LITERAL = {
    'node': 'n',
    'way': 'w',
    'rel': 'r'
}

# Undocumented
# - osmx:hasnodes
# - osmx:hasmembers
# - osmx:hasrole{CUSTOM}

# @SEE blank nodes https://www.w3.org/TR/turtle/#h2_sec-examples


class OSMApiv06Xml:
    """OSMApiv06Xml

    Not so optimized quick parser for XML files that can fit into memory
    """

    iterator: None
    xmlroot: None  # xml.etree.ElementTree.Element
    root_tag: str  # Example: osm
    root_attrib: dict
    child_1_tag: str
    child_1_attr: dict
    # @TODO for full dumps, will have 1-n child items

    def __init__(self, file_or_string: str) -> None:

        # @TODO maybe eventually implement from file (the large ones)
        # self.iterator = XMLElementTree.iterparse(
        #     source=file_or_string,
        #     events=('start', 'end')
        # )

        # self.iterator = XMLElementTree.fromstring(
        #     file_or_string,
        #     events=('start', 'end')
        # )
        root = XMLElementTree.fromstring(file_or_string)
        self.xmlroot = root
        self.root_tag = root.tag
        self.root_attrib = root.attrib

    def node(self):

        for child in self.xmlroot:
            # print('>>>>> el', child.tag, child.attrib)
            # print('>>>>> el tags', child.findall("tag"))
            xml_tags = None
            _eltags = child.findall("tag")
            if _eltags:
                xml_tags = []
                for item in _eltags:
                    xml_tags.append((item.attrib['k'], item.attrib['v']))
            # print('>>>>> el nd', child.findall("nd"))
            xml_nds = None
            _elnds = child.findall("nd")
            if _elnds:
                xml_nds = []
                for item in _elnds:
                    xml_nds.append(int(item.attrib['ref']))

            xml_members = None
            _elmembers = child.findall("member")
            if _elmembers:
                xml_members = []
                for item in _elmembers:
                    # @FIXME this is incomplete
                    _type = item.attrib['type']
                    _ref = int(item.attrib['ref'])
                    _role = item.attrib['role'] if 'role' in item.attrib else None
                    xml_members.append((_type, _ref, _role))
            # print('>>>>> el2', dict(child.attrib))
            # # @TODO restrict here to node, way, relation, ...
            # print('>>>>> el3', OSMElement(
            #     child.tag, dict(child.attrib)).__dict__)
            return OSMElement(
                child.tag,
                dict(child.attrib),
                xml_tags,
                xml_nds,
                xml_members
            )
            break


class OSMElement:
    """OSMElement generic container for primitives

    Note: this will not do additional checks if input data is valid
    """
    _basegroup: str
    _tag: str
    _el_osm_tags: List[tuple]
    _el_osm_nds: List[int]
    _el_osm_members: List[tuple]
    _xml_filter: Type['OSMElementFilter']
    _tagcaster: Type['OSMElementTagValueCast']
    id: int
    changeset: int
    timestamp: str  # maybe chage later
    user: str
    userid: str
    version: str
    visible: bool
    lat: float
    lon: float

    def __init__(
        self, tag: str, meta: dict,
        xml_tags: List[tuple] = None,
        xml_nds: List[int] = None,
        xml_members: List[tuple] = None,
        xml_filter: Type['OSMElementFilter'] = None,
        tagcaster: Type['OSMElementTagValueCast'] = None,
    ):
        if not isinstance(meta, dict):
            meta = dict(meta)

        self.id = int(meta['id']) if 'id' in meta else None

        # self.version = float(meta['version']) if 'version' in meta else None
        self.version = meta['version'] if 'version' in meta else None
        self.changeset = int(
            meta['changeset']) if 'changeset' in meta else None

        self.timestamp = meta['timestamp'] if 'timestamp' in meta else None

        self.user = meta['user'] if 'user' in meta else None

        # uid = userid
        self.userid = int(meta['uid']) if 'uid' in meta else None
        self.lat = float(meta['lat']) if 'lat' in meta else None
        self.lon = float(meta['lon']) if 'lon' in meta else None

        self._tag = tag
        self._basegroup = '{0}{1}'.format(
            OSM_ELEMENT_PREFIX[tag], str(self.id))

        self._el_osm_tags = xml_tags
        self._el_osm_nds = xml_nds
        self._el_osm_members = xml_members
        self._xml_filter = xml_filter
        self._tagcaster = tagcaster

    def can_output(self) -> bool:
        if not self._xml_filter.can_tag(self._tag):
            return False
        return True

    def to_ttl(self) -> list:
        data = []
        data.append(self._basegroup)

        if self.changeset:
            data.append(f'    osmm:changeset {self.changeset} ;')
        if self.lat and self.lon:
            data.append(
                f'    osmm:loc "Point({self.lat} {self.lon})"^^geo:wktLiteral ;')
        if self.timestamp:
            data.append(
                f'    osmm:timestamp "{self.timestamp}"^^xsd:dateTime ;')
        if self._tag in OSM_ELEMENT_TYPE_LITERAL.keys():
            data.append(
                f'    osmm:type "{OSM_ELEMENT_TYPE_LITERAL[self._tag]}" ;')
        if self.user:
            data.append(
                f'    osmm:user "{self.user}" ;')
        if self.userid:
            data.append(
                f'    osmm:userid {self.userid} ;')
        if self.version:
            data.append(
                f'    osmm:version {self.version} ;')

        # data.append('    # TODO implement the tags')

        if self._el_osm_tags:
            for key, value in self._el_osm_tags:
                _escp_key = osmrdf_tagkey_encode(key)

                data.append(
                    f'    osmt:{_escp_key} "{value}" ;')

                if self._tagcaster and self._tagcaster.can_cast(_escp_key):
                    data.append(self._tagcaster.to_ttl(_escp_key, value))

        if self._el_osm_nds:
            _parts = []
            for ref in self._el_osm_nds:
                _parts.append(f'osmnode:{ref}')
            data.append(
                f'    osmx:hasnodes ({" ".join(_parts)}) ;')

        if self._el_osm_members:
            _parts = []
            for _type, _ref, _role in self._el_osm_members:
                _prefix = OSM_ELEMENT_PREFIX[_type]

                _parts.append(f'[osmx:hasrole{_role} {_prefix}{_ref}]')
            data.append(
                f'    osmx:hasmembers ({" ".join(_parts)}) ;')

        data.append('.')
        return data


class OSMElementFilter:
    """Helper for OSMElement limit what to output
    """

    xml_tags: List = None
    xml_tags_not: List = None

    def __init__(self) -> None:
        pass

    def set_filter_xml_tags(self, tags: list):
        self.xml_tags = tags
        return self

    def set_filter_xml_tags_not(self, tags: list):
        self.xml_tags_not = tags
        return self

    def can_tag(self, tag: str) -> bool:
        if not self.xml_tags and not self.xml_tags_not:
            return True
        if (not self.xml_tags or tag in self.xml_tags) and \
                (not self.xml_tags_not or tag not in self.xml_tags_not):
            return True
        return False


class OSMElementTagger:
    """Poor man's tagger (no external inference required)

    @example
+	<way>	*	is_in=BRA
-	<node>|<way>|<relation>	*	created_by=*
+	<way>|<relation>	*	shacl:lessThanOrEquals:maxspeed=120
    """
    rules: list = None

    def __init__(self, rules=None) -> None:
        # if rules:
        # self.rules =
        if rules:
            self.parse_rules(rules)

    def parse_rules(self, rules_tsv: str):
        parts = rules_tsv.splitlines()
        # print('todooo')
        try:
            rules = []
            for index, line in enumerate(parts):
                line = line.split("\t")
                # print(line)
                _op = line[0]
                _xml_tag = line[1].replace('<', '').replace('>', '').split('|')
                _xml_attrs = line[2]
                # print(line[3])
                _xml_c_attr_key, _xml_c_attr_value = line[3].split('=')
                # _xml_c_attr_key, _xml_c_attr_value = ['', '']
                if _xml_tag[0] == '*':
                    _xml_tag = None
                if _xml_attrs[0] == '*':
                    _xml_attrs = None
                rules.append({
                    'i': index,
                    'op': _op,
                    'xt': _xml_tag,
                    'xa': _xml_attrs,
                    'xack': _xml_c_attr_key,
                    'xacv': _xml_c_attr_value,
                })
            if rules:
                self.rules = rules
        except Exception as err:
            print(f"ERROR OSMElementTagger: {err}")
            print('--- start of file ---')
            print(rules_tsv)
            print('--- end of file ---')
            sys.exit()
            pass

        # print(rules_tsv, self.rules)
        # sys.exit()

    def retag(self, element: str, de_facto_tags: List[tuple] = None):
        new_tags = de_facto_tags
        if self.rules:
            new_tags_temp = []
            left_rules = list(range(0, len(self.rules)))
            # print(left_rules, len(self.rules))
            for rule in self.rules:
                if rule['xt'] is not None and element not in rule['xt']:
                    left_rules.remove(rule['i'])
                    continue
                # TODO implement attribute check
                # new_tags_temp = new_tags
                new_tags_temp = []
                for tag_key, tag_value in new_tags:
                    if rule['op'] == '-':
                        if tag_key in rule['xack']:
                            left_rules.remove(rule['i'])
                            continue
                    if rule['op'] == '+':
                        if tag_key in rule['xack']:
                            tag_value = rule['xacv']
                            left_rules.remove(rule['i'])
                            break
                        else:
                            pass
                            # continue
                    new_tags_temp.append((tag_key, tag_value))
            new_tags = new_tags_temp
            if len(left_rules) > 0:
                for rule_index in left_rules:
                    if self.rules[rule_index]['op'] == '+':
                        # print('TODO add ', self.rules[rule_index])
                        new_tags.append(
                            (self.rules[rule_index]['xack'], self.rules[rule_index]['xacv']))
                        # pass
                    # raise SyntaxError(self.rules[rule_index]['op'])
                pass
        # if element == 'relation':
        #     print('todo', element, de_facto_tags,
        #           self.rules, new_tags_temp, left_rules)
            # sys.exit()
        return new_tags


class OSMElementTagValueCast:
    """Coerse value of vanilla OpenStreetMap tags
    """
    rules: list = None

    def __init__(self, rules=None) -> None:
        # if rules:
        # self.rules =
        if rules:
            self.parse_rules(rules)

    def parse_rules(self, rules_tsv: str):
        parts = rules_tsv.splitlines()
        # @TODO make it

    def can_cast(self, tagkey: str) -> bool:
        if tagkey in ['wikidata']:
            return True
        return False

    def to_ttl(self, tagkey: str, tagvalue: str) -> str:
        # return f'    osmx:{tagkey} "{tagvalue}" ;'
        return f'    osmx:{tagkey} wikidata:{tagvalue} ;'


def osmrdf_node_xml2ttl(data_xml: str):

    osmx = OSMApiv06Xml(data_xml)
    osmnode = osmx.node()

    output = []
    output.extend(RDF_TURTLE_PREFIXES)
    output.append('')

    output.extend(osmnode.to_ttl())

    output.append('')
    # DEBUG: next 2 lines will print the XML node, commented
    # comment = "# " + "\n# ".join(data_xml.split("\n"))
    # output.append(comment)

    return "\n".join(output)


def osmrdf_relation_xml2ttl(data_xml: str):

    osmx = OSMApiv06Xml(data_xml)
    osmnode = osmx.node()

    output = []
    output.extend(RDF_TURTLE_PREFIXES)
    output.append('')

    output.extend(osmnode.to_ttl())

    output.append('')
    # DEBUG: next 2 lines will print the XML node, commented
    comment = "# " + "\n# ".join(data_xml.split("\n"))
    output.append(comment)

    return "\n".join(output)


def osmrdf_tagkey_encode(raw_tag: str) -> str:
    # @TODO improve-me
    # return raw_tag.replace(':', '%3A').replace(' ', '%20')
    return raw_tag.replace(' ', '%20')


def osmrdf_way_xml2ttl(data_xml: str):

    osmx = OSMApiv06Xml(data_xml)
    osmnode = osmx.node()

    # print(osmnode)
    # print(type(osmnode))
    # print(osmnode.to_ttl())
    # print(type(osmnode.to_ttl()))

    output = []
    output.extend(RDF_TURTLE_PREFIXES)
    output.append('')

    output.extend(osmnode.to_ttl())

    output.append('')
    # DEBUG: next 2 lines will print the XML node, commented
    # comment = "# " + "\n# ".join(data_xml.split("\n"))
    # output.append(comment)

    return "\n".join(output)


def osmrdf_xmldump2_ttl(xml_file_path, xml_filter: OSMElementFilter = None):
    """osmrdf_xmldump2_ttl _summary_

    @deprecated will be replaced later

    Args:
        xml_file_path (_type_): _description_
        xml_filter (OSMElementFilter, optional): _description_. Defaults to None.
    """
    # @TODO document-me

    from xml.etree import cElementTree as ET

    all_records = []

    print('\n'.join(RDF_TURTLE_PREFIXES))
    print('')

    count = 0
    xml_tags = []
    xml_nds = []
    xml_members = []
    for event, elem in ET.iterparse(xml_file_path, events=("start", "end")):

        # if elem not in ['node', 'way', 'relation']
        if elem.tag in ['bounds', 'osm']:
            continue

        # if elem.tag in ['nd', 'member', 'tag']:
        if elem.tag in ['nd', 'member']:
            # @FIXME way
            continue

        if event == 'start':
            _eltags = elem.findall("tag")
            if _eltags:
                for item in _eltags:
                    xml_tags.append((item.attrib['k'], item.attrib['v']))

            _elnds = elem.findall("nd")
            if _elnds:
                xml_nds = []
                for item in _elnds:
                    xml_nds.append(int(item.attrib['ref']))
            # xml_members = None
            _elmembers = elem.findall("member")
            if _elmembers:
                xml_members = []
                for item in _elmembers:
                    # @FIXME this is incomplete
                    _type = item.attrib['type']
                    _ref = int(item.attrib['ref'])
                    _role = item.attrib['role'] if 'role' in item.attrib else None
                    xml_members.append((_type, _ref, _role))

        if event == 'end':
            if elem.tag == 'tag':
                continue

            # print(elem, elem.attrib)

            child = elem

            # if xml_tags:
            #     print (xml_tags)
            #     sys.exit()

            el = OSMElement(
                child.tag,
                dict(child.attrib),
                xml_tags=xml_tags,
                xml_nds=xml_nds,
                xml_members=xml_members,
                xml_filter=xml_filter
            )
            xml_tags = []
            xml_nds = []
            xml_members = []
            if el.can_output():
                print('\n'.join(el.to_ttl()) + '\n')
                count += 1

        # if count > 10:
        #     break


def osmrdf_xmldump2_ttl_v2(
        xml_file_path,
        xml_filter: OSMElementFilter = None, retagger: str = None):
    # context = etree.iterparse(xml_file_path, events=('end',), tag='node')
    # context = etree.iterparse(xml_file_path, events=('end',), tag=('way'))

    print('\n'.join(RDF_TURTLE_PREFIXES))
    print('')

    # _tags = ('node', 'way', 'relation')
    _tags = ('way', 'relation')
    _rules = """+	<way>	*	is_in=BRA
-	<node>|<way>|<relation>	*	created_by=*
+	<way>|<relation>	*	shacl:lessThanOrEquals:maxspeed=120"""
    # retagger = OSMElementTagger(_rules)
    retagger = OSMElementTagger(retagger)
    tagcaster = OSMElementTagValueCast(None)

    # context = etree.iterparse(xml_file_path, events=('end',), tag=_tags)
    context = etree.iterparse(xml_file_path, events=('end',))

    count = 0
    xml_tags = []
    xml_nds = []
    xml_members = []
    for _event, elem in context:
        # if elem.tag in ['osm', 'bounds', 'nd', 'member', 'tag']:
        if elem.tag in ['osm', 'bounds', 'nd', 'member', 'tag', 'tagi']:
            continue

        _eltags = elem.findall("tag")
        if _eltags:
            for item in _eltags:
                xml_tags.append((item.attrib['k'], item.attrib['v']))
        xml_tags = retagger.retag(elem.tag, xml_tags)

        # "Implicit" tag "<tagi />"
        _elimplicittags = elem.findall("tagi")
        if _elimplicittags:
            for item in _elimplicittags:
                xml_tags.append((item.attrib['k'], item.attrib['v']))
        xml_tags = retagger.retag(elem.tag, xml_tags)

        _elnds = elem.findall("nd")
        if _elnds:
            xml_nds = []
            for item in _elnds:
                xml_nds.append(int(item.attrib['ref']))
        _elmembers = elem.findall("member")
        if _elmembers:
            xml_members = []
            for item in _elmembers:
                # @FIXME this is incomplete
                _type = item.attrib['type']
                _ref = int(item.attrib['ref'])
                _role = item.attrib['role'] if 'role' in item.attrib else None
                xml_members.append((_type, _ref, _role))

        el = OSMElement(
            elem.tag,
            dict(elem.attrib),
            xml_tags=xml_tags,
            xml_nds=xml_nds,
            xml_members=xml_members,
            xml_filter=xml_filter,
            tagcaster=tagcaster,
        )
        xml_tags = []
        xml_nds = []
        xml_members = []
        if el.can_output():
            print('\n'.join(el.to_ttl()) + '\n')
            count += 1

        elem.clear()
        while elem.getprevious() is not None:
            del elem.getparent()[0]


# @TODO after Protobuf, maybe try some alternative which could allow
#       search specific parts. See:
#       - https://wiki.openstreetmap.org/wiki/SQLite
#       - https://wiki.openstreetmap.org/wiki/SpatiaLite
#       - https://github.com/osmzoso/osm2sqlite
#

Algorithm

information sign

This page migth contain only snippets of proof of concept, not the conventions themselves or their discussions

References