Services

Services in Advanced Alchemy build upon repositories to provide higher-level business logic, data transformation, and schema validation. While repositories handle raw database operations, services handle the application’s business rules and data transformation needs.

Understanding Services

Services provide:

  • Business logic abstraction

  • Data transformation using Pydantic or Msgspec models

  • Input validation

  • Complex operations involving multiple repositories

  • Consistent error handling

  • Automatic schema validation and transformation

Basic Service Usage

Let’s build upon our blog example by creating services for posts and tags:

from advanced_alchemy.service import SQLAlchemyAsyncRepositoryService
from pydantic import BaseModel
from datetime import datetime
from typing import Optional, List

# Pydantic schemas for validation
class PostCreate(BaseModel):
    title: str
    content: str
    tag_names: List[str]

class PostUpdate(BaseModel):
    title: Optional[str] = None
    content: Optional[str] = None
    published: Optional[bool] = None

class PostResponse(BaseModel):
    id: int
    title: str
    content: str
    published: bool
    published_at: Optional[datetime]
    created_at: datetime
    updated_at: datetime
    tags: List["TagResponse"]

    model_config = {"from_attributes": True}

class PostService(SQLAlchemyAsyncRepositoryService[Post]):
    """Service for managing blog posts with automatic schema validation."""

    repository_type = PostRepository
from advanced_alchemy.service import SQLAlchemyAsyncRepositoryService
from pydantic import BaseModel
from datetime import datetime
from typing import Optional

# Pydantic schemas for validation
class PostCreate(BaseModel):
    title: str
    content: str
    tag_names: list[str]

class PostUpdate(BaseModel):
    title: Optional[str] = None
    content: Optional[str] = None
    published: Optional[bool] = None

class PostResponse(BaseModel):
    id: int
    title: str
    content: str
    published: bool
    published_at: Optional[datetime]
    created_at: datetime
    updated_at: datetime
    tags: list["TagResponse"]

    model_config = {"from_attributes": True}

class PostService(SQLAlchemyAsyncRepositoryService[Post]):
    """Service for managing blog posts with automatic schema validation."""

    repository_type = PostRepository
from advanced_alchemy.service import SQLAlchemyAsyncRepositoryService
from pydantic import BaseModel
from datetime import datetime

# Pydantic schemas for validation
class PostCreate(BaseModel):
    title: str
    content: str
    tag_names: list[str]

class PostUpdate(BaseModel):
    title: str | None = None
    content: str | None = None
    published: bool | None = None

class PostResponse(BaseModel):
    id: int
    title: str
    content: str
    published: bool
    published_at: datetime | None
    created_at: datetime
    updated_at: datetime
    tags: list["TagResponse"]

    model_config = {"from_attributes": True}

class PostService(SQLAlchemyAsyncRepositoryService[Post]):
    """Service for managing blog posts with automatic schema validation."""

    repository_type = PostRepository

Service Operations

Services provide high-level methods for common operations:

async def create_post_with_tags(
    post_service: PostService,
    data: PostCreate,
) -> PostResponse:
    """Create a post with associated tags."""
    # Service automatically validates input using PostCreate schema
    post = await post_service.create(
        data,
        auto_commit=True,
    )
    return post_service.to_schema(post)

async def update_post(
    post_service: PostService,
    post_id: int,
    data: PostUpdate,
) -> PostResponse:
    """Update a post."""
    post = await post_service.update(
        item_id=post_id,
        data=data,
        auto_commit=True,
    )
    return post_service.to_schema(post)

Complex Operations

Services can handle complex business logic involving multiple repositories:

