Services

Services in Advanced Alchemy build upon repositories to provide higher-level business logic, data transformation, and schema validation. While repositories handle raw database operations, services handle the application’s business rules and data transformation needs.

Understanding Services

Services provide:

  • Business logic abstraction

  • Data transformation using Pydantic or Msgspec models

  • Input validation

  • Complex operations involving multiple repositories

  • Consistent error handling

  • Automatic schema validation and transformation

Basic Service Usage

Let’s build upon our blog example by creating services for posts and tags:

import datetime
from typing import Optional, List

from advanced_alchemy.service import SQLAlchemyAsyncRepositoryService
from pydantic import BaseModel

# Pydantic schemas for validation
class PostCreate(BaseModel):
    title: str
    content: str
    tag_names: List[str]

class PostUpdate(BaseModel):
    title: Optional[str] = None
    content: Optional[str] = None
    published: Optional[bool] = None

class PostResponse(BaseModel):
    id: int
    title: str
    content: str
    published: bool
    published_at: Optional[datetime.datetime]
    created_at: datetime.datetime
    updated_at: datetime.datetime
    tags: List["TagResponse"]

    model_config = {"from_attributes": True}

class PostService(SQLAlchemyAsyncRepositoryService[Post]):
    """Service for managing blog posts with automatic schema validation."""

    repository_type = PostRepository
import datetime
from typing import Optional

from advanced_alchemy.service import SQLAlchemyAsyncRepositoryService
from pydantic import BaseModel

# Pydantic schemas for validation
class PostCreate(BaseModel):
    title: str
    content: str
    tag_names: list[str]

class PostUpdate(BaseModel):
    title: Optional[str] = None
    content: Optional[str] = None
    published: Optional[bool] = None

class PostResponse(BaseModel):
    id: int
    title: str
    content: str
    published: bool
    published_at: Optional[datetime.datetime]
    created_at: datetime.datetime
    updated_at: datetime.datetime
    tags: list["TagResponse"]

    model_config = {"from_attributes": True}

class PostService(SQLAlchemyAsyncRepositoryService[Post]):
    """Service for managing blog posts with automatic schema validation."""

    repository_type = PostRepository
import datetime

from advanced_alchemy.service import SQLAlchemyAsyncRepositoryService
from pydantic import BaseModel

# Pydantic schemas for validation
class PostCreate(BaseModel):
    title: str
    content: str
    tag_names: list[str]

class PostUpdate(BaseModel):
    title: str | None = None
    content: str | None = None
    published: bool | None = None

class PostResponse(BaseModel):
    id: int
    title: str
    content: str
    published: bool
    published_at: datetime.datetime | None
    created_at: datetime.datetime
    updated_at: datetime.datetime
    tags: list["TagResponse"]

    model_config = {"from_attributes": True}

class PostService(SQLAlchemyAsyncRepositoryService[Post]):
    """Service for managing blog posts with automatic schema validation."""

    repository_type = PostRepository

Service Operations

Services provide high-level methods for common operations:

async def create_post(
    post_service: PostService,
    data: PostCreate,
) -> PostResponse:
    """Create a post with associated tags."""
    post = await post_service.create(
        data,
        auto_commit=True,
    )
    return post_service.to_schema(post, schema_type=PostResponse)

async def update_post(
    post_service: PostService,
    post_id: int,
    data: PostUpdate,
) -> PostResponse:
    """Update a post."""
    post = await post_service.update(
        item_id=post_id,
        data=data,
        auto_commit=True,
    )
    return post_service.to_schema(post, schema_type=PostResponse)

Complex Operations

Services can handle complex business logic involving multiple models. The code below shows a service coordinating posts and tags.

Note

The following example assumes the existence of the Post model defined in Many-to-Many Relationships and the Tag model defined in Using UniqueMixin.

from typing import List

from advanced_alchemy.service import SQLAlchemyAsyncRepositoryService
from advanced_alchemy.service.typing import ModelDictT

from .models import Post, Tag

