A sandbox for exploring how to derive properties from immutable event sequences. Not intended for production use.
Events are processed through composable aggregators that incrementally build up state. Each aggregator declares the fields it reads (deps) and writes (outputs), and the framework handles dependency ordering, field isolation, and validation.
from highstep import Event, Pipeline, SetClosed, SetOpen, emit, problems, aggregator
from highstep.event import Problem
from highstep.field import Field
from highstep.state import StateView
from highstep.aggregators import AggregatorResult
from enum import StrEnum
class Status(StrEnum):
OPEN = "open"
CLOSED = "closed"
STATUS = Field[Status]("status", default=Status.OPEN)
@aggregator(outputs=(STATUS,))
def track_status(state: StateView, event: Event) -> AggregatorResult:
match (event.payload, state[STATUS]):
case (SetClosed(), Status.CLOSED):
return problems(Problem(event.id, "already closed"))
case (SetClosed(), Status.OPEN):
return emit(STATUS.value(Status.CLOSED))
case (SetOpen(), Status.OPEN):
return problems(Problem(event.id, "already open"))
case (SetOpen(), Status.CLOSED):
return emit(STATUS.value(Status.OPEN))
return None
pipeline = Pipeline([track_status])- Couple Fields to their Aggregator directly, so that a Field knows how to
produce itself and dependency resolution is implicit (e.g.
Pipeline([SPICY])automatically pulls inTRICKY,STATUS, etc.) - Event serialization/deserialization via Payload base class