Stores

Store protocol for SPARQL backends.

class sparqlmodel.stores.base.StoreProtocol(*args, **kwargs)[source]

Bases: Protocol

Protocol for RDF persistence backends.

property graph: RdfGraph

Underlying RDF graph (pyoxigraph via Store).

query(sparql)[source]

Execute a SPARQL SELECT query and return bindings.

Return type:

list[dict[str, Any]]

update_graph(add=None, remove=None)[source]

Add or remove triples from the store.

__init__(*args, **kwargs)

In-memory RDF store backed by pyoxigraph (triplemodel.Store).

class sparqlmodel.stores.memory.MemoryStore(graph=None, *, prefixes=None)[source]

Bases: object

In-memory RDF store using Store.

__init__(graph=None, *, prefixes=None)[source]
property mirror_generation: int

Monotonic counter; 0 for in-memory stores (no wholesale mirror sync).

query(sparql)[source]

Execute SPARQL SELECT and return variable bindings.

Return type:

list[dict[str, Any]]

update_graph(add=None, remove=None)[source]

Add or remove triples.

HTTP SPARQL 1.1 store with a local graph mirror for ORM cascade reads.

class sparqlmodel.stores.http.HttpStore(endpoint, *, read_endpoint=None, write_endpoint=None, graph=None, prefixes=None, auth=None, bearer_token=None, headers=None, timeout=30.0, client=None, mirror_mode='writer', max_retries=2, retry_backoff=0.5, max_triples_per_update=500, query_method='post', graph_store_url=None)[source]

Bases: object

SPARQL 1.1 endpoint store with a local triplemodel.Store mirror.

update_graph pushes INSERT DATA / DELETE DATA to the remote endpoint (chunked when max_triples_per_update is exceeded) and applies the mirror delta only after all remote chunks succeed. query executes SELECT against the remote read endpoint (POST by default, optional GET via query_method).

Optional read_endpoint / write_endpoint support Fuseki-style split URLs (defaults to endpoint for both). Transient HTTP failures (502/503/504, connection errors) are retried per max_retries / retry_backoff.

Mirror limitations: Data written outside this store instance (another app, admin UI, or raw SPARQL UPDATE) is visible to query / execute but not to graph, get, or cascade/orphan logic until the mirror is updated. Use pull_subjects_into_mirror(), sync_mirror() (Graph Store HTTP GET when graph_store_url is set), or get (which pulls automatically) to hydrate the mirror from the remote dataset. Prefer MemoryStore for single-process apps and tests. Assume a single writer per endpoint when using HttpStore.

If both auth and bearer_token are set, Basic auth wins for Authorization.

__init__(endpoint, *, read_endpoint=None, write_endpoint=None, graph=None, prefixes=None, auth=None, bearer_token=None, headers=None, timeout=30.0, client=None, mirror_mode='writer', max_retries=2, retry_backoff=0.5, max_triples_per_update=500, query_method='post', graph_store_url=None)[source]
property mirror_mode: Literal['writer', 'remote_authoritative']

Mirror mode — writer (default) or remote_authoritative.

property query_method: Literal['post', 'get']

How remote SELECT queries are sent — post (default) or get.

property graph_store_url: str | None

Graph Store HTTP URL for sync_mirror() (optional).

property mirror_generation: int

Monotonic counter bumped when the local mirror is wholesale or partially replaced.

sync_mirror()[source]

Replace the local mirror with the remote default graph (Graph Store HTTP GET).

Requires graph_store_url on construction. Does not modify the remote dataset. For per-subject hydration use pull_subjects_into_mirror() instead.

query(sparql)[source]

Execute SPARQL SELECT against the remote read endpoint.

Return type:

list[dict[str, Any]]

pull_subjects_into_mirror(iris)[source]

Fetch triples for iris from the remote endpoint into the local mirror.

update_graph(add=None, remove=None)[source]

Apply graph delta to remote endpoint and local mirror.

Async store protocol for SPARQL backends.

class sparqlmodel.stores.async_base.AsyncStoreProtocol(*args, **kwargs)[source]

Bases: Protocol

Protocol for async RDF persistence backends.

property graph: RdfGraph

Underlying RDF graph (pyoxigraph via Store).

property namespaces: NamespaceRegistry

Prefix registry bound to the store graph.

async query(sparql)[source]

Execute a SPARQL SELECT query and return bindings.

Return type:

list[dict[str, Any]]

async update_graph(add=None, remove=None)[source]

Add or remove triples from the store.

async aclose()[source]

Release resources (HTTP client, etc.).

__init__(*args, **kwargs)

In-memory async RDF store (sync graph, async API surface).

class sparqlmodel.stores.async_memory.AsyncMemoryStore(graph=None, *, prefixes=None)[source]

Bases: object

In-memory RDF store with async methods delegating to sync MemoryStore.

__init__(graph=None, *, prefixes=None)[source]

Async HTTP SPARQL 1.1 store with a local graph mirror for ORM cascade reads.

class sparqlmodel.stores.async_http.AsyncHttpStore(endpoint, *, read_endpoint=None, write_endpoint=None, graph=None, prefixes=None, auth=None, bearer_token=None, headers=None, timeout=30.0, client=None, mirror_mode='writer', max_retries=2, retry_backoff=0.5, max_triples_per_update=500, query_method='post', graph_store_url=None)[source]

Bases: object

