Source code for sparqlmodel.session_state

"""Session-level identity map, hydration cache, and pending writes."""

from __future__ import annotations

from sparqlmodel.graph import _expanded_iri_key
from sparqlmodel.model import SPARQLModel
from sparqlmodel.types import IRI

IdentityKey = tuple[type[SPARQLModel], str]
HydrationKey = tuple[type[SPARQLModel], str, int]


def identity_key(model: SPARQLModel) -> IdentityKey:
    model.ensure_id()
    prefixes = model.get_prefixes()
    iri = model.id
    assert iri is not None
    key = str(iri)
    if not key.startswith("_:"):
        key = _expanded_iri_key(iri, prefixes)
    return (type(model), key)


def identity_key_for_iri(model_cls: type[SPARQLModel], iri: str | IRI) -> IdentityKey:
    prefixes = model_cls.get_prefixes()
    raw = str(iri)
    key = raw if raw.startswith("_:") else _expanded_iri_key(iri, prefixes)
    return (model_cls, key)


[docs] class SessionState: """Identity map, hydration cache, and pending ``put`` queue for a session."""
[docs] def __init__(self) -> None: self._identity: dict[IdentityKey, SPARQLModel] = {} self._hydration: dict[HydrationKey, SPARQLModel | None] = {} self._pending: list[SPARQLModel] = [] self.store_generation: int = 0
def get_identity(self, key: IdentityKey) -> SPARQLModel | None: return self._identity.get(key) def set_identity(self, model: SPARQLModel) -> None: key = identity_key(model) self._identity[key] = model self.invalidate_hydration_for_iri(key[1]) def pop_identity(self, key: IdentityKey) -> None: self._identity.pop(key, None) def evict_identity_prefix(self, model_cls: type[SPARQLModel], iri_key: str) -> None: self._identity.pop((model_cls, iri_key), None) def get_hydration(self, key: HydrationKey) -> SPARQLModel | None | _HydrationMiss: if key not in self._hydration: return _HYDRATION_MISS return self._hydration[key] def set_hydration(self, key: HydrationKey, value: SPARQLModel | None) -> None: self._hydration[key] = value def invalidate_hydration_for(self, model_cls: type[SPARQLModel], iri_key: str) -> None: to_drop = [k for k in self._hydration if k[0] is model_cls and k[1] == iri_key] for k in to_drop: del self._hydration[k]
[docs] def invalidate_hydration_for_iri(self, iri_key: str) -> None: """Drop hydration cache entries for all model classes at ``iri_key``.""" to_drop = [k for k in self._hydration if k[1] == iri_key] for k in to_drop: del self._hydration[k]
def invalidate_all_hydration(self) -> None: self._hydration.clear() def add_pending(self, model: SPARQLModel) -> None: key = identity_key(model) self._pending = [m for m in self._pending if identity_key(m) != key] self._pending.append(model) @property def pending(self) -> list[SPARQLModel]: return self._pending def clear_pending(self) -> None: self._pending.clear()
[docs] def remove_pending_for(self, model_cls: type[SPARQLModel], iri_key: str) -> None: """Drop queued ``put(..., flush=False)`` writes for ``(model_cls, iri_key)``.""" target = (model_cls, iri_key) def _matches_pending(m: SPARQLModel) -> bool: if m.id is None: return False return identity_key_for_iri(type(m), m.id) == target self._pending = [m for m in self._pending if not _matches_pending(m)]
def expire_model(self, model: SPARQLModel) -> None: key = identity_key(model) self.pop_identity(key) self.invalidate_hydration_for_iri(key[1]) def expire_keys(self, keys: list[IdentityKey]) -> None: for model_cls, iri_key in keys: self.evict_identity_prefix(model_cls, iri_key) self.invalidate_hydration_for_iri(iri_key)
[docs] def expunge_all(self) -> None: """Clear identity map and hydration cache (pending queue unchanged).""" self._identity.clear() self._hydration.clear()
class _HydrationMiss: pass _HYDRATION_MISS = _HydrationMiss()