class PostService(SQLAlchemyAsyncRepositoryService[Post]):
    """Higher-level service coordinating posts and tags."""

    default_load_options = [Post.tags]
    repository_type = PostRepository
    match_fields = ["name"]

    def __init__(self, **repo_kwargs: Any) -> None:
        self.repository: PostRepository = self.repository_type(**repo_kwargs)
        self.model_type = self.repository.model_type


    async def create(
        self,
        data: ModelDictT[Post],
        *,
        auto_commit: bool | None = None,
        auto_expunge: bool | None = None,
        auto_refresh: bool | None = None,
        error_messages: ErrorMessages | None | EmptyType = Empty,
    ) -> Post:
        """Create a new post."""
        tags_added: list[str] = []
        if isinstance(data, dict):
            data["id"] = data.get("id", uuid4())
            tags_added = data.pop("tags", [])
        data = await self.to_model(data, "create")
        if tags_added:
            data.tags.extend(
                [
                    await Tag.as_unique_async(self.repository.session, name=tag_text, slug=slugify(tag_text))
                    for tag_text in tags_added
                ],
            )
        await super().create(
            data=data,
            auto_commit=auto_commit,
            auto_expunge=True,
            auto_refresh=False,
            error_messages=error_messages,
        )
        return data

    async def update(
        self,
        data: ModelDictT[Post],
        item_id: Any | None = None,
        *,
        id_attribute: str | InstrumentedAttribute[Any] | None = None,
        attribute_names: Iterable[str] | None = None,
        with_for_update: bool | None = None,
        auto_commit: bool | None = None,
        auto_expunge: bool | None = None,
        auto_refresh: bool | None = None,
        error_messages: ErrorMessages | None | EmptyType = Empty,
        load: LoadSpec | None = None,
        execution_options: dict[str, Any] | None = None,
    ) -> Post:
        """Wrap repository update operation.

        Returns:
            Updated representation.
        """
        tags_updated: list[str] = []
        if isinstance(data, dict):
            tags_updated.extend(data.pop("tags", None) or [])
            data["id"] = item_id
            data = await self.to_model(data, "update")
            existing_tags = [tag.name for tag in data.tags]
            tags_to_remove = [tag for tag in data.tags if tag.name not in tags_updated]
            tags_to_add = [tag for tag in tags_updated if tag not in existing_tags]
            for tag_rm in tags_to_remove:
                data.tags.remove(tag_rm)
            data.tags.extend(
                [
                    await Tag.as_unique_async(self.repository.session, name=tag_text, slug=slugify(tag_text))
                    for tag_text in tags_to_add
                ],
            )
        return await super().update(
            data=data,
            item_id=item_id,
            attribute_names=attribute_names,
            id_attribute=id_attribute,
            load=load,
            execution_options=execution_options,
            with_for_update=with_for_update,
            auto_commit=auto_commit,
            auto_expunge=auto_expunge,
            auto_refresh=auto_refresh,
            error_messages=error_messages,
        )


    async def publish_post(
        self,
        post_id: int,
        publish: bool = True,
    ) -> PostResponse:
        """Publish or unpublish a post with timestamp."""
        data = PostUpdate(
            published=publish,
            published_at=datetime.utcnow() if publish else None,
        )
        post = await self.post_service.update(
            item_id=post_id,
            data=data,
            auto_commit=True,
        )
        return self.post_service.to_schema(post)

    async def get_trending_posts(
        self,
        days: int = 7,
        min_views: int = 100,
    ) -> List[PostResponse]:
        """Get trending posts based on view count and recency."""
        posts = await self.post_service.list(
            Post.published == True,
            Post.created_at > (datetime.utcnow() - timedelta(days=days)),
            Post.view_count >= min_views,
            order_by=[Post.view_count.desc()],
        )
        return self.post_service.to_schema(posts)

    async def to_model(self, data: ModelDictT[Post], operation: str | None = None) -> Post:
        """Convert a dictionary, Msgspec model, or Pydantic model to a Post model."""
        if (is_msgspec_model(data) or is_pydantic_model(data)) and operation == "create" and data.slug is None:
            data.slug = await self.repository.get_available_slug(data.name)
        if (is_msgspec_model(data) or is_pydantic_model(data)) and operation == "update" and data.slug is None:
            data.slug = await self.repository.get_available_slug(data.name)
        if is_dict(data) and "slug" not in data and operation == "create":
            data["slug"] = await self.repository.get_available_slug(data["name"])
        if is_dict(data) and "slug" not in data and "name" in data and operation == "update":
            data["slug"] = await self.repository.get_available_slug(data["name"])
        return await super().to_model(data, operation)

Framework Integration

Services integrate seamlessly with both Litestar and FastAPI. For Litestar integration: