From 6616d8483e1f550a353b8e9256a298b094c32c69 Mon Sep 17 00:00:00 2001 From: "Mark G." Date: Thu, 21 Nov 2024 11:33:21 -0800 Subject: [PATCH 1/3] ENG-2689 - Add retry with backoff * This adds optional retry policies and logic to `Consume()` and `ConsumeOnce()` --- README.md | 26 +++++++++++++++++++ go.mod | 3 +-- go.sum | 14 ---------- rabbit.go | 76 +++++++++++++++++++++++++++++++++++++++++++++++-------- retry.go | 69 ++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 161 insertions(+), 27 deletions(-) create mode 100644 retry.go diff --git a/README.md b/README.md index 16494c4..82019de 100644 --- a/README.md +++ b/README.md @@ -81,3 +81,29 @@ func main() { cancel() } ``` + +### Retry Policies + +You can specify a retry policy for the consumer. +A pre-made ACK retry policy is available in the library at `rp := rabbit.DefaultAckPolicy()`. This policy will retry +acknowledgement unlimited times + +You can also create a new policy using the `rabbit.NewRetryPolicy(maxAttempts, time.Millisecond * 200, time.Second, ...)` function. + +The retry policy can then be passed to consume functions as an argument: + +```go +consumeFunc := func(msg amqp.Delivery) error { + fmt.Printf("Received new message: %+v\n", msg) + + numReceived++ + + if numReceived > 1 { + r.Stop() + } + } + +rp := rabbit.DefaultAckPolicy() + +r.Consume(ctx, nil, consumeFunc, rp) +``` \ No newline at end of file diff --git a/go.mod b/go.mod index de96be8..e931b18 100644 --- a/go.mod +++ b/go.mod @@ -8,9 +8,8 @@ require ( github.com/onsi/gomega v1.10.2 github.com/pkg/errors v0.9.1 github.com/rabbitmq/amqp091-go v1.10.0 - github.com/relistan/go-director v0.0.0-20240410125439-78829fce487d github.com/satori/go.uuid v1.2.0 - github.com/sirupsen/logrus v1.9.3 // indirect + github.com/sirupsen/logrus v1.9.3 google.golang.org/protobuf v1.24.0 // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect ) diff --git a/go.sum b/go.sum index 03ddf47..fb4a497 100644 --- a/go.sum +++ b/go.sum @@ -27,11 +27,7 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= -github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= -github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= -github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= @@ -55,18 +51,10 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= -github.com/relistan/go-director v0.0.0-20200406104025-dbbf5d95248d h1:NWE6gufaNLgqs6VUzsqXkogQkMEcZxQjdRTSbf79NCA= -github.com/relistan/go-director v0.0.0-20200406104025-dbbf5d95248d/go.mod h1:zxI04y3OTmbrx/ef0ahmkEy9/eBLLseHAjy6M5iKsws= -github.com/relistan/go-director v0.0.0-20240410125439-78829fce487d h1:nNavMkv4sC6aJIvu1K6sq3YcQZLuA2C7VH3Nb/2IaC8= -github.com/relistan/go-director v0.0.0-20240410125439-78829fce487d/go.mod h1:zxI04y3OTmbrx/ef0ahmkEy9/eBLLseHAjy6M5iKsws= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= -github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= -github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= -github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= -github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -98,7 +86,6 @@ golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200519105757-fe76b779f299 h1:DYfZAGf2WMFjMxbgTjaC+2HC7NkNAQs+6Q8b9WEB/F4= golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -109,7 +96,6 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= -golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/rabbit.go b/rabbit.go index 3ffb9a7..f8dd3e6 100644 --- a/rabbit.go +++ b/rabbit.go @@ -366,7 +366,12 @@ func validMode(mode Mode) error { // If the server goes away, `Consume` will automatically attempt to reconnect. // Subsequent reconnect attempts will sleep/wait for `DefaultRetryReconnectSec` // between attempts. -func (r *Rabbit) Consume(ctx context.Context, errChan chan *ConsumeError, f func(msg amqp.Delivery) error) { +func (r *Rabbit) Consume(ctx context.Context, errChan chan *ConsumeError, f func(msg amqp.Delivery) error, rp ...*RetryPolicy) { + var retry *RetryPolicy + if len(rp) > 0 { + retry = rp[0] + } + if r.shutdown { r.log.Error(ErrShutdown) return @@ -386,6 +391,8 @@ func (r *Rabbit) Consume(ctx context.Context, errChan chan *ConsumeError, f func r.log.Debug("waiting for messages from rabbit ...") + var retries int + MAIN: for { select { @@ -403,11 +410,34 @@ MAIN: continue } - if err := f(msg); err != nil { - r.writeError(errChan, &ConsumeError{ - Message: &msg, - Error: fmt.Errorf("error during consume: %s", err), - }) + RETRY: + for { + if err := f(msg); err != nil { + if retry != nil && retry.ShouldRetry() { + dur := retry.Duration(retries) + + r.writeError(errChan, &ConsumeError{ + Message: &msg, + Error: fmt.Errorf("[Retry %s] error during consume: %s", retry.AttemptCount(), err), + }) + + time.Sleep(dur) + retries++ + continue RETRY + } + + r.writeError(errChan, &ConsumeError{ + Message: &msg, + Error: fmt.Errorf("error during consume: %s", err), + }) + + // We're not retrying here, break out of retry loop and return + // control flow to MAIN's loop + break + } + + // Exit retry loop on success + break } case <-ctx.Done(): r.log.Warn("stopped via context") @@ -450,7 +480,12 @@ func (r *Rabbit) writeError(errChan chan *ConsumeError, err *ConsumeError) { // // Same as with `Consume()`, you can pass in a context to cancel `ConsumeOnce()` // or run `Stop()`. -func (r *Rabbit) ConsumeOnce(ctx context.Context, runFunc func(msg amqp.Delivery) error) error { +func (r *Rabbit) ConsumeOnce(ctx context.Context, runFunc func(msg amqp.Delivery) error, rp ...*RetryPolicy) error { + var retry *RetryPolicy + if len(rp) > 0 { + retry = rp[0] + } + if r.shutdown { return ErrShutdown } @@ -465,6 +500,8 @@ func (r *Rabbit) ConsumeOnce(ctx context.Context, runFunc func(msg amqp.Delivery r.log.Debug("waiting for a single message from rabbit ...") + var retries int + select { case msg := <-r.delivery(): if msg.Acknowledger == nil { @@ -475,14 +512,31 @@ func (r *Rabbit) ConsumeOnce(ctx context.Context, runFunc func(msg amqp.Delivery return errors.New("detected nil acknowledger - sent signal to reconnect to RabbitMQ") } - if err := runFunc(msg); err != nil { - return err + RETRY: + for { + if err := runFunc(msg); err != nil { + if retry != nil && retry.ShouldRetry() { + dur := retry.Duration(retries) + + r.log.Warnf("[Retry %s] error during consume: %s", retry.AttemptCount(), err) + + time.Sleep(dur) + retries++ + continue RETRY + } + + r.log.Debug("ConsumeOnce finished - exiting") + return err + } + + break } case <-ctx.Done(): - r.log.Warn("stopped via context") + r.log.Warn("ConsumeOnce stopped via context") + return nil case <-r.ctx.Done(): - r.log.Warn("stopped via Stop()") + r.log.Warn("ConsumeOnce stopped via Stop()") return nil } diff --git a/retry.go b/retry.go new file mode 100644 index 0000000..de38425 --- /dev/null +++ b/retry.go @@ -0,0 +1,69 @@ +package rabbit + +import ( + "fmt" + "time" +) + +const RetryUnlimited = -1 + +type RetryPolicy struct { + DelayMS []time.Duration + MaxAttempts int + RetryCount int +} + +// DefaultAckPolicy is the default backoff policy for acknowledging messages. +func DefaultAckPolicy() *RetryPolicy { + return &RetryPolicy{ + DelayMS: []time.Duration{ + 50 * time.Millisecond, + 100 * time.Millisecond, + 500 * time.Millisecond, + }, + MaxAttempts: RetryUnlimited, + } +} + +// NewRetryPolicy returns a new backoff policy with the given delays. +func NewRetryPolicy(maxAttempts int, t ...time.Duration) *RetryPolicy { + times := make([]time.Duration, 0) + for _, d := range t { + times = append(times, d) + } + return &RetryPolicy{ + DelayMS: times, + MaxAttempts: maxAttempts, + } +} + +// Duration returns the duration for the given attempt number +// If the attempt number exceeds the number of delays, the last delay is returned +func (b *RetryPolicy) Duration(n int) time.Duration { + b.RetryCount++ + if n >= len(b.DelayMS) { + n = len(b.DelayMS) - 1 + } + + return b.DelayMS[n] +} + +// ShouldRetry returns true if the current retry count is less than the max attempts +func (b *RetryPolicy) ShouldRetry() bool { + return b.MaxAttempts == RetryUnlimited || b.RetryCount < b.MaxAttempts +} + +// Reset resets the current retry count to 0 +func (b *RetryPolicy) Reset() { + b.RetryCount = 0 +} + +// AttemptCount returns the current attempt count as a string, for use with log messages +func (b *RetryPolicy) AttemptCount() string { + maxAttempts := fmt.Sprintf("%d", b.MaxAttempts) + if b.MaxAttempts == RetryUnlimited { + maxAttempts = "Unlimited" + } + + return fmt.Sprintf("%d/%s", b.RetryCount+1, maxAttempts) +} From 03195901c7faa2a6046005daa4e3f67c89f39bbc Mon Sep 17 00:00:00 2001 From: "Mark G." Date: Thu, 21 Nov 2024 11:40:21 -0800 Subject: [PATCH 2/3] Fix docker compose call in action workflows --- .github/workflows/main-test.yaml | 2 +- .github/workflows/pr-test.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/main-test.yaml b/.github/workflows/main-test.yaml index 00cfd2d..30948e6 100644 --- a/.github/workflows/main-test.yaml +++ b/.github/workflows/main-test.yaml @@ -11,7 +11,7 @@ jobs: steps: - uses: actions/checkout@main - name: Start up dependencies - run: docker-compose up -d + run: docker compose up -d - name: Wait for dependencies to start up uses: jakejarvis/wait-action@master with: diff --git a/.github/workflows/pr-test.yaml b/.github/workflows/pr-test.yaml index 20c302d..391409c 100644 --- a/.github/workflows/pr-test.yaml +++ b/.github/workflows/pr-test.yaml @@ -7,7 +7,7 @@ jobs: steps: - uses: actions/checkout@v2 - name: Start up dependencies - run: docker-compose up -d + run: docker compose up -d - name: Wait for dependencies to start up uses: jakejarvis/wait-action@master with: From 71d6f0eb7c0e40a3b33309934dcbe337630885dc Mon Sep 17 00:00:00 2001 From: "Mark G." Date: Thu, 21 Nov 2024 13:39:26 -0800 Subject: [PATCH 3/3] Adding comment and fixing logs --- rabbit.go | 12 ++++++------ retry.go | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/rabbit.go b/rabbit.go index f8dd3e6..de67d5b 100644 --- a/rabbit.go +++ b/rabbit.go @@ -57,8 +57,8 @@ var ( // IRabbit is the interface that the `rabbit` library implements. It's here as // convenience. type IRabbit interface { - Consume(ctx context.Context, errChan chan *ConsumeError, f func(msg amqp.Delivery) error) - ConsumeOnce(ctx context.Context, runFunc func(msg amqp.Delivery) error) error + Consume(ctx context.Context, errChan chan *ConsumeError, f func(msg amqp.Delivery) error, rp ...*RetryPolicy) + ConsumeOnce(ctx context.Context, runFunc func(msg amqp.Delivery) error, rp ...*RetryPolicy) error Publish(ctx context.Context, routingKey string, payload []byte, headers ...amqp.Table) error Stop(timeout ...time.Duration) error Close() error @@ -440,10 +440,10 @@ MAIN: break } case <-ctx.Done(): - r.log.Warn("stopped via context") + r.log.Warn("Consume stopped via local context") break MAIN case <-r.ctx.Done(): - r.log.Warn("stopped via Stop()") + r.log.Warn("Consume stopped via global context") break MAIN } } @@ -532,11 +532,11 @@ func (r *Rabbit) ConsumeOnce(ctx context.Context, runFunc func(msg amqp.Delivery break } case <-ctx.Done(): - r.log.Warn("ConsumeOnce stopped via context") + r.log.Warn("ConsumeOnce stopped via local context") return nil case <-r.ctx.Done(): - r.log.Warn("ConsumeOnce stopped via Stop()") + r.log.Warn("ConsumeOnce stopped via global context") return nil } diff --git a/retry.go b/retry.go index de38425..6404949 100644 --- a/retry.go +++ b/retry.go @@ -10,7 +10,7 @@ const RetryUnlimited = -1 type RetryPolicy struct { DelayMS []time.Duration MaxAttempts int - RetryCount int + RetryCount int // Default: unlimited (-1) } // DefaultAckPolicy is the default backoff policy for acknowledging messages.