Services¶
Services in Advanced Alchemy build upon repositories to provide higher-level business logic, data transformation, and schema validation. While repositories handle raw database operations, services handle the application’s business rules and data transformation needs.
Understanding Services¶
Services provide:
Business logic abstraction
Data transformation using Pydantic or Msgspec models
Input validation
Complex operations involving multiple repositories
Consistent error handling
Automatic schema validation and transformation
Basic Service Usage¶
Let’s build upon our blog example by creating services for posts and tags:
import datetime
from typing import Optional, List
from advanced_alchemy.service import SQLAlchemyAsyncRepositoryService
from pydantic import BaseModel
# Pydantic schemas for validation
class PostCreate(BaseModel):
title: str
content: str
tag_names: List[str]
class PostUpdate(BaseModel):
title: Optional[str] = None
content: Optional[str] = None
published: Optional[bool] = None
class PostResponse(BaseModel):
id: int
title: str
content: str
published: bool
published_at: Optional[datetime.datetime]
created_at: datetime.datetime
updated_at: datetime.datetime
tags: List["TagResponse"]
model_config = {"from_attributes": True}
class PostService(SQLAlchemyAsyncRepositoryService[Post]):
"""Service for managing blog posts with automatic schema validation."""
repository_type = PostRepository
import datetime
from typing import Optional
from advanced_alchemy.service import SQLAlchemyAsyncRepositoryService
from pydantic import BaseModel
# Pydantic schemas for validation
class PostCreate(BaseModel):
title: str
content: str
tag_names: list[str]
class PostUpdate(BaseModel):
title: Optional[str] = None
content: Optional[str] = None
published: Optional[bool] = None
class PostResponse(BaseModel):
id: int
title: str
content: str
published: bool
published_at: Optional[datetime.datetime]
created_at: datetime.datetime
updated_at: datetime.datetime
tags: list["TagResponse"]
model_config = {"from_attributes": True}
class PostService(SQLAlchemyAsyncRepositoryService[Post]):
"""Service for managing blog posts with automatic schema validation."""
repository_type = PostRepository
import datetime
from advanced_alchemy.service import SQLAlchemyAsyncRepositoryService
from pydantic import BaseModel
# Pydantic schemas for validation
class PostCreate(BaseModel):
title: str
content: str
tag_names: list[str]
class PostUpdate(BaseModel):
title: str | None = None
content: str | None = None
published: bool | None = None
class PostResponse(BaseModel):
id: int
title: str
content: str
published: bool
published_at: datetime.datetime | None
created_at: datetime.datetime
updated_at: datetime.datetime
tags: list["TagResponse"]
model_config = {"from_attributes": True}
class PostService(SQLAlchemyAsyncRepositoryService[Post]):
"""Service for managing blog posts with automatic schema validation."""
repository_type = PostRepository
Service Operations¶
Services provide high-level methods for common operations:
async def create_post(
post_service: PostService,
data: PostCreate,
) -> PostResponse:
"""Create a post with associated tags."""
post = await post_service.create(
data,
auto_commit=True,
)
return post_service.to_schema(post, schema_type=PostResponse)
async def update_post(
post_service: PostService,
post_id: int,
data: PostUpdate,
) -> PostResponse:
"""Update a post."""
post = await post_service.update(
item_id=post_id,
data=data,
auto_commit=True,
)
return post_service.to_schema(post, schema_type=PostResponse)
Complex Operations¶
Services can handle complex business logic involving multiple models. The code below shows a service coordinating posts and tags.
Note
The following example assumes the existence of the
Post
model defined in Many-to-Many Relationships and the
Tag
model defined in Using UniqueMixin.
from typing import List
from advanced_alchemy.service import SQLAlchemyAsyncRepositoryService
from advanced_alchemy.service.typing import ModelDictT
from .models import Post, Tag
class PostService(SQLAlchemyAsyncRepositoryService[Post, PostRepository]):
default_load_options = [Post.tags]
repository_type = PostRepository
match_fields = ["name"]
# Override creation behavior to handle tags
async def create(self, data: ModelDictT[Post], **kwargs) -> Post:
"""Create a new post with tags, if provided."""
tags_added: list[str] = []
if isinstance(data, dict):
data["id"] = data.get("id", uuid4())
tags_added = data.pop("tags", [])
data = await self.to_model(data, "create")
if tags_added:
data.tags.extend(
[
await Tag.as_unique_async(self.repository.session, name=tag_text, slug=slugify(tag_text))
for tag_text in tags_added
],
)
return await super().create(data=data, **kwargs)
# Override update behavior to handle tags
async def update(
self,
data: ModelDictT[Post],
item_id: Any | None = None,
**kwargs,
) -> Post:
"""Update a post with tags, if provided."""
tags_updated: list[str] = []
if isinstance(data, dict):
tags_updated.extend(data.pop("tags", None) or [])
data["id"] = item_id
data = await self.to_model(data, "update")
existing_tags = [tag.name for tag in data.tags]
tags_to_remove = [tag for tag in data.tags if tag.name not in tags_updated]
tags_to_add = [tag for tag in tags_updated if tag not in existing_tags]
for tag_rm in tags_to_remove:
data.tags.remove(tag_rm)
data.tags.extend(
[
await Tag.as_unique_async(self.repository.session, name=tag_text, slug=slugify(tag_text))
for tag_text in tags_to_add
],
)
return await super().update(
data=data,
item_id=item_id,
**kwargs,
)
# A custom write operation
async def publish_post(
self,
post_id: int,
publish: bool = True,
) -> PostResponse:
"""Publish or unpublish a post with timestamp."""
data = PostUpdate(
published=publish,
published_at=datetime.datetime.utcnow() if publish else None,
)
post = await self.repository.update(
item_id=post_id,
data=data,
auto_commit=True,
)
return self.to_schema(post, schema_type=PostResponse)
# A custom read operation
async def get_trending_posts(
self,
days: int = 7,
min_views: int = 100,
) -> List[PostResponse]:
"""Get trending posts based on view count and recency."""
posts = await self.post_service.list(
Post.published == True,
Post.created_at > (datetime.datetime.utcnow() - timedelta(days=days)),
Post.view_count >= min_views,
order_by=[Post.view_count.desc()],
)
return self.post_service.to_schema(posts, schema_type=PostResponse)
# Override the default `to_model` to handle slugs
async def to_model(self, data: ModelDictT[Post], operation: str | None = None) -> Post:
"""Convert a dictionary, msgspec Struct, or Pydantic model to a Post model. """
if (is_msgspec_struct(data) or is_pydantic_model(data)) and operation in {"create", "update"} and data.slug is None:
data.slug = await self.repository.get_available_slug(data.name)
if is_dict(data) and "slug" not in data and operation == "create":
data["slug"] = await self.repository.get_available_slug(data["name"])
if is_dict(data) and "slug" not in data and "name" in data and operation == "update":
data["slug"] = await self.repository.get_available_slug(data["name"])
return await super().to_model(data, operation)
from advanced_alchemy.service import SQLAlchemyAsyncRepositoryService
from advanced_alchemy.service.typing import ModelDictT
from .models import Post, Tag
class PostService(SQLAlchemyAsyncRepositoryService[Post, PostRepository]):
default_load_options = [Post.tags]
repository_type = PostRepository
match_fields = ["name"]
# Override creation behavior to handle tags
async def create(self, data: ModelDictT[Post], **kwargs) -> Post:
"""Create a new post with tags, if provided."""
tags_added: list[str] = []
if isinstance(data, dict):
data["id"] = data.get("id", uuid4())
tags_added = data.pop("tags", [])
data = await self.to_model(data, "create")
if tags_added:
data.tags.extend(
[
await Tag.as_unique_async(self.repository.session, name=tag_text, slug=slugify(tag_text))
for tag_text in tags_added
],
)
return await super().create(data=data, **kwargs)
# Override update behavior to handle tags
async def update(
self,
data: ModelDictT[Post],
item_id: Any | None = None,
**kwargs,
) -> Post:
"""Update a post with tags, if provided."""
tags_updated: list[str] = []
if isinstance(data, dict):
tags_updated.extend(data.pop("tags", None) or [])
data["id"] = item_id
data = await self.to_model(data, "update")
existing_tags = [tag.name for tag in data.tags]
tags_to_remove = [tag for tag in data.tags if tag.name not in tags_updated]
tags_to_add = [tag for tag in tags_updated if tag not in existing_tags]
for tag_rm in tags_to_remove:
data.tags.remove(tag_rm)
data.tags.extend(
[
await Tag.as_unique_async(self.repository.session, name=tag_text, slug=slugify(tag_text))
for tag_text in tags_to_add
],
)
return await super().update(
data=data,
item_id=item_id,
**kwargs,
)
# A custom write operation
async def publish_post(
self,
post_id: int,
publish: bool = True,
) -> PostResponse:
"""Publish or unpublish a post with timestamp."""
data = PostUpdate(
published=publish,
published_at=datetime.datetime.utcnow() if publish else None,
)
post = await self.repository.update(
item_id=post_id,
data=data,
auto_commit=True,
)
return self.to_schema(post, schema_type=PostResponse)
# A custom read operation
async def get_trending_posts(
self,
days: int = 7,
min_views: int = 100,
) -> list[PostResponse]:
"""Get trending posts based on view count and recency."""
posts = await self.post_service.list(
Post.published == True,
Post.created_at > (datetime.datetime.utcnow() - timedelta(days=days)),
Post.view_count >= min_views,
order_by=[Post.view_count.desc()],
)
return self.post_service.to_schema(posts, schema_type=PostResponse)
# Override the default `to_model` to handle slugs
async def to_model(self, data: ModelDictT[Post], operation: str | None = None) -> Post:
"""Convert a dictionary, msgspec Struct, or Pydantic model to a Post model. """
if (is_msgspec_struct(data) or is_pydantic_model(data)) and operation in {"create", "update"} and data.slug is None:
data.slug = await self.repository.get_available_slug(data.name)
if is_dict(data) and "slug" not in data and operation == "create":
data["slug"] = await self.repository.get_available_slug(data["name"])
if is_dict(data) and "slug" not in data and "name" in data and operation == "update":
data["slug"] = await self.repository.get_available_slug(data["name"])
return await super().to_model(data, operation)
Framework Integration¶
Services integrate seamlessly with both Litestar and FastAPI.