Source code for advanced_alchemy.alembic.utils

from contextlib import AbstractAsyncContextManager, AbstractContextManager
from pathlib import Path
from typing import TYPE_CHECKING, Union

from sqlalchemy import Engine, MetaData, Table
from typing_extensions import TypeIs

from advanced_alchemy.exceptions import MissingDependencyError

if TYPE_CHECKING:
    from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession
    from sqlalchemy.orm import DeclarativeBase, Session

__all__ = ("drop_all", "dump_tables")


[docs] async def drop_all(engine: "Union[AsyncEngine, Engine]", version_table_name: str, metadata: MetaData) -> None: """Drop all tables in the database. Args: engine: The database engine. version_table_name: The name of the version table. metadata: The metadata object containing the tables to drop. Raises: MissingDependencyError: If the `rich` package is not installed. """ try: from rich import get_console except ImportError as e: # pragma: no cover msg = "rich" raise MissingDependencyError(msg, install_package="cli") from e console = get_console() def _is_sync(engine: "Union[Engine, AsyncEngine]") -> "TypeIs[Engine]": return isinstance(engine, Engine) def _drop_tables_sync(engine: Engine) -> None: console.rule("[bold red]Connecting to database backend.") with engine.begin() as db: console.rule("[bold red]Dropping the db", align="left") metadata.drop_all(db) console.rule("[bold red]Dropping the version table", align="left") Table(version_table_name, metadata).drop(db, checkfirst=True) console.rule("[bold yellow]Successfully dropped all objects", align="left") async def _drop_tables_async(engine: "AsyncEngine") -> None: console.rule("[bold red]Connecting to database backend.", align="left") async with engine.begin() as db: console.rule("[bold red]Dropping the db", align="left") await db.run_sync(metadata.drop_all) console.rule("[bold red]Dropping the version table", align="left") await db.run_sync(Table(version_table_name, metadata).drop, checkfirst=True) console.rule("[bold yellow]Successfully dropped all objects", align="left") if _is_sync(engine): return _drop_tables_sync(engine) return await _drop_tables_async(engine)
async def dump_tables( dump_dir: Path, session: "Union[AbstractContextManager[Session], AbstractAsyncContextManager[AsyncSession]]", models: "list[type[DeclarativeBase]]", ) -> None: from types import new_class from advanced_alchemy._serialization import encode_json try: from rich import get_console except ImportError as e: # pragma: no cover msg = "rich" raise MissingDependencyError(msg, install_package="cli") from e console = get_console() def _is_sync( session: "Union[AbstractAsyncContextManager[AsyncSession], AbstractContextManager[Session]]", ) -> "TypeIs[AbstractContextManager[Session]]": return isinstance(session, AbstractContextManager) def _dump_table_sync(session: "AbstractContextManager[Session]") -> None: from advanced_alchemy.repository import SQLAlchemySyncRepository with session as _session: for model in models: json_path = dump_dir / f"{model.__tablename__}.json" console.rule( f"[yellow bold]Dumping table '{json_path.stem}' to '{json_path}'", style="yellow", align="left", ) repo = new_class( "repo", (SQLAlchemySyncRepository,), exec_body=lambda ns, model=model: ns.setdefault("model_type", model), # type: ignore[misc] ) json_path.write_text(encode_json([row.to_dict() for row in repo(session=_session).list()])) async def _dump_table_async(session: "AbstractAsyncContextManager[AsyncSession]") -> None: from advanced_alchemy.repository import SQLAlchemyAsyncRepository async with session as _session: for model in models: json_path = dump_dir / f"{model.__tablename__}.json" console.rule( f"[yellow bold]Dumping table '{json_path.stem}' to '{json_path}'", style="yellow", align="left", ) repo = new_class( "repo", (SQLAlchemyAsyncRepository,), exec_body=lambda ns, model=model: ns.setdefault("model_type", model), # type: ignore[misc] ) json_path.write_text(encode_json([row.to_dict() for row in await repo(session=_session).list()])) dump_dir.mkdir(exist_ok=True) if _is_sync(session): return _dump_table_sync(session) return await _dump_table_async(session)