class PostService(SQLAlchemyAsyncRepositoryService[Post, PostRepository]):

    default_load_options = [Post.tags]
    repository_type = PostRepository
    match_fields = ["name"]

    # Override creation behavior to handle tags
    async def create(self, data: ModelDictT[Post], **kwargs) -> Post:
        """Create a new post with tags, if provided."""
        tags_added: list[str] = []
        if isinstance(data, dict):
            data["id"] = data.get("id", uuid4())
            tags_added = data.pop("tags", [])
        data = await self.to_model(data, "create")
        if tags_added:
            data.tags.extend(
                [
                    await Tag.as_unique_async(self.repository.session, name=tag_text, slug=slugify(tag_text))
                    for tag_text in tags_added
                ],
            )
        return await super().create(data=data, **kwargs)

    # Override update behavior to handle tags
    async def update(
        self,
        data: ModelDictT[Post],
        item_id: Any | None = None,
        **kwargs,
    ) -> Post:
        """Update a post with tags, if provided."""
        tags_updated: list[str] = []
        if isinstance(data, dict):
            tags_updated.extend(data.pop("tags", None) or [])
            data["id"] = item_id
            data = await self.to_model(data, "update")
            existing_tags = [tag.name for tag in data.tags]
            tags_to_remove = [tag for tag in data.tags if tag.name not in tags_updated]
            tags_to_add = [tag for tag in tags_updated if tag not in existing_tags]
            for tag_rm in tags_to_remove:
                data.tags.remove(tag_rm)
            data.tags.extend(
                [
                    await Tag.as_unique_async(self.repository.session, name=tag_text, slug=slugify(tag_text))
                    for tag_text in tags_to_add
                ],
            )
        return await super().update(
            data=data,
            item_id=item_id,
            **kwargs,
        )

    # A custom write operation
    async def publish_post(
        self,
        post_id: int,
        publish: bool = True,
    ) -> PostResponse:
        """Publish or unpublish a post with timestamp."""
        data = PostUpdate(
            published=publish,
            published_at=datetime.datetime.utcnow() if publish else None,
        )
        post = await self.repository.update(
            item_id=post_id,
            data=data,
            auto_commit=True,
        )
        return self.to_schema(post, schema_type=PostResponse)

    # A custom read operation
    async def get_trending_posts(
        self,
        days: int = 7,
        min_views: int = 100,
    ) -> List[PostResponse]:
        """Get trending posts based on view count and recency."""
        posts = await self.post_service.list(
            Post.published == True,
            Post.created_at > (datetime.datetime.utcnow() - timedelta(days=days)),
            Post.view_count >= min_views,
            order_by=[Post.view_count.desc()],
        )
        return self.post_service.to_schema(posts, schema_type=PostResponse)

    # Override the default `to_model` to handle slugs
    async def to_model(self, data: ModelDictT[Post], operation: str | None = None) -> Post:
        """Convert a dictionary, msgspec Struct, or Pydantic model to a Post model. """
        if (is_msgspec_struct(data) or is_pydantic_model(data)) and operation in {"create", "update"} and data.slug is None:
            data.slug = await self.repository.get_available_slug(data.name)
        if is_dict(data) and "slug" not in data and operation == "create":
            data["slug"] = await self.repository.get_available_slug(data["name"])
        if is_dict(data) and "slug" not in data and "name" in data and operation == "update":
            data["slug"] = await self.repository.get_available_slug(data["name"])
        return await super().to_model(data, operation)
from advanced_alchemy.service import SQLAlchemyAsyncRepositoryService
from advanced_alchemy.service.typing import ModelDictT

from .models import Post, Tag

class PostService(SQLAlchemyAsyncRepositoryService[Post, PostRepository]):

    default_load_options = [Post.tags]
    repository_type = PostRepository
    match_fields = ["name"]

    # Override creation behavior to handle tags
    async def create(self, data: ModelDictT[Post], **kwargs) -> Post:
        """Create a new post with tags, if provided."""
        tags_added: list[str] = []
        if isinstance(data, dict):
            data["id"] = data.get("id", uuid4())
            tags_added = data.pop("tags", [])
        data = await self.to_model(data, "create")
        if tags_added:
            data.tags.extend(
                [
                    await Tag.as_unique_async(self.repository.session, name=tag_text, slug=slugify(tag_text))
                    for tag_text in tags_added
                ],
            )
        return await super().create(data=data, **kwargs)

    # Override update behavior to handle tags
    async def update(
        self,
        data: ModelDictT[Post],
        item_id: Any | None = None,
        **kwargs,
    ) -> Post:
        """Update a post with tags, if provided."""
        tags_updated: list[str] = []
        if isinstance(data, dict):
            tags_updated.extend(data.pop("tags", None) or [])
            data["id"] = item_id
            data = await self.to_model(data, "update")
            existing_tags = [tag.name for tag in data.tags]
            tags_to_remove = [tag for tag in data.tags if tag.name not in tags_updated]
            tags_to_add = [tag for tag in tags_updated if tag not in existing_tags]
            for tag_rm in tags_to_remove:
                data.tags.remove(tag_rm)
            data.tags.extend(
                [
                    await Tag.as_unique_async(self.repository.session, name=tag_text, slug=slugify(tag_text))
                    for tag_text in tags_to_add
                ],
            )
        return await super().update(
            data=data,
            item_id=item_id,
            **kwargs,
        )

    # A custom write operation
    async def publish_post(
        self,
        post_id: int,
        publish: bool = True,
    ) -> PostResponse:
        """Publish or unpublish a post with timestamp."""
        data = PostUpdate(
            published=publish,
            published_at=datetime.datetime.utcnow() if publish else None,
        )
        post = await self.repository.update(
            item_id=post_id,
            data=data,
            auto_commit=True,
        )
        return self.to_schema(post, schema_type=PostResponse)

    # A custom read operation
    async def get_trending_posts(
        self,
        days: int = 7,
        min_views: int = 100,
    ) -> list[PostResponse]:
        """Get trending posts based on view count and recency."""
        posts = await self.post_service.list(
            Post.published == True,
            Post.created_at > (datetime.datetime.utcnow() - timedelta(days=days)),
            Post.view_count >= min_views,
            order_by=[Post.view_count.desc()],
        )
        return self.post_service.to_schema(posts, schema_type=PostResponse)

    # Override the default `to_model` to handle slugs
    async def to_model(self, data: ModelDictT[Post], operation: str | None = None) -> Post:
        """Convert a dictionary, msgspec Struct, or Pydantic model to a Post model. """
        if (is_msgspec_struct(data) or is_pydantic_model(data)) and operation in {"create", "update"} and data.slug is None:
            data.slug = await self.repository.get_available_slug(data.name)
        if is_dict(data) and "slug" not in data and operation == "create":
            data["slug"] = await self.repository.get_available_slug(data["name"])
        if is_dict(data) and "slug" not in data and "name" in data and operation == "update":
            data["slug"] = await self.repository.get_available_slug(data["name"])
        return await super().to_model(data, operation)

Framework Integration

Services integrate seamlessly with both Litestar and FastAPI.