-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpool.go
More file actions
198 lines (168 loc) · 4.35 KB
/
pool.go
File metadata and controls
198 lines (168 loc) · 4.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
// Package gopool provides a strand-based goroutine pool.
//
// The core idea is simple:
//
// - A Pool owns a fixed number of strands.
// - Each strand executes tasks sequentially.
// - A Handle is a stable reference to a single strand.
// - Tasks scheduled through the same Handle never overlap
// and preserve submission order.
// - Tasks scheduled through different Handles may run concurrently.
//
// This model is useful for stateful, order-sensitive workloads where
// per-actor or per-key serialization is required without global locking.
package gopool
import (
"context"
"errors"
"runtime"
"sync"
"sync/atomic"
)
// ErrPoolClosed is returned when scheduling or shutdown is attempted
// after the pool has entered draining mode.
var ErrPoolClosed = errors.New("pool is shut down")
// Task represents a unit of work executed by the pool.
type Task func()
// PanicHandler is invoked when a task panics.
//
// The pool itself does not log, crash, or restart.
// If no PanicHandler is provided, panics are silently recovered.
type PanicHandler func(strand int, panicValue any, stack []byte)
// Pool is a fixed-size collection of execution strands.
//
// A Pool creates all goroutines eagerly at construction time.
// It does not spawn goroutines per task.
type Pool struct {
strands []strand
// rr is used to distribute handles across strands
// in a round-robin fashion.
rr atomic.Uint64
mu sync.RWMutex
wg sync.WaitGroup
draining atomic.Bool
onPanic PanicHandler
}
// strand owns a task queue and executes tasks sequentially.
type strand struct {
queue chan Task
}
// Handle represents a stable execution context.
//
// Tasks scheduled through the same Handle are guaranteed to execute
// sequentially and in submission order.
type Handle struct {
pool *Pool
idx uint64
}
// Option configures a Pool at construction time.
type Option func(*Pool)
// New constructs a new Pool.
//
// size controls the number of independent strands.
// queue controls the per-strand task queue capacity.
//
// Panics if size <= 0 or queue < 0.
func New(size, queue int, opts ...Option) *Pool {
if size <= 0 {
panic("pool size must be > 0")
}
if queue < 0 {
panic("queue size must be >= 0")
}
p := &Pool{
strands: make([]strand, size),
}
for _, opt := range opts {
opt(p)
}
for i := range p.strands {
strandIndex := i
s := &p.strands[i]
s.queue = make(chan Task, queue)
p.wg.Go(func() {
for task := range s.queue {
p.runTaskSafely(strandIndex, task)
}
})
}
return p
}
// WithPanicHandler installs a handler invoked when a task panics.
func WithPanicHandler(h PanicHandler) Option {
return func(p *Pool) {
p.onPanic = h
}
}
// NewHandle returns a Handle bound to a specific strand.
//
// Handles are distributed across strands in round-robin order.
// The returned Handle is cheap to copy.
func (p *Pool) NewHandle() Handle {
i := p.rr.Add(1) - 1
return Handle{
pool: p,
idx: i % uint64(len(p.strands)),
}
}
// Schedule submits a task for execution through the Handle.
//
// The task will execute sequentially with respect to other tasks
// scheduled through the same Handle.
//
// If the context is canceled before the task is enqueued,
// Schedule returns the context error.
func (h Handle) Schedule(ctx context.Context, task Task) error {
p := h.pool
if err := ctx.Err(); err != nil {
return err
}
p.mu.RLock()
defer p.mu.RUnlock()
if p.draining.Load() {
return ErrPoolClosed
}
select {
case p.strands[h.idx].queue <- task:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// Shutdown transitions the pool into draining mode and waits
// for all queued tasks to complete.
//
// New tasks are rejected once shutdown begins.
// Shutdown may be canceled via context.
func (p *Pool) Shutdown(ctx context.Context) error {
if !p.draining.CompareAndSwap(false, true) {
return ErrPoolClosed
}
p.mu.Lock()
for i := range p.strands {
close(p.strands[i].queue)
}
p.mu.Unlock()
done := make(chan struct{})
go func() {
p.wg.Wait()
close(done)
}()
select {
case <-done:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// runTaskSafely executes a task and recovers panics.
func (p *Pool) runTaskSafely(strand int, task Task) {
defer func() {
if r := recover(); r != nil && p.onPanic != nil {
buf := make([]byte, 65536)
n := runtime.Stack(buf, false)
p.onPanic(strand, r, buf[:n])
}
}()
task()
}