-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsyncgroup.go
More file actions
208 lines (172 loc) · 6.25 KB
/
syncgroup.go
File metadata and controls
208 lines (172 loc) · 6.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
// Package syncgroup package that contains an implementation of an abstract
// synchronisation mechanism - synchronisation group.
// The main idea is to have an ability to run independent tasks in separate goroutines which way return errors.
// A user can wait until all goroutines finish running and collect all occurred errors.
//
// The design is similar to errgroup (https://godoc.org/golang.org/x/sync/errgroup),
// but it does not cancel the context of the goroutines if any of them returns an error.
package syncgroup
import (
"errors"
"fmt"
"runtime/debug"
"sync"
"sync/atomic"
)
// ErrPanicRecovered is a special error that is returned when a panic is recovered from a goroutine.
// It is used to wrap the original panic error and stack trace.
// You can use errors.Is(err, ErrPanicRecovered) to check if the error was caused by a panic.
// If the panic value was an error, you can use errors.Unwrap(err) to get the original error.
var ErrPanicRecovered = errors.New("recovered from panic")
// SyncGroup is the main abstraction for working with syncgroups.
// A Sync Group is a collection of goroutines that can be waited for.
//
// Additionally, SyncGroup collects all errors returned by goroutines,
// handles panics and provides a way to limit the number of concurrent goroutines.
// It has four main methods: SetLimit(), Go(), TryGo() and Wait()
//
// SetLimit() sets a limit for the number of concurrent goroutines.
// Using SetLimit() is optional, be default there is no limit.
// Inside it uses a semaphore pattern to limit the number concurrent of goroutines.
//
// Go() spawns a new goroutine, which may return an error.
// When using SetLimit(), Go() will wait until a slot in the semaphore is available.
// The returned error will be saved and returned by Wait() method.
//
// TryGo() is similar to Go(), but it runs a goroutine only if there is a slot in the semaphore.
// When using SetLimit(), TryGo() will return false and not block, if there are no available slots.
// If there are available slots, it will run the goroutine and return true.
// If goroutine is run, the returned error will be saved and returned by Wait() method.
//
// Wait() waits until all spawned goroutines finish and returns a wrapper for a slice of errors.
// If there was no error, Wait() would return nil,
// otherwise a non nil error, which can be unwrapped to access all errors.
type SyncGroup struct {
wg sync.WaitGroup
semaphore chan semaphoreToken
finishedChan chan []error
errorChan chan error
listeningStarted atomic.Bool
listeningRoutineStarter *sync.Once
}
type semaphoreToken struct{}
// New is the default constructor for SyncGroup.
func New() *SyncGroup {
grp := &SyncGroup{
wg: sync.WaitGroup{},
semaphore: nil,
finishedChan: make(chan []error),
errorChan: make(chan error),
listeningStarted: atomic.Bool{},
listeningRoutineStarter: new(sync.Once),
}
return grp
}
// Go spawns given function in a new goroutine.
// If group has a limit of concurrent goroutines, goroutine execution will be blocked until a slot is available.
// The returned error will be saved and returned wrapped by Wait() method.
func (g *SyncGroup) Go(fnc func() error) {
g.startListening()
g.wg.Add(1)
go func() {
defer g.done()
// blocks until semaphore slot is acquired
if g.semaphore != nil {
g.semaphore <- semaphoreToken{}
}
err := fnc()
if err != nil {
g.errorChan <- err
}
}()
}
// TryGo is similar to Go, but it runs a goroutine only if there is a slot in the semaphore.
// If there are available slots, it will run the goroutine and return true.
// If goroutine is run, the returned error will be saved and returned by Wait() method.
func (g *SyncGroup) TryGo(fnc func() error) bool {
if g.semaphore != nil {
select {
case g.semaphore <- semaphoreToken{}:
default:
return false
}
}
g.startListening()
g.wg.Add(1)
go func() {
defer g.done()
err := fnc()
if err != nil {
g.errorChan <- err
}
}()
return true
}
// done is called in every goroutine spawned by SyncGroup in defer statement.
// Its job is to handle panics, release all resources and decrement the WaitGroup counter.
func (g *SyncGroup) done() {
if msg := recover(); msg != nil {
var err error
switch val := msg.(type) {
case error:
err = fmt.Errorf("%w: %w\n%s", ErrPanicRecovered, val, string(debug.Stack()))
default:
err = fmt.Errorf("%w: %v\n%s", ErrPanicRecovered, val, string(debug.Stack()))
}
g.errorChan <- err
}
if g.semaphore != nil {
<-g.semaphore
}
g.wg.Done()
}
// startListening starts a single goroutine that listens to all errors and accumulates them.
// It should make sure that the goroutine is started only once.
func (g *SyncGroup) startListening() {
g.listeningRoutineStarter.Do(func() {
g.listeningStarted.Store(true)
go g.listenToErrors()
})
}
// listenToErrors is a goroutine that listens to all errors and accumulates them.
// When all goroutines are finished, it sends the accumulated errors to the finishedChan.
func (g *SyncGroup) listenToErrors() {
defer func() {
close(g.finishedChan)
}()
var accumulatedErrors []error //nolint:prealloc // false positive
for err := range g.errorChan {
accumulatedErrors = append(accumulatedErrors, err)
}
g.finishedChan <- accumulatedErrors
}
// Wait waits until all spawned goroutines are finished and returns a wrapped error for all collected errors.
// The result is nil if none of the spawned goroutines returned an error
//
// If error is not nil, the result is guaranteed to implement `Unwrap() []errors` methods to access all errors.
// The error supports unwrapping with standard errors.Unwrap(), errors.Is() and errors.As() functions.
func (g *SyncGroup) Wait() error {
if !g.listeningStarted.Load() {
return nil
}
g.wg.Wait()
close(g.errorChan)
errs := <-g.finishedChan
if len(errs) == 0 {
return nil
}
return errors.Join(errs...)
}
// SetLimit sets a limit for the number of concurrent goroutines.
// Using SetLimit() is optional, be default there is no limit.
// Pass 0 to disable the limit.
func (g *SyncGroup) SetLimit(limit int) {
if g.listeningStarted.Load() {
panic("cannot set limit after starting goroutines")
}
if limit <= 0 {
g.semaphore = nil
return
}
g.semaphore = make(chan semaphoreToken, limit)
}