Async SPARQL 1.1 endpoint store with a local triplemodel.Store mirror.

Same semantics as HttpStore, using httpx.AsyncClient for non-blocking remote I/O. Supports optional read_endpoint / write_endpoint, mirror_mode, chunked UPDATE (max_triples_per_update), retries (max_retries / retry_backoff), and query_method post vs get for SELECT.

Mirror limitations: Data written outside this store instance is visible to query / execute but not to graph, get, or cascade until the mirror is updated. Use pull_subjects_into_mirror(), sync_mirror() (when graph_store_url is set), or get (auto-pull) to hydrate from remote. Prefer AsyncMemoryStore for asyncio tests. Assume a single writer per endpoint.

If both auth and bearer_token are set, Basic auth wins for Authorization.

__init__(endpoint, *, read_endpoint=None, write_endpoint=None, graph=None, prefixes=None, auth=None, bearer_token=None, headers=None, timeout=30.0, client=None, mirror_mode='writer', max_retries=2, retry_backoff=0.5, max_triples_per_update=500, query_method='post', graph_store_url=None)[source]
property mirror_mode: Literal['writer', 'remote_authoritative']

Mirror mode — writer (default) or remote_authoritative.

property query_method: Literal['post', 'get']

How remote SELECT queries are sent — post (default) or get.

property graph_store_url: str | None

Graph Store HTTP URL for sync_mirror() (optional).

property mirror_generation: int

Monotonic counter bumped when the local mirror is wholesale or partially replaced.

async sync_mirror()[source]

Replace the local mirror with the remote default graph (Graph Store HTTP GET).

async query(sparql)[source]

Execute SPARQL SELECT against the remote read endpoint.

Return type:

list[dict[str, Any]]

async pull_subjects_into_mirror(iris)[source]

Fetch triples for iris from the remote endpoint into the local mirror.

async update_graph(add=None, remove=None)[source]

Apply graph delta to remote endpoint and local mirror.

Shared SPARQL HTTP helpers for sync and async endpoint stores.

sparqlmodel.stores.http_common.iter_graph_chunks(graph, max_triples)[source]

Yield sub-stores with at most max_triples triples each.

Return type:

Iterator[RdfGraph]

sparqlmodel.stores.http_common.build_update_chunks(remove, add, max_triples)[source]

Build ordered SPARQL UPDATE strings: all DELETE chunks, then all INSERT chunks.

Return type:

list[str]

sparqlmodel.stores.http_common.append_query_params(url, **params)[source]

Append query parameters to url, preserving any existing query string.

Return type:

str

sparqlmodel.stores.http_common.expand_subject_iris(iris, prefixes)[source]

Expand compact IRIs to absolute form for CONSTRUCT VALUES and mirror sync.

Return type:

list[str]

sparqlmodel.stores.http_common.request_with_retry(client, method, url, *, operation, max_retries, retry_backoff, **kwargs)[source]

Execute an HTTP request with retries on transient failures.

Return type:

Response

async sparqlmodel.stores.http_common.async_request_with_retry(client, method, url, *, operation, max_retries, retry_backoff, **kwargs)[source]

Execute an async HTTP request with retries on transient failures.

Return type:

Response

sparqlmodel.stores.http_common.execute_select(client, url, sparql, *, query_method, max_retries, retry_backoff)[source]

Run a remote SELECT using GET or POST.

Return type:

Response

async sparqlmodel.stores.http_common.async_execute_select(client, url, sparql, *, query_method, max_retries, retry_backoff)[source]

Run a remote SELECT using GET or POST (async).

Return type:

Response

sparqlmodel.stores.http_common.remove_mirror_subjects(graph, iris, prefixes)[source]

Remove all mirror triples whose subject is one of iris.

sparqlmodel.stores.http_common.apply_construct_to_mirror(graph, remote, *, subjects, prefixes)[source]

Replace mirror triples for subjects with triples from remote (may be empty).

sparqlmodel.stores.http_common.sparql_url(endpoint)[source]

Normalize a SPARQL endpoint URL, preserving an existing query string.

Return type:

str

sparqlmodel.stores.http_common.is_select_query(sparql)[source]

Return True when sparql appears to be a SPARQL SELECT (not ASK/CONSTRUCT/DESCRIBE).

Return type:

bool

sparqlmodel.stores.http_common.default_graph_store_url(sparql_endpoint)[source]

Heuristic Fuseki GSP URL from a SPARQL endpoint (.../sparql.../data).

Return type:

str | None

sparqlmodel.stores.http_common.replace_mirror_from_graph(target, remote)[source]

Replace all triples in target with triples from remote (empty clears mirror).

sparqlmodel.stores.http_common.parse_gsp_response(content, content_type)[source]

Parse a Graph Store HTTP GET body into a Store.

Return type:

RdfGraph

sparqlmodel.stores.http_common.parse_construct_response(content, content_type)[source]

Parse a SPARQL CONSTRUCT response body into a Store.

Return type:

RdfGraph

sparqlmodel.stores.http_common.fetch_graph_store(client, url, *, max_retries, retry_backoff)[source]

GET an RDF graph via Graph Store HTTP; return body and Content-Type.

Return type:

tuple[bytes, str | None]

async sparqlmodel.stores.http_common.async_fetch_graph_store(client, url, *, max_retries, retry_backoff)[source]

Async GET for Graph Store HTTP.

Return type:

tuple[bytes, str | None]