Source code for triplemodel.ontology_registry

"""Ontology hints: subclass closure and inverse predicate pairs from TTL or static registration."""

from __future__ import annotations

from collections import deque
from collections.abc import Iterable, Mapping
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, cast

from pydantic import BaseModel
from pydantic.fields import FieldInfo
from pyoxigraph import NamedNode
from triplemodel.store import RdfGraph as Graph
from triplemodel.store.terms import term_str

from triplemodel.config.constants import OWL, RDFS
from triplemodel.config import get_rdf_config
from triplemodel.fields.metadata import (
    inverse_for_field,
    predicate_for_field,
    predicate_from_annotation,
)
from triplemodel.metadata.cardinality import field_annotation
from triplemodel.namespaces import resolve_predicate

RDFS_SUBCLASS = NamedNode(f"{RDFS}subClassOf")
OWL_INVERSE = NamedNode(f"{OWL}inverseOf")


[docs] @dataclass class OntologyRegistry: """Subclass and ``owl:inverseOf`` hints from an ontology graph and/or static registration. Use :meth:`subtypes_of` for **descendant** type IRIs (subclasses). For **ancestor** types (superclasses of a given class), use :func:`~triplemodel.subclass_uris` on a graph that contains ``rdfs:subClassOf`` axioms. """ _graph: Graph | None = None _parent_to_children: dict[str, set[str]] = field(default_factory=dict) _registered_inverse_forward: dict[str, str] = field(default_factory=dict) _registered_inverse_reverse: dict[str, str] = field(default_factory=dict) _graph_inverse_forward: dict[str, str] = field(default_factory=dict) _graph_inverse_reverse: dict[str, str] = field(default_factory=dict)
[docs] @classmethod def from_graph(cls, graph: Graph) -> OntologyRegistry: """Build a registry backed by ``graph`` (typically parsed OWL/RDFS Turtle).""" reg = cls() reg.load_graph(graph) return reg
[docs] @classmethod def from_ttl( cls, path: str | Path, *, format: str | None = None, base_iri: str | None = None, ) -> OntologyRegistry: """Parse a Turtle (or other) ontology file into a new registry.""" graph = Graph() graph.parse( str(path), format=format or "turtle", base_iri=base_iri, ) return cls.from_graph(graph)
[docs] def load_graph(self, graph: Graph) -> None: """Use ``graph`` for :meth:`subtypes_of` and :meth:`inverse_of` queries.""" self._graph = graph self._graph_inverse_forward.clear() self._graph_inverse_reverse.clear() self._index_inverse_from_graph(graph)
[docs] def register_subclasses( self, base_type_uri: str, subtype_uris: Iterable[str], ) -> None: """Register direct ``rdfs:subClassOf`` links without an ontology file.""" children = self._parent_to_children.setdefault(base_type_uri, set()) for sub in subtype_uris: children.add(sub)
[docs] def register_inverse(self, forward_predicate: str, inverse_predicate: str) -> None: """Register an ``owl:inverseOf`` pair (both directions are queryable).""" self._registered_inverse_forward[forward_predicate] = inverse_predicate self._registered_inverse_reverse[inverse_predicate] = forward_predicate
[docs] def subtypes_of(self, type_uri: str) -> frozenset[str]: """Return ``type_uri`` and all registered or inferred **subclass** IRIs.""" found: set[str] = {type_uri} if self._graph is not None: term = NamedNode(type_uri) for subj in self._graph.transitive_subjects(RDFS_SUBCLASS, term): found.add(term_str(subj)) found.update(self._static_subtype_closure(type_uri)) return frozenset(found)
[docs] def inverse_of(self, predicate_uri: str) -> str | None: """Return the inverse property IRI for ``predicate_uri``, if known.""" if predicate_uri in self._registered_inverse_forward: return self._registered_inverse_forward[predicate_uri] if predicate_uri in self._registered_inverse_reverse: return self._registered_inverse_reverse[predicate_uri] if predicate_uri in self._graph_inverse_forward: return self._graph_inverse_forward[predicate_uri] if predicate_uri in self._graph_inverse_reverse: return self._graph_inverse_reverse[predicate_uri] if self._graph is None: return None pred = NamedNode(predicate_uri) for obj in self._graph.objects(pred, OWL_INVERSE): return term_str(obj) for subj in self._graph.subjects(OWL_INVERSE, pred): return term_str(subj) return None
def _static_subtype_closure(self, type_uri: str) -> set[str]: """All descendants from :meth:`register_subclasses` edges.""" result: set[str] = set() queue: deque[str] = deque(self._parent_to_children.get(type_uri, ())) while queue: child = queue.popleft() if child in result: continue result.add(child) queue.extend(self._parent_to_children.get(child, ())) return result def _index_inverse_from_graph(self, graph: Graph) -> None: for subj in graph.subjects(OWL_INVERSE, None): if not isinstance(subj, NamedNode): continue for obj in graph.objects(subj, OWL_INVERSE): if isinstance(obj, NamedNode): forward = term_str(subj) inverse = term_str(obj) self._graph_inverse_forward[forward] = inverse self._graph_inverse_reverse[inverse] = forward
def apply_hints_to_model( model_cls: type[BaseModel], registry: OntologyRegistry, *, mutate: bool = False, overwrite: bool = False, ) -> dict[str, str]: """Suggest or apply ``inverse=`` metadata from :meth:`OntologyRegistry.inverse_of`. Returns ``{field_name: inverse_predicate_uri}`` for each mapped field that received a hint. When ``mutate`` is True, updates ``json_schema_extra`` on the class and calls ``model_rebuild()`` so :func:`~triplemodel.fields.metadata.inverse_for_field` sees the inverse IRI. Skips fields that already declare an inverse unless ``overwrite`` is True. """ cfg = get_rdf_config(model_cls) prefixes = cfg.prefixes_dict applied: dict[str, str] = {} new_fields: dict[str, FieldInfo] = dict(model_cls.model_fields) changed = False for name, field_info in model_cls.model_fields.items(): raw = predicate_for_field(field_info) or predicate_from_annotation( field_annotation(field_info) ) if raw is None: continue if inverse_for_field(field_info) is not None and not overwrite: continue try: forward = resolve_predicate(raw, prefixes) except ValueError: continue inv = registry.inverse_of(forward) if inv is None: continue applied[name] = inv if mutate: extra = field_info.json_schema_extra merged: dict[str, object] = ( dict(cast(Mapping[str, object], extra)) if isinstance(extra, dict) else {} ) merged["rdf_inverse"] = inv new_fields[name] = FieldInfo.merge_field_infos( # ty: ignore[deprecated] field_info, json_schema_extra=merged, ) changed = True if mutate and changed: cast(Any, model_cls).model_fields = new_fields model_cls.model_rebuild(force=True) return applied __all__ = ["OntologyRegistry", "apply_hints_to_model"]