All components are exported from the package root (seedwork).
- Role: Base class for DDD entities. Identity over attributes — two entities are equal when they share the same
idof the same concrete class, regardless of other fields. - Usage: Subclass as
@dataclass(frozen=True, eq=False, kw_only=True)and declareidvia inheritance. Use__post_init__for additional validation. RaisesNullEntityIdErrorifidisNone. - Key methods:
__eq__compares byidwhen both objects are the same concrete class.__hash__is based onid._evolve(**changes) -> Selfreturns a new instance with the given fields replaced.
- Role: Root of an aggregate. Single entry point for state changes. Accumulates domain events without side effects — all behavior methods return new instances.
- Fields:
domain_events: tuple[DomainEvent, ...]— immutable, keyword-only, excluded fromrepr,hash, andeq. Passed to the constructor when seeding events (e.g. in factory class methods). - Key methods:
_evolve(**changes) -> Self— inherited fromEntity; produces a new instance with updated fields._record(*events) -> Self— returns a new instance with the given events appended todomain_events. - Usage pattern: Use two factory patterns:
open/createfor new aggregates — passdomain_events=(event,)in the constructor;reconstitutefor loading from persistence — pass nodomain_events(those have already been published). Behavior methods chain_evolve(state_change)._record(event)and return the new instance.DomainEventPublishingRepositoryreadsdomain_eventsand publishes aftersave.
- Role: Immutable domain concept defined entirely by its attributes. Subclass as
@dataclass(frozen=True, kw_only=True). Equality and hashing are structural — delegated to the dataclass. - Usage: Declare fields directly on the subclass. Use
__post_init__for validation; raise aDomainErrorsubclass on invalid input (co-located in the same file). All fields are keyword-only.
DomainEvent— Protocol defining the structural interface for domain events:id: strandoccurred_at: datetime.DomainEventRecord[TPayload]— frozen dataclass; declarespayload: TPayloadfirst, thenid: str(default UUID) andoccurred_at: datetime(default UTC now).- Pattern: define a frozen dataclass
Payload, then a frozen dataclass event extendingDomainEventRecord[Payload]. Name events in past tense. Keep payload fields primitive (serializable).
@dataclass(frozen=True)
class MoneyDepositedPayload:
account_id: str
amount: float
currency: str
@dataclass(frozen=True)
class MoneyDeposited(DomainEventRecord[MoneyDepositedPayload]):
pass- Methods:
find_by_id(entity_id: TId) -> TAggregate | None,save(aggregate: TAggregate) -> None,delete_by_id(entity_id: TId) -> None. All areasync. - Define a typed sub-interface in the domain layer; implement in infrastructure.
- Protocol (structural — no inheritance required). Implementations must provide
__aenter__(self) -> Selfand__aexit__(self, exc_type, exc_val, exc_tb) -> None.__aexit__should commit whenexc_type is Noneand roll back otherwise —TransactionalCommandBusrelies on this contract.
- Base
Exceptionsubclass. Constructor(message: str, code: str). Exposesself.code. Always subclass with a named class —DomainErroritself is not meant to be raised directly.
Result.succeeded()/Result.failed(errors: list[ResultError]). Check withresult.ok: bool..errors: tuple[ResultError, ...](immutable).- Use for expected domain failures at the application boundary; let infrastructure exceptions propagate.
Command— frozen dataclass base. Subclass as@dataclass(frozen=True, kw_only=True)and declare fields directly.CommandHandler[TCommand]— Protocol.execute(self, command: TCommand) -> None(async).CommandBus— Protocol.dispatch(self, command: Command) -> Result(async).
Query[TResult]— generic frozen dataclass base. Subclass as@dataclass(frozen=True, kw_only=True)and declare the result type as a type parameter:class MyQuery(Query[MyResponse]).QueryHandler[TQuery, TResult]— Protocol.execute(self, query: TQuery) -> TResult | None(async). ReturnNoneto signal absence.QueryBus— Protocol.ask(self, query: Query[TResult]) -> TResult | None(async). The return type is inferred from the query's type parameter — noAny, no cast at the call site.
- Protocol.
publish(self, events: Sequence[DomainEvent]) -> None(async). Accepts any sequence — tuples fromaggregate.domain_eventsare passed directly. - Do not inject into command handlers — use
DomainEventPublishingRepositoryinstead.
- Protocol.
handle(self, event: TEvent) -> None(async).
- Routes commands to handlers via in-process registry keyed by command class.
register(command_type, handler),dispatch(command) -> Result.- Catches
DomainErrorand converts toResult.failed. All other exceptions propagate.
bus = RegistryCommandBus()
bus.register(OpenAccountCommand, OpenAccountHandler(repo))
result = await bus.dispatch(OpenAccountCommand(account_id="acc-1", initial_balance=100.0))
result.ok # True
# DomainError → Result.failed
result = await bus.dispatch(...) # handler raises InsufficientFundsError
result.ok # False
result.errors[0].code # "INSUFFICIENT_FUNDS"- Same registry pattern for queries.
register(query_type, handler),ask(query) -> TResult | None. The bus is generically typed, so the return type matches the registered query handler result type. - Raises
KeyErrorwhen no handler is registered for the query type.
bus = RegistryQueryBus()
bus.register(GetBalanceQuery, GetBalanceHandler(read_repo))
balance = await bus.ask(GetBalanceQuery(account_id="acc-1"))
# balance: BalanceResponse | None- Decorator. Wraps dispatch in the
UnitOfWorkcontext manager (async with unit_of_work). Commit and rollback are the context manager's responsibility.
bus = TransactionalCommandBus(inner=registry_bus, unit_of_work=uow)
# Every dispatch runs inside: async with uow: inner.dispatch(command)- Decorator. Reads
aggregate.domain_eventsand callspublisher.publish(aggregate.domain_events)after everysave.delete_by_idandfind_by_iddelegate without side effects.
repo = DomainEventPublishingRepository(inner=BankAccountRepositoryImpl(), publisher=my_publisher)
account = BankAccount.open(BankAccountId("acc-1"), Money(amount=100.0, currency="EUR"))
await repo.save(account)
# inner_repo.save is called first, then publisher.publish(account.domain_events).register(command_type, handler)— wire handler (last registration wins)..with_transaction(unit_of_work)— addTransactionalCommandBus..use(middleware: Callable[[CommandBus], CommandBus])— add custom middleware..build() -> CommandBus— return assembled bus.- Declaration order = stack order; first declared = outermost.
bus = (
CommandBusBuilder()
.register(OpenAccountCommand, OpenAccountHandler(repo))
.register(DepositMoneyCommand, DepositMoneyHandler(repo))
.with_transaction(uow)
.build()
)
result = await bus.dispatch(DepositMoneyCommand(account_id="acc-1", amount=50.0, currency="EUR")).register(query_type, handler)— wire handler..use(middleware: Callable[[QueryBus], QueryBus])— add custom middleware..build() -> QueryBus— return assembled bus.
bus = (
QueryBusBuilder()
.register(GetBalanceQuery, GetBalanceHandler(read_repo))
.build()
)
balance = await bus.ask(GetBalanceQuery(account_id="acc-1"))- Generic in-memory
Repositoryimplementation backed by a plaindict. Intended for use in tests and as a starting point for proof-of-concept implementations. - Satisfies the
Repository[TId, TAggregate]Protocol structurally — no inheritance declaration needed. - All three methods (
find_by_id,save,delete_by_id) areasyncand match theRepositorycontract exactly.
repo: InMemoryRepository[BankAccountId, BankAccount] = InMemoryRepository()
await repo.save(account)
found = await repo.find_by_id(BankAccountId("acc-1"))