Source code for advanced_alchemy.extensions.litestar.plugins.init.config.asyncio
from__future__importannotationsfromcontextlibimportasynccontextmanagerfromdataclassesimportdataclass,fieldfromtypingimportTYPE_CHECKING,Callable,Literal,castfromlitestar.cli._utilsimportconsolefromlitestar.constantsimportHTTP_RESPONSE_STARTfromsqlalchemy.excimportOperationalErrorfromsqlalchemy.ext.asyncioimportAsyncEngine,AsyncSessionfromadvanced_alchemy.baseimportmetadata_registryfromadvanced_alchemy.config.asyncioimportSQLAlchemyAsyncConfigas_SQLAlchemyAsyncConfigfromadvanced_alchemy.extensions.litestar._utilsimport(delete_aa_scope_state,get_aa_scope_state,set_aa_scope_state,)fromadvanced_alchemy.extensions.litestar.plugins.init.config.commonimport(SESSION_SCOPE_KEY,SESSION_TERMINUS_ASGI_EVENTS,)fromadvanced_alchemy.extensions.litestar.plugins.init.config.engineimportEngineConfigifTYPE_CHECKING:fromcollections.abcimportAsyncGenerator,CoroutinefromtypingimportAnyfromlitestarimportLitestarfromlitestar.datastructures.stateimportStatefromlitestar.typesimportBeforeMessageSendHookHandler,Message,Scope# noinspection PyUnresolvedReferences__all__=("SQLAlchemyAsyncConfig","autocommit_before_send_handler","autocommit_handler_maker","default_before_send_handler","default_handler_maker",)defdefault_handler_maker(session_scope_key:str=SESSION_SCOPE_KEY,)->Callable[[Message,Scope],Coroutine[Any,Any,None]]:"""Set up the handler to issue a transaction commit or rollback based on specified status codes Args: session_scope_key: The key to use within the application state Returns: The handler callable """asyncdefhandler(message:Message,scope:Scope)->None:"""Handle commit/rollback, closing and cleaning up sessions before sending. Args: message: ASGI-``Message`` scope: An ASGI-``Scope`` Returns: None """session=cast("AsyncSession | None",get_aa_scope_state(scope,session_scope_key))ifsessionandmessage["type"]inSESSION_TERMINUS_ASGI_EVENTS:awaitsession.close()delete_aa_scope_state(scope,session_scope_key)returnhandlerdefault_before_send_handler=default_handler_maker()defautocommit_handler_maker(commit_on_redirect:bool=False,extra_commit_statuses:set[int]|None=None,extra_rollback_statuses:set[int]|None=None,session_scope_key:str=SESSION_SCOPE_KEY,)->Callable[[Message,Scope],Coroutine[Any,Any,None]]:"""Set up the handler to issue a transaction commit or rollback based on specified status codes Args: commit_on_redirect: Issue a commit when the response status is a redirect (``3XX``) extra_commit_statuses: A set of additional status codes that trigger a commit extra_rollback_statuses: A set of additional status codes that trigger a rollback session_scope_key: The key to use within the application state Returns: The handler callable """ifextra_commit_statusesisNone:extra_commit_statuses=set()ifextra_rollback_statusesisNone:extra_rollback_statuses=set()iflen(extra_commit_statuses&extra_rollback_statuses)>0:msg="Extra rollback statuses and commit statuses must not share any status codes"raiseValueError(msg)commit_range=range(200,400ifcommit_on_redirectelse300)asyncdefhandler(message:Message,scope:Scope)->None:"""Handle commit/rollback, closing and cleaning up sessions before sending. Args: message: ASGI-``litestar.types.Message`` scope: An ASGI-``litestar.types.Scope`` Returns: None """session=cast("AsyncSession | None",get_aa_scope_state(scope,session_scope_key))try:ifsessionisnotNoneandmessage["type"]==HTTP_RESPONSE_START:if(message["status"]incommit_rangeormessage["status"]inextra_commit_statuses)andmessage["status"]notinextra_rollback_statuses:awaitsession.commit()else:awaitsession.rollback()finally:ifsessionandmessage["type"]inSESSION_TERMINUS_ASGI_EVENTS:awaitsession.close()delete_aa_scope_state(scope,session_scope_key)returnhandlerautocommit_before_send_handler=autocommit_handler_maker()
[docs]@dataclassclassSQLAlchemyAsyncConfig(_SQLAlchemyAsyncConfig):"""Litestar Async SQLAlchemy Configuration."""before_send_handler:BeforeMessageSendHookHandler|None|Literal["autocommit","autocommit_include_redirects"]=(None)"""Handler to call before the ASGI message is sent. The handler should handle closing the session stored in the ASGI scope, if it's still open, and committing and uncommitted data. """engine_dependency_key:str="db_engine""""Key to use for the dependency injection of database engines."""session_dependency_key:str="db_session""""Key to use for the dependency injection of database sessions."""engine_app_state_key:str="db_engine""""Key under which to store the SQLAlchemy engine in the application :class:`State <litestar.datastructures.State>` instance. """session_maker_app_state_key:str="session_maker_class""""Key under which to store the SQLAlchemy :class:`sessionmaker <sqlalchemy.orm.sessionmaker>` in the application :class:`State <litestar.datastructures.State>` instance. """session_scope_key:str=SESSION_SCOPE_KEY"""Key under which to store the SQLAlchemy scope in the application."""engine_config:EngineConfig=field(default_factory=EngineConfig)# pyright: ignore[reportIncompatibleVariableOverride]"""Configuration for the SQLAlchemy engine. The configuration options are documented in the SQLAlchemy documentation. """set_default_exception_handler:bool=True"""Sets the default exception handler on application start."""def_ensure_unique(self,registry_name:str,key:str,new_key:str|None=None,_iter:int=0)->str:new_key=new_keyifnew_keyisnotNoneelsekeyifnew_keyingetattr(self.__class__,registry_name,{}):_iter+=1new_key=self._ensure_unique(registry_name,key,f"{key}_{_iter}",_iter)returnnew_keydef__post_init__(self)->None:self.session_scope_key=self._ensure_unique("_SESSION_SCOPE_KEY_REGISTRY",self.session_scope_key)self.engine_app_state_key=self._ensure_unique("_ENGINE_APP_STATE_KEY_REGISTRY",self.engine_app_state_key)self.session_maker_app_state_key=self._ensure_unique("_SESSIONMAKER_APP_STATE_KEY_REGISTRY",self.session_maker_app_state_key,)self.__class__._SESSION_SCOPE_KEY_REGISTRY.add(self.session_scope_key)# noqa: SLF001self.__class__._ENGINE_APP_STATE_KEY_REGISTRY.add(self.engine_app_state_key)# noqa: SLF001self.__class__._SESSIONMAKER_APP_STATE_KEY_REGISTRY.add(self.session_maker_app_state_key)# noqa: SLF001ifself.before_send_handlerisNone:self.before_send_handler=default_handler_maker(session_scope_key=self.session_scope_key)ifself.before_send_handler=="autocommit":self.before_send_handler=autocommit_handler_maker(session_scope_key=self.session_scope_key)ifself.before_send_handler=="autocommit_include_redirects":self.before_send_handler=autocommit_handler_maker(session_scope_key=self.session_scope_key,commit_on_redirect=True,)super().__post_init__()
[docs]defcreate_session_maker(self)->Callable[[],AsyncSession]:"""Get a session maker. If none exists yet, create one. Returns: Session factory used by the plugin. """ifself.session_maker:returnself.session_makersession_kws=self.session_config_dictifsession_kws.get("bind")isNone:session_kws["bind"]=self.get_engine()returnself.session_maker_class(**session_kws)# pyright: ignore[reportUnknownVariableType,reportUnknownMemberType]
[docs]defprovide_engine(self,state:State)->AsyncEngine:"""Create an engine instance. Args: state: The ``Litestar.state`` instance. Returns: An engine instance. """returncast("AsyncEngine",state.get(self.engine_app_state_key))
[docs]defprovide_session(self,state:State,scope:Scope)->AsyncSession:"""Create a session instance. Args: state: The ``Litestar.state`` instance. scope: The current connection's scope. Returns: A session instance. """session=cast("AsyncSession | None",get_aa_scope_state(scope,self.session_scope_key))ifsessionisNone:session_maker=cast("Callable[[], AsyncSession]",state[self.session_maker_app_state_key])session=session_maker()set_aa_scope_state(scope,self.session_scope_key,session)returnsession
@propertydefsignature_namespace(self)->dict[str,Any]:"""Return the plugin's signature namespace. Returns: A string keyed dict of names to be added to the namespace for signature forward reference resolution. """return{"AsyncEngine":AsyncEngine,"AsyncSession":AsyncSession}
[docs]asyncdefcreate_all_metadata(self,app:Litestar)->None:"""Create all metadata Args: app (Litestar): The ``Litestar`` instance """asyncwithself.get_engine().begin()asconn:try:awaitconn.run_sync(metadata_registry.get(self.bind_key).create_all)exceptOperationalErrorasexc:console.print(f"[bold red] * Could not create target metadata. Reason: {exc}")
[docs]defcreate_app_state_items(self)->dict[str,Any]:"""Key/value pairs to be stored in application state."""return{self.engine_app_state_key:self.get_engine(),self.session_maker_app_state_key:self.create_session_maker(),}
[docs]defupdate_app_state(self,app:Litestar)->None:"""Set the app state with engine and session. Args: app: The ``Litestar`` instance. """app.state.update(self.create_app_state_items())