Source code for sparqlmodel.types

"""IRI and namespace utilities."""

from __future__ import annotations

import re
from typing import Any

from pydantic import GetCoreSchemaHandler
from pydantic_core import CoreSchema, core_schema

from sparqlmodel.exceptions import ConfigurationError, QueryError

DEFAULT_PREFIXES: dict[str, str] = {
    "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
    "rdfs": "http://www.w3.org/2000/01/rdf-schema#",
    "xsd": "http://www.w3.org/2001/XMLSchema#",
    "schema": "https://schema.org/",
}

_COMPACT_IRI_RE = re.compile(r"^([a-zA-Z_][\w-]*):([^:]+)$")


[docs] def is_compact_iri(value: str) -> bool: """Return True if value is a prefix:local compact IRI (not a plain literal with a colon).""" if value.startswith(("http://", "https://", "urn:")): return False return _COMPACT_IRI_RE.match(value) is not None
[docs] def is_absolute_iri(value: str) -> bool: """Return True if ``value`` is an absolute IRI string (http, https, or urn).""" return value.startswith(("http://", "https://", "urn:"))
[docs] class IRI(str): """RDF IRI identifier (compact or absolute).""" @classmethod def __get_pydantic_core_schema__( cls, source_type: Any, handler: GetCoreSchemaHandler ) -> CoreSchema: return core_schema.no_info_after_validator_function( cls._validate, core_schema.str_schema(), ) @classmethod def _validate(cls, value: str) -> IRI: if not value or not value.strip(): raise ValueError("IRI cannot be empty") return cls(value.strip())
[docs] def expand(self, prefixes: dict[str, str] | None = None) -> str: """Expand a compact IRI (e.g. schema:Person) to an absolute IRI.""" return expand_iri(str(self), prefixes)
[docs] def compact(self, prefixes: dict[str, str] | None = None) -> str: """Return the most compact form using known prefixes.""" return compact_iri(str(self), prefixes)
[docs] def expand_iri(iri: str, prefixes: dict[str, str] | None = None) -> str: """Expand compact IRIs using prefix map.""" if iri.startswith(("http://", "https://", "urn:")): return iri match = _COMPACT_IRI_RE.match(iri) if not match: return iri prefix, local = match.group(1), match.group(2) merged = {**DEFAULT_PREFIXES, **(prefixes or {})} if prefix not in merged: raise ConfigurationError(f"Unknown prefix '{prefix}' for IRI '{iri}'") return merged[prefix] + local
[docs] def compact_iri(iri: str, prefixes: dict[str, str] | None = None) -> str: """Compact an absolute IRI when a known prefix matches.""" if not iri.startswith(("http://", "https://")): return iri merged = {**DEFAULT_PREFIXES, **(prefixes or {})} best_prefix = "" best_ns = "" for prefix, ns in sorted(merged.items(), key=lambda x: len(x[1]), reverse=True): if iri.startswith(ns) and len(ns) > len(best_ns): best_prefix = prefix best_ns = ns if best_ns: return f"{best_prefix}:{iri[len(best_ns) :]}" return iri
[docs] class NamespaceRegistry: """Registry of namespace prefixes for a session or model."""
[docs] def __init__(self, prefixes: dict[str, str] | None = None) -> None: self._prefixes = {**DEFAULT_PREFIXES, **(prefixes or {})}
@property def prefixes(self) -> dict[str, str]: return dict(self._prefixes)
[docs] def bind(self, graph: Any) -> None: """Bind prefixes to a TripleModel ``Store`` graph.""" for prefix, uri in self._prefixes.items(): graph.bind(prefix, uri)
def expand(self, iri: str) -> str: return expand_iri(iri, self._prefixes) def compact(self, iri: str) -> str: return compact_iri(iri, self._prefixes)
[docs] def sparql_prefixes(self) -> str: """Return PREFIX declarations for SPARQL queries.""" lines: list[str] = [] for prefix, uri in sorted(self._prefixes.items()): if not prefix or any(c in prefix for c in " \n\r\t<>"): raise QueryError(f"Invalid SPARQL prefix name: {prefix!r}") if not uri or any(c in uri for c in " \n\r\t<>"): raise QueryError(f"Invalid namespace URI for prefix {prefix!r}: {uri!r}") lines.append(f"PREFIX {prefix}: <{uri}>") return "\n".join(lines)