These standards apply to projects built on top of this package (consuming bounded contexts). They describe how consumer code should use the seedwork building blocks, not how the seedwork library itself is implemented.
- Python 3.12+. Use
from __future__ import annotationsin all files. - Immutable value types: frozen dataclasses (
@dataclass(frozen=True)). - Async/await throughout application and infrastructure layers.
- Type hints everywhere. No
Anyin domain or application code. Protocolfor ports/contracts.@dataclassfor data-carrying types.__post_init__for validation in all dataclasses (Commands, Queries, Value Objects, and others).
| Do | Don't |
|---|---|
Protocol for ports |
ABC for ports (use only for base classes with shared logic) |
| Frozen dataclass for immutability (VOs, Commands, Queries, Aggregates, Entities, ...) | @dataclass without frozen=True |
__post_init__ for validation |
Dedicated validator classes or external validation layers |
T_co / T_contra variance suffixes |
Plain T for covariant/contravariant TypeVars |
Publish integration events from DomainEventHandler |
Publish integration events from aggregate |
An Entity is a domain object with a durable identity — two instances with the same id are the same entity regardless of their other fields. eq=False on the dataclass delegates equality to the inherited Entity.__eq__, which compares by id only.
from __future__ import annotations
from dataclasses import dataclass
from typing import NewType
from uuid import UUID
from seedwork.domain import Entity
AccountId = NewType("AccountId", UUID)
UserId = NewType("UserId", UUID)
@dataclass(frozen=True, eq=False, kw_only=True)
class Account(Entity[AccountId]):
owner_id: UserId
balance: int
def validate(self) -> None:
pass # add domain invariants here if neededidis inherited fromEntity[TId]— do not re-declare it in the subclass.eq=Falseis required — equality is by identity, not by field values.- Domain logic (invariants, rules) lives in
validate(), not in handlers. - Use
NewTypefor typed identifiers — catches mix-ups betweenAccountIdandUserIdat the type-checker level with zero runtime overhead.
- Re-declare
idin the subclass. - Put orchestration, I/O, or repository calls inside an entity.
A Value Object models a domain concept with no identity — two instances with the same data are interchangeable. Immutability is enforced by frozen=True; validity is enforced at construction via validate().
from __future__ import annotations
from dataclasses import dataclass
from seedwork.domain import DomainError, ValueObject
class NegativeAmountError(DomainError):
def __init__(self) -> None:
super().__init__("Amount cannot be negative", "NEGATIVE_AMOUNT")
@dataclass(frozen=True, kw_only=True)
class Money(ValueObject):
amount: float
currency: str
def validate(self) -> None:
if self.amount < 0:
raise NegativeAmountError()- Always
frozen=Trueandkw_only=True. validate()is called byValueObject.__post_init__; subclasses override it to enforce invariants.- Return new instances from transformation methods — never mutate in place.
- Raise
DomainErrorsubclasses invalidate().
- Raise
ValueErroror generic exceptions — always raiseDomainErrorsubclasses. - Use a Value Object for a concept that needs to be tracked individually over time — that is an Entity.
AggregateRoot extends Entity and acts as the consistency boundary of the aggregate cluster — all external writes must go through it. It is a frozen dataclass: every state-change method must return a new instance via _evolve(**kwargs)._record(*events).
from __future__ import annotations
from dataclasses import dataclass
from typing import NewType
from uuid import UUID
from seedwork.domain import AggregateRoot
UserId = NewType("UserId", UUID)
@dataclass(frozen=True, eq=False, kw_only=True)
class BankAccount(AggregateRoot[BankAccountId]):
owner_id: UserId
balance: Money
def validate(self) -> None:
pass
@classmethod
def open(cls, id: BankAccountId, owner_id: UserId, initial_balance: Money) -> BankAccount:
return cls(id=id, owner_id=owner_id, balance=initial_balance)._record(
AccountOpened.create(
initial_balance=initial_balance.amount,
currency=initial_balance.currency,
aggregate_id=str(id),
)
)
def deposit(self, amount: Money) -> BankAccount:
new_balance = Money(amount=self.balance.amount + amount.amount, currency=self.balance.currency)
return self._evolve(balance=new_balance)._record(
MoneyDeposited.create(
amount=amount.amount,
aggregate_id=str(self.id),
)
)domain_eventsistuple[DomainEvent, ...](immutable)._record(*events)returns a new instance with the events appended.idis inherited fromEntity[TId]— do not re-declare it in the subclass.- The
DomainEventPublishingRepositoryreadsdomain_eventsafter eachsave(). TheDeferredDomainEventBusdeduplicates byevent.id— no manual clearing needed.
- Use
_evolve(**kwargs)._record(*events)for all state changes. - Use class factory methods (
open,reconstitute) — never expose the constructor directly to application code.
- Mutate state in place —
AggregateRootis frozen. - Put orchestration, I/O, or repository calls inside the aggregate.
- Re-declare
idin the subclass.
A Domain Event represents something that happened in the domain — a meaningful business fact, not a technical operation. Events are always recorded by the aggregate root itself via _record(*events); no external code creates or injects them.
Use a frozen payload dataclass for the event fields. BaseDomainEvent[TPayload] provides id and occurred_at with defaults; aggregate_id is a required constructor argument — pass it explicitly to identify the emitting aggregate.
from __future__ import annotations
from dataclasses import dataclass
from seedwork.domain.domain_event import BaseDomainEvent
@dataclass(frozen=True)
class AccountOpenedPayload:
initial_balance: float
currency: str
@dataclass(frozen=True)
class AccountOpened(BaseDomainEvent[AccountOpenedPayload]):
@classmethod
def create(cls, initial_balance: float, currency: str, aggregate_id: str) -> AccountOpened:
return cls(
payload=AccountOpenedPayload(initial_balance=initial_balance, currency=currency),
aggregate_id=aggregate_id,
)Usage from the aggregate:
AccountOpened.create(
initial_balance=initial_balance.amount,
currency=initial_balance.currency,
aggregate_id=str(self.id),
)- Named in past tense:
AccountOpened,MoneyDeposited. - Keep payloads minimal — use a frozen payload dataclass with primitive or value object fields.
- Routing is done with
isinstance/type()— notypeorversionfields needed.
- Use a
create()classmethod as factory — callers pass plain data, the factory constructs the payload internally. This decouples the aggregate from the payload structure.
- Call the constructor directly from the aggregate — always go through
create(). - Add
typeorversionfields — domain events are internal to the bounded context. - Create domain events outside the aggregate — handlers and repositories must never instantiate events directly.
- Use domain events to communicate to other services — use Integration Events for that.
Deletion note. Deletion is not automatically a domain event. Use
delete_by_idfor technical removal with no business significance. If deletion is a meaningful domain occurrence (e.g. closing an account), model it as an aggregate behaviour (account.close()) that records the event — the handler then callssave()ordelete_by_id()as appropriate. Never usedelete_by_idwhen a domain event must be raised.
A Repository is a domain port — defined in the domain layer as a Protocol, implemented in infrastructure. Its sole concern is the persistence of aggregates: it abstracts storage behind a collection-like interface so the domain never depends on a specific technology. Only aggregate roots have repositories; child entities and value objects are persisted as part of their aggregate, never independently.
from __future__ import annotations
from typing import Protocol
from .account import BankAccount
class BankAccountRepository(Repository[BankAccountId, BankAccount], Protocol):
pass- The interface is defined in the domain layer; the concrete implementation lives in infrastructure.
ProtocolwithRepository[TId, TAgg]as base — structural typing, no coupling between port and adapter.- Method signatures (
find_by_id,save,delete_by_id) are inherited fromRepository; no need to redeclare them. passbody is intentional — redeclaring methods is redundant and couples the port to the base class's naming.
- Add query methods (
find_by_email,find_by_status) to the domain repository — define a read port in the application layer. - Return ORM models from the repository — return domain types only.
Domain errors represent business rule violations — named, typed, and defined in the domain layer. Each distinct invariant gets its own class extending DomainError.
from seedwork.domain import DomainError
class InsufficientFundsError(DomainError):
def __init__(self) -> None:
super().__init__("Insufficient funds", "INSUFFICIENT_FUNDS")
class AccountNotFoundError(DomainError):
def __init__(self, account_id: str) -> None:
super().__init__(f"Account {account_id} not found", "ACCOUNT_NOT_FOUND")- The
codestring is the machine-readable identifier — bake it into__init__so call sites only supply domain-specific arguments. - The
RegistryCommandBuscatchesDomainErrorand wraps it in a failedResultautomatically — handlers never catch them. - Define one subclass per distinct business rule violation.
- Raise
ValueErroror generic exceptions for business rule violations. - Catch
DomainErrorin the handler — let the bus convert it toResult.failed.
A Command expresses an intent to change state — it carries the input data and validates it in __post_init__. A CommandHandler receives a guaranteed-valid command, loads or creates an aggregate, calls the domain method, and saves. No business logic lives in the handler.
from __future__ import annotations
from dataclasses import dataclass
from seedwork.application import Command, CommandHandler
from seedwork.domain import DomainError
class InvalidInitialBalanceError(DomainError):
def __init__(self) -> None:
super().__init__("initial_balance must be non-negative", "INVALID_INITIAL_BALANCE")
@dataclass(frozen=True, kw_only=True)
class OpenAccountCommand(Command):
account_id: str
owner_id: str
initial_balance: float
currency: str
def __post_init__(self) -> None:
if self.initial_balance < 0:
raise InvalidInitialBalanceError()
class OpenAccountCommandHandler(CommandHandler[OpenAccountCommand]):
def __init__(self, repository: BankAccountRepository) -> None:
self._repository = repository
async def handle(self, command: OpenAccountCommand) -> None:
account = BankAccount.open(
id=BankAccountId(command.account_id),
owner_id=UserId(command.owner_id),
initial_balance=Money(amount=command.initial_balance, currency=command.currency),
)
await self._repository.save(account)- The handler's sole responsibility: obtain aggregate (or create new) → call domain method → save. No business logic.
__post_init__validates the command before the handler runs — the handler receives a guaranteed-valid command.handle()returnsNone. The bus wraps the outcome inResult.ok()on success orResult.failed(errors)onDomainError.
- Put business logic or domain conditions in the handler.
- Call
publish(aggregate.domain_events)—DomainEventPublishingRepositorydoes this automatically aftersave(). - Return values from
handle().
A Query expresses an intent to read data — always read-only. A QueryHandler returns T | None and never loads full aggregates nor triggers side effects.
from __future__ import annotations
from dataclasses import dataclass
from seedwork.application import Query, QueryHandler
@dataclass(frozen=True, kw_only=True)
class GetAccountQuery(Query[AccountView]):
account_id: str
@dataclass(frozen=True, kw_only=True)
class AccountView:
id: str
balance: float
class GetAccountQueryHandler(QueryHandler[GetAccountQuery, AccountView]):
def __init__(self, repository: AccountReadRepository) -> None:
self._repository = repository
async def handle(self, query: GetAccountQuery) -> AccountView | None:
return await self._repository.find_view(query.account_id)Query[TResult]is generic — declare the response type on each query subclass.handle()returnsT | None— returnNonefor not-found cases.- Use a read repository (application-layer port) that returns projections, not the domain repository.
- Load full aggregates just to extract a few fields.
- Trigger side effects from a query handler — no
save(), no domain events. - Return domain aggregates as query results.
Result is the return type of every command dispatch: ok on success, failed with typed errors on a domain rule violation. It eliminates unchecked exceptions from the application boundary.
# Command bus returns Result — inspect at the entry point
result = await command_bus.dispatch(command)
if result.is_failed: # @property bool
return error_response(result.errors)
# also: result.is_okResult.ok()— success.Result.failed(errors)— domain failure. Both are class methods.result.is_okandresult.is_failedare@propertyon the dataclass (no parentheses).
from seedwork.infrastructure import CommandBusBuilder, RegistryCommandBus
registry = RegistryCommandBus()
command_bus = (
CommandBusBuilder(registry)
.register(OpenAccountCommand, OpenAccountCommandHandler(repository))
.with_transaction(uow) # optional — TransactionalCommandBus
.with_domain_event_coordination(event_bus) # DomainEventCoordinatorCommandBus
.build()
)- Order:
with_transaction(outermost) →with_domain_event_coordination→ registry (innermost). with_domain_event_coordination(event_bus)callsdispatch()on success anddiscard()on failure.
- Always use
CommandBusBuilder— never instantiate bus decorators manually.
- Place
with_domain_event_coordination()outsidewith_transaction()— events must be dispatched within the open transaction.
An Integration Event communicates a meaningful business fact from this bounded context to the outside world. Unlike domain events (internal, synchronous), integration events cross service boundaries and are delivered asynchronously. They carry a stable, versioned contract: once published, their schema must not break consumers.
Concrete event — use BaseIntegrationEvent and a from_domain_event() factory:
from __future__ import annotations
from typing import TYPE_CHECKING, ClassVar
from uuid import uuid4
from seedwork.application.integration_events import BaseIntegrationEvent
from .request_context import correlation_id as _correlation_id
if TYPE_CHECKING:
from .events import AccountOpened
class AccountOpenedIntegrationEvent(BaseIntegrationEvent):
TYPE: ClassVar[str] = "bank_account.account_opened"
VERSION: ClassVar[str] = "1.0"
@classmethod
def from_domain_event(cls, event: AccountOpened) -> AccountOpenedIntegrationEvent:
return cls(
type=cls.TYPE,
version=cls.VERSION,
aggregate_id=event.aggregate_id,
payload={
"account_id": event.aggregate_id,
"initial_balance": event.payload.initial_balance,
"currency": event.payload.currency,
},
correlation_id=_correlation_id.get(str(uuid4())), # from execution context
causation_id=event.id, # domain event that caused this
)Consumer side (Subscriber entrypoint):
from seedwork.application import IntegrationEventHandler
class AccountOpenedIntegrationEventHandler(IntegrationEventHandler[AccountOpenedIntegrationEvent]):
async def handle(self, event: AccountOpenedIntegrationEvent) -> None:
# React to an integration event from another bounded context
...TYPEandVERSIONare class-level constants passed toBaseIntegrationEvent.correlation_idfrom execution context (ContextVar) — not from the domain event.causation_id=event.id(the domain event that triggered this).publish()takes aSequence[IntegrationEvent]— pass[event]even for a single event.
- Version from day 1; bump on breaking payload changes.
- Use a
from_domain_event()factory classmethod on the integration event class.
- Publish from the aggregate or the command handler — always publish from
DomainEventHandler. - Omit
correlation_id. - Mutate an existing integration event schema — introduce a new version instead.
A Background Task defers work that must happen eventually but does not need to complete within the current request — sending emails, triggering webhooks, calling external APIs. Tasks are written to an outbox before the transaction commits, guaranteeing at-least-once execution by a worker even if the process crashes mid-flight.
from __future__ import annotations
from typing import TYPE_CHECKING, ClassVar
from uuid import uuid4
from seedwork.application.background_tasks import BaseBackgroundTask
from .request_context import correlation_id as _correlation_id
if TYPE_CHECKING:
from .events import AccountOpened
class SendWelcomeEmailTask(BaseBackgroundTask):
TYPE: ClassVar[str] = "send_welcome_email"
@classmethod
def from_domain_event(cls, event: AccountOpened) -> SendWelcomeEmailTask:
return cls(
type=cls.TYPE,
payload={"account_id": event.aggregate_id},
correlation_id=_correlation_id.get(str(uuid4())),
causation_id=event.id,
)
class SendWelcomeEmailTaskHandler(TaskHandler[SendWelcomeEmailTask]):
async def handle(self, task: SendWelcomeEmailTask) -> None:
# idempotent — may be retried
...TYPEclass attribute as discriminator for routing.correlation_idfrom execution context — not from the domain event.causation_id=event.id(the domain event that triggered this).
- Design handlers idempotent — the task runner may deliver more than once.
- Schedule from a
DomainEventHandlerviaawait task_scheduler.schedule(task).
- Schedule tasks from the aggregate or the command handler.
- Omit
correlation_id. - Use background tasks to communicate with other services — use integration events for that.
correlation_id is a cross-cutting tracing concern set at the entry point. The Command does not carry it.
# shared module, e.g. request_context.py
from contextvars import ContextVar
from uuid import uuid4
correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")
# Entry point (API controller, subscriber)
def set_correlation_id(incoming_id: str | None = None) -> None:
correlation_id.set(incoming_id or str(uuid4()))contextvars.ContextVarpropagates automatically through asyncio tasks — no explicit parameter threading needed.- Set once per request at the entry point; read anywhere via
correlation_id.get().
- Thread
correlation_idthrough function or handler signatures — use theContextVardirectly. - Read
correlation_idfrom the domain event — domain events do not carry it.
A DomainEventHandler reacts to a domain event inside the same transaction. It is the only place where integration events are published and background tasks are scheduled — never from the aggregate or the command handler.
from seedwork.application import DomainEventHandler
from .events import AccountOpened
from .request_context import correlation_id as _correlation_id
class AccountOpenedDomainEventHandler(DomainEventHandler[AccountOpened]):
def __init__(
self,
publisher: IntegrationEventPublisher,
task_scheduler: TaskScheduler,
) -> None:
self._publisher = publisher
self._task_scheduler = task_scheduler
async def handle(self, event: AccountOpened) -> None:
ie = AccountOpenedIntegrationEvent.from_domain_event(event)
await self._publisher.publish([ie])
task = SendWelcomeEmailTask.from_domain_event(event)
await self._task_scheduler.schedule(task)- Runs synchronously inside the same transaction (dispatched by
DomainEventBus.dispatch()). - Both
publish()andschedule()write to the outbox inside the current transaction — actual delivery is eventual. _correlation_id.get()reads from the ambient ContextVar — set once per request at the entry point.
- Inject a repository into a
DomainEventHandler— it must not load or save aggregates. - Read
correlation_idfrom the domain event — it does not carry it.
Decorates a repository to publish domain events after each save:
from seedwork.infrastructure import DomainEventPublishingRepository
publishing_repo = DomainEventPublishingRepository(
SqlAlchemyBankAccountRepository(session),
event_bus, # satisfies DomainEventBusPublisher
)- Takes
DomainEventBusPublisher(the segregated interface). - After
save(), readsaggregate.domain_eventsand callsevent_bus.publish(events). - The command handler remains unaware of event publishing.
from seedwork.infrastructure import (
CommandBusBuilder,
DeferredDomainEventBus,
DomainEventPublishingRepository,
RegistryCommandBus,
)
from seedwork.testing import InMemoryIntegrationEventPublisher, InMemoryTaskScheduler
event_bus = DeferredDomainEventBus()
publisher = InMemoryIntegrationEventPublisher() # or OutboxIntegrationEventPublisher
scheduler = InMemoryTaskScheduler() # or OutboxTaskScheduler
scheduler.register(SendWelcomeEmailTask.TYPE, SendWelcomeEmailTaskHandler())
event_bus.subscribe(AccountOpened, AccountOpenedDomainEventHandler(publisher, scheduler))
account_repo = DomainEventPublishingRepository(
SqlAlchemyBankAccountRepository(session),
event_bus,
)
registry = RegistryCommandBus()
command_bus = (
CommandBusBuilder(registry)
.register(OpenAccountCommand, OpenAccountCommandHandler(account_repo))
.with_transaction(uow)
.with_domain_event_coordination(event_bus)
.build()
)dispatch() processes the buffered events; discard() clears without processing. The buffer is a dict[str, DomainEvent] keyed by event.id — saving the same aggregate twice does not duplicate events.
from seedwork.testing import (
InMemoryRepository, # + RepositorySpy (all, reset)
InMemoryIntegrationEventPublisher, # + IntegrationEventPublisherSpy (published, reset)
InMemoryTaskScheduler, # + TaskSchedulerSpy (scheduled, register, execute_scheduled, reset)
InMemoryIntegrationEventOutboxRepository,
InMemoryTaskOutboxRepository,
)# spy usage in tests
publisher = InMemoryIntegrationEventPublisher()
scheduler = InMemoryTaskScheduler()
scheduler.register(SendWelcomeEmailTask.TYPE, SendWelcomeEmailTaskHandler())
await command_bus.dispatch(OpenAccountCommand(...))
assert len(publisher.published) == 1
assert publisher.published[0].type == "bank_account.account_opened"
await scheduler.execute_scheduled() # executes all scheduled tasks in-process
assert len(scheduler.scheduled) == 0 # tasks were consumedInMemoryIntegrationEventPublisheris spy-only — do not execute, integration events target other bounded contexts.InMemoryTaskSchedulersupports both spy inspection (scheduled) and execution (execute_scheduled()).reset()clears state between tests.
| Concept | Convention | Example |
|---|---|---|
| Aggregate / Entity | PascalCase |
BankAccount, Transaction |
| Value Object | PascalCase |
Money, BankAccountId |
| Domain Event | Past tense PascalCase |
AccountOpened, MoneyDeposited |
| Domain Event Payload | Past tense + Payload suffix |
AccountOpenedPayload |
| Integration Event | Past tense + IntegrationEvent suffix |
AccountOpenedIntegrationEvent |
| Command | Imperative + Command |
OpenAccountCommand |
| Query | Noun phrase + Query |
GetBalanceQuery |
| Command Handler | Imperative + Handler |
OpenAccountHandler |
| Query Handler | Noun phrase + Handler |
GetBalanceHandler |
| Domain Event Handler | Noun phrase + DomainEventHandler |
AccountOpenedDomainEventHandler |
| Background Task | Imperative + Task |
SendWelcomeEmailTask |
| Task Handler | Imperative + TaskHandler |
SendWelcomeEmailTaskHandler |
| Repository (port) | {Aggregate}Repository |
BankAccountRepository |
| Repository (impl) | {ORM}{Aggregate}Repository |
SqlAlchemyBankAccountRepository |
| Module file | snake_case.py |
bank_account_repository.py |
Python ABCs enforce implementation via isinstance checks. Protocol achieves structural subtyping — the implementor never imports the port. This is the correct dependency-inversion direction for hexagonal architecture.
Use ABCs only when a base class carries shared, non-trivial logic (e.g., BaseBackgroundTask, BaseDomainEvent, AggregateRoot).
# Port — Protocol, zero coupling to domain
# 'pass' body: Repository[BankAccountId, BankAccount] already carries the contract
class BankAccountRepository(Repository[BankAccountId, BankAccount], Protocol):
pass
# Adapter — no import of the Protocol required
class SqlAlchemyBankAccountRepository:
async def find_by_id(self, id: BankAccountId) -> BankAccount | None:
...When a generic Protocol is covariant or contravariant, name the TypeVar explicitly:
from typing import TypeVar
T_co = TypeVar("T_co", covariant=True)
T_contra = TypeVar("T_contra", contravariant=True)
class QueryHandler(Protocol[T_contra, T_co]):
async def handle(self, query: T_contra) -> T_co: ...from typing import Protocol, runtime_checkable
from seedwork.testing import RepositorySpy
# RepositorySpy is @runtime_checkable — isinstance() works at runtime
assert isinstance(repo, RepositorySpy)
assert len(repo.all) == 1
repo.reset() # between testsKeep spy interfaces out of production code paths — import them only in tests and InMemory implementations.