"""In-memory async RDF store (sync graph, async API surface)."""
from __future__ import annotations
from typing import Any
from triplemodel import Store
from sparqlmodel.stores.memory import MemoryStore
[docs]
class AsyncMemoryStore:
"""In-memory RDF store with async methods delegating to sync
:class:`~sparqlmodel.stores.memory.MemoryStore`.
"""
[docs]
def __init__(
self,
graph: Store | None = None,
*,
prefixes: dict[str, str] | None = None,
) -> None:
self._inner = MemoryStore(graph=graph, prefixes=prefixes)
@property
def graph(self) -> Store:
return self._inner.graph
@property
def namespaces(self):
return self._inner.namespaces
[docs]
async def query(self, sparql: str) -> list[dict[str, Any]]:
return self._inner.query(sparql)
[docs]
async def update_graph(self, add: Store | None = None, remove: Store | None = None) -> None:
self._inner.update_graph(add=add, remove=remove)
[docs]
async def aclose(self) -> None:
return None