A high-performance, in-memory, lock-free event bus for Go — built on a cache-line-padded atomic ring buffer with fan-out dispatch, middleware, lifecycle hooks, and back-pressure policies.
- Features
- Installation
- Quick Start
- Fan-out
- Batch Handlers
- Middleware
- Lifecycle Hooks
- Back-pressure Policies
- Async Mode
- Dead Letter Queue
- Transactions
- Scheduling
- API Reference
- Benchmarks
- Contributing
- License
- Lock-free ring buffer — atomic operations with cache-line padding keep dispatch sub-microsecond
- Fan-out dispatch — register multiple independent handlers per projection; each runs its own middleware chain
- Batch handlers — collect up to N events per projection and deliver them as a slice; ideal for bulk DB writes and HTTP batching
- Sync or async — flip
store.Async = trueto hand work off to a fixed worker pool - Middleware — wrap handlers with logging, tracing, retries, or any cross-cutting behaviour
- Lifecycle hooks —
OnBefore,OnAfter,OnErrorfor observability without touching handler logic - Back-pressure — choose
DropOldest,Block, orReturnErrorper store - Dead letter queue — failed and panicking events are routed to an inspectable, replayable secondary store instead of being silently counted
- Transactions — batch events and commit or roll back as a unit
- Scheduling — fire events at a future
time.Timeor after atime.Duration - Metrics — published, processed, and error counters via a single call
go get github.com/Protocol-Lattice/GoEventBusRequires Go 1.21+.
package main
import (
"context"
"fmt"
"log"
"github.com/Protocol-Lattice/GoEventBus"
)
type UserCreatedPayload struct{ ID string }
type HouseWasSold struct{}
type HouseSoldPayload struct {
Address string
Price int
}
func main() {
disp := GoEventBus.Dispatcher{}
disp.Register("user_created", func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) {
p := ev.Data.(UserCreatedPayload)
fmt.Println("User created:", p.ID)
return GoEventBus.Result{Message: "ok"}, nil
})
disp.Register(HouseWasSold{}, func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) {
p := ev.Data.(HouseSoldPayload)
fmt.Printf("House sold at %s for $%d\n", p.Address, p.Price)
return GoEventBus.Result{Message: "ok"}, nil
})
store := GoEventBus.NewEventStore(&disp, 1<<16, GoEventBus.DropOldest)
store.Async = true
_ = store.Subscribe(context.Background(), GoEventBus.Event{
ID: "evt1",
Projection: "user_created",
Data: UserCreatedPayload{ID: "u-42"},
})
_ = store.Subscribe(context.Background(), GoEventBus.Event{
ID: "evt2",
Projection: HouseWasSold{},
Data: HouseSoldPayload{Address: "1 Main St", Price: 500_000},
})
store.Publish()
if err := store.Drain(context.Background()); err != nil {
log.Fatal(err)
}
published, processed, errors := store.Metrics()
fmt.Printf("published=%d processed=%d errors=%d\n", published, processed, errors)
}Key points:
Projectioncan be any comparable value — a string, a struct, an integer.Datacarries the type-safe payload; assert it inside the handler.- Pass the context your handler should respect to
Subscribe; it is forwarded to every handler.
Register accumulates handlers. Every handler registered for a projection is called in order for each dispatched event. Handlers are fully independent — an error in one does not prevent the others from running.
disp := GoEventBus.Dispatcher{}
// Register as many handlers as you need for the same projection.
disp.Register("order.placed",
auditLogger,
inventoryReducer,
notificationSender,
)
// Register can also be called incrementally across different packages.
disp.Register("order.placed", analyticsTracker)In async mode each handler invocation becomes its own work item, so the four handlers above may run in parallel across the worker pool.
RegisterBatch collects all events for a projection that accumulate between Publish calls and delivers them to the handler as a slice — one call per chunk of up to size events. This eliminates per-event overhead for bulk operations like database inserts or HTTP batch APIs.
store := GoEventBus.NewEventStore(&disp, 1<<16, GoEventBus.DropOldest)
store.RegisterBatch("metrics.recorded", 50, func(ctx context.Context, evs []GoEventBus.Event) ([]GoEventBus.Result, error) {
rows := make([]MetricRow, len(evs))
for i, ev := range evs {
rows[i] = ev.Data.(MetricRow)
}
return nil, db.BulkInsert(ctx, rows)
})If more events arrive than the configured size, the handler is called multiple times — once per full chunk, then once for the remainder.
// 7 events, size=3 → called with [3 events], [3 events], [1 event]
store.RegisterBatch("tick", 3, handler)Return a []Result aligned with the input slice to pass per-event results to OnAfter hooks. A nil or shorter slice is fine — missing positions are treated as zero Result values.
store.RegisterBatch("order.placed", 100, func(ctx context.Context, evs []GoEventBus.Event) ([]GoEventBus.Result, error) {
results := make([]GoEventBus.Result, len(evs))
for i, ev := range evs {
results[i] = GoEventBus.Result{Message: "processed"}
}
return results, nil
})Multiple batch handlers can be registered for the same projection. Each receives the full chunk independently.
store.RegisterBatch("order.placed", 50, writeToDatabase)
store.RegisterBatch("order.placed", 50, pushToAnalytics)Regular per-event handlers and batch handlers can coexist on the same projection. Both fire on each Publish call.
disp.Register("order.placed", auditLogger) // called once per event
store.RegisterBatch("order.placed", 50, bulkWriter) // called once per chunkA batch handler returns a single error for the whole chunk. On error, every event in the failing chunk is sent to the DLQ and OnError fires once per event. Chunks that succeeded in the same Publish cycle are unaffected.
store.DLQ = GoEventBus.NewDeadLetterQueue()
store.RegisterBatch("write", 25, func(ctx context.Context, evs []GoEventBus.Event) ([]GoEventBus.Result, error) {
if err := db.BulkInsert(ctx, evs); err != nil {
return nil, err // all 25 events land in the DLQ
}
return nil, nil
})Panics in a batch handler are recovered and treated identically to a returned error — the chunk lands in the DLQ, OnError fires, and the worker pool continues running.
OnBefore, OnAfter, and OnError fire once per event even for batch handlers, keeping observability consistent regardless of handler type. Middleware is not applied to batch handlers (the signatures are incompatible); use hooks for cross-cutting concerns instead.
Batch handlers participate in the async worker pool when store.Async = true. Each chunk is dispatched as one work item.
store.Async = true
store.RegisterBatch("events", 100, bulkWriter)
for i := 0; i < 300; i++ {
_ = store.Subscribe(ctx, GoEventBus.Event{Projection: "events", Data: rows[i]})
}
store.Publish()
_ = store.Drain(context.Background()) // wait for all chunks to completeMiddleware wraps the handler chain and is applied in the order it is registered. Use it for logging, tracing, timeout injection, or retries.
// Logging middleware
store.Use(func(next GoEventBus.HandlerFunc) GoEventBus.HandlerFunc {
return func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) {
log.Printf("handling %v", ev.Projection)
res, err := next(ctx, ev)
log.Printf("done %v err=%v", ev.Projection, err)
return res, err
}
})
// Timeout middleware
store.Use(func(next GoEventBus.HandlerFunc) GoEventBus.HandlerFunc {
return func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) {
ctx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
defer cancel()
return next(ctx, ev)
}
})Each handler in a fan-out gets its own independent copy of the middleware chain.
Hooks fire outside the middleware chain and are useful for metrics, structured logging, and alerting without polluting handler code. For batch handlers, hooks fire once per event in the chunk rather than once per chunk.
store.OnBefore(func(ctx context.Context, ev GoEventBus.Event) {
// fires before each handler invocation
})
store.OnAfter(func(ctx context.Context, ev GoEventBus.Event, res GoEventBus.Result, err error) {
// fires after each handler invocation, whether it succeeded or failed
})
store.OnError(func(ctx context.Context, ev GoEventBus.Event, err error) {
// fires only when a handler returns a non-nil error
log.Printf("handler error for %v: %v", ev.Projection, err)
})Choose how Subscribe behaves when the ring buffer is full.
| Policy | Behaviour | Best for |
|---|---|---|
DropOldest |
Discards the oldest event to make room | Low-latency pipelines where fresh data matters more than completeness |
Block |
Blocks the caller (respecting its context) until space is available | Workloads that cannot afford to lose events |
ReturnError |
Returns ErrBufferFull immediately |
Callers that manage their own retry or back-pressure logic |
store := GoEventBus.NewEventStore(&disp, 1<<14, GoEventBus.Block)
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
if err := store.Subscribe(ctx, ev); errors.Is(err, context.DeadlineExceeded) {
// buffer stayed full for 50 ms — handle accordingly
}Set store.Async = true before the first Publish call. GoEventBus starts a worker pool sized to runtime.NumCPU() and routes every handler invocation through it.
store := GoEventBus.NewEventStore(&disp, 1<<16, GoEventBus.DropOldest)
store.Async = true
store.Publish() // dispatches handlers to the worker pool
// Wait for all in-flight handlers before shutting down.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := store.Drain(ctx); err != nil {
log.Printf("drain timeout: %v", err)
}Drain (or equivalently Close) must be called once when the store is no longer needed so worker goroutines are cleaned up.
Attach a DeadLetterQueue to an EventStore and every event whose handler returns an error or panics is captured there instead of being silently swallowed into the error counter. Panics are wrapped as errors with full errors.Is / errors.As support when the original panic value was itself an error.
store := GoEventBus.NewEventStore(&disp, 1<<16, GoEventBus.DropOldest)
store.DLQ = GoEventBus.NewDeadLetterQueue()store.Publish()
for _, dl := range store.DLQ.Entries() {
fmt.Printf("event=%s err=%v attempts=%d failed=%s\n",
dl.Event.ID, dl.Err, dl.Attempts, dl.FailedAt.Format(time.RFC3339))
}Entries() returns a snapshot copy — mutations to the slice do not affect the queue.
failed := store.DLQ.Drain() // empties the queue and returns all entriesReplay re-subscribes all entries and calls Publish once after. Each entry has its Attempts counter incremented. Entries that fail to re-enqueue (e.g. buffer full) are kept in the queue and the first Subscribe error is returned.
if err := store.DLQ.Replay(ctx, store); err != nil {
log.Printf("replay partially failed: %v", err)
}A common pattern is to gate replays on Attempts to avoid infinite retry loops:
const maxAttempts = 3
for _, dl := range store.DLQ.Drain() {
if dl.Attempts >= maxAttempts {
log.Printf("dropping %s after %d attempts: %v", dl.Event.ID, dl.Attempts, dl.Err)
continue
}
_ = store.Subscribe(ctx, dl.Event)
}
store.Publish()Each handler in a fan-out is independent. If handler A fails and handler B succeeds, only A's invocation produces a dead letter — B is unaffected. The same rule applies to batch handlers: only the failing chunk's events land in the DLQ.
disp.Register("order.placed",
auditLogger, // fails -> one dead letter per event
invoiceHandler, // succeeds -> no dead letters
)The DLQ also catches handler panics. In sync and async modes alike, the panic is converted to an error, routed to the DLQ and error hooks, and execution continues. The worker pool is never killed by a misbehaving handler.
disp.Register("risky", func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) {
panic("something went wrong") // caught — not fatal
})
store.DLQ = GoEventBus.NewDeadLetterQueue()
store.Publish()
dl := store.DLQ.Entries()[0]
fmt.Println(dl.Err) // "handler panic: something went wrong"Group multiple events into a single commit. If any handler returns an error, Commit stops and returns that error. Call Rollback to discard buffered events without publishing.
type OrderPayload struct{ OrderID string }
type InvoicePayload struct{ OrderID string }
disp := GoEventBus.Dispatcher{}
disp.Register("order.created", handleOrder)
disp.Register("invoice.issued", handleInvoice)
store := GoEventBus.NewEventStore(&disp, 1<<16, GoEventBus.DropOldest)
tx := store.BeginTransaction()
tx.Publish(GoEventBus.Event{
ID: "ev-1",
Projection: "order.created",
Data: OrderPayload{OrderID: "ord-99"},
})
tx.Publish(GoEventBus.Event{
ID: "ev-2",
Projection: "invoice.issued",
Data: InvoicePayload{OrderID: "ord-99"},
})
if err := tx.Commit(context.Background()); err != nil {
tx.Rollback()
log.Fatal("transaction failed:", err)
}Fire an event at a specific point in time or after a duration. The returned *time.Timer can be stopped to cancel the event before it fires.
// Fire at an absolute time.
timer := store.Schedule(ctx, time.Now().Add(10*time.Second), ev)
if timer != nil {
timer.Stop() // cancel before it fires
}
// Fire after a duration.
timer = store.ScheduleAfter(ctx, 30*time.Second, ev)If the target time is in the past, or the duration is zero or negative, the event is executed synchronously and immediately without entering the ring buffer, and nil is returned.
type Dispatcher map[interface{}][]HandlerFunc
func (d Dispatcher) Register(projection interface{}, handlers ...HandlerFunc)Register appends one or more handlers for the given projection. Calling it multiple times on the same key accumulates handlers in call order.
type Event struct {
ID string
Projection interface{} // key used to look up handlers
Data any // type-safe payload (preferred)
Args map[string]any // legacy payload (deprecated)
}type HandlerFunc func(context.Context, Event) (Result, error)
type Middleware func(HandlerFunc) HandlerFunctype BatchHandlerFunc func(context.Context, []Event) ([]Result, error)Receives a slice of events collected during a Publish cycle. Returns one Result per input event (may be nil or shorter — missing entries default to zero Result) and a single error covering the whole batch. Middleware is not applied; use lifecycle hooks for cross-cutting concerns.
type Result struct {
Message string
}const (
DropOldest OverrunPolicy = iota
Block
ReturnError
)
var ErrBufferFull = errors.New("goeventbus: buffer is full")func NewEventStore(dispatcher *Dispatcher, bufferSize uint64, policy OverrunPolicy) *EventStorePanics if dispatcher is nil or bufferSize is not a non-zero power of two.
Fields
| Field | Type | Description |
|---|---|---|
Async |
bool |
Enable async worker pool dispatch |
DLQ |
*DeadLetterQueue |
Optional dead letter queue; nil by default |
| Method | Description |
|---|---|
Subscribe(ctx, Event) error |
Enqueue an event; applies back-pressure per OverrunPolicy |
Publish() |
Dispatch all pending events to their handlers |
RegisterBatch(projection, size, BatchHandlerFunc) |
Register a batch handler; events are delivered in chunks of up to size |
Use(Middleware) |
Append middleware to the chain (applied to per-event handlers only) |
OnBefore(BeforeHook) |
Register a hook that runs before each handler invocation |
OnAfter(AfterHook) |
Register a hook that runs after each handler invocation |
OnError(ErrorHook) |
Register a hook that runs when a handler errors |
Drain(ctx) error |
Wait for all async handlers; shuts down the worker pool |
Close(ctx) error |
Alias for Drain |
Metrics() (published, processed, errors uint64) |
Snapshot event counters; processed counts individual events, not batch calls |
Schedule(ctx, time.Time, Event) *time.Timer |
Fire an event at a time |
ScheduleAfter(ctx, time.Duration, Event) *time.Timer |
Fire an event after a duration |
BeginTransaction() *Transaction |
Returns a *Transaction scoped to this store |
type DeadLetter struct {
Event Event
Err error // handler error, or a wrapped panic value
FailedAt time.Time
Attempts int // 1 on first failure; incremented on each Replay call
}func NewDeadLetterQueue() *DeadLetterQueue| Method | Description |
|---|---|
Len() int |
Number of entries currently in the queue |
Entries() []DeadLetter |
Snapshot copy — safe to iterate without holding a lock |
Drain() []DeadLetter |
Remove and return all entries, leaving the queue empty |
Replay(ctx, store) error |
Re-subscribe all entries, call Publish, keep failures in queue |
Attach to a store via store.DLQ = GoEventBus.NewDeadLetterQueue(). When DLQ is nil (default) behaviour is unchanged.
| Method | Description |
|---|---|
Publish(Event) |
Buffer an event within the transaction |
Commit(ctx) error |
Enqueue and process all buffered events; stops on first error |
Rollback() |
Discard all buffered events |
Run on Apple M-series with go test -bench . -benchtime=3s:
| Benchmark | Iterations | ns/op |
|---|---|---|
BenchmarkSubscribe |
27,080,376 | 40.37 |
BenchmarkSubscribeParallel |
26,418,999 | 38.42 |
BenchmarkPublish |
295,661,464 | 3.91 |
BenchmarkPublishAfterPrefill |
252,943,526 | 4.59 |
BenchmarkSubscribeLargePayload |
1,613,017 | 771.5 |
BenchmarkPublishLargePayload |
296,434,225 | 3.91 |
BenchmarkEventStore_Async |
2,816,988 | 436.5 |
BenchmarkEventStore_Sync |
2,638,519 | 428.5 |
BenchmarkFastHTTPSync |
6,275,112 | 163.8 |
BenchmarkFastHTTPAsync |
1,954,884 | 662.0 |
BenchmarkFastHTTPParallel |
4,489,274 | 262.3 |
Contributions, issues, and feature requests are welcome. See the issues page to get started.
Distributed under the MIT License. See LICENSE for details.
