"""FastAPI session dependency (SQLModel / SQLAlchemy style)."""
from __future__ import annotations
from collections.abc import AsyncIterator, Callable, Generator
from contextlib import asynccontextmanager
from typing import Annotated, Any
from sparqlmodel.async_session import AsyncSPARQLSession
from sparqlmodel.session import SPARQLSession
from sparqlmodel.stores.async_base import AsyncStoreProtocol
from sparqlmodel.stores.async_memory import AsyncMemoryStore
from sparqlmodel.stores.base import Store
from sparqlmodel.stores.memory import MemoryStore
Depends: Any = None
FastAPI: Any
Request: Any
try:
from fastapi import Depends as _Depends
from fastapi import FastAPI as _FastAPI
from fastapi import Request as _Request
Depends = _Depends
FastAPI = _FastAPI
Request = _Request
except ImportError: # pragma: no cover - optional extra
pass
def _require_fastapi_depends() -> None:
if Depends is None:
raise ImportError(
"FastAPI session dependencies require: pip install 'sparqlmodel[fastapi]'"
)
[docs]
def init_app(
app: FastAPI,
store: Store,
*,
prefixes: dict[str, str] | None = None,
autoflush: bool = True,
rollback_on_error: bool = True,
) -> None:
"""Attach a shared store and session options to ``app.state`` (like a SQLAlchemy engine)."""
_require_fastapi_depends()
app.state.sparql_store = store
app.state.sparql_session_options = {
"prefixes": prefixes,
"autoflush": autoflush,
"rollback_on_error": rollback_on_error,
"close_on_exit": False,
}
def _session_options(app: FastAPI) -> dict[str, Any]:
options = getattr(app.state, "sparql_session_options", None)
if isinstance(options, dict):
return dict(options)
return {}
def _resolve_store(app: FastAPI) -> tuple[Store, bool]:
"""Return ``(store, close_store_on_session_exit)``."""
shared = getattr(app.state, "sparql_store", None)
if shared is not None:
return shared, False
factory = getattr(app.state, "sparql_store_factory", None)
if callable(factory):
return factory(), True
return MemoryStore(), True
[docs]
def get_session(request: Request) -> Generator[SPARQLSession, None, None]:
"""Yield one :class:`~sparqlmodel.session.SPARQLSession` per request (use with ``Depends``).
Mirrors SQLModel / SQLAlchemy::
def get_session():
with Session(engine) as session:
yield session
The shared store comes from :func:`init_app` or :func:`http_store_lifespan`.
When neither is configured, each request gets an isolated in-memory store.
"""
_require_fastapi_depends()
store, should_close_store = _resolve_store(request.app)
options = _session_options(request.app)
options.setdefault("close_on_exit", should_close_store)
with SPARQLSession(store=store, **options) as session:
yield session
if Depends is not None:
SessionDep = Annotated[SPARQLSession, Depends(get_session)]
else: # pragma: no cover
SessionDep = SPARQLSession # type: ignore[assignment,misc]
[docs]
@asynccontextmanager
async def http_store_lifespan(
app: FastAPI,
endpoint: str,
*,
auth: tuple[str, str] | None = None,
headers: dict[str, str] | None = None,
prefixes: dict[str, str] | None = None,
**http_kwargs: Any,
) -> AsyncIterator[None]:
"""HttpStore lifespan: :func:`init_app` on startup, close the store on shutdown."""
_require_fastapi_depends()
from sparqlmodel.stores.http import HttpStore
with HttpStore(
endpoint,
auth=auth,
headers=headers,
prefixes=prefixes,
**http_kwargs,
) as store:
init_app(app, store, prefixes=prefixes)
yield
[docs]
def session_dependency(
store: Store | None = None,
*,
store_factory: Callable[[], Store] | None = None,
prefixes: dict[str, str] | None = None,
autoflush: bool = True,
rollback_on_error: bool = True,
close_on_exit: bool | None = None,
) -> Callable[[Request], Generator[SPARQLSession, None, None]]:
"""Build a custom ``get_session`` for ``app.dependency_overrides`` or multi-store apps."""
_require_fastapi_depends()
def _get(request: Request) -> Generator[SPARQLSession, None, None]:
if store is not None:
resolved = store
close = False if close_on_exit is None else close_on_exit
elif store_factory is not None:
resolved = store_factory()
close = True if close_on_exit is None else close_on_exit
else:
resolved, close = _resolve_store(request.app)
if getattr(request.app.state, "sparql_store", None) is resolved:
close = False
elif close_on_exit is not None:
close = close_on_exit
kwargs: dict[str, Any] = {
"prefixes": prefixes,
"autoflush": autoflush,
"rollback_on_error": rollback_on_error,
"close_on_exit": close,
}
with SPARQLSession(store=resolved, **kwargs) as session:
yield session
return _get
[docs]
def init_async_app(
app: FastAPI,
store: AsyncStoreProtocol,
*,
prefixes: dict[str, str] | None = None,
autoflush: bool = True,
rollback_on_error: bool = True,
) -> None:
"""Attach a shared async store and session options to ``app.state``."""
_require_fastapi_depends()
app.state.sparql_async_store = store
app.state.sparql_async_session_options = {
"prefixes": prefixes,
"autoflush": autoflush,
"rollback_on_error": rollback_on_error,
"close_on_exit": False,
}
def _async_session_options(app: FastAPI) -> dict[str, Any]:
options = getattr(app.state, "sparql_async_session_options", None)
if isinstance(options, dict):
return dict(options)
return {}
def _resolve_async_store(app: FastAPI) -> tuple[AsyncStoreProtocol, bool]:
shared = getattr(app.state, "sparql_async_store", None)
if shared is not None:
return shared, False
factory = getattr(app.state, "sparql_async_store_factory", None)
if callable(factory):
return factory(), True
return AsyncMemoryStore(), True
[docs]
async def get_async_session(request: Request) -> AsyncIterator[AsyncSPARQLSession]:
"""Yield one :class:`~sparqlmodel.async_session.AsyncSPARQLSession` per request."""
_require_fastapi_depends()
store, should_close_store = _resolve_async_store(request.app)
options = _async_session_options(request.app)
options.setdefault("close_on_exit", should_close_store)
async with AsyncSPARQLSession(store=store, **options) as session:
yield session
if Depends is not None:
AsyncSessionDep = Annotated[AsyncSPARQLSession, Depends(get_async_session)]
else: # pragma: no cover
AsyncSessionDep = AsyncSPARQLSession # type: ignore[assignment,misc]
[docs]
@asynccontextmanager
async def async_http_store_lifespan(
app: FastAPI,
endpoint: str,
*,
auth: tuple[str, str] | None = None,
headers: dict[str, str] | None = None,
prefixes: dict[str, str] | None = None,
**http_kwargs: Any,
) -> AsyncIterator[None]:
"""AsyncHttpStore lifespan: :func:`init_async_app` on startup, ``aclose`` on shutdown."""
_require_fastapi_depends()
from sparqlmodel.stores.async_http import AsyncHttpStore
async with AsyncHttpStore(
endpoint,
auth=auth,
headers=headers,
prefixes=prefixes,
**http_kwargs,
) as store:
init_async_app(app, store, prefixes=prefixes)
yield
[docs]
def async_session_dependency(
store: AsyncStoreProtocol | None = None,
*,
store_factory: Callable[[], AsyncStoreProtocol] | None = None,
prefixes: dict[str, str] | None = None,
autoflush: bool = True,
rollback_on_error: bool = True,
close_on_exit: bool | None = None,
) -> Callable[[Request], AsyncIterator[AsyncSPARQLSession]]:
"""Build a custom async ``get_session`` for dependency overrides."""
_require_fastapi_depends()
async def _get(request: Request) -> AsyncIterator[AsyncSPARQLSession]:
if store is not None:
resolved = store
close = False if close_on_exit is None else close_on_exit
elif store_factory is not None:
resolved = store_factory()
close = True if close_on_exit is None else close_on_exit
else:
resolved, close = _resolve_async_store(request.app)
if getattr(request.app.state, "sparql_async_store", None) is resolved:
close = False
elif close_on_exit is not None:
close = close_on_exit
kwargs: dict[str, Any] = {
"prefixes": prefixes,
"autoflush": autoflush,
"rollback_on_error": rollback_on_error,
"close_on_exit": close,
}
async with AsyncSPARQLSession(store=resolved, **kwargs) as session:
yield session
return _get