diff --git a/CLAUDE.md b/CLAUDE.md index 2e190d6..a560d24 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -30,7 +30,7 @@ Commit messages are enforced by a `commit-msg` pre-commit hook using Conventiona This is a **library** (`src/seedwork/`), not an application. It ships DDD and Hexagonal Architecture building blocks. Consuming projects import from `seedwork` (everything is re-exported from the top-level `__init__.py`). -The library is split into three layers that mirror DDD's dependency rule — domain has no outward imports, application depends only on domain, infrastructure depends on both: +The library is split into three layers that enforce the dependency rule of Hexagonal Architecture — domain has no outward imports, application depends only on domain, infrastructure depends on both: ```text seedwork.domain → pure Python, no framework @@ -54,6 +54,8 @@ seedwork.infrastructure → concrete bus/repository implementations **`DomainEventPublishingRepository` is a repository decorator.** Do not publish events inside command handlers. Wrap the concrete repository at composition time; it reads `aggregate.domain_events` and calls `publisher.publish` after every `save`. +**Domain events use a `create()` factory.** Domain event classes expose a `create()` classmethod that accepts plain data and constructs the payload internally. Aggregate methods call `EventClass.create(...)` rather than the constructor directly — this decouples the aggregate from the payload structure. + ## Type-checking constraints - `pyright` runs in `typeCheckingMode = "strict"` — no `# type: ignore` in `src/` or `tests/`. @@ -63,6 +65,7 @@ seedwork.infrastructure → concrete bus/repository implementations ## Testing patterns - Domain: unit-test aggregates and value objects directly — no mocks needed. -- Command handlers: use `InMemoryRepository[TId, TAggregate]` (ships with the library) + a spy `DomainEventPublisher`. +- Command handlers: use `InMemoryRepository[TId, TAggregate]` from `seedwork.testing` — assert on `repo.find_by_id()` and `aggregate.domain_events` directly. - Query handlers: use an inline in-memory read repository (a plain class satisfying the read `Protocol`). +- Integration and task side-effects: use `InMemoryIntegrationEventPublisher` and `InMemoryTaskScheduler` from `seedwork.testing` — both expose spy attributes (`published`, `scheduled`) and a `reset()` method. - Coverage gate is 90% on `src/seedwork/` — running `make test` will fail if it drops below. diff --git a/docs/coding-standards.md b/docs/coding-standards.md index 5a658cd..fd677a3 100644 --- a/docs/coding-standards.md +++ b/docs/coding-standards.md @@ -1,226 +1,454 @@ # Coding Standards -These standards apply to projects built on top of this package. +> These standards apply to projects **built on top of this package** (consuming bounded contexts). They describe how consumer code should use the seedwork building blocks, not how the seedwork library itself is implemented. ## Python baseline -- `pyright` with `typeCheckingMode = "strict"` always — no `# type: ignore` in domain or application layers except where unavoidable at generic boundaries (documented in this package itself). -- All domain classes are immutable frozen dataclasses. Use `__post_init__` for validation. -- No `Any` in domain or application layers. Infrastructure may use it only at adapter boundaries (e.g. ORM row dicts). -- `ruff` with `select = ["E", "F", "I", "UP", "B", "SIM", "ANN"]` for linting and formatting. +- Python 3.12+. Use `from __future__ import annotations` in all files. +- Immutable value types: frozen dataclasses (`@dataclass(frozen=True)`). +- Async/await throughout application and infrastructure layers. +- Type hints everywhere. No `Any` in domain or application code. +- `Protocol` for ports/contracts. `@dataclass` for data-carrying types. +- `__post_init__` for validation in all dataclasses (Commands, Queries, Value Objects, and others). --- -## Do and Don't — Overview +## Do / Don't overview | Do | Don't | |---|---| -| Keep domain free of framework and infrastructure imports | Import framework types, ORMs, or HTTP in the domain layer | -| One use case = one command/query + one handler | Put multiple use cases in a single handler | -| Return new instances from behavior methods via `_evolve`/`_record` | Mutate aggregate state in place | -| Raise a `DomainError` subclass for domain violations | Raise generic `Exception` or `ValueError` from domain code | -| Name events in past tense (`MoneyDeposited`) | Name events like commands (`DepositMoney`) | -| Reference other aggregates by their ID value object only | Hold object references to other aggregate roots | -| Use primitives in command/query constructors | Leak domain types through port interfaces when avoidable | -| Stack buses: `with_transaction → registry` | Open a transaction without a wrapping bus | -| One public class per file; file name matches class in `snake_case` | Put multiple unrelated classes in one file | +| `Protocol` for ports | `ABC` for ports (use only for base classes with shared logic) | +| Frozen dataclass for immutability (VOs, Commands, Queries, Aggregates, Entities, ...) | `@dataclass` without `frozen=True` | +| `__post_init__` for validation | Dedicated validator classes or external validation layers | +| `T_co` / `T_contra` variance suffixes | Plain `T` for covariant/contravariant TypeVars | +| Publish integration events from `DomainEventHandler` | Publish integration events from aggregate | --- ## Domain layer -### Entities +### Entity -- Subclass `Entity[ID]` as `@dataclass(frozen=True, eq=False, kw_only=True)`. The `ID` type parameter can be a `ValueObject` subclass (for IDs with structural rules) or a `NewType` alias over a primitive (for simple typed wrappers with no validation). Prefer `NewType` for IDs that only need nominal typing — it avoids the `.value` accessor and carries zero runtime overhead. -- Equality is by identity only — never compare entities by attributes. -- Use `_evolve(**changes)` to produce a new instance with updated fields. Do not expose public setters. +An `Entity` is a domain object with a durable identity — two instances with the same `id` are the same entity regardless of their other fields. `eq=False` on the dataclass delegates equality to the inherited `Entity.__eq__`, which compares by `id` only. -### Value objects +```python +from __future__ import annotations +from dataclasses import dataclass +from typing import NewType +from uuid import UUID + +from seedwork.domain import Entity + +AccountId = NewType("AccountId", UUID) +UserId = NewType("UserId", UUID) + +@dataclass(frozen=True, eq=False, kw_only=True) +class Account(Entity[AccountId]): + owner_id: UserId + balance: int + + def validate(self) -> None: + pass # add domain invariants here if needed +``` + +#### Key points + +- `id` is inherited from `Entity[TId]` — do not re-declare it in the subclass. +- `eq=False` is required — equality is by identity, not by field values. +- Domain logic (invariants, rules) lives in `validate()`, not in handlers. +- Use `NewType` for typed identifiers — catches mix-ups between `AccountId` and `UserId` at the type-checker level with zero runtime overhead. + +#### Don't + +- Re-declare `id` in the subclass. +- Put orchestration, I/O, or repository calls inside an entity. + +--- + +### Value Object + +A Value Object models a domain concept with no identity — two instances with the same data are interchangeable. Immutability is enforced by `frozen=True`; validity is enforced at construction via `validate()`. + +```python +from __future__ import annotations +from dataclasses import dataclass + +from seedwork.domain import DomainError, ValueObject + + +class NegativeAmountError(DomainError): + def __init__(self) -> None: + super().__init__("Amount cannot be negative", "NEGATIVE_AMOUNT") -- Subclass `ValueObject` as `@dataclass(frozen=True, kw_only=True)`. Declare fields directly on the subclass. Use `__post_init__` for validation; raise a `DomainError` subclass on invalid input (co-located in the same file as the value object). -- Equality and hashing are structural and delegated to the dataclass — all fields participate automatically. -- All constructors are keyword-only: `Money(amount=10.0, currency="EUR")`. -### Aggregates +@dataclass(frozen=True, kw_only=True) +class Money(ValueObject): + amount: float + currency: str -- Subclass `AggregateRoot[ID]` as `@dataclass(frozen=True, eq=False, kw_only=True)`. Use class methods (`open`, `create`, `reconstitute`) as named factories. -- Behavior methods return a new instance — chain `_evolve(state_changes)._record(event)`. -- `reconstitute` class methods pass no `domain_events` — those have already been published. -- Expose only the root to the outside; internal entities and value objects are not shared directly. + def validate(self) -> None: + if self.amount < 0: + raise NegativeAmountError() +``` -### Domain events +#### Key points -- Subclass `DomainEventRecord[TPayload]` as a `@dataclass(frozen=True)`. -- Define `TPayload` as a separate `@dataclass(frozen=True)` with primitive-only fields. -- Name events in past tense (`AccountOpened`, `MoneyDeposited`). -- Pass the payload as `DomainEventRecord(payload=...)` — `id` and `occurred_at` are auto-generated. +- Always `frozen=True` and `kw_only=True`. +- `validate()` is called by `ValueObject.__post_init__`; subclasses override it to enforce invariants. +- Return new instances from transformation methods — never mutate in place. -### Repositories +#### Do -- Define repository interfaces in the domain layer, extending `Repository[ID, T]`. -- Implement in infrastructure. -- Do not add query methods that return DTOs or expose persistence details in the interface. +- Raise `DomainError` subclasses in `validate()`. -### Errors +#### Don't -- `DomainError` is a base class — always subclass with a named class: `class InsufficientFundsError(DomainError): ...`. Do not raise `DomainError` directly. -- The class name carries the ubiquitous language; `code` carries the external contract identifier for API mapping. -- Do not catch infrastructure exceptions in the domain layer. +- Raise `ValueError` or generic exceptions — always raise `DomainError` subclasses. +- Use a Value Object for a concept that needs to be tracked individually over time — that is an Entity. --- -## Application layer +### Aggregate + +`AggregateRoot` extends `Entity` and acts as the consistency boundary of the aggregate cluster — all external writes must go through it. It is a frozen dataclass: every state-change method must return a new instance via `_evolve(**kwargs)._record(*events)`. + +```python +from __future__ import annotations +from dataclasses import dataclass +from typing import NewType +from uuid import UUID + +from seedwork.domain import AggregateRoot + +UserId = NewType("UserId", UUID) + +@dataclass(frozen=True, eq=False, kw_only=True) +class BankAccount(AggregateRoot[BankAccountId]): + owner_id: UserId + balance: Money + + def validate(self) -> None: + pass + + @classmethod + def open(cls, id: BankAccountId, owner_id: UserId, initial_balance: Money) -> BankAccount: + return cls(id=id, owner_id=owner_id, balance=initial_balance)._record( + AccountOpened.create( + initial_balance=initial_balance.amount, + currency=initial_balance.currency, + aggregate_id=str(id), + ) + ) + + def deposit(self, amount: Money) -> BankAccount: + new_balance = Money(amount=self.balance.amount + amount.amount, currency=self.balance.currency) + return self._evolve(balance=new_balance)._record( + MoneyDeposited.create( + amount=amount.amount, + aggregate_id=str(self.id), + ) + ) +``` -### Commands and handlers +#### Key points -- One command class per write use case, subclassing `Command` as `@dataclass(frozen=True, kw_only=True)`. The decorator is required — omitting it means fields are not part of `__init__` and construction will fail at runtime. -- One handler implementing `CommandHandler[TCommand]` (Protocol — no inheritance required). -- Handler pattern: load aggregate → call domain method → save the **returned** instance. Event publishing is handled transparently by `DomainEventPublishingRepository` — do not publish events inside handlers. -- Use primitives in commands when the handler constructs domain objects internally. This keeps the port boundary free of domain type coupling. -- Do not put business logic in the handler — only orchestration. +- `domain_events` is `tuple[DomainEvent, ...]` (immutable). `_record(*events)` returns a new instance with the events appended. +- `id` is inherited from `Entity[TId]` — do not re-declare it in the subclass. +- The `DomainEventPublishingRepository` reads `domain_events` after each `save()`. The `DeferredDomainEventBus` deduplicates by `event.id` — no manual clearing needed. -### Queries and handlers +#### Do -- One query class per read use case, subclassing `Query[TResult]` as `@dataclass(frozen=True, kw_only=True)`. The result type parameter is mandatory — it makes `QueryBus.ask` fully typed with no `Any` at the call site. -- One handler implementing `QueryHandler[TQuery, TResult]` (Protocol — no inheritance required), returning a plain dataclass — never a domain entity or aggregate. -- **Never inject a domain `Repository[TId, TAggregate]` into a query handler.** Define an ad-hoc read repository as a Protocol in the application layer (alongside the query) and implement it in infrastructure. This decouples the read model from the write model and allows read-side optimizations. -- Handlers are read-only: no commands dispatched, no state changed. +- Use `_evolve(**kwargs)._record(*events)` for all state changes. +- Use class factory methods (`open`, `reconstitute`) — never expose the constructor directly to application code. -### Domain event handlers +#### Don't -- Implement `DomainEventHandler[TEvent]` (Protocol — no inheritance required). -- One concern per handler (e.g. update projection, send notification — these are separate handlers). -- Design for idempotency when the bus may redeliver events. -- Wiring (routing event types to handlers) is the responsibility of the consuming project's composition root. +- Mutate state in place — `AggregateRoot` is frozen. +- Put orchestration, I/O, or repository calls inside the aggregate. +- Re-declare `id` in the subclass. --- -## Infrastructure layer +### Domain Events + +A Domain Event represents something that happened in the domain — a meaningful business fact, not a technical operation. Events are always recorded by the aggregate root itself via `_record(*events)`; no external code creates or injects them. + +Use a frozen payload dataclass for the event fields. `BaseDomainEvent[TPayload]` provides `id` and `occurred_at` with defaults; `aggregate_id` is a **required** constructor argument — pass it explicitly to identify the emitting aggregate. + +```python +from __future__ import annotations +from dataclasses import dataclass + +from seedwork.domain.domain_event import BaseDomainEvent + + +@dataclass(frozen=True) +class AccountOpenedPayload: + initial_balance: float + currency: str + + +@dataclass(frozen=True) +class AccountOpened(BaseDomainEvent[AccountOpenedPayload]): + @classmethod + def create(cls, initial_balance: float, currency: str, aggregate_id: str) -> AccountOpened: + return cls( + payload=AccountOpenedPayload(initial_balance=initial_balance, currency=currency), + aggregate_id=aggregate_id, + ) +``` + +Usage from the aggregate: + +```python +AccountOpened.create( + initial_balance=initial_balance.amount, + currency=initial_balance.currency, + aggregate_id=str(self.id), +) +``` + +#### Key points -- Implement `Repository` and `UnitOfWork` here, not in domain. -- Wire `RegistryCommandBus` and `RegistryQueryBus` with handlers via `.register(...)`. -- Compose command buses using `CommandBusBuilder`: `.with_transaction()` wraps the `RegistryCommandBus`. -- Wrap the repository with `DomainEventPublishingRepository` to publish events transparently after `save`. -- Do not put domain or application use-case logic in infrastructure. +- Named in past tense: `AccountOpened`, `MoneyDeposited`. +- Keep payloads minimal — use a frozen payload dataclass with primitive or value object fields. +- Routing is done with `isinstance` / `type()` — no `type` or `version` fields needed. + +#### Do + +- Use a `create()` classmethod as factory — callers pass plain data, the factory constructs the payload internally. This decouples the aggregate from the payload structure. + +#### Don't + +- Call the constructor directly from the aggregate — always go through `create()`. +- Add `type` or `version` fields — domain events are internal to the bounded context. +- Create domain events outside the aggregate — handlers and repositories must never instantiate events directly. +- Use domain events to communicate to other services — use Integration Events for that. + +> **Deletion note.** Deletion is not automatically a domain event. Use `delete_by_id` for technical removal with no business significance. If deletion is a meaningful domain occurrence (e.g. closing an account), model it as an aggregate behaviour (`account.close()`) that records the event — the handler then calls `save()` or `delete_by_id()` as appropriate. Never use `delete_by_id` when a domain event must be raised. --- -## Naming conventions +### Repository -| Artifact | Convention | Example | -|---|---|---| -| Aggregate / Entity | `PascalCase` noun | `BankAccount`, `Transaction` | -| ID (`NewType` or VO) | `Id` | `BankAccountId = NewType("BankAccountId", str)` | -| Value object | `PascalCase` noun | `Money`, `EmailAddress` | -| Domain event | `PascalCase` past tense | `MoneyDeposited`, `AccountOpened` | -| Domain event payload | `Payload` | `MoneyDepositedPayload` | -| Command | `PascalCase` verb phrase + `Command` | `DepositMoneyCommand`, `OpenAccountCommand` | -| Query | `GetQuery` or `FindQuery` | `GetBalanceQuery`, `FindActiveAccountsQuery` | -| Command / query handler | `Handler` | `DepositMoneyHandler`, `GetBalanceHandler` | -| Write repository interface | `Repository` | `BankAccountRepository` | -| Read repository interface | `ReadRepository` | `BankAccountReadRepository` | -| Domain error class | `Error` | `InsufficientFundsError` | -| Domain event handler | `On` or by concern | `OnMoneyDeposited`, `UpdateBalanceProjection` | -| File names | `snake_case`, one class per file | `bank_account.py`, `deposit_money_handler.py` | - ---- - -## File and folder structure - -```text -src/ -└── / - ├── domain/ - │ ├── .py - │ ├── _id.py - │ ├── value_objects/ - │ │ └── .py - │ ├── events/ - │ │ └── .py - │ └── _repository.py - ├── application/ - │ └── / - │ ├── _command.py # or _query.py - │ ├── _handler.py - │ ├── _response.py # plain dataclass (queries only) - │ └── _read_repository.py # read port Protocol (queries only) - └── infrastructure/ - ├── __repository.py # write repository - └── __read_repository.py # read repository -``` - -One public class per file. File name matches the exported class name in `snake_case`. +A Repository is a domain port — defined in the domain layer as a `Protocol`, implemented in infrastructure. Its sole concern is the persistence of aggregates: it abstracts storage behind a collection-like interface so the domain never depends on a specific technology. Only aggregate roots have repositories; child entities and value objects are persisted as part of their aggregate, never independently. + +```python +from __future__ import annotations +from typing import Protocol +from .account import BankAccount + +class BankAccountRepository(Repository[BankAccountId, BankAccount], Protocol): + pass +``` + +#### Key points + +- The interface is defined in the domain layer; the concrete implementation lives in infrastructure. +- `Protocol` with `Repository[TId, TAgg]` as base — structural typing, no coupling between port and adapter. +- Method signatures (`find_by_id`, `save`, `delete_by_id`) are inherited from `Repository`; no need to redeclare them. +- `pass` body is intentional — redeclaring methods is redundant and couples the port to the base class's naming. + +#### Don't + +- Add query methods (`find_by_email`, `find_by_status`) to the domain repository — define a read port in the application layer. +- Return ORM models from the repository — return domain types only. --- -## Design decisions +### Domain Errors -### Protocols over ABCs (PEP 544) +Domain errors represent business rule violations — named, typed, and defined in the domain layer. Each distinct invariant gets its own class extending `DomainError`. -All contracts with no shared implementation use `Protocol` rather than abstract base classes. Implementations satisfy them structurally — no inheritance from seedwork is required. This keeps the dependency direction clean: consuming code never inherits from library internals, and swapping implementations requires no base class changes. +```python +from seedwork.domain import DomainError -The exceptions are `Command`, `Query`, `ValueObject`, `Entity`, and `AggregateRoot` — these use nominal (inheritance-based) typing because inheritance here communicates DDD intent, not just interface conformance. +class InsufficientFundsError(DomainError): + def __init__(self) -> None: + super().__init__("Insufficient funds", "INSUFFICIENT_FUNDS") -### TypeVar variance naming (PEP 484) +class AccountNotFoundError(DomainError): + def __init__(self, account_id: str) -> None: + super().__init__(f"Account {account_id} not found", "ACCOUNT_NOT_FOUND") +``` -TypeVars with declared variance carry `_co` (covariant) or `_contra` (contravariant) suffixes as specified by PEP 484. This makes the variance constraint visible at the point of use without navigating to the TypeVar definition. +#### Key points -| TypeVar | Variance | Why | -|---|---|---| -| `TCommand_contra`, `TQuery_contra`, `TEvent_contra` | Contravariant | Handler input parameters: a handler of a supertype satisfies a handler of a subtype | -| `TId_contra` | Contravariant | Repository ID parameter | -| `TResult_co` | Covariant | Query handler result: a handler returning a subtype satisfies one returning a supertype | +- The `code` string is the machine-readable identifier — bake it into `__init__` so call sites only supply domain-specific arguments. +- The `RegistryCommandBus` catches `DomainError` and wraps it in a failed `Result` automatically — handlers never catch them. +- Define one subclass per distinct business rule violation. -### Protocol method stubs use `...` +#### Don't -Protocol method bodies use `...` as the stub, following PEP 544 convention for `.py` files. Python requires a syntactic body for all function definitions; `...` is the minimal idiomatic form for abstract protocol methods. +- Raise `ValueError` or generic exceptions for business rule violations. +- Catch `DomainError` in the handler — let the bus convert it to `Result.failed`. -### `correlation_id` propagation via `ContextVar` +--- + +## Application layer -`correlation_id` is a request-scoped value that must flow from the entry point (HTTP handler, CLI, consumer) through the entire call stack without threading it through every function signature. The seedwork does not provide this abstraction; each bounded context owns it. +### Command and handler -**Pattern:** +A `Command` expresses an intent to change state — it carries the input data and validates it in `__post_init__`. A `CommandHandler` receives a guaranteed-valid command, loads or creates an aggregate, calls the domain method, and saves. No business logic lives in the handler. ```python -# application/request_context.py -from contextvars import ContextVar +from __future__ import annotations +from dataclasses import dataclass +from seedwork.application import Command, CommandHandler +from seedwork.domain import DomainError + + +class InvalidInitialBalanceError(DomainError): + def __init__(self) -> None: + super().__init__("initial_balance must be non-negative", "INVALID_INITIAL_BALANCE") + -correlation_id: ContextVar[str] = ContextVar("correlation_id") +@dataclass(frozen=True, kw_only=True) +class OpenAccountCommand(Command): + account_id: str + owner_id: str + initial_balance: float + currency: str + + def __post_init__(self) -> None: + if self.initial_balance < 0: + raise InvalidInitialBalanceError() + +class OpenAccountCommandHandler(CommandHandler[OpenAccountCommand]): + def __init__(self, repository: BankAccountRepository) -> None: + self._repository = repository + + async def handle(self, command: OpenAccountCommand) -> None: + account = BankAccount.open( + id=BankAccountId(command.account_id), + owner_id=UserId(command.owner_id), + initial_balance=Money(amount=command.initial_balance, currency=command.currency), + ) + await self._repository.save(account) ``` -The entry point sets it at the boundary: +#### Key points + +- The handler's sole responsibility: obtain aggregate (or create new) → call domain method → save. No business logic. +- `__post_init__` validates the command before the handler runs — the handler receives a guaranteed-valid command. +- `handle()` returns `None`. The bus wraps the outcome in `Result.ok()` on success or `Result.failed(errors)` on `DomainError`. + +#### Don't + +- Put business logic or domain conditions in the handler. +- Call `publish(aggregate.domain_events)` — `DomainEventPublishingRepository` does this automatically after `save()`. +- Return values from `handle()`. + +--- + +### Query and handler + +A `Query` expresses an intent to read data — always read-only. A `QueryHandler` returns `T | None` and never loads full aggregates nor triggers side effects. ```python -# infrastructure/http/middleware.py -token = correlation_id.set(request.headers.get("X-Correlation-Id", str(uuid4()))) -try: - response = await call_next(request) -finally: - correlation_id.reset(token) +from __future__ import annotations +from dataclasses import dataclass +from seedwork.application import Query, QueryHandler + +@dataclass(frozen=True, kw_only=True) +class GetAccountQuery(Query[AccountView]): + account_id: str + +@dataclass(frozen=True, kw_only=True) +class AccountView: + id: str + balance: float + +class GetAccountQueryHandler(QueryHandler[GetAccountQuery, AccountView]): + def __init__(self, repository: AccountReadRepository) -> None: + self._repository = repository + + async def handle(self, query: GetAccountQuery) -> AccountView | None: + return await self._repository.find_view(query.account_id) ``` -`DomainEventHandler` implementations read it when building integration events: +#### Key points + +- `Query[TResult]` is generic — declare the response type on each query subclass. +- `handle()` returns `T | None` — return `None` for not-found cases. +- Use a **read repository** (application-layer port) that returns projections, not the domain repository. + +#### Don't + +- Load full aggregates just to extract a few fields. +- Trigger side effects from a query handler — no `save()`, no domain events. +- Return domain aggregates as query results. + +--- + +### Result + +`Result` is the return type of every command dispatch: `ok` on success, `failed` with typed errors on a domain rule violation. It eliminates unchecked exceptions from the application boundary. ```python -correlation_id=correlation_id.get(str(uuid4())), # fallback generates a fresh id -causation_id=event.id, # the domain event that triggered this +# Command bus returns Result — inspect at the entry point +result = await command_bus.dispatch(command) + +if result.is_failed: # @property bool + return error_response(result.errors) + +# also: result.is_ok ``` -**`correlation_id` vs `causation_id`:** +#### Key points -| Field | Value | Meaning | -|---|---|---| -| `correlation_id` | request/trace id | Groups all events belonging to the same logical operation | -| `causation_id` | domain event `id` | The immediate cause of this integration event | +- `Result.ok()` — success. `Result.failed(errors)` — domain failure. Both are class methods. +- `result.is_ok` and `result.is_failed` are `@property` on the dataclass (no parentheses). -Never assign `event.id` to `correlation_id` — that conflates causation with correlation and breaks distributed tracing. +--- -### `BaseIntegrationEvent` subclass convention +### CommandBus wiring -Every concrete integration event declares `TYPE` and `VERSION` as `ClassVar[str]` and uses them in its factory method. This prevents scattered string literals and makes the event type searchable: +```python +from seedwork.infrastructure import CommandBusBuilder, RegistryCommandBus + +registry = RegistryCommandBus() +command_bus = ( + CommandBusBuilder(registry) + .register(OpenAccountCommand, OpenAccountCommandHandler(repository)) + .with_transaction(uow) # optional — TransactionalCommandBus + .with_domain_event_coordination(event_bus) # DomainEventCoordinatorCommandBus + .build() +) +``` + +#### Key points + +- Order: `with_transaction` (outermost) → `with_domain_event_coordination` → registry (innermost). +- `with_domain_event_coordination(event_bus)` calls `dispatch()` on success and `discard()` on failure. + +#### Do + +- Always use `CommandBusBuilder` — never instantiate bus decorators manually. + +#### Don't + +- Place `with_domain_event_coordination()` outside `with_transaction()` — events must be dispatched within the open transaction. + +--- + +### Integration Events + +An Integration Event communicates a meaningful business fact from this bounded context to the outside world. Unlike domain events (internal, synchronous), integration events cross service boundaries and are delivered asynchronously. They carry a stable, versioned contract: once published, their schema must not break consumers. + +**Concrete event — use `BaseIntegrationEvent` and a `from_domain_event()` factory:** ```python -from typing import ClassVar +from __future__ import annotations +from typing import TYPE_CHECKING, ClassVar +from uuid import uuid4 + +from seedwork.application.integration_events import BaseIntegrationEvent +from .request_context import correlation_id as _correlation_id + +if TYPE_CHECKING: + from .events import AccountOpened + class AccountOpenedIntegrationEvent(BaseIntegrationEvent): TYPE: ClassVar[str] = "bank_account.account_opened" @@ -231,8 +459,323 @@ class AccountOpenedIntegrationEvent(BaseIntegrationEvent): return cls( type=cls.TYPE, version=cls.VERSION, - ... + aggregate_id=event.aggregate_id, + payload={ + "account_id": event.aggregate_id, + "initial_balance": event.payload.initial_balance, + "currency": event.payload.currency, + }, + correlation_id=_correlation_id.get(str(uuid4())), # from execution context + causation_id=event.id, # domain event that caused this + ) +``` + +**Consumer side (Subscriber entrypoint):** + +```python +from seedwork.application import IntegrationEventHandler + +class AccountOpenedIntegrationEventHandler(IntegrationEventHandler[AccountOpenedIntegrationEvent]): + async def handle(self, event: AccountOpenedIntegrationEvent) -> None: + # React to an integration event from another bounded context + ... +``` + +#### Key points + +- `TYPE` and `VERSION` are class-level constants passed to `BaseIntegrationEvent`. +- `correlation_id` from execution context (`ContextVar`) — not from the domain event. +- `causation_id` = `event.id` (the domain event that triggered this). +- `publish()` takes a `Sequence[IntegrationEvent]` — pass `[event]` even for a single event. + +#### Do + +- Version from day 1; bump on breaking payload changes. +- Use a `from_domain_event()` factory classmethod on the integration event class. + +#### Don't + +- Publish from the aggregate or the command handler — always publish from `DomainEventHandler`. +- Omit `correlation_id`. +- Mutate an existing integration event schema — introduce a new version instead. + +--- + +### Background Tasks + +A Background Task defers work that must happen eventually but does not need to complete within the current request — sending emails, triggering webhooks, calling external APIs. Tasks are written to an outbox before the transaction commits, guaranteeing at-least-once execution by a worker even if the process crashes mid-flight. + +```python +from __future__ import annotations +from typing import TYPE_CHECKING, ClassVar +from uuid import uuid4 + +from seedwork.application.background_tasks import BaseBackgroundTask +from .request_context import correlation_id as _correlation_id + +if TYPE_CHECKING: + from .events import AccountOpened + + +class SendWelcomeEmailTask(BaseBackgroundTask): + TYPE: ClassVar[str] = "send_welcome_email" + + @classmethod + def from_domain_event(cls, event: AccountOpened) -> SendWelcomeEmailTask: + return cls( + type=cls.TYPE, + payload={"account_id": event.aggregate_id}, + correlation_id=_correlation_id.get(str(uuid4())), + causation_id=event.id, ) + + +class SendWelcomeEmailTaskHandler(TaskHandler[SendWelcomeEmailTask]): + async def handle(self, task: SendWelcomeEmailTask) -> None: + # idempotent — may be retried + ... +``` + +#### Key points + +- `TYPE` class attribute as discriminator for routing. +- `correlation_id` from execution context — not from the domain event. +- `causation_id` = `event.id` (the domain event that triggered this). + +#### Do + +- Design handlers **idempotent** — the task runner may deliver more than once. +- Schedule from a `DomainEventHandler` via `await task_scheduler.schedule(task)`. + +#### Don't + +- Schedule tasks from the aggregate or the command handler. +- Omit `correlation_id`. +- Use background tasks to communicate with other services — use integration events for that. + +--- + +### Execution context — correlationId propagation + +`correlation_id` is a cross-cutting tracing concern set at the entry point. The `Command` does **not** carry it. + +```python +# shared module, e.g. request_context.py +from contextvars import ContextVar +from uuid import uuid4 + +correlation_id: ContextVar[str] = ContextVar("correlation_id", default="") + +# Entry point (API controller, subscriber) +def set_correlation_id(incoming_id: str | None = None) -> None: + correlation_id.set(incoming_id or str(uuid4())) +``` + +#### Key points + +- `contextvars.ContextVar` propagates automatically through asyncio tasks — no explicit parameter threading needed. +- Set once per request at the entry point; read anywhere via `correlation_id.get()`. + +#### Don't + +- Thread `correlation_id` through function or handler signatures — use the `ContextVar` directly. +- Read `correlation_id` from the domain event — domain events do not carry it. + +--- + +### DomainEventHandler + +A `DomainEventHandler` reacts to a domain event inside the same transaction. It is the only place where integration events are published and background tasks are scheduled — never from the aggregate or the command handler. + +```python +from seedwork.application import DomainEventHandler +from .events import AccountOpened +from .request_context import correlation_id as _correlation_id + +class AccountOpenedDomainEventHandler(DomainEventHandler[AccountOpened]): + def __init__( + self, + publisher: IntegrationEventPublisher, + task_scheduler: TaskScheduler, + ) -> None: + self._publisher = publisher + self._task_scheduler = task_scheduler + + async def handle(self, event: AccountOpened) -> None: + ie = AccountOpenedIntegrationEvent.from_domain_event(event) + await self._publisher.publish([ie]) + task = SendWelcomeEmailTask.from_domain_event(event) + await self._task_scheduler.schedule(task) +``` + +#### Key points + +- Runs synchronously inside the same transaction (dispatched by `DomainEventBus.dispatch()`). +- Both `publish()` and `schedule()` write to the outbox inside the current transaction — actual delivery is eventual. +- `_correlation_id.get()` reads from the ambient ContextVar — set once per request at the entry point. + +#### Don't + +- Inject a repository into a `DomainEventHandler` — it must not load or save aggregates. +- Read `correlation_id` from the domain event — it does not carry it. + +--- + +## Infrastructure layer + +### DomainEventPublishingRepository + +Decorates a repository to publish domain events after each save: + +```python +from seedwork.infrastructure import DomainEventPublishingRepository + +publishing_repo = DomainEventPublishingRepository( + SqlAlchemyBankAccountRepository(session), + event_bus, # satisfies DomainEventBusPublisher +) +``` + +- Takes `DomainEventBusPublisher` (the segregated interface). +- After `save()`, reads `aggregate.domain_events` and calls `event_bus.publish(events)`. +- The command handler remains unaware of event publishing. + +### Full wiring example + +```python +from seedwork.infrastructure import ( + CommandBusBuilder, + DeferredDomainEventBus, + DomainEventPublishingRepository, + RegistryCommandBus, +) +from seedwork.testing import InMemoryIntegrationEventPublisher, InMemoryTaskScheduler + +event_bus = DeferredDomainEventBus() + +publisher = InMemoryIntegrationEventPublisher() # or OutboxIntegrationEventPublisher +scheduler = InMemoryTaskScheduler() # or OutboxTaskScheduler +scheduler.register(SendWelcomeEmailTask.TYPE, SendWelcomeEmailTaskHandler()) + +event_bus.subscribe(AccountOpened, AccountOpenedDomainEventHandler(publisher, scheduler)) + +account_repo = DomainEventPublishingRepository( + SqlAlchemyBankAccountRepository(session), + event_bus, +) + +registry = RegistryCommandBus() +command_bus = ( + CommandBusBuilder(registry) + .register(OpenAccountCommand, OpenAccountCommandHandler(account_repo)) + .with_transaction(uow) + .with_domain_event_coordination(event_bus) + .build() +) +``` + +`dispatch()` processes the buffered events; `discard()` clears without processing. The buffer is a `dict[str, DomainEvent]` keyed by `event.id` — saving the same aggregate twice does not duplicate events. + +### InMemory implementations (tests) + +```python +from seedwork.testing import ( + InMemoryRepository, # + RepositorySpy (all, reset) + InMemoryIntegrationEventPublisher, # + IntegrationEventPublisherSpy (published, reset) + InMemoryTaskScheduler, # + TaskSchedulerSpy (scheduled, register, execute_scheduled, reset) + InMemoryIntegrationEventOutboxRepository, + InMemoryTaskOutboxRepository, +) +``` + +```python +# spy usage in tests +publisher = InMemoryIntegrationEventPublisher() +scheduler = InMemoryTaskScheduler() +scheduler.register(SendWelcomeEmailTask.TYPE, SendWelcomeEmailTaskHandler()) + +await command_bus.dispatch(OpenAccountCommand(...)) + +assert len(publisher.published) == 1 +assert publisher.published[0].type == "bank_account.account_opened" + +await scheduler.execute_scheduled() # executes all scheduled tasks in-process +assert len(scheduler.scheduled) == 0 # tasks were consumed +``` + +- `InMemoryIntegrationEventPublisher` is spy-only — do not execute, integration events target other bounded contexts. +- `InMemoryTaskScheduler` supports both spy inspection (`scheduled`) and execution (`execute_scheduled()`). +- `reset()` clears state between tests. + +--- + +## Naming conventions + +| Concept | Convention | Example | +|---|---|---| +| Aggregate / Entity | `PascalCase` | `BankAccount`, `Transaction` | +| Value Object | `PascalCase` | `Money`, `BankAccountId` | +| Domain Event | Past tense `PascalCase` | `AccountOpened`, `MoneyDeposited` | +| Domain Event Payload | Past tense + `Payload` suffix | `AccountOpenedPayload` | +| Integration Event | Past tense + `IntegrationEvent` suffix | `AccountOpenedIntegrationEvent` | +| Command | Imperative + `Command` | `OpenAccountCommand` | +| Query | Noun phrase + `Query` | `GetBalanceQuery` | +| Command Handler | Imperative + `Handler` | `OpenAccountHandler` | +| Query Handler | Noun phrase + `Handler` | `GetBalanceHandler` | +| Domain Event Handler | Noun phrase + `DomainEventHandler` | `AccountOpenedDomainEventHandler` | +| Background Task | Imperative + `Task` | `SendWelcomeEmailTask` | +| Task Handler | Imperative + `TaskHandler` | `SendWelcomeEmailTaskHandler` | +| Repository (port) | `{Aggregate}Repository` | `BankAccountRepository` | +| Repository (impl) | `{ORM}{Aggregate}Repository` | `SqlAlchemyBankAccountRepository` | +| Module file | `snake_case.py` | `bank_account_repository.py` | + +--- + +## Design decisions + +### Protocols over ABCs + +Python ABCs enforce implementation via `isinstance` checks. `Protocol` achieves structural subtyping — the implementor never imports the port. This is the correct dependency-inversion direction for hexagonal architecture. + +Use ABCs only when a base class carries shared, non-trivial logic (e.g., `BaseBackgroundTask`, `BaseDomainEvent`, `AggregateRoot`). + +```python +# Port — Protocol, zero coupling to domain +# 'pass' body: Repository[BankAccountId, BankAccount] already carries the contract +class BankAccountRepository(Repository[BankAccountId, BankAccount], Protocol): + pass + +# Adapter — no import of the Protocol required +class SqlAlchemyBankAccountRepository: + async def find_by_id(self, id: BankAccountId) -> BankAccount | None: + ... +``` + +### TypeVar variance naming + +When a generic `Protocol` is covariant or contravariant, name the TypeVar explicitly: + +```python +from typing import TypeVar + +T_co = TypeVar("T_co", covariant=True) +T_contra = TypeVar("T_contra", contravariant=True) + +class QueryHandler(Protocol[T_contra, T_co]): + async def handle(self, query: T_contra) -> T_co: ... +``` + +### Spy interfaces for testing + +```python +from typing import Protocol, runtime_checkable +from seedwork.testing import RepositorySpy + +# RepositorySpy is @runtime_checkable — isinstance() works at runtime +assert isinstance(repo, RepositorySpy) +assert len(repo.all) == 1 + +repo.reset() # between tests ``` -`BaseIntegrationEvent` does not validate that `type`/`version` are non-empty — that is a programming error the `ClassVar` convention prevents at authoring time. No runtime validation is added. +Keep spy interfaces out of production code paths — import them only in tests and InMemory implementations. diff --git a/docs/examples/bank_account/application/open_account/open_account_command.py b/docs/examples/bank_account/application/open_account/open_account_command.py index 7951203..2ead01a 100644 --- a/docs/examples/bank_account/application/open_account/open_account_command.py +++ b/docs/examples/bank_account/application/open_account/open_account_command.py @@ -6,5 +6,6 @@ @dataclass(frozen=True, kw_only=True) class OpenAccountCommand(Command): account_id: str + owner_id: str initial_balance: float currency: str diff --git a/docs/examples/bank_account/application/open_account/open_account_handler.py b/docs/examples/bank_account/application/open_account/open_account_handler.py index 0669163..0119de9 100644 --- a/docs/examples/bank_account/application/open_account/open_account_handler.py +++ b/docs/examples/bank_account/application/open_account/open_account_handler.py @@ -3,6 +3,7 @@ from bank_account.domain.bank_account_id import BankAccountId from bank_account.domain.bank_account_repository import BankAccountRepository from bank_account.domain.money import Money +from bank_account.domain.user_id import UserId from seedwork.application.commands import CommandHandler @@ -14,6 +15,7 @@ def __init__(self, repository: BankAccountRepository) -> None: async def handle(self, command: OpenAccountCommand) -> None: account = BankAccount.open( id=BankAccountId(command.account_id), + owner_id=UserId(command.owner_id), initial_balance=Money(amount=command.initial_balance, currency=command.currency), ) await self._repository.save(account) diff --git a/docs/examples/bank_account/domain/bank_account.py b/docs/examples/bank_account/domain/bank_account.py index 7e1c953..91ad0a1 100644 --- a/docs/examples/bank_account/domain/bank_account.py +++ b/docs/examples/bank_account/domain/bank_account.py @@ -1,72 +1,69 @@ +from __future__ import annotations + from dataclasses import dataclass from typing import Self from bank_account.domain.bank_account_id import BankAccountId from bank_account.domain.errors import CurrencyMismatchError, InsufficientFundsError -from bank_account.domain.events.account_credited import ( - AccountCredited, - AccountCreditedPayload, -) -from bank_account.domain.events.account_debited import ( - AccountDebited, - AccountDebitedPayload, -) -from bank_account.domain.events.account_opened import AccountOpened, AccountOpenedPayload +from bank_account.domain.events.account_credited import AccountCredited +from bank_account.domain.events.account_debited import AccountDebited +from bank_account.domain.events.account_opened import AccountOpened from bank_account.domain.money import Money +from bank_account.domain.user_id import UserId from seedwork.domain.aggregate_root import AggregateRoot @dataclass(frozen=True, eq=False, kw_only=True) class BankAccount(AggregateRoot[BankAccountId]): + owner_id: UserId balance: Money @classmethod - def reconstitute(cls, id: BankAccountId, balance: Money) -> Self: - return cls(id=id, balance=balance) + def reconstitute(cls, id: BankAccountId, owner_id: UserId, balance: Money) -> Self: + return cls(id=id, owner_id=owner_id, balance=balance) @classmethod - def open(cls, id: BankAccountId, initial_balance: Money) -> Self: - event = AccountOpened( - aggregate_id=id, - payload=AccountOpenedPayload( + def open(cls, id: BankAccountId, owner_id: UserId, initial_balance: Money) -> Self: + return cls(id=id, owner_id=owner_id, balance=initial_balance)._record( + AccountOpened.create( initial_balance=initial_balance.amount, currency=initial_balance.currency, - ), + aggregate_id=str(id), + ) ) - return cls(id=id, balance=initial_balance, domain_events=(event,)) def credit(self, amount: Money) -> Self: if amount.currency != self.balance.currency: raise CurrencyMismatchError(self.balance.currency, amount.currency) - new_amount = self.balance.amount + amount.amount - new_balance = Money(amount=new_amount, currency=self.balance.currency) + new_balance = Money( + amount=self.balance.amount + amount.amount, + currency=self.balance.currency, + ) return self._evolve(balance=new_balance)._record( - AccountCredited( - aggregate_id=self.id, - payload=AccountCreditedPayload( - amount=amount.amount, - currency=amount.currency, - ), + AccountCredited.create( + amount=amount.amount, + currency=amount.currency, + aggregate_id=str(self.id), ) ) - def validate(self) -> None: - pass - def debit(self, amount: Money) -> Self: if amount.currency != self.balance.currency: raise CurrencyMismatchError(self.balance.currency, amount.currency) if amount.amount > self.balance.amount: raise InsufficientFundsError() - new_amount = self.balance.amount - amount.amount - new_balance = Money(amount=new_amount, currency=self.balance.currency) + new_balance = Money( + amount=self.balance.amount - amount.amount, + currency=self.balance.currency, + ) return self._evolve(balance=new_balance)._record( - AccountDebited( - aggregate_id=self.id, - payload=AccountDebitedPayload( - amount=amount.amount, - currency=amount.currency, - ), + AccountDebited.create( + amount=amount.amount, + currency=amount.currency, + aggregate_id=str(self.id), ) ) + + def validate(self) -> None: + pass diff --git a/docs/examples/bank_account/domain/events/account_credited.py b/docs/examples/bank_account/domain/events/account_credited.py index 0f5faa7..d33d6de 100644 --- a/docs/examples/bank_account/domain/events/account_credited.py +++ b/docs/examples/bank_account/domain/events/account_credited.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from dataclasses import dataclass from seedwork.domain.domain_event import BaseDomainEvent @@ -11,4 +13,9 @@ class AccountCreditedPayload: @dataclass(frozen=True) class AccountCredited(BaseDomainEvent[AccountCreditedPayload]): - pass + @classmethod + def create(cls, amount: float, currency: str, aggregate_id: str) -> AccountCredited: + return cls( + payload=AccountCreditedPayload(amount=amount, currency=currency), + aggregate_id=aggregate_id, + ) diff --git a/docs/examples/bank_account/domain/events/account_debited.py b/docs/examples/bank_account/domain/events/account_debited.py index 56a97a9..b8d7399 100644 --- a/docs/examples/bank_account/domain/events/account_debited.py +++ b/docs/examples/bank_account/domain/events/account_debited.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from dataclasses import dataclass from seedwork.domain.domain_event import BaseDomainEvent @@ -11,4 +13,9 @@ class AccountDebitedPayload: @dataclass(frozen=True) class AccountDebited(BaseDomainEvent[AccountDebitedPayload]): - pass + @classmethod + def create(cls, amount: float, currency: str, aggregate_id: str) -> AccountDebited: + return cls( + payload=AccountDebitedPayload(amount=amount, currency=currency), + aggregate_id=aggregate_id, + ) diff --git a/docs/examples/bank_account/domain/events/account_opened.py b/docs/examples/bank_account/domain/events/account_opened.py index 801277c..7800b4e 100644 --- a/docs/examples/bank_account/domain/events/account_opened.py +++ b/docs/examples/bank_account/domain/events/account_opened.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from dataclasses import dataclass from seedwork.domain.domain_event import BaseDomainEvent @@ -11,4 +13,9 @@ class AccountOpenedPayload: @dataclass(frozen=True) class AccountOpened(BaseDomainEvent[AccountOpenedPayload]): - pass + @classmethod + def create(cls, initial_balance: float, currency: str, aggregate_id: str) -> AccountOpened: + return cls( + payload=AccountOpenedPayload(initial_balance=initial_balance, currency=currency), + aggregate_id=aggregate_id, + ) diff --git a/docs/examples/bank_account/domain/user_id.py b/docs/examples/bank_account/domain/user_id.py new file mode 100644 index 0000000..53892f4 --- /dev/null +++ b/docs/examples/bank_account/domain/user_id.py @@ -0,0 +1,3 @@ +from typing import NewType + +UserId = NewType("UserId", str) diff --git a/docs/examples/bank_account/tests/test_bank_account_domain.py b/docs/examples/bank_account/tests/test_bank_account_domain.py index 813dc0e..9f62ed9 100644 --- a/docs/examples/bank_account/tests/test_bank_account_domain.py +++ b/docs/examples/bank_account/tests/test_bank_account_domain.py @@ -6,15 +6,18 @@ from bank_account.domain.events.account_debited import AccountDebited from bank_account.domain.events.account_opened import AccountOpened from bank_account.domain.money import Money +from bank_account.domain.user_id import UserId def make_account( account_id: str = "acc-1", + owner_id: str = "user-1", amount: float = 100.0, currency: str = "EUR", ) -> BankAccount: return BankAccount.open( id=BankAccountId(account_id), + owner_id=UserId(owner_id), initial_balance=Money(amount=amount, currency=currency), ) diff --git a/docs/examples/bank_account/tests/test_composition_root.py b/docs/examples/bank_account/tests/test_composition_root.py index 0c2eec2..83f0466 100644 --- a/docs/examples/bank_account/tests/test_composition_root.py +++ b/docs/examples/bank_account/tests/test_composition_root.py @@ -11,7 +11,9 @@ async def test_compose_open_account_publishes_integration_event() -> None: command_bus, _ = compose(integration_publisher=publisher) await command_bus.dispatch( - OpenAccountCommand(account_id="acc-1", initial_balance=100.0, currency="EUR") + OpenAccountCommand( + account_id="acc-1", owner_id="user-1", initial_balance=100.0, currency="EUR" + ) ) assert len(publisher.published) == 1 @@ -27,7 +29,9 @@ async def test_compose_deposit_money() -> None: command_bus, _ = compose() await command_bus.dispatch( - OpenAccountCommand(account_id="acc-1", initial_balance=100.0, currency="EUR") + OpenAccountCommand( + account_id="acc-1", owner_id="user-1", initial_balance=100.0, currency="EUR" + ) ) result = await command_bus.dispatch( DepositMoneyCommand(account_id="acc-1", amount=50.0, currency="EUR") @@ -40,7 +44,9 @@ async def test_compose_withdraw_money() -> None: command_bus, _ = compose() await command_bus.dispatch( - OpenAccountCommand(account_id="acc-1", initial_balance=200.0, currency="EUR") + OpenAccountCommand( + account_id="acc-1", owner_id="user-1", initial_balance=200.0, currency="EUR" + ) ) result = await command_bus.dispatch( WithdrawMoneyCommand(account_id="acc-1", amount=80.0, currency="EUR") diff --git a/docs/examples/bank_account/tests/test_deposit_money_handler.py b/docs/examples/bank_account/tests/test_deposit_money_handler.py index ae408f7..2965100 100644 --- a/docs/examples/bank_account/tests/test_deposit_money_handler.py +++ b/docs/examples/bank_account/tests/test_deposit_money_handler.py @@ -5,6 +5,7 @@ from bank_account.domain.bank_account_id import BankAccountId from bank_account.domain.errors import AccountNotFoundError from bank_account.domain.money import Money +from bank_account.domain.user_id import UserId from bank_account.infrastructure.in_memory_bank_account_repository import ( InMemoryBankAccountRepository, ) @@ -14,6 +15,7 @@ async def test_deposit_increases_balance() -> None: repo = InMemoryBankAccountRepository() account = BankAccount.open( id=BankAccountId("acc-1"), + owner_id=UserId("user-1"), initial_balance=Money(amount=100.0, currency="EUR"), ) await repo.save(account) diff --git a/docs/examples/bank_account/tests/test_open_account_handler.py b/docs/examples/bank_account/tests/test_open_account_handler.py index f0e76d9..9e95478 100644 --- a/docs/examples/bank_account/tests/test_open_account_handler.py +++ b/docs/examples/bank_account/tests/test_open_account_handler.py @@ -18,7 +18,9 @@ async def test_open_account_saves_account_with_initial_balance() -> None: handler = OpenAccountHandler(repo) await handler.handle( - OpenAccountCommand(account_id="acc-1", initial_balance=100.0, currency="EUR") + OpenAccountCommand( + account_id="acc-1", owner_id="user-1", initial_balance=100.0, currency="EUR" + ) ) account = await inner_repo.find_by_id(BankAccountId("acc-1")) @@ -42,7 +44,9 @@ async def handle(self, event: AccountOpened) -> None: handler = OpenAccountHandler(repo) await handler.handle( - OpenAccountCommand(account_id="acc-3", initial_balance=200.0, currency="EUR") + OpenAccountCommand( + account_id="acc-3", owner_id="user-1", initial_balance=200.0, currency="EUR" + ) ) await event_bus.dispatch() diff --git a/docs/examples/bank_account/tests/test_send_welcome_email_task_handler.py b/docs/examples/bank_account/tests/test_send_welcome_email_task_handler.py index 8420d41..da04d63 100644 --- a/docs/examples/bank_account/tests/test_send_welcome_email_task_handler.py +++ b/docs/examples/bank_account/tests/test_send_welcome_email_task_handler.py @@ -52,7 +52,9 @@ async def test_compose_open_account_schedules_welcome_email_task() -> None: command_bus, _ = compose(task_scheduler=scheduler) await command_bus.dispatch( - OpenAccountCommand(account_id="acc-1", initial_balance=100.0, currency="EUR") + OpenAccountCommand( + account_id="acc-1", owner_id="user-1", initial_balance=100.0, currency="EUR" + ) ) assert len(scheduler.scheduled) == 1 diff --git a/docs/examples/bank_account/tests/test_withdraw_money_handler.py b/docs/examples/bank_account/tests/test_withdraw_money_handler.py index c9e38a5..8c742fb 100644 --- a/docs/examples/bank_account/tests/test_withdraw_money_handler.py +++ b/docs/examples/bank_account/tests/test_withdraw_money_handler.py @@ -5,6 +5,7 @@ from bank_account.domain.bank_account_id import BankAccountId from bank_account.domain.errors import AccountNotFoundError, InsufficientFundsError from bank_account.domain.money import Money +from bank_account.domain.user_id import UserId from bank_account.infrastructure.in_memory_bank_account_repository import ( InMemoryBankAccountRepository, ) @@ -14,6 +15,7 @@ async def test_withdraw_decreases_balance() -> None: repo = InMemoryBankAccountRepository() account = BankAccount.open( id=BankAccountId("acc-1"), + owner_id=UserId("user-1"), initial_balance=Money(amount=200.0, currency="EUR"), ) await repo.save(account) @@ -38,6 +40,7 @@ async def test_withdraw_insufficient_funds_raises() -> None: repo = InMemoryBankAccountRepository() account = BankAccount.open( id=BankAccountId("acc-1"), + owner_id=UserId("user-1"), initial_balance=Money(amount=30.0, currency="EUR"), ) await repo.save(account) diff --git a/tests/application/test_deposit_money_handler.py b/tests/application/test_deposit_money_handler.py index 4adf179..2c6478e 100644 --- a/tests/application/test_deposit_money_handler.py +++ b/tests/application/test_deposit_money_handler.py @@ -5,6 +5,7 @@ from bank_account.domain.bank_account_id import BankAccountId from bank_account.domain.errors import AccountNotFoundError from bank_account.domain.money import Money +from bank_account.domain.user_id import UserId from seedwork.testing import InMemoryRepository @@ -17,6 +18,7 @@ async def test_deposit_increases_balance() -> None: repo = BankAccountInMemoryRepository() account = BankAccount.open( id=BankAccountId("acc-1"), + owner_id=UserId("user-1"), initial_balance=Money(amount=100.0, currency="EUR"), ) await repo.save(account) diff --git a/tests/application/test_open_account_handler.py b/tests/application/test_open_account_handler.py index cefff4f..d892602 100644 --- a/tests/application/test_open_account_handler.py +++ b/tests/application/test_open_account_handler.py @@ -16,7 +16,9 @@ async def test_open_account_saves_account_with_initial_balance() -> None: handler = OpenAccountHandler(repo) await handler.handle( - OpenAccountCommand(account_id="acc-1", initial_balance=100.0, currency="EUR") + OpenAccountCommand( + account_id="acc-1", owner_id="user-1", initial_balance=100.0, currency="EUR" + ) ) account = await repo.find_by_id(BankAccountId("acc-1")) @@ -29,7 +31,9 @@ async def test_open_account_records_domain_event() -> None: handler = OpenAccountHandler(repo) await handler.handle( - OpenAccountCommand(account_id="acc-2", initial_balance=50.0, currency="USD") + OpenAccountCommand( + account_id="acc-2", owner_id="user-1", initial_balance=50.0, currency="USD" + ) ) account = await repo.find_by_id(BankAccountId("acc-2")) diff --git a/tests/domain/test_aggregate_root.py b/tests/domain/test_aggregate_root.py index 8b0ac23..431acfc 100644 --- a/tests/domain/test_aggregate_root.py +++ b/tests/domain/test_aggregate_root.py @@ -6,10 +6,13 @@ from bank_account.domain.errors import CurrencyMismatchError, InsufficientFundsError from bank_account.domain.events.account_opened import AccountOpened from bank_account.domain.money import Money +from bank_account.domain.user_id import UserId def make_account(balance: float = 100.0, currency: str = "EUR") -> BankAccount: - return BankAccount.open(BankAccountId("acc-1"), Money(amount=balance, currency=currency)) + return BankAccount.open( + BankAccountId("acc-1"), UserId("user-1"), Money(amount=balance, currency=currency) + ) def test_open_records_domain_event() -> None: @@ -79,6 +82,7 @@ def test_debit_raises_on_currency_mismatch() -> None: def test_reconstitute_restores_state_without_events() -> None: account = BankAccount.reconstitute( id=BankAccountId("acc-1"), + owner_id=UserId("user-1"), balance=Money(amount=250.0, currency="EUR"), ) assert account.id == BankAccountId("acc-1") @@ -90,6 +94,7 @@ def test_reconstitute_instance_is_equal_to_open_instance() -> None: opened = make_account(balance=100.0) reconstituted = BankAccount.reconstitute( id=BankAccountId("acc-1"), + owner_id=UserId("user-1"), balance=Money(amount=100.0, currency="EUR"), ) assert opened == reconstituted diff --git a/tests/infrastructure/test_bus_stack_composition.py b/tests/infrastructure/test_bus_stack_composition.py index 1924954..4dad243 100644 --- a/tests/infrastructure/test_bus_stack_composition.py +++ b/tests/infrastructure/test_bus_stack_composition.py @@ -43,7 +43,9 @@ async def test_full_bus_stack_dispatches_command() -> None: ) result = await bus.dispatch( - OpenAccountCommand(account_id="acc-1", initial_balance=200.0, currency="EUR") + OpenAccountCommand( + account_id="acc-1", owner_id="user-1", initial_balance=200.0, currency="EUR" + ) ) assert result.is_ok diff --git a/tests/infrastructure/test_domain_event_publishing_repository.py b/tests/infrastructure/test_domain_event_publishing_repository.py index 7cbce42..96aae1e 100644 --- a/tests/infrastructure/test_domain_event_publishing_repository.py +++ b/tests/infrastructure/test_domain_event_publishing_repository.py @@ -4,6 +4,7 @@ from bank_account.domain.bank_account import BankAccount from bank_account.domain.bank_account_id import BankAccountId from bank_account.domain.money import Money +from bank_account.domain.user_id import UserId from seedwork.application.domain_event_bus import DomainEventBusPublisher from seedwork.domain.domain_event import DomainEvent @@ -40,7 +41,9 @@ async def test_save_publishes_domain_events() -> None: publisher = SpyPublisher() repo = DomainEventPublishingRepository(inner, publisher) - account = BankAccount.open(BankAccountId("acc-1"), Money(amount=100.0, currency="EUR")) + account = BankAccount.open( + BankAccountId("acc-1"), UserId("user-1"), Money(amount=100.0, currency="EUR") + ) await repo.save(account) assert len(publisher.published) == 1 @@ -51,7 +54,9 @@ async def test_save_with_no_events_does_not_publish() -> None: publisher = SpyPublisher() repo = DomainEventPublishingRepository(inner, publisher) - account = BankAccount.open(BankAccountId("acc-4"), Money(amount=10.0, currency="EUR")) + account = BankAccount.open( + BankAccountId("acc-4"), UserId("user-1"), Money(amount=10.0, currency="EUR") + ) account_no_events = dataclass_replace(account, domain_events=()) await repo.save(account_no_events) @@ -64,7 +69,9 @@ async def test_find_by_id_delegates_to_inner() -> None: publisher = SpyPublisher() repo = DomainEventPublishingRepository(inner, publisher) - account = BankAccount.open(BankAccountId("acc-2"), Money(amount=50.0, currency="EUR")) + account = BankAccount.open( + BankAccountId("acc-2"), UserId("user-1"), Money(amount=50.0, currency="EUR") + ) await inner.save(account) found = await repo.find_by_id(BankAccountId("acc-2")) @@ -85,7 +92,9 @@ async def test_delete_does_not_publish_events() -> None: publisher = SpyPublisher() repo = DomainEventPublishingRepository(inner, publisher) - account = BankAccount.open(BankAccountId("acc-3"), Money(amount=50.0, currency="EUR")) + account = BankAccount.open( + BankAccountId("acc-3"), UserId("user-1"), Money(amount=50.0, currency="EUR") + ) await inner.save(account) await repo.delete_by_id(BankAccountId("acc-3")) diff --git a/tests/infrastructure/test_in_memory_repository.py b/tests/infrastructure/test_in_memory_repository.py index 4743a9c..6d7b1b2 100644 --- a/tests/infrastructure/test_in_memory_repository.py +++ b/tests/infrastructure/test_in_memory_repository.py @@ -1,12 +1,15 @@ from bank_account.domain.bank_account import BankAccount from bank_account.domain.bank_account_id import BankAccountId from bank_account.domain.money import Money +from bank_account.domain.user_id import UserId from seedwork.testing import InMemoryRepository, RepositorySpy def make_account(account_id: str = "acc-1", balance: float = 100.0) -> BankAccount: - return BankAccount.open(BankAccountId(account_id), Money(amount=balance, currency="EUR")) + return BankAccount.open( + BankAccountId(account_id), UserId("user-1"), Money(amount=balance, currency="EUR") + ) async def test_save_and_find_by_id() -> None: