Skip to content

Commit 57df71f

Browse files
committed
refactor: replace Handler with Proc/Func pattern
Upgrade API to support both fire-and-forget (Proc) and request-response (Func) handler patterns with explicit Replier interface for response handling. - Rename Handler[T] to Proc[T] with Run method - Add Func[T, R] interface with Call method for request-response - Rename Parsed to Message with Replier instead of Complete callback - Add Replier interface with Reply/Fail methods - Update registration: RegisterProc, RegisterFunc, RegisterProcFunc, RegisterFuncFunc - Use sourceRef struct for more efficient adaptive ordering - Update all documentation to reflect new API
1 parent 63dccb9 commit 57df71f

4 files changed

Lines changed: 415 additions & 217 deletions

File tree

README.md

Lines changed: 91 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@ A flexible message routing framework for event-driven Go applications.
1212
- **Multi-Source Routing** — Route messages from webhooks, message queues, or custom formats through a single processor
1313
- **Discriminator Pattern** — Cheap detection before expensive parsing for O(1) hot-path matching
1414
- **Typed Handlers** — Automatic JSON unmarshaling and validation with generics
15+
- **Proc/Func Pattern** — Fire-and-forget procedures or request-response functions
16+
- **Replier Interface** — Built-in support for request-response transports (Step Functions, etc.)
1517
- **Pluggable Hooks** — Observability without coupling to specific logging or metrics systems
16-
- **Completion Callbacks** — Built-in support for async acknowledgment patterns
1718
- **Format Agnostic** — Inspector/View abstraction supports JSON, protobuf, or custom formats
1819
- **Zero Allocation Matching** — Uses gjson for efficient JSON field lookups
1920

@@ -44,11 +45,11 @@ type UserCreatedPayload struct {
4445
Email string `json:"email"`
4546
}
4647

47-
// Define your handler
48-
type UserCreatedHandler struct{}
48+
// Define a procedure (fire-and-forget)
49+
type UserCreatedProc struct{}
4950

50-
func (h *UserCreatedHandler) Handle(ctx context.Context, p UserCreatedPayload) error {
51-
log.Printf("User created: %s (%s)", p.UserID, p.Email)
51+
func (p *UserCreatedProc) Run(ctx context.Context, payload UserCreatedPayload) error {
52+
log.Printf("User created: %s (%s)", payload.UserID, payload.Email)
5253
return nil
5354
}
5455

@@ -61,15 +62,15 @@ func (s *mySource) Discriminator() dispatch.Discriminator {
6162
return dispatch.HasFields("type", "payload")
6263
}
6364

64-
func (s *mySource) Parse(raw []byte) (dispatch.Parsed, bool) {
65+
func (s *mySource) Parse(raw []byte) (dispatch.Message, error) {
6566
var env struct {
6667
Type string `json:"type"`
6768
Payload json.RawMessage `json:"payload"`
6869
}
69-
if err := json.Unmarshal(raw, &env); err != nil || env.Type == "" {
70-
return dispatch.Parsed{}, false
70+
if err := json.Unmarshal(raw, &env); err != nil {
71+
return dispatch.Message{}, err
7172
}
72-
return dispatch.Parsed{Key: env.Type, Payload: env.Payload}, true
73+
return dispatch.Message{Key: env.Type, Payload: env.Payload}, nil
7374
}
7475

7576
func main() {
@@ -79,8 +80,8 @@ func main() {
7980
// Add source
8081
r.AddSource(&mySource{})
8182

82-
// Register handler
83-
dispatch.Register(r, "user/created", &UserCreatedHandler{})
83+
// Register procedure
84+
dispatch.RegisterProc(r, "user/created", &UserCreatedProc{})
8485

8586
// Process a message
8687
msg := []byte(`{"type": "user/created", "payload": {"user_id": "123", "email": "test@example.com"}}`)
@@ -98,7 +99,25 @@ The package separates concerns into three layers:
9899
|-------|---------------|
99100
| **Sources** | Parse raw bytes, extract routing key + payload |
100101
| **Router** | Match keys to handlers, orchestrate dispatch flow |
101-
| **Handlers** | Pure business logic with typed payloads |
102+
| **Handlers** | Pure business logic with typed payloads (Proc or Func) |
103+
104+
### Proc vs Func
105+
106+
The package provides two handler patterns:
107+
108+
```go
109+
// Proc: Fire-and-forget (returns only error)
110+
type Proc[T any] interface {
111+
Run(ctx context.Context, payload T) error
112+
}
113+
114+
// Func: Request-response (returns result and error)
115+
type Func[T, R any] interface {
116+
Call(ctx context.Context, payload T) (R, error)
117+
}
118+
```
119+
120+
Use `Proc` for event handlers where you don't need to send a response. Use `Func` for request-response patterns like Step Functions tasks.
102121

103122
### Discriminator Pattern
104123

@@ -134,6 +153,66 @@ r.AddSource(apiSource)
134153
r.AddGroup(protoInspector, grpcSource, kafkaSource)
135154
```
136155

156+
## Handler Registration
157+
158+
```go
159+
// Register a procedure (fire-and-forget)
160+
dispatch.RegisterProc(r, "user/created", &UserCreatedProc{})
161+
162+
// Register a function (request-response)
163+
dispatch.RegisterFunc(r, "lookup-user", &LookupUserFunc{})
164+
165+
// Or use function adapters for simple cases
166+
dispatch.RegisterProcFunc(r, "ping", func(ctx context.Context, p PingPayload) error {
167+
return nil
168+
})
169+
170+
dispatch.RegisterFuncFunc(r, "echo", func(ctx context.Context, in Input) (*Output, error) {
171+
return &Output{Value: in.Value}, nil
172+
})
173+
```
174+
175+
## Replier Interface
176+
177+
For transports that require sending responses back (like Step Functions), sources can provide a Replier:
178+
179+
```go
180+
type Replier interface {
181+
Reply(ctx context.Context, result json.RawMessage) error
182+
Fail(ctx context.Context, err error) error
183+
}
184+
```
185+
186+
Example Step Functions source:
187+
188+
```go
189+
type sfnReplier struct {
190+
sfn SFNClient
191+
token string
192+
}
193+
194+
func (r *sfnReplier) Reply(ctx context.Context, result json.RawMessage) error {
195+
return r.sfn.SendTaskSuccess(ctx, r.token, result)
196+
}
197+
198+
func (r *sfnReplier) Fail(ctx context.Context, err error) error {
199+
return r.sfn.SendTaskFailure(ctx, r.token, err)
200+
}
201+
202+
func (s *sfnSource) Parse(raw []byte) (dispatch.Message, error) {
203+
// ... parse envelope ...
204+
return dispatch.Message{
205+
Key: taskType,
206+
Payload: payload,
207+
Replier: &sfnReplier{sfn: s.sfn, token: token},
208+
}, nil
209+
}
210+
```
211+
212+
When a Replier is present:
213+
- On success: router calls `Replier.Reply` with the marshaled result (or `{}` for Procs)
214+
- On error: router calls `Replier.Fail` with the error
215+
137216
## Discriminators
138217

139218
Composable predicates for source matching:
@@ -201,33 +280,6 @@ type OnSuccessHook interface {
201280
}
202281
```
203282

204-
## Completion Callbacks
205-
206-
For transports that require explicit acknowledgment after processing:
207-
208-
```go
209-
func (s *taskSource) Parse(raw []byte) (dispatch.Parsed, bool) {
210-
var env struct {
211-
TaskID string `json:"task_id"`
212-
Type string `json:"type"`
213-
Payload json.RawMessage `json:"payload"`
214-
}
215-
if err := json.Unmarshal(raw, &env); err != nil {
216-
return dispatch.Parsed{}, false
217-
}
218-
return dispatch.Parsed{
219-
Key: env.Type,
220-
Payload: env.Payload,
221-
Complete: func(ctx context.Context, err error) error {
222-
if err != nil {
223-
return s.taskQueue.ReportFailure(ctx, env.TaskID, err)
224-
}
225-
return s.taskQueue.ReportSuccess(ctx, env.TaskID)
226-
},
227-
}, true
228-
}
229-
```
230-
231283
## Validation
232284

233285
Payloads implementing `Validate() error` are automatically validated:

dispatch.go

Lines changed: 88 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -5,36 +5,72 @@ import (
55
"encoding/json"
66
)
77

8-
// Handler processes a typed payload. This is the primary interface to implement.
8+
// Proc (procedure) processes a message without returning a result.
9+
// Use this for fire-and-forget patterns like event handlers.
910
//
10-
// The type parameter T is the payload type that the handler expects. The router
11-
// automatically unmarshals the raw JSON payload to this type and validates it
12-
// if T implements validation.Validatable.
11+
// The type parameter T is the payload type. The router automatically
12+
// unmarshals JSON to T and validates it if T implements Validate() error.
1313
//
1414
// Example:
1515
//
16-
// type UserCreatedHandler struct {
16+
// type UserCreatedProc struct {
1717
// db *sql.DB
1818
// }
1919
//
20-
// func (h *UserCreatedHandler) Handle(ctx context.Context, p UserCreatedPayload) error {
21-
// _, err := h.db.ExecContext(ctx, "INSERT INTO users ...", p.UserID, p.Email)
20+
// func (p *UserCreatedProc) Run(ctx context.Context, payload UserCreatedPayload) error {
21+
// _, err := p.db.ExecContext(ctx, "INSERT INTO users ...", payload.UserID)
2222
// return err
2323
// }
24-
type Handler[T any] interface {
25-
Handle(ctx context.Context, payload T) error
24+
type Proc[T any] interface {
25+
Run(ctx context.Context, payload T) error
2626
}
2727

28-
// HandlerFunc is a function adapter for Handler. Use this for simple handlers
28+
// ProcFunc is a function adapter for Proc. Use for simple procedures
2929
// that don't need a struct:
3030
//
31-
// dispatch.RegisterFunc(r, "ping", func(ctx context.Context, p PingPayload) error {
31+
// dispatch.RegisterProc(r, "user/created", func(ctx context.Context, p Payload) error {
3232
// return nil
3333
// })
34-
type HandlerFunc[T any] func(ctx context.Context, payload T) error
34+
type ProcFunc[T any] func(ctx context.Context, payload T) error
3535

36-
// Handle implements the Handler interface.
37-
func (f HandlerFunc[T]) Handle(ctx context.Context, payload T) error {
36+
// Run implements the Proc interface.
37+
func (f ProcFunc[T]) Run(ctx context.Context, payload T) error {
38+
return f(ctx, payload)
39+
}
40+
41+
// Func (function) processes a message and returns a typed result.
42+
// Use this for request-response patterns like Step Functions tasks.
43+
//
44+
// The type parameters are: T for input payload, R for result.
45+
// The router automatically unmarshals T, validates it, and marshals R.
46+
//
47+
// Example:
48+
//
49+
// type LookupUserFunc struct {
50+
// client IdentityClient
51+
// }
52+
//
53+
// func (f *LookupUserFunc) Call(ctx context.Context, in LookupInput) (*LookupResult, error) {
54+
// user, err := f.client.GetUser(ctx, in.UserID)
55+
// if err != nil {
56+
// return nil, err
57+
// }
58+
// return &LookupResult{Email: user.Email}, nil
59+
// }
60+
type Func[T, R any] interface {
61+
Call(ctx context.Context, payload T) (R, error)
62+
}
63+
64+
// FuncFunc is a function adapter for Func. Use for simple functions
65+
// that don't need a struct:
66+
//
67+
// dispatch.RegisterFunc(r, "lookup-user", func(ctx context.Context, in Input) (*Result, error) {
68+
// return &Result{...}, nil
69+
// })
70+
type FuncFunc[T, R any] func(ctx context.Context, payload T) (R, error)
71+
72+
// Call implements the Func interface.
73+
func (f FuncFunc[T, R]) Call(ctx context.Context, payload T) (R, error) {
3874
return f(ctx, payload)
3975
}
4076

@@ -49,6 +85,7 @@ func (f HandlerFunc[T]) Handle(ctx context.Context, payload T) error {
4985
// - SNS notifications
5086
// - Step Functions task tokens
5187
// - Kinesis records
88+
// - SQS messages
5289
// - Custom formats
5390
//
5491
// Example:
@@ -61,18 +98,15 @@ func (f HandlerFunc[T]) Handle(ctx context.Context, payload T) error {
6198
// return dispatch.HasFields("type", "payload")
6299
// }
63100
//
64-
// func (s *mySource) Parse(raw []byte) (dispatch.Parsed, error) {
101+
// func (s *mySource) Parse(raw []byte) (dispatch.Message, error) {
65102
// var env struct {
66103
// Type string `json:"type"`
67104
// Payload json.RawMessage `json:"payload"`
68105
// }
69106
// if err := json.Unmarshal(raw, &env); err != nil {
70-
// return dispatch.Parsed{}, err
107+
// return dispatch.Message{}, err
71108
// }
72-
// if env.Type == "" {
73-
// return dispatch.Parsed{}, errors.New("missing type field")
74-
// }
75-
// return dispatch.Parsed{Key: env.Type, Payload: env.Payload}, nil
109+
// return dispatch.Message{Key: env.Type, Payload: env.Payload}, nil
76110
// }
77111
type Source interface {
78112
// Name returns the source identifier for logging and metrics.
@@ -84,39 +118,39 @@ type Source interface {
84118
Discriminator() Discriminator
85119

86120
// Parse attempts to parse raw bytes as this source's format.
87-
// Returns the parsed result and nil if successful, or an error describing
88-
// why parsing failed.
89-
Parse(raw []byte) (Parsed, error)
121+
// Returns the parsed message and nil if successful, or an error
122+
// describing why parsing failed.
123+
Parse(raw []byte) (Message, error)
90124
}
91125

92126
// SourceFunc creates a Source from a name, discriminator, and parse function.
93-
// Use this for simple sources that don't need a struct:
127+
// Use for simple sources that don't need a struct:
94128
//
95129
// r.AddSource(dispatch.SourceFunc(
96130
// "legacy",
97131
// dispatch.HasFields("type", "payload"),
98-
// func(raw []byte) (dispatch.Parsed, error) {
132+
// func(raw []byte) (dispatch.Message, error) {
99133
// // parse logic
100134
// },
101135
// ))
102-
func SourceFunc(name string, disc Discriminator, parse func([]byte) (Parsed, error)) Source {
136+
func SourceFunc(name string, disc Discriminator, parse func([]byte) (Message, error)) Source {
103137
return &sourceFunc{name: name, disc: disc, parse: parse}
104138
}
105139

106140
type sourceFunc struct {
107141
name string
108142
disc Discriminator
109-
parse func([]byte) (Parsed, error)
143+
parse func([]byte) (Message, error)
110144
}
111145

112-
func (s *sourceFunc) Name() string { return s.name }
113-
func (s *sourceFunc) Discriminator() Discriminator { return s.disc }
114-
func (s *sourceFunc) Parse(raw []byte) (Parsed, error) { return s.parse(raw) }
146+
func (s *sourceFunc) Name() string { return s.name }
147+
func (s *sourceFunc) Discriminator() Discriminator { return s.disc }
148+
func (s *sourceFunc) Parse(raw []byte) (Message, error) { return s.parse(raw) }
115149

116-
// Parsed contains the result of source parsing.
117-
type Parsed struct {
150+
// Message contains the result of source parsing.
151+
type Message struct {
118152
// Key is the routing key used to find the handler.
119-
// This is matched against keys passed to Register.
153+
// This is matched against keys passed to RegisterProc/RegisterFunc.
120154
Key string
121155

122156
// Version is the schema version of the payload, if available.
@@ -126,12 +160,26 @@ type Parsed struct {
126160
// Payload is the raw JSON to unmarshal into the handler's type.
127161
Payload json.RawMessage
128162

129-
// Complete is called after the handler finishes, regardless of success or failure.
130-
// Use this for transport-specific completion semantics like Step Functions
131-
// SendTaskSuccess/SendTaskFailure.
163+
// Replier handles sending responses back to the caller.
164+
// For fire-and-forget sources (EventBridge, SNS), this is nil.
165+
// For request-response sources (Step Functions), this sends results back.
166+
//
167+
// When Replier is set and a Func is registered:
168+
// - On success: router marshals result and calls Replier.Reply
169+
// - On error: router calls Replier.Fail
132170
//
133-
// If Complete is nil, no completion callback is made.
134-
// If Complete returns an error, that error is returned from Process.
135-
// The err parameter is the handler's error (or nil on success).
136-
Complete func(ctx context.Context, err error) error
171+
// When Replier is set and a Proc is registered:
172+
// - On success: router calls Replier.Reply with empty JSON ({})
173+
// - On error: router calls Replier.Fail
174+
Replier Replier
175+
}
176+
177+
// Replier sends responses back to the message originator.
178+
// Implement this for request-response transport patterns.
179+
type Replier interface {
180+
// Reply sends a successful response with the given JSON payload.
181+
Reply(ctx context.Context, result json.RawMessage) error
182+
183+
// Fail sends a failure response with the given error.
184+
Fail(ctx context.Context, err error) error
137185
}

0 commit comments

Comments
 (0)