"""Service object implementation for SQLAlchemy.
RepositoryService object is generic on the domain model type which
should be a SQLAlchemy model.
"""
from functools import lru_cache
from typing import (
TYPE_CHECKING,
Annotated,
Any,
TypeVar,
Union,
cast,
overload,
)
from sqlalchemy import RowMapping
from typing_extensions import TypeAlias, TypeGuard
from advanced_alchemy.service._typing import (
ATTRS_INSTALLED,
CATTRS_INSTALLED,
LITESTAR_INSTALLED,
MSGSPEC_INSTALLED,
PYDANTIC_INSTALLED,
UNSET,
AttrsInstance,
AttrsLike,
BaseModel,
BaseModelLike,
DictProtocol,
DTOData,
DTODataLike,
FailFast,
Struct,
StructLike,
T,
TypeAdapter,
UnsetType,
attrs_nothing,
convert,
)
from advanced_alchemy.service._typing import attrs_asdict as asdict
from advanced_alchemy.service._typing import attrs_fields as fields
from advanced_alchemy.service._typing import attrs_has as has
from advanced_alchemy.service._typing import cattrs_structure as structure
from advanced_alchemy.service._typing import cattrs_unstructure as unstructure
if TYPE_CHECKING:
from collections.abc import Sequence
from sqlalchemy.engine.row import Row
from advanced_alchemy.filters import StatementFilter
from advanced_alchemy.repository.typing import ModelT
PYDANTIC_USE_FAILFAST = False # leave permanently disabled for now
FilterTypeT = TypeVar("FilterTypeT", bound="StatementFilter")
"""Type variable for filter types.
:class:`~advanced_alchemy.filters.StatementFilter`
"""
SupportedSchemaModel: TypeAlias = Union[StructLike, BaseModelLike, AttrsLike]
"""Type alias for objects that support schema conversion methods (model_dump, asdict, etc.)."""
ModelDTOT = TypeVar("ModelDTOT", bound="Union[SupportedSchemaModel, Any]")
"""Type variable for model DTOs.
:class:`msgspec.Struct`|:class:`pydantic.BaseModel`|:class:`attrs class`
"""
PydanticOrMsgspecT = SupportedSchemaModel
"""Type alias for supported schema models.
:class:`msgspec.Struct` or :class:`pydantic.BaseModel` or :class:`attrs class`
"""
ModelDictT: TypeAlias = "Union[dict[str, Any], ModelT, SupportedSchemaModel, DTODataLike[ModelT], Any]"
"""Type alias for model dictionaries.
Represents:
- :type:`dict[str, Any]` | :class:`~advanced_alchemy.base.ModelProtocol` | :class:`msgspec.Struct` | :class:`pydantic.BaseModel` | :class:`attrs class` | :class:`litestar.dto.data_structures.DTOData` | :class:`~advanced_alchemy.base.ModelProtocol`
"""
ModelDictListT: TypeAlias = "Sequence[Union[dict[str, Any], ModelT, SupportedSchemaModel, Any]]"
"""Type alias for model dictionary lists.
A list or sequence of any of the following:
- :type:`Sequence`[:type:`dict[str, Any]` | :class:`~advanced_alchemy.base.ModelProtocol` | :class:`msgspec.Struct` | :class:`pydantic.BaseModel` | :class:`attrs class`]
"""
BulkModelDictT: TypeAlias = (
"Union[Sequence[Union[dict[str, Any], ModelT, SupportedSchemaModel, Any]], DTODataLike[list[ModelT]]]"
)
"""Type alias for bulk model dictionaries.
:type:`Sequence`[ :type:`dict[str, Any]` | :class:`~advanced_alchemy.base.ModelProtocol` | :class:`msgspec.Struct` | :class:`pydantic.BaseModel` | :class:`attrs class`] | :class:`litestar.dto.data_structures.DTOData`
"""
@lru_cache(typed=True)
def get_type_adapter(f: "type[T]") -> Any:
"""Caches and returns a pydantic type adapter.
Args:
f: Type to create a type adapter for.
Returns:
:class:`pydantic.TypeAdapter`[:class:`typing.TypeVar`[T]]
"""
if PYDANTIC_USE_FAILFAST:
return TypeAdapter(Annotated[f, FailFast()]) # pyright: ignore
return TypeAdapter(f)
@lru_cache(maxsize=128, typed=True)
def get_attrs_fields(cls: Any) -> "tuple[Any, ...]":
"""Caches and returns attrs fields for a given attrs class.
Args:
cls: attrs class to get fields for.
Returns:
Tuple of attrs fields.
"""
if ATTRS_INSTALLED:
return fields(cls) # type: ignore[no-any-return]
return ()
[docs]
def is_dto_data(v: Any) -> TypeGuard[DTODataLike[Any]]:
"""Check if a value is a Litestar DTOData object.
Args:
v: Value to check.
Returns:
bool
"""
return LITESTAR_INSTALLED and isinstance(v, DTOData)
[docs]
def is_pydantic_model(v: Any) -> TypeGuard[BaseModelLike]:
"""Check if a value is a pydantic model.
Args:
v: Value to check.
Returns:
bool
"""
if not PYDANTIC_INSTALLED:
return False
if isinstance(v, type):
try:
return issubclass(v, BaseModel)
except TypeError:
return False
return isinstance(v, BaseModel)
[docs]
def is_msgspec_struct(v: Any) -> TypeGuard[StructLike]:
"""Check if a value is a msgspec struct.
Args:
v: Value to check.
Returns:
bool
"""
return MSGSPEC_INSTALLED and isinstance(v, Struct)
[docs]
def is_attrs_instance(obj: Any) -> TypeGuard[AttrsLike]:
"""Check if a value is an attrs class instance.
Args:
obj: Value to check.
Returns:
bool
"""
return ATTRS_INSTALLED and has(obj.__class__)
[docs]
def is_attrs_schema(cls: Any) -> TypeGuard["type[AttrsLike]"]:
"""Check if a class type is an attrs schema.
Args:
cls: Class to check.
Returns:
bool
"""
return ATTRS_INSTALLED and has(cls)
def is_dataclass(obj: Any) -> TypeGuard[Any]:
"""Check if an object is a dataclass."""
return hasattr(obj, "__dataclass_fields__")
def is_dataclass_with_field(obj: Any, field_name: str) -> TypeGuard[object]: # Can't specify dataclass type directly
"""Check if an object is a dataclass and has a specific field."""
return is_dataclass(obj) and hasattr(obj, field_name)
def is_dataclass_without_field(obj: Any, field_name: str) -> TypeGuard[object]:
"""Check if an object is a dataclass and does not have a specific field."""
return is_dataclass(obj) and not hasattr(obj, field_name)
[docs]
def is_attrs_instance_with_field(v: Any, field_name: str) -> TypeGuard[AttrsLike]:
"""Check if an attrs instance has a specific field.
Args:
v: Value to check.
field_name: Field name to check for.
Returns:
bool
"""
return is_attrs_instance(v) and hasattr(v, field_name)
[docs]
def is_attrs_instance_without_field(v: Any, field_name: str) -> TypeGuard[AttrsLike]:
"""Check if an attrs instance does not have a specific field.
Args:
v: Value to check.
field_name: Field name to check for.
Returns:
bool
"""
return is_attrs_instance(v) and not hasattr(v, field_name)
[docs]
def is_dict(v: Any) -> TypeGuard[dict[str, Any]]:
"""Check if a value is a dictionary.
Args:
v: Value to check.
Returns:
bool
"""
return isinstance(v, dict)
def has_dict_attribute(obj: Any) -> "TypeGuard[DictProtocol]":
"""Check if an object has a __dict__ attribute.
Args:
obj: Value to check.
Returns:
bool
"""
# Protocol checking returns True for None, so add explicit check
return obj is not None and isinstance(obj, DictProtocol)
def is_row_mapping(v: Any) -> TypeGuard["RowMapping"]:
"""Check if a value is a SQLAlchemy RowMapping.
Args:
v: Value to check.
Returns:
bool
"""
return isinstance(v, RowMapping)
[docs]
def is_dict_with_field(v: Any, field_name: str) -> TypeGuard[dict[str, Any]]:
"""Check if a dictionary has a specific field.
Args:
v: Value to check.
field_name: Field name to check for.
Returns:
bool
"""
return is_dict(v) and field_name in v
[docs]
def is_dict_without_field(v: Any, field_name: str) -> TypeGuard[dict[str, Any]]:
"""Check if a dictionary does not have a specific field.
Args:
v: Value to check.
field_name: Field name to check for.
Returns:
bool
"""
return is_dict(v) and field_name not in v
[docs]
def is_pydantic_model_with_field(v: Any, field_name: str) -> TypeGuard[BaseModelLike]:
"""Check if a pydantic model has a specific field.
Args:
v: Value to check.
field_name: Field name to check for.
Returns:
bool
"""
return is_pydantic_model(v) and hasattr(v, field_name)
[docs]
def is_pydantic_model_without_field(v: Any, field_name: str) -> TypeGuard[BaseModelLike]:
"""Check if a pydantic model does not have a specific field.
Args:
v: Value to check.
field_name: Field name to check for.
Returns:
bool
"""
return is_pydantic_model(v) and not hasattr(v, field_name)
[docs]
def is_msgspec_struct_with_field(v: Any, field_name: str) -> TypeGuard[StructLike]:
"""Check if a msgspec struct has a specific field.
Args:
v: Value to check.
field_name: Field name to check for.
Returns:
bool
"""
return is_msgspec_struct(v) and hasattr(v, field_name)
[docs]
def is_msgspec_struct_without_field(v: Any, field_name: str) -> "TypeGuard[StructLike]":
"""Check if a msgspec struct does not have a specific field.
Args:
v: Value to check.
field_name: Field name to check for.
Returns:
bool
"""
return is_msgspec_struct(v) and not hasattr(v, field_name)
[docs]
def is_schema(v: Any) -> "TypeGuard[SupportedSchemaModel]":
"""Check if a value is a msgspec Struct, Pydantic model, or attrs instance.
Args:
v: Value to check.
Returns:
bool
"""
return is_msgspec_struct(v) or is_pydantic_model(v) or is_attrs_instance(v)
[docs]
def is_schema_or_dict(v: Any) -> "TypeGuard[Union[SupportedSchemaModel, dict[str, Any]]]":
"""Check if a value is a msgspec Struct, Pydantic model, attrs class, or dict.
Args:
v: Value to check.
Returns:
bool
"""
return is_schema(v) or is_dict(v)
[docs]
def is_schema_with_field(v: Any, field_name: str) -> "TypeGuard[SupportedSchemaModel]":
"""Check if a value is a msgspec Struct, Pydantic model, or attrs instance with a specific field.
Args:
v: Value to check.
field_name: Field name to check for.
Returns:
bool
"""
return (
is_msgspec_struct_with_field(v, field_name)
or is_pydantic_model_with_field(v, field_name)
or is_attrs_instance_with_field(v, field_name)
)
[docs]
def is_schema_without_field(v: Any, field_name: str) -> "TypeGuard[SupportedSchemaModel]":
"""Check if a value is a msgspec Struct, Pydantic model, or attrs instance without a specific field.
Args:
v: Value to check.
field_name: Field name to check for.
Returns:
bool
"""
return not is_schema_with_field(v, field_name)
[docs]
def is_schema_or_dict_with_field(v: Any, field_name: str) -> "TypeGuard[Union[SupportedSchemaModel, dict[str, Any]]]":
"""Check if a value is a msgspec Struct, Pydantic model, attrs instance, or dict with a specific field.
Args:
v: Value to check.
field_name: Field name to check for.
Returns:
bool
"""
return is_schema_with_field(v, field_name) or is_dict_with_field(v, field_name)
[docs]
def is_schema_or_dict_without_field(
v: Any, field_name: str
) -> "TypeGuard[Union[SupportedSchemaModel, dict[str, Any]]]":
"""Check if a value is a msgspec Struct, Pydantic model, attrs instance, or dict without a specific field.
Args:
v: Value to check.
field_name: Field name to check for.
Returns:
bool
"""
return not is_schema_or_dict_with_field(v, field_name)
@overload
def schema_dump(data: "RowMapping", exclude_unset: bool = True) -> "dict[str, Any]": ...
@overload
def schema_dump(data: "Row[Any]", exclude_unset: bool = True) -> "dict[str, Any]": ...
@overload
def schema_dump(data: "DTODataLike[Any]", exclude_unset: bool = True) -> "dict[str, Any]": ...
@overload
def schema_dump(data: "ModelT", exclude_unset: bool = True) -> "ModelT": ... # pyright: ignore[reportOverlappingOverload]
@overload
def schema_dump(
data: Any,
exclude_unset: bool = True,
) -> "dict[str, Any]": ...
[docs]
def schema_dump(
data: "Union[dict[str, Any], ModelT, SupportedSchemaModel, DTODataLike[ModelT], RowMapping, Row[Any]]",
exclude_unset: bool = True,
) -> "Union[dict[str, Any], ModelT]":
"""Dump a data object to a dictionary.
Args:
data: :type:`dict[str, Any]` | :class:`advanced_alchemy.base.ModelProtocol` | :class:`msgspec.Struct` | :class:`pydantic.BaseModel` | :class:`attrs class` | :class:`litestar.dto.data_structures.DTOData[ModelT]` | :class:`sqlalchemy.RowMapping` | :class:`sqlalchemy.engine.row.Row`
exclude_unset: :type:`bool` Whether to exclude unset values.
Returns:
Union[:type: dict[str, Any], :class:`~advanced_alchemy.base.ModelProtocol`]
"""
if is_dict(data):
return data
if is_row_mapping(data):
return dict(data)
if is_pydantic_model(data):
return data.model_dump(exclude_unset=exclude_unset)
if is_msgspec_struct(data):
if exclude_unset:
return {
f: getattr(data, f)
for f in data.__struct_fields__
if hasattr(data, f) and getattr(data, f) is not UNSET
}
return {f: getattr(data, f, None) for f in data.__struct_fields__}
if is_attrs_instance(data):
if exclude_unset:
# Filter out attrs.NOTHING values for partial updates
def filter_unset_attrs(attr: Any, value: Any) -> bool: # noqa: ARG001
return value is not attrs_nothing
return asdict(data, filter=filter_unset_attrs)
# Use cattrs for enhanced performance and type-aware serialization when available
if CATTRS_INSTALLED:
return unstructure(data) # type: ignore[no-any-return]
# Fallback to basic attrs.asdict when cattrs is not available
return asdict(data)
if is_dto_data(data):
return cast("dict[str, Any]", data.as_builtins())
if has_dict_attribute(data):
return data.__dict__
return cast("ModelT", data) # type: ignore[no-return-any]
__all__ = (
"ATTRS_INSTALLED",
"CATTRS_INSTALLED",
"LITESTAR_INSTALLED",
"MSGSPEC_INSTALLED",
"PYDANTIC_INSTALLED",
"PYDANTIC_USE_FAILFAST",
"UNSET",
"AttrsInstance",
"AttrsLike",
"BaseModel",
"BaseModelLike",
"BulkModelDictT",
"DTOData",
"DTODataLike",
"FailFast",
"FilterTypeT",
"ModelDTOT",
"ModelDictListT",
"ModelDictT",
"PydanticOrMsgspecT",
"Struct",
"StructLike",
"SupportedSchemaModel",
"TypeAdapter",
"UnsetType",
"asdict",
"attrs_nothing",
"convert",
"fields",
"get_attrs_fields",
"get_type_adapter",
"has",
"is_attrs_instance",
"is_attrs_instance_with_field",
"is_attrs_instance_without_field",
"is_attrs_schema",
"is_dataclass",
"is_dataclass_with_field",
"is_dataclass_without_field",
"is_dict",
"is_dict_with_field",
"is_dict_without_field",
"is_dto_data",
"is_msgspec_struct",
"is_msgspec_struct_with_field",
"is_msgspec_struct_without_field",
"is_pydantic_model",
"is_pydantic_model_with_field",
"is_pydantic_model_without_field",
"is_row_mapping",
"is_schema",
"is_schema_or_dict",
"is_schema_or_dict_with_field",
"is_schema_or_dict_without_field",
"is_schema_with_field",
"is_schema_without_field",
"schema_dump",
"structure",
"unstructure",
)