Source code for sparqlmodel.stores.http

"""HTTP SPARQL 1.1 store with a local graph mirror for ORM cascade reads."""

from __future__ import annotations

from collections.abc import Iterable, Mapping
from typing import Any

import httpx
from triplemodel import Store

from sparqlmodel.exceptions import ConfigurationError, QueryError
from sparqlmodel.rdf_n3 import validate_iri_token
from sparqlmodel.stores import http_common
from sparqlmodel.stores.sparql_json import parse_sparql_json_bindings
from sparqlmodel.types import IRI, NamespaceRegistry

_CLOSED_STORE_MSG = "Cannot use a closed HttpStore"

# Backward-compatible re-exports for tests and callers.
_graph_to_insert_data = http_common.graph_to_insert_data
_graph_to_delete_data = http_common.graph_to_delete_data


[docs] class HttpStore: """SPARQL 1.1 endpoint store with a local ``triplemodel.Store`` mirror. ``update_graph`` pushes ``INSERT DATA`` / ``DELETE DATA`` to the remote endpoint (chunked when ``max_triples_per_update`` is exceeded) and applies the mirror delta only after all remote chunks succeed. ``query`` executes SELECT against the remote read endpoint (POST by default, optional GET via ``query_method``). Optional ``read_endpoint`` / ``write_endpoint`` support Fuseki-style split URLs (defaults to ``endpoint`` for both). Transient HTTP failures (502/503/504, connection errors) are retried per ``max_retries`` / ``retry_backoff``. **Mirror limitations:** Data written outside this store instance (another app, admin UI, or raw SPARQL UPDATE) is visible to ``query`` / ``execute`` but not to ``graph``, ``get``, or cascade/orphan logic until the mirror is updated. Use :meth:`pull_subjects_into_mirror`, :meth:`sync_mirror` (Graph Store HTTP GET when ``graph_store_url`` is set), or ``get`` (which pulls automatically) to hydrate the mirror from the remote dataset. Prefer :class:`~sparqlmodel.stores.memory.MemoryStore` for single-process apps and tests. Assume a single writer per endpoint when using ``HttpStore``. If both ``auth`` and ``bearer_token`` are set, Basic ``auth`` wins for ``Authorization``. """
[docs] def __init__( self, endpoint: str, *, read_endpoint: str | None = None, write_endpoint: str | None = None, graph: Store | None = None, prefixes: dict[str, str] | None = None, auth: tuple[str, str] | None = None, bearer_token: str | None = None, headers: Mapping[str, str] | None = None, timeout: float = 30.0, client: httpx.Client | None = None, mirror_mode: http_common.MirrorMode = "writer", max_retries: int = 2, retry_backoff: float = 0.5, max_triples_per_update: int = 500, query_method: http_common.QueryMethod = "post", graph_store_url: str | None = None, ) -> None: http_common.validate_http_resilience( max_retries=max_retries, retry_backoff=retry_backoff, max_triples_per_update=max_triples_per_update, ) self._mirror_mode = http_common.validate_mirror_mode(mirror_mode) self._query_method = http_common.validate_query_method(query_method) self._max_retries = max_retries self._retry_backoff = retry_backoff self._max_triples_per_update = max_triples_per_update self._endpoint = endpoint.rstrip("/") self._read_endpoint = (read_endpoint or endpoint).rstrip("/") self._write_endpoint = (write_endpoint or endpoint).rstrip("/") self._graph_store_url = graph_store_url.rstrip("/") if graph_store_url else None self._graph = graph or Store() self._registry = NamespaceRegistry(prefixes) self._registry.bind(self._graph) self._timeout = timeout self._owns_client = client is None req_headers = http_common.build_request_headers( headers=headers, auth=auth, bearer_token=bearer_token, ) if client is not None: self._client = client if req_headers: self._client.headers.update(req_headers) if timeout is not None: self._client.timeout = timeout else: self._client = httpx.Client( headers=req_headers, timeout=timeout, follow_redirects=True, ) self._closed = False self._mirror_generation = 0
def _check_open(self) -> None: if self._closed: raise RuntimeError(_CLOSED_STORE_MSG) def _read_url(self) -> str: return http_common.sparql_url(self._read_endpoint) def _write_url(self) -> str: return http_common.sparql_url(self._write_endpoint) def _sparql_url(self) -> str: """Backward-compatible alias for the read SPARQL URL.""" return self._read_url() @property def graph(self) -> Store: return self._graph @property def namespaces(self) -> NamespaceRegistry: return self._registry @property def endpoint(self) -> str: return self._endpoint @property def read_endpoint(self) -> str: return self._read_endpoint @property def write_endpoint(self) -> str: return self._write_endpoint @property def mirror_mode(self) -> http_common.MirrorMode: """Mirror mode — ``writer`` (default) or ``remote_authoritative``.""" return self._mirror_mode @property def query_method(self) -> http_common.QueryMethod: """How remote SELECT queries are sent — ``post`` (default) or ``get``.""" return self._query_method @property def graph_store_url(self) -> str | None: """Graph Store HTTP URL for :meth:`sync_mirror` (optional).""" return self._graph_store_url @property def mirror_generation(self) -> int: """Monotonic counter bumped when the local mirror is wholesale or partially replaced.""" return self._mirror_generation def _bump_mirror_generation(self) -> None: self._mirror_generation += 1
[docs] def sync_mirror(self) -> None: """Replace the local mirror with the remote default graph (Graph Store HTTP GET). Requires ``graph_store_url`` on construction. Does not modify the remote dataset. For per-subject hydration use :meth:`pull_subjects_into_mirror` instead. """ self._check_open() if self._graph_store_url is None: raise ConfigurationError( "HttpStore.sync_mirror() requires graph_store_url=; " "use pull_subjects_into_mirror() for partial sync" ) content, content_type = http_common.fetch_graph_store( self._client, self._graph_store_url, max_retries=self._max_retries, retry_backoff=self._retry_backoff, ) remote = http_common.parse_gsp_response(content, content_type) http_common.replace_mirror_from_graph(self._graph, remote) self._registry.bind(self._graph) self._bump_mirror_generation()
[docs] def close(self) -> None: if self._closed: return self._closed = True if self._owns_client: self._client.close()
def __enter__(self) -> HttpStore: return self def __exit__(self, *args: object) -> None: self.close() def _post_update(self, update: str) -> None: self._check_open() if not update.strip(): return http_common.request_with_retry( self._client, "POST", self._write_url(), operation="SPARQL UPDATE", max_retries=self._max_retries, retry_backoff=self._retry_backoff, content=update.encode("utf-8"), headers=http_common.SPARQL_UPDATE_HEADERS, )
[docs] def query(self, sparql: str) -> list[dict[str, Any]]: """Execute SPARQL SELECT against the remote read endpoint.""" self._check_open() if not http_common.is_select_query(sparql): raise QueryError("Expected SELECT query for HttpStore.query()") try: response = http_common.execute_select( self._client, self._read_url(), sparql, query_method=self._query_method, max_retries=self._max_retries, retry_backoff=self._retry_backoff, ) except QueryError: raise except Exception as exc: raise QueryError(f"SPARQL query failed: {exc}") from exc try: return parse_sparql_json_bindings(response.content) except QueryError: raise except Exception as exc: raise QueryError(f"Failed to parse SPARQL JSON results: {exc}") from exc
[docs] def pull_subjects_into_mirror(self, iris: Iterable[str | IRI]) -> None: """Fetch triples for ``iris`` from the remote endpoint into the local mirror.""" self._check_open() prefixes = dict(self._registry.prefixes) unique = http_common.expand_subject_iris(iris, prefixes) if not unique: return values = " ".join(f"<{validate_iri_token(iri)}>" for iri in unique) sparql = f"CONSTRUCT {{ ?s ?p ?o }} WHERE {{ VALUES ?s {{ {values} }} ?s ?p ?o }}" headers = { **http_common.SPARQL_QUERY_HEADERS, "Accept": "text/turtle", } try: response = http_common.request_with_retry( self._client, "POST", self._read_url(), operation="SPARQL CONSTRUCT", max_retries=self._max_retries, retry_backoff=self._retry_backoff, content=sparql.encode("utf-8"), headers=headers, ) except QueryError: raise except Exception as exc: raise QueryError(f"SPARQL CONSTRUCT failed: {exc}") from exc remote: Store | None = None if response.content.strip(): try: remote = http_common.parse_construct_response( response.content, response.headers.get("Content-Type"), ) except Exception as exc: raise QueryError(f"Failed to parse CONSTRUCT response: {exc}") from exc http_common.apply_construct_to_mirror( self._graph, remote, subjects=unique, prefixes=prefixes, ) self._bump_mirror_generation()
[docs] def update_graph(self, add: Store | None = None, remove: Store | None = None) -> None: """Apply graph delta to remote endpoint and local mirror.""" self._check_open() for chunk in http_common.build_update_chunks(remove, add, self._max_triples_per_update): self._post_update(chunk) if remove is not None: for triple in remove: self._graph.remove(triple) if add is not None: for triple in add: self._graph.add(triple)