From 30b14dbb48edcb2cecd04537aa2ca354c98d66af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20-=20=E3=82=A2=E3=83=AC=E3=83=83=E3=82=AF=E3=82=B9?= Date: Fri, 1 May 2026 23:27:38 +0200 Subject: [PATCH 01/17] feat: add WithDrain option for graceful Stop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Stop now waits for runFunc to return on its own when WithDrain is set, closing the channel returned by Stopping(ctx) instead of cancelling. - Falls back to ctx cancel + wait once the drain timeout elapses, so stuck runFuncs still surface as DeadlineExceeded. - No-op for existing callers — Stop semantics are unchanged when WithDrain is not used. - Strengthen the existing stop-timeout test to assert ctx-cancel behavior when WithDrain is absent. --- README.md | 23 +++++++ runnable.go | 28 +++++++++ runnable_test.go | 20 ++++-- with_drain.go | 39 ++++++++++++ with_drain_test.go | 154 +++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 259 insertions(+), 5 deletions(-) create mode 100644 with_drain.go create mode 100644 with_drain_test.go diff --git a/README.md b/README.md index 25d4511..0df47c2 100644 --- a/README.md +++ b/README.md @@ -121,6 +121,29 @@ if err != nil { } ``` +### Drain on Stop + +By default, `Stop` cancels the runFunc's context, which aborts in-flight +work. For workers that own external calls that must complete (e.g. an +HTTP request that creates remote state), use `WithDrain` to switch to +"signal-and-wait" semantics: `Stop` closes the channel returned by +`Stopping(ctx)` and waits up to the drain timeout for runFunc to return +on its own. If the timeout elapses, `Stop` falls back to cancelling the +context. + +```go +r := runnable.New(func(ctx context.Context) error { + for { + select { + case <-runnable.Stopping(ctx): + return nil // finish in-flight work, then return + case <-time.After(time.Second): + doWork(ctx) + } + } +}, runnable.WithDrain(10*time.Second)) +``` + ### Runnable Object ```go package main diff --git a/runnable.go b/runnable.go index ca62fef..192f8e7 100644 --- a/runnable.go +++ b/runnable.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "time" ) var ( @@ -28,6 +29,10 @@ type runnable struct { runCancel context.CancelFunc runStop chan bool + drainEnabled bool + drainTimeout time.Duration + stoppingChan chan struct{} + isRunning bool onStart func() onStop func() @@ -92,6 +97,10 @@ func (r *runnable) Run(ctx context.Context) error { r.runStop = make(chan bool) runCtx := r.runCtx + if r.drainEnabled { + r.stoppingChan = make(chan struct{}) + runCtx = context.WithValue(runCtx, stoppingKey{}, (<-chan struct{})(r.stoppingChan)) + } r.mu.Unlock() defer func() { @@ -134,8 +143,27 @@ func (r *runnable) Stop(ctx context.Context) error { } runStop := r.runStop + drainEnabled := r.drainEnabled + drainTimeout := r.drainTimeout + stoppingChan := r.stoppingChan r.mu.Unlock() + if drainEnabled { + close(stoppingChan) + drainCtx, drainCancel := context.WithTimeout(ctx, drainTimeout) + select { + case <-runStop: + drainCancel() + return nil + case <-ctx.Done(): + drainCancel() + return ctx.Err() + case <-drainCtx.Done(): + // Drain timed out; fall through to forced cancel. + } + drainCancel() + } + r.runCancel() select { diff --git a/runnable_test.go b/runnable_test.go index 045be86..1b7cf9d 100644 --- a/runnable_test.go +++ b/runnable_test.go @@ -81,18 +81,20 @@ func TestRunnable(t *testing.T) { assert.Equal(t, false, r.IsRunning()) }) - t.Run("runnable, stop timeout", func(t *testing.T) { + t.Run("runnable, stop timeout proves no drain behavior", func(t *testing.T) { started := make(chan struct{}) + ctxCancelObserved := make(chan struct{}) r := New(func(ctx context.Context) error { started <- struct{}{} + <-ctx.Done() + close(ctxCancelObserved) time.Sleep(2 * time.Second) - return nil + return ctx.Err() }) go func() { - err := r.Run(context.Background()) - require.NoError(t, err) + _ = r.Run(context.Background()) }() <-started @@ -101,7 +103,15 @@ func TestRunnable(t *testing.T) { stopCtx, stopCancel := context.WithTimeout(context.Background(), 1*time.Second) defer stopCancel() err := r.Stop(stopCtx) - require.Error(t, err, context.DeadlineExceeded) + require.ErrorIs(t, err, context.DeadlineExceeded) + + // Without WithDrain, Stop cancels runFunc's ctx immediately. + select { + case <-ctxCancelObserved: + case <-time.After(100 * time.Millisecond): + t.Fatal("expected runFunc's ctx to be cancelled when Stop fires without WithDrain") + } + assert.Equal(t, true, r.IsRunning()) }) } diff --git a/with_drain.go b/with_drain.go new file mode 100644 index 0000000..a409e13 --- /dev/null +++ b/with_drain.go @@ -0,0 +1,39 @@ +package runnable + +import ( + "context" + "time" +) + +type stoppingKey struct{} + +// Stopping returns a channel that closes when Stop has been called on +// the Runnable that owns ctx. runFunc implementations under WithDrain +// should select on it and return cleanly without cancelling in-flight +// work. Returns a nil channel when ctx is not associated with a +// drain-enabled Runnable — receiving from a nil channel blocks forever, +// which is the correct no-op for callers that opt into drain semantics +// only when configured. +func Stopping(ctx context.Context) <-chan struct{} { + ch, _ := ctx.Value(stoppingKey{}).(<-chan struct{}) + return ch +} + +type withDrain struct { + timeout time.Duration +} + +// WithDrain switches Stop's behavior from "cancel runFunc's ctx" to +// "close Stopping(ctx) and wait up to timeout for runFunc to return on +// its own." After the timeout elapses, Stop falls back to cancelling +// the ctx as before (preserving the existing escape hatch for stuck +// runFuncs). Use this when runFunc owns in-flight external calls that +// must drain rather than abort on shutdown. +func WithDrain(timeout time.Duration) Option { + return &withDrain{timeout: timeout} +} + +func (w *withDrain) apply(r *runnable) { + r.drainEnabled = true + r.drainTimeout = w.timeout +} diff --git a/with_drain_test.go b/with_drain_test.go new file mode 100644 index 0000000..8347bfb --- /dev/null +++ b/with_drain_test.go @@ -0,0 +1,154 @@ +package runnable + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestWithDrain(t *testing.T) { + t.Run("Stop waits for runFunc to return", func(t *testing.T) { + started := make(chan struct{}) + runFuncErr := make(chan error, 1) + + r := New(func(ctx context.Context) error { + close(started) + <-Stopping(ctx) + time.Sleep(200 * time.Millisecond) + // Return naturally without observing ctx cancellation. + if ctx.Err() != nil { + return ctx.Err() + } + return nil + }, WithDrain(1*time.Second)) + + go func() { + runFuncErr <- r.Run(context.Background()) + }() + + <-started + assert.True(t, r.IsRunning()) + + start := time.Now() + err := r.Stop(context.Background()) + elapsed := time.Since(start) + require.NoError(t, err) + assert.False(t, r.IsRunning()) + assert.GreaterOrEqual(t, elapsed, 200*time.Millisecond) + assert.Less(t, elapsed, 500*time.Millisecond) + + select { + case err := <-runFuncErr: + require.NoError(t, err, "runFunc should return naturally, not via ctx cancellation") + case <-time.After(time.Second): + t.Fatal("runFunc did not return") + } + }) + + t.Run("Stop falls back to cancel on drain timeout", func(t *testing.T) { + started := make(chan struct{}) + runFuncErr := make(chan error, 1) + + r := New(func(ctx context.Context) error { + close(started) + <-ctx.Done() + return ctx.Err() + }, WithDrain(100*time.Millisecond)) + + go func() { + runFuncErr <- r.Run(context.Background()) + }() + + <-started + + stopCtx, stopCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer stopCancel() + err := r.Stop(stopCtx) + require.NoError(t, err) + assert.False(t, r.IsRunning()) + + select { + case err := <-runFuncErr: + require.ErrorIs(t, err, context.Canceled) + case <-time.After(time.Second): + t.Fatal("runFunc did not return") + } + }) + + t.Run("Stop returns DeadlineExceeded when runFunc stuck", func(t *testing.T) { + started := make(chan struct{}) + release := make(chan struct{}) + + r := New(func(ctx context.Context) error { + close(started) + <-release + return nil + }, WithDrain(50*time.Millisecond)) + + go func() { + _ = r.Run(context.Background()) + }() + + <-started + + stopCtx, stopCancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer stopCancel() + err := r.Stop(stopCtx) + require.ErrorIs(t, err, context.DeadlineExceeded) + + // Release the runFunc so the goroutine can exit cleanly. + close(release) + }) + + t.Run("outer ctx cancel still propagates", func(t *testing.T) { + started := make(chan struct{}) + runFuncErr := make(chan error, 1) + + r := New(func(ctx context.Context) error { + close(started) + <-ctx.Done() + return ctx.Err() + }, WithDrain(1*time.Second)) + + ctx, cancel := context.WithCancel(context.Background()) + + go func() { + runFuncErr <- r.Run(ctx) + }() + + <-started + cancel() + + select { + case err := <-runFuncErr: + require.True(t, errors.Is(err, context.Canceled)) + case <-time.After(time.Second): + t.Fatal("runFunc did not exit on outer ctx cancel") + } + assert.False(t, r.IsRunning()) + }) + + t.Run("Stopping returns nil when not configured", func(t *testing.T) { + var observed bool + + r := New(func(ctx context.Context) error { + ch := Stopping(ctx) + // Selecting on nil channel blocks forever; default branch runs. + select { + case <-ch: + observed = true + default: + observed = ch == nil + } + return nil + }) + + err := r.Run(context.Background()) + require.NoError(t, err) + assert.True(t, observed, "Stopping(ctx) should be nil without WithDrain") + }) +} From c586f84e468dda334d13e7dd9707e1ee742cf670 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20-=20=E3=82=A2=E3=83=AC=E3=83=83=E3=82=AF=E3=82=B9?= Date: Fri, 1 May 2026 23:27:56 +0200 Subject: [PATCH 02/17] feat: add NewTicker primitive - NewTicker(interval, tick, opts...) wraps the standard time.NewTicker + select-loop pattern that periodic workers reimplement. - Composes with WithDrain (in-flight tick finishes; loop exits without firing a new tick) and WithRecoverer (panics in tick are caught). - Skip queued ticks accumulated while a slow tick was running once Stop has been called, so shutdown isn't delayed by stale t.C buffer. - Add examples/ticker-with-drain showing the full SIGTERM-safe shape (ticker + drain + recoverer + signal.NotifyContext). --- README.md | 26 ++++++ examples/ticker-with-drain/main.go | 71 +++++++++++++++ ticker.go | 50 +++++++++++ ticker_test.go | 135 +++++++++++++++++++++++++++++ 4 files changed, 282 insertions(+) create mode 100644 examples/ticker-with-drain/main.go create mode 100644 ticker.go create mode 100644 ticker_test.go diff --git a/README.md b/README.md index 0df47c2..40e8b8b 100644 --- a/README.md +++ b/README.md @@ -144,6 +144,32 @@ r := runnable.New(func(ctx context.Context) error { }, runnable.WithDrain(10*time.Second)) ``` +### Ticker + +`NewTicker` wraps the standard "select-loop on a `time.Ticker`" pattern. +It composes with `WithDrain` (let the current tick finish on Stop) and +`WithRecoverer` (catch panics in the tick body). + +```go +r := runnable.NewTicker( + 30*time.Second, + func(ctx context.Context) error { + return reconcile(ctx) + }, + runnable.WithDrain(10*time.Second), +) + +go r.Run(ctx) + +// On shutdown: +stopCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second) +defer cancel() +r.Stop(stopCtx) // drains the in-flight tick before returning +``` + +A full SIGTERM-safe shape (ticker + drain + recoverer + signal.NotifyContext) +lives in [`examples/ticker-with-drain`](examples/ticker-with-drain/main.go). + ### Runnable Object ```go package main diff --git a/examples/ticker-with-drain/main.go b/examples/ticker-with-drain/main.go new file mode 100644 index 0000000..4ea1e05 --- /dev/null +++ b/examples/ticker-with-drain/main.go @@ -0,0 +1,71 @@ +// Example: a periodic reconciler that drains gracefully on SIGTERM. +// +// Shape: NewTicker + WithDrain + WithRecoverer + signal.NotifyContext. +// Copy-paste this into a service's cmd/.../main.go and replace the +// reconcile body with your work. +package main + +import ( + "context" + "errors" + "fmt" + "os" + "os/signal" + "syscall" + "time" + + "github.com/0xsequence/runnable" +) + +type stderrReporter struct{} + +func (stderrReporter) Report(ctx context.Context, rec interface{}) { + fmt.Fprintf(os.Stderr, "panic recovered: %v\n", rec) +} + +type stderrPrinter struct{} + +func (stderrPrinter) Print(ctx context.Context, callstack []byte) { + os.Stderr.Write(callstack) +} + +func reconcile(ctx context.Context) error { + // Pretend this is an HTTP call to an external system that must not + // be aborted mid-request when SIGTERM fires. Under WithDrain, Stop + // waits for this tick to finish before tearing down the Runnable. + fmt.Println("tick: reconciling...") + time.Sleep(500 * time.Millisecond) + fmt.Println("tick: done") + return nil +} + +func main() { + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) + defer stop() + + rc := runnable.NewTicker( + 2*time.Second, + reconcile, + runnable.WithDrain(10*time.Second), + runnable.WithRecoverer(stderrReporter{}, stderrPrinter{}), + ) + + runErr := make(chan error, 1) + go func() { + runErr <- rc.Run(ctx) + }() + + <-ctx.Done() + fmt.Println("shutdown: draining in-flight tick...") + + stopCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + if err := rc.Stop(stopCtx); err != nil { + fmt.Fprintf(os.Stderr, "stop: %v\n", err) + } + + if err := <-runErr; err != nil && !errors.Is(err, context.Canceled) { + fmt.Fprintf(os.Stderr, "reconciler stopped: %v\n", err) + os.Exit(1) + } +} diff --git a/ticker.go b/ticker.go new file mode 100644 index 0000000..15589be --- /dev/null +++ b/ticker.go @@ -0,0 +1,50 @@ +package runnable + +import ( + "context" + "time" +) + +// NewTicker returns a Runnable that calls tick once per interval until +// ctx is cancelled, Stop is called, or tick returns a non-nil error. +// +// When the Runnable is configured WithDrain, an in-flight tick is +// allowed to finish before Run returns; the loop exits without firing +// a new tick. Without WithDrain, Stop cancels ctx and any in-flight +// tick observes the cancellation through ctx.Done(). +// +// tick should respect ctx.Done() for cancellation. To make in-flight +// external calls survive shutdown under WithDrain, tick should derive +// per-call timeouts via context.WithoutCancel(ctx) so its work is not +// affected by either Stop's drain signal or the Runnable's ctx cancel. +func NewTicker(interval time.Duration, tick func(ctx context.Context) error, opts ...Option) Runnable { + return New(func(ctx context.Context) error { + t := time.NewTicker(interval) + defer t.Stop() + + stopping := Stopping(ctx) // nil when WithDrain not used + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-stopping: + return nil + case <-t.C: + // Re-check shutdown signals before firing a new tick: + // when a tick takes longer than the interval, t.C may + // have ready ticks queued from before Stop was called. + select { + case <-ctx.Done(): + return ctx.Err() + case <-stopping: + return nil + default: + } + if err := tick(ctx); err != nil { + return err + } + } + } + }, opts...) +} diff --git a/ticker_test.go b/ticker_test.go new file mode 100644 index 0000000..d4ea362 --- /dev/null +++ b/ticker_test.go @@ -0,0 +1,135 @@ +package runnable + +import ( + "context" + "errors" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewTicker(t *testing.T) { + t.Run("fires on interval", func(t *testing.T) { + var count atomic.Int32 + + r := NewTicker(50*time.Millisecond, func(ctx context.Context) error { + count.Add(1) + return nil + }) + + go func() { + _ = r.Run(context.Background()) + }() + + time.Sleep(175 * time.Millisecond) + + err := r.Stop(context.Background()) + require.NoError(t, err) + + c := count.Load() + assert.GreaterOrEqual(t, c, int32(2)) + assert.LessOrEqual(t, c, int32(4)) + }) + + t.Run("Stop with drain allows current tick to finish", func(t *testing.T) { + tickStarted := make(chan struct{}, 1) + var completed atomic.Int32 + + r := NewTicker(20*time.Millisecond, func(ctx context.Context) error { + select { + case tickStarted <- struct{}{}: + default: + } + time.Sleep(200 * time.Millisecond) + completed.Add(1) + return nil + }, WithDrain(1*time.Second)) + + go func() { + _ = r.Run(context.Background()) + }() + + <-tickStarted + + start := time.Now() + err := r.Stop(context.Background()) + elapsed := time.Since(start) + require.NoError(t, err) + + assert.GreaterOrEqual(t, completed.Load(), int32(1), "in-flight tick should complete") + assert.Less(t, elapsed, 500*time.Millisecond) + }) + + t.Run("Stop without drain cancels in-flight tick", func(t *testing.T) { + tickStarted := make(chan struct{}, 1) + tickErr := make(chan error, 1) + + r := NewTicker(20*time.Millisecond, func(ctx context.Context) error { + select { + case tickStarted <- struct{}{}: + default: + } + <-ctx.Done() + tickErr <- ctx.Err() + return ctx.Err() + }) + + runDone := make(chan error, 1) + go func() { + runDone <- r.Run(context.Background()) + }() + + <-tickStarted + err := r.Stop(context.Background()) + require.NoError(t, err) + + select { + case e := <-tickErr: + require.ErrorIs(t, e, context.Canceled) + case <-time.After(time.Second): + t.Fatal("tick did not observe ctx cancellation") + } + + select { + case e := <-runDone: + require.ErrorIs(t, e, context.Canceled) + case <-time.After(time.Second): + t.Fatal("Run did not return") + } + }) + + t.Run("tick error aborts loop", func(t *testing.T) { + sentinel := errors.New("boom") + var count atomic.Int32 + + r := NewTicker(20*time.Millisecond, func(ctx context.Context) error { + if count.Add(1) == 2 { + return sentinel + } + return nil + }) + + err := r.Run(context.Background()) + require.ErrorIs(t, err, sentinel) + assert.Equal(t, int32(2), count.Load()) + }) + + t.Run("respects outer ctx cancel", func(t *testing.T) { + var count atomic.Int32 + + r := NewTicker(20*time.Millisecond, func(ctx context.Context) error { + count.Add(1) + return nil + }) + + ctx, cancel := context.WithTimeout(context.Background(), 75*time.Millisecond) + defer cancel() + + err := r.Run(ctx) + require.ErrorIs(t, err, context.DeadlineExceeded) + assert.False(t, r.IsRunning()) + }) +} From 03bfd97f99d4ff6a0c6a3c56d8a720ad4cc9e86a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20-=20=E3=82=A2=E3=83=AC=E3=83=83=E3=82=AF=E3=82=B9?= Date: Fri, 1 May 2026 23:29:10 +0200 Subject: [PATCH 03/17] chore: fix go vet and staticcheck warnings - examples/main.go: drop unreachable return after infinite loop; capture and defer the cancel from context.WithTimeout to fix the ctx leak. - runnable_test.go, runnable_group_test.go: replace single-case selects with plain channel receives (S1000). - with_recoverer_test.go: drop unreachable returns after panic. --- examples/main.go | 4 ++-- runnable_group_test.go | 4 +--- runnable_test.go | 7 ++----- with_recoverer_test.go | 3 --- 4 files changed, 5 insertions(+), 13 deletions(-) diff --git a/examples/main.go b/examples/main.go index e1e710a..b799017 100644 --- a/examples/main.go +++ b/examples/main.go @@ -33,7 +33,6 @@ func (m *Monitor) run(ctx context.Context) error { time.Sleep(1 * time.Second) fmt.Println("Monitoring...") } - return nil } func main() { @@ -92,7 +91,8 @@ func main() { // simple function with timeout fmt.Println("Simple function with timeout...") - ctxWithTimeout, _ := context.WithTimeout(context.Background(), 5*time.Second) + ctxWithTimeout, cancelTimeout := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelTimeout() err = runnable.New(func(ctx context.Context) error { fmt.Println("Starting...") defer fmt.Println("Stopping...") diff --git a/runnable_group_test.go b/runnable_group_test.go index 6efa922..fb70ed9 100644 --- a/runnable_group_test.go +++ b/runnable_group_test.go @@ -48,9 +48,7 @@ func TestNewGroup(t *testing.T) { // Create a new group group := NewGroup( New(func(ctx context.Context) error { - select { - case <-ctx.Done(): - } + <-ctx.Done() return nil }), New(func(ctx context.Context) error { diff --git a/runnable_test.go b/runnable_test.go index 1b7cf9d..ea1b2a8 100644 --- a/runnable_test.go +++ b/runnable_test.go @@ -43,11 +43,8 @@ func TestRunnable(t *testing.T) { r := New(func(ctx context.Context) error { started <- struct{}{} time.Sleep(2 * time.Second) - - select { - case <-ctx.Done(): - return ctx.Err() - } + <-ctx.Done() + return ctx.Err() }) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) diff --git a/with_recoverer_test.go b/with_recoverer_test.go index e7cb5dd..1cf6b59 100644 --- a/with_recoverer_test.go +++ b/with_recoverer_test.go @@ -25,7 +25,6 @@ func TestWithRecoverer(t *testing.T) { fn := func(ctx context.Context) error { defer func() { counter++ }() panic("something went wrong") - return nil } r := New(fn, WithRecoverer(&reporter, nil)) @@ -42,7 +41,6 @@ func TestWithRecoverer(t *testing.T) { r := New(func(ctx context.Context) error { started <- struct{}{} panic("something went wrong") - return nil }, WithRecoverer(reporter, nil)) go func() { @@ -82,7 +80,6 @@ func TestWithRecoverer(t *testing.T) { r := New(func(ctx context.Context) error { started <- struct{}{} panic("something went wrong") - return nil }, WithRecoverer(reporter, nil), WithStatus("test", store)) go func() { From 6188660c578ac46ce5f55c47b95ada3db90a97dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20-=20=E3=82=A2=E3=83=AC=E3=83=83=E3=82=AF=E3=82=B9?= Date: Fri, 1 May 2026 23:30:34 +0200 Subject: [PATCH 04/17] chore: fix two more linter warnings - with_recoverer_test.go: drop forced string type assertion in Report; use %v formatting so non-string panic values don't crash. - examples/ticker-with-drain: silence errcheck on os.Stderr.Write. --- examples/ticker-with-drain/main.go | 2 +- with_recoverer_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/ticker-with-drain/main.go b/examples/ticker-with-drain/main.go index 4ea1e05..0c0a18d 100644 --- a/examples/ticker-with-drain/main.go +++ b/examples/ticker-with-drain/main.go @@ -26,7 +26,7 @@ func (stderrReporter) Report(ctx context.Context, rec interface{}) { type stderrPrinter struct{} func (stderrPrinter) Print(ctx context.Context, callstack []byte) { - os.Stderr.Write(callstack) + _, _ = os.Stderr.Write(callstack) } func reconcile(ctx context.Context) error { diff --git a/with_recoverer_test.go b/with_recoverer_test.go index 1cf6b59..8f656b2 100644 --- a/with_recoverer_test.go +++ b/with_recoverer_test.go @@ -14,7 +14,7 @@ type InMemoryReporter struct { } func (i *InMemoryReporter) Report(ctx context.Context, rec interface{}) { - i.logs = append(i.logs, fmt.Sprintf("%s", rec.(string))) + i.logs = append(i.logs, fmt.Sprintf("%v", rec)) } func TestWithRecoverer(t *testing.T) { From 63caae5dcb275bb271fdd978d8af30fb7b2a78cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20-=20=E3=82=A2=E3=83=AC=E3=83=83=E3=82=AF=E3=82=B9?= Date: Fri, 1 May 2026 23:38:26 +0200 Subject: [PATCH 05/17] fix: make Stop concurrency-safe under WithDrain MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Concurrent Stop callers raced to close the same stoppingChan, causing "close of closed channel" panic on the second close. Pre-PR Stop was safe because context.CancelFunc tolerates repeat calls; WithDrain silently regressed that contract — and K8s does occasionally fire SIGTERM twice on shutdown. Claim ownership under the mutex: the first Stop copies and nils stoppingChan, drives the drain. Subsequent Stops see nil, skip the close, and fall through to the existing runCancel + <-runStop wait (which is already idempotent). All callers observe the same outcome: runFunc returns, then Stop returns. Adds TestWithDrain/Stop_is_concurrency_safe — 10 concurrent Stops, no panic, every caller returns nil or ErrNotRunning. --- runnable.go | 3 ++- with_drain_test.go | 41 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/runnable.go b/runnable.go index 192f8e7..50bb28b 100644 --- a/runnable.go +++ b/runnable.go @@ -146,9 +146,10 @@ func (r *runnable) Stop(ctx context.Context) error { drainEnabled := r.drainEnabled drainTimeout := r.drainTimeout stoppingChan := r.stoppingChan + r.stoppingChan = nil // first-caller wins; subsequent concurrent Stops see nil r.mu.Unlock() - if drainEnabled { + if drainEnabled && stoppingChan != nil { close(stoppingChan) drainCtx, drainCancel := context.WithTimeout(ctx, drainTimeout) select { diff --git a/with_drain_test.go b/with_drain_test.go index 8347bfb..f722368 100644 --- a/with_drain_test.go +++ b/with_drain_test.go @@ -3,6 +3,7 @@ package runnable import ( "context" "errors" + "sync" "testing" "time" @@ -132,6 +133,46 @@ func TestWithDrain(t *testing.T) { assert.False(t, r.IsRunning()) }) + t.Run("Stop is concurrency-safe", func(t *testing.T) { + started := make(chan struct{}) + + r := New(func(ctx context.Context) error { + close(started) + <-Stopping(ctx) + return nil + }, WithDrain(1*time.Second)) + + go func() { + _ = r.Run(context.Background()) + }() + + <-started + + const callers = 10 + var wg sync.WaitGroup + errs := make([]error, callers) + for i := 0; i < callers; i++ { + i := i + wg.Add(1) + go func() { + defer wg.Done() + errs[i] = r.Stop(context.Background()) + }() + } + wg.Wait() + + // No double-close panic is the load-bearing assertion. Each + // Stop must return either nil (drove or waited on the drain) + // or ErrNotRunning (Run already exited before this caller + // grabbed the lock). + for _, err := range errs { + if err != nil { + require.ErrorIs(t, err, ErrNotRunning) + } + } + assert.False(t, r.IsRunning()) + }) + t.Run("Stopping returns nil when not configured", func(t *testing.T) { var observed bool From 2dc534c9d93f9eadc515e53c33cb1041c47ef759 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20-=20=E3=82=A2=E3=83=AC=E3=83=83=E3=82=AF=E3=82=B9?= Date: Fri, 1 May 2026 23:40:46 +0200 Subject: [PATCH 06/17] feat: distinguish drain timeout from clean drain via ErrDrainTimedOut MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stop's drain fall-through path was indistinguishable from a clean drain — both returned nil. Callers had no way to detect runFuncs that ignored Stopping(ctx) and only exited via ctx.Done(). Track the fall-through and return ErrDrainTimedOut once runFunc finally exits. ctx-cancel paths still return ctx.Err() unchanged. The runnable is fully stopped either way; the sentinel is observability. --- runnable.go | 7 ++++++- with_drain_test.go | 4 ++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/runnable.go b/runnable.go index 50bb28b..b42f7f3 100644 --- a/runnable.go +++ b/runnable.go @@ -10,6 +10,7 @@ import ( var ( ErrAlreadyRunning = fmt.Errorf("already running") ErrNotRunning = fmt.Errorf("not running") + ErrDrainTimedOut = fmt.Errorf("drain timed out") ) type Option interface { @@ -149,6 +150,7 @@ func (r *runnable) Stop(ctx context.Context) error { r.stoppingChan = nil // first-caller wins; subsequent concurrent Stops see nil r.mu.Unlock() + var drainTimedOut bool if drainEnabled && stoppingChan != nil { close(stoppingChan) drainCtx, drainCancel := context.WithTimeout(ctx, drainTimeout) @@ -160,7 +162,7 @@ func (r *runnable) Stop(ctx context.Context) error { drainCancel() return ctx.Err() case <-drainCtx.Done(): - // Drain timed out; fall through to forced cancel. + drainTimedOut = true } drainCancel() } @@ -171,6 +173,9 @@ func (r *runnable) Stop(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() case <-runStop: + if drainTimedOut { + return ErrDrainTimedOut + } return nil } } diff --git a/with_drain_test.go b/with_drain_test.go index f722368..486c396 100644 --- a/with_drain_test.go +++ b/with_drain_test.go @@ -50,7 +50,7 @@ func TestWithDrain(t *testing.T) { } }) - t.Run("Stop falls back to cancel on drain timeout", func(t *testing.T) { + t.Run("Stop returns ErrDrainTimedOut on fall-through", func(t *testing.T) { started := make(chan struct{}) runFuncErr := make(chan error, 1) @@ -69,7 +69,7 @@ func TestWithDrain(t *testing.T) { stopCtx, stopCancel := context.WithTimeout(context.Background(), 2*time.Second) defer stopCancel() err := r.Stop(stopCtx) - require.NoError(t, err) + require.ErrorIs(t, err, ErrDrainTimedOut) assert.False(t, r.IsRunning()) select { From 2707d2a374a08825d7f75b1fdb9c80c2a1842505 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20-=20=E3=82=A2=E3=83=AC=E3=83=83=E3=82=AF=E3=82=B9?= Date: Fri, 1 May 2026 23:41:31 +0200 Subject: [PATCH 07/17] docs: warn about Stopping foot-gun and ticker+retry cadence - Stopping(ctx) godoc and README example: callers must select on both ctx.Done() and Stopping(ctx). A loop that observes only Stopping hangs on outer-ctx cancellation. README example now shows the full three-case select. - README also reflects ErrDrainTimedOut on the fall-through path. - NewTicker godoc: composing with WithRetry resets the ticker cadence on every retry (tick error bails the loop, WithRetry re-enters runFunc, fresh ticker). Document the behavior; users who need stable cadence should retry inside the tick handler. --- README.md | 8 +++++++- ticker.go | 6 ++++++ with_drain.go | 15 +++++++++++---- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 40e8b8b..444a5bf 100644 --- a/README.md +++ b/README.md @@ -129,12 +129,18 @@ HTTP request that creates remote state), use `WithDrain` to switch to "signal-and-wait" semantics: `Stop` closes the channel returned by `Stopping(ctx)` and waits up to the drain timeout for runFunc to return on its own. If the timeout elapses, `Stop` falls back to cancelling the -context. +context and returns `ErrDrainTimedOut` once runFunc exits. + +Always select on both `<-ctx.Done()` and `<-runnable.Stopping(ctx)` — +`Stopping` signals only `Stop`; outer-context cancellation still arrives +via `ctx.Done()` and a loop that ignores it will hang. ```go r := runnable.New(func(ctx context.Context) error { for { select { + case <-ctx.Done(): + return ctx.Err() case <-runnable.Stopping(ctx): return nil // finish in-flight work, then return case <-time.After(time.Second): diff --git a/ticker.go b/ticker.go index 15589be..9a268d0 100644 --- a/ticker.go +++ b/ticker.go @@ -17,6 +17,12 @@ import ( // external calls survive shutdown under WithDrain, tick should derive // per-call timeouts via context.WithoutCancel(ctx) so its work is not // affected by either Stop's drain signal or the Runnable's ctx cancel. +// +// Composing with WithRetry resets the ticker cadence on every retry: +// a tick error bails the loop, WithRetry re-enters runFunc, and the +// next tick fires `interval` after the retry — not at the original +// cadence. If you need stable cadence with transient-error tolerance, +// handle retries inside `tick` instead. func NewTicker(interval time.Duration, tick func(ctx context.Context) error, opts ...Option) Runnable { return New(func(ctx context.Context) error { t := time.NewTicker(interval) diff --git a/with_drain.go b/with_drain.go index a409e13..306be8a 100644 --- a/with_drain.go +++ b/with_drain.go @@ -10,10 +10,17 @@ type stoppingKey struct{} // Stopping returns a channel that closes when Stop has been called on // the Runnable that owns ctx. runFunc implementations under WithDrain // should select on it and return cleanly without cancelling in-flight -// work. Returns a nil channel when ctx is not associated with a -// drain-enabled Runnable — receiving from a nil channel blocks forever, -// which is the correct no-op for callers that opt into drain semantics -// only when configured. +// work. +// +// Always also select on ctx.Done() — Stopping signals only Stop; +// outer-context cancellation (e.g. the ctx passed to Run was cancelled +// directly) still arrives via ctx.Done(). A loop that selects only on +// Stopping(ctx) will hang on outer-ctx cancel. +// +// Returns a nil channel when ctx is not associated with a drain-enabled +// Runnable — receiving from a nil channel blocks forever, which is the +// correct no-op for callers that opt into drain semantics only when +// configured. func Stopping(ctx context.Context) <-chan struct{} { ch, _ := ctx.Value(stoppingKey{}).(<-chan struct{}) return ch From 101ee9fa23a33b7404b437b63850402fb928d793 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20-=20=E3=82=A2=E3=83=AC=E3=83=83=E3=82=AF=E3=82=B9?= Date: Sat, 2 May 2026 00:06:56 +0200 Subject: [PATCH 08/17] fix: use independent timer for WithDrain timeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit context.WithTimeout(ctx, drainTimeout) derived the drain ctx from the caller's ctx, so a Stop deadline shorter than drainTimeout made <-ctx.Done() and the drain expiry race in the same select. The <-ctx.Done() branch returned ctx.Err() *without calling r.runCancel()* — leaving the runnable alive after Stop returned. The other branch could misreport ErrDrainTimedOut when the caller's deadline was the real cause. Switch to time.NewTimer(drainTimeout) and let <-ctx.Done() in the drain select fall through to r.runCancel() so the runnable always gets force-cancelled before Stop returns. drainTimedOut is only set when the standalone timer fires, so the sentinel is no longer spuriously returned on caller-ctx cancellation. Adds TestWithDrain/Stop_forces_cancel_when_caller_ctx_expires_during_drain which exercises a 100ms caller ctx against a 10s drain. --- runnable.go | 18 +++++++++++------- with_drain_test.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 7 deletions(-) diff --git a/runnable.go b/runnable.go index b42f7f3..00a0151 100644 --- a/runnable.go +++ b/runnable.go @@ -153,18 +153,22 @@ func (r *runnable) Stop(ctx context.Context) error { var drainTimedOut bool if drainEnabled && stoppingChan != nil { close(stoppingChan) - drainCtx, drainCancel := context.WithTimeout(ctx, drainTimeout) + // Use a standalone timer so the drain budget is independent of + // the caller's ctx — otherwise a caller ctx shorter than + // drainTimeout makes <-ctx.Done() and the drain expiry race in + // the same select. + drainTimer := time.NewTimer(drainTimeout) select { case <-runStop: - drainCancel() + drainTimer.Stop() return nil - case <-ctx.Done(): - drainCancel() - return ctx.Err() - case <-drainCtx.Done(): + case <-drainTimer.C: drainTimedOut = true + case <-ctx.Done(): + drainTimer.Stop() + // Caller deadline elapsed during drain; fall through so + // r.runCancel() still fires before we return ctx.Err(). } - drainCancel() } r.runCancel() diff --git a/with_drain_test.go b/with_drain_test.go index 486c396..7dbd9c9 100644 --- a/with_drain_test.go +++ b/with_drain_test.go @@ -133,6 +133,39 @@ func TestWithDrain(t *testing.T) { assert.False(t, r.IsRunning()) }) + t.Run("Stop forces cancel when caller ctx expires during drain", func(t *testing.T) { + started := make(chan struct{}) + runFuncDone := make(chan struct{}) + + // runFunc respects its own ctx but not Stopping(ctx). Without + // the independent drain timer, Stop with a caller ctx shorter + // than drainTimeout could return ctx.Err() before r.runCancel() + // fired, leaving the runnable alive. + r := New(func(ctx context.Context) error { + close(started) + <-ctx.Done() + close(runFuncDone) + return ctx.Err() + }, WithDrain(10*time.Second)) + + go func() { + _ = r.Run(context.Background()) + }() + + <-started + + stopCtx, stopCancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer stopCancel() + err := r.Stop(stopCtx) + require.ErrorIs(t, err, context.DeadlineExceeded) + + select { + case <-runFuncDone: + case <-time.After(2 * time.Second): + t.Fatal("runnable was not force-cancelled when caller ctx expired during drain") + } + }) + t.Run("Stop is concurrency-safe", func(t *testing.T) { started := make(chan struct{}) From 6db724232a229316d1f362e6de70f0fa1d830c4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20-=20=E3=82=A2=E3=83=AC=E3=83=83=E3=82=AF=E3=82=B9?= Date: Sat, 2 May 2026 00:07:00 +0200 Subject: [PATCH 09/17] docs(examples): use pristine ctx for Run so SIGTERM doesn't bypass drain MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The example passed signal.NotifyContext's ctx to rc.Run. On SIGTERM that ctx cancels immediately — the ticker's runFunc observes <-ctx.Done() and exits before Stop ever closes Stopping(ctx), completely defeating WithDrain. Pass context.Background() to Run; use sigCtx only to detect when to call Stop. Stop's drain budget is now the sole driver of shutdown for the drain-enabled worker. --- examples/ticker-with-drain/main.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/examples/ticker-with-drain/main.go b/examples/ticker-with-drain/main.go index 0c0a18d..474c89c 100644 --- a/examples/ticker-with-drain/main.go +++ b/examples/ticker-with-drain/main.go @@ -40,8 +40,8 @@ func reconcile(ctx context.Context) error { } func main() { - ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) - defer stop() + sigCtx, stopSig := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) + defer stopSig() rc := runnable.NewTicker( 2*time.Second, @@ -50,12 +50,16 @@ func main() { runnable.WithRecoverer(stderrReporter{}, stderrPrinter{}), ) + // Run with a pristine ctx — if Run received sigCtx, SIGTERM would + // cancel runFunc's ctx directly and the ticker would exit before + // Stop ever closed Stopping(ctx), defeating WithDrain. Stop is the + // only thing that should drive shutdown of a drain-enabled worker. runErr := make(chan error, 1) go func() { - runErr <- rc.Run(ctx) + runErr <- rc.Run(context.Background()) }() - <-ctx.Done() + <-sigCtx.Done() fmt.Println("shutdown: draining in-flight tick...") stopCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second) From 4dbbc854fe0dc7fa2ef8ef728330b6548e417cde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20-=20=E3=82=A2=E3=83=AC=E3=83=83=E3=82=AF=E3=82=B9?= Date: Sat, 2 May 2026 08:47:02 +0200 Subject: [PATCH 10/17] fix: preserve drain semantics under concurrent Stop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous concurrency-safe fix avoided the double-close panic but introduced two regressions Codex caught on re-review: 1. Secondary Stop callers fell through to r.runCancel() unconditionally, hard-cancelling the runCtx mid-drain. The drain that the primary caller was honoring got bypassed — exactly the failure mode WithDrain exists to prevent. 2. After fixing (1) by making secondary callers wait on runStop, a later Stop with a shorter deadline could no longer enforce it: it would return DeadlineExceeded but never escalate, so the runnable kept draining until the primary caller's drainTimeout expired. Restructure the secondary path: wait on runStop, but escalate to r.runCancel() if the caller's ctx expires first. The shortest deadline among concurrent callers wins, and the drain is only bypassed when some caller actively asks for it via deadline expiry. Adds two tests: - "concurrent Stop preserves drain semantics" — strengthened to assert runFunc observes Stopping(ctx), not ctx.Done() - "secondary Stop with shorter deadline escalates runCancel" — new regression for Codex's escalation finding --- runnable.go | 20 +++++++++-- with_drain_test.go | 86 +++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 96 insertions(+), 10 deletions(-) diff --git a/runnable.go b/runnable.go index 00a0151..330acfb 100644 --- a/runnable.go +++ b/runnable.go @@ -147,11 +147,27 @@ func (r *runnable) Stop(ctx context.Context) error { drainEnabled := r.drainEnabled drainTimeout := r.drainTimeout stoppingChan := r.stoppingChan - r.stoppingChan = nil // first-caller wins; subsequent concurrent Stops see nil + r.stoppingChan = nil // first-caller wins; subsequent Stops see nil r.mu.Unlock() + // Concurrent Stop with drain enabled: another caller is already + // driving the drain. Wait for its outcome rather than calling + // runCancel ourselves — that would hard-cancel the runCtx and + // defeat the drain the primary caller is honoring. If our ctx + // expires first, escalate to runCancel so the shortest deadline + // among concurrent callers wins. + if drainEnabled && stoppingChan == nil { + select { + case <-runStop: + return nil + case <-ctx.Done(): + r.runCancel() + return ctx.Err() + } + } + var drainTimedOut bool - if drainEnabled && stoppingChan != nil { + if drainEnabled { close(stoppingChan) // Use a standalone timer so the drain budget is independent of // the caller's ctx — otherwise a caller ctx shorter than diff --git a/with_drain_test.go b/with_drain_test.go index 7dbd9c9..5be93e1 100644 --- a/with_drain_test.go +++ b/with_drain_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "sync" + "sync/atomic" "testing" "time" @@ -166,14 +167,25 @@ func TestWithDrain(t *testing.T) { } }) - t.Run("Stop is concurrency-safe", func(t *testing.T) { + t.Run("concurrent Stop preserves drain semantics", func(t *testing.T) { started := make(chan struct{}) + drainObserved := make(chan struct{}) + var ctxCancelObserved atomic.Bool + // runFunc must exit via Stopping(ctx). If a concurrent Stop + // falls through to r.runCancel(), ctx.Done() fires and the + // drain semantics are violated. r := New(func(ctx context.Context) error { close(started) - <-Stopping(ctx) - return nil - }, WithDrain(1*time.Second)) + select { + case <-Stopping(ctx): + close(drainObserved) + return nil + case <-ctx.Done(): + ctxCancelObserved.Store(true) + return ctx.Err() + } + }, WithDrain(2*time.Second)) go func() { _ = r.Run(context.Background()) @@ -194,18 +206,76 @@ func TestWithDrain(t *testing.T) { } wg.Wait() - // No double-close panic is the load-bearing assertion. Each - // Stop must return either nil (drove or waited on the drain) - // or ErrNotRunning (Run already exited before this caller - // grabbed the lock). + // Each Stop must return either nil (drove or waited on the + // drain) or ErrNotRunning (Run already exited before this + // caller grabbed the lock). No double-close panic. for _, err := range errs { if err != nil { require.ErrorIs(t, err, ErrNotRunning) } } + + select { + case <-drainObserved: + default: + t.Fatal("runFunc never observed Stopping(ctx); drain was bypassed by concurrent Stop") + } + assert.False(t, ctxCancelObserved.Load(), "drain semantics violated: runCtx was hard-cancelled by a concurrent Stop") assert.False(t, r.IsRunning()) }) + t.Run("secondary Stop with shorter deadline escalates runCancel", func(t *testing.T) { + started := make(chan struct{}) + runFuncDone := make(chan struct{}) + + // runFunc waits only on ctx.Done() (ignores Stopping). Without + // escalation, Stop B's deadline expires but the runnable keeps + // draining for the full drainTimeout (5s). + r := New(func(ctx context.Context) error { + close(started) + <-ctx.Done() + close(runFuncDone) + return ctx.Err() + }, WithDrain(5*time.Second)) + + go func() { + _ = r.Run(context.Background()) + }() + + <-started + + // Stop A: no deadline; primary, drives drain. + aDone := make(chan error, 1) + go func() { + aDone <- r.Stop(context.Background()) + }() + + time.Sleep(20 * time.Millisecond) + + // Stop B: 100ms deadline; secondary. Must escalate so runFunc + // exits within the caller's budget. + bCtx, bCancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer bCancel() + start := time.Now() + bErr := r.Stop(bCtx) + bElapsed := time.Since(start) + require.ErrorIs(t, bErr, context.DeadlineExceeded) + assert.Less(t, bElapsed, 500*time.Millisecond, "Stop B should not wait beyond its own deadline") + + select { + case <-runFuncDone: + case <-time.After(time.Second): + t.Fatal("runnable was not force-cancelled when secondary Stop's ctx expired") + } + + select { + case err := <-aDone: + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("Stop A did not return after runFunc exited") + } + }) + t.Run("Stopping returns nil when not configured", func(t *testing.T) { var observed bool From 1cbd251c41a81d13af35312ef90056ee672626e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20-=20=E3=82=A2=E3=83=AC=E3=83=83=E3=82=AF=E3=82=B9?= Date: Sat, 2 May 2026 08:47:07 +0200 Subject: [PATCH 11/17] docs(examples): exit promptly when worker dies before signal main was reading runErr only inside the SIGTERM branch, so if the worker exited early (tick error, recovered panic) main would block on sigCtx forever. Select on either sigCtx or runErr; an early worker exit now propagates the failure without waiting for a signal. --- examples/ticker-with-drain/main.go | 32 +++++++++++++++++++----------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/examples/ticker-with-drain/main.go b/examples/ticker-with-drain/main.go index 474c89c..5eb85a2 100644 --- a/examples/ticker-with-drain/main.go +++ b/examples/ticker-with-drain/main.go @@ -59,17 +59,25 @@ func main() { runErr <- rc.Run(context.Background()) }() - <-sigCtx.Done() - fmt.Println("shutdown: draining in-flight tick...") - - stopCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second) - defer cancel() - if err := rc.Stop(stopCtx); err != nil { - fmt.Fprintf(os.Stderr, "stop: %v\n", err) - } - - if err := <-runErr; err != nil && !errors.Is(err, context.Canceled) { - fmt.Fprintf(os.Stderr, "reconciler stopped: %v\n", err) - os.Exit(1) + // Wait for either a shutdown signal or an early worker exit + // (tick error, recovered panic). Without the runErr branch, main + // would block on sigCtx forever after the worker died. + select { + case <-sigCtx.Done(): + fmt.Println("shutdown: draining in-flight tick...") + stopCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + if err := rc.Stop(stopCtx); err != nil { + fmt.Fprintf(os.Stderr, "stop: %v\n", err) + } + if err := <-runErr; err != nil && !errors.Is(err, context.Canceled) { + fmt.Fprintf(os.Stderr, "reconciler stopped: %v\n", err) + os.Exit(1) + } + case err := <-runErr: + if err != nil && !errors.Is(err, context.Canceled) { + fmt.Fprintf(os.Stderr, "reconciler stopped: %v\n", err) + os.Exit(1) + } } } From fe7b1fddc61a78c512860e7225b03b680b1f4fe8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20-=20=E3=82=A2=E3=83=AC=E3=83=83=E3=82=AF=E3=82=B9?= Date: Sat, 2 May 2026 09:14:44 +0200 Subject: [PATCH 12/17] fix: snapshot runCancel under mutex to avoid cancelling a future Run MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stop captured runStop and stoppingChan under the mutex but read r.runCancel via the field after waiting in a select. With drain enabled, Stop's drainTimer / ctx.Done() branches fall through to runCancel after a wait. If the runnable exited and Run was called again before that wait returned, r.runCancel pointed at the *new* run's cancel — and Stop tore down the freshly started worker. Snapshot runCancel alongside the other fields so the Stop call operates on the runnable it observed under the lock, regardless of what the next Run does to the field. Adds "late runCancel does not tear down a subsequent Run" — runs Stop followed by an immediate fresh Run and asserts the second run isn't prematurely cancelled. Also adds "WithRetry stops retrying after Stopping fires" as part of the same drain-correctness sweep — see follow-up commit. --- runnable.go | 8 +++- with_drain_test.go | 91 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+), 2 deletions(-) diff --git a/runnable.go b/runnable.go index 330acfb..eb57ee7 100644 --- a/runnable.go +++ b/runnable.go @@ -144,6 +144,10 @@ func (r *runnable) Stop(ctx context.Context) error { } runStop := r.runStop + // Snapshot runCancel under the lock — the field is overwritten by + // the next Run, so reading r.runCancel() after waiting can cancel + // a *future* runnable that started after this Stop began draining. + runCancel := r.runCancel drainEnabled := r.drainEnabled drainTimeout := r.drainTimeout stoppingChan := r.stoppingChan @@ -161,7 +165,7 @@ func (r *runnable) Stop(ctx context.Context) error { case <-runStop: return nil case <-ctx.Done(): - r.runCancel() + runCancel() return ctx.Err() } } @@ -187,7 +191,7 @@ func (r *runnable) Stop(ctx context.Context) error { } } - r.runCancel() + runCancel() select { case <-ctx.Done(): diff --git a/with_drain_test.go b/with_drain_test.go index 5be93e1..26b7480 100644 --- a/with_drain_test.go +++ b/with_drain_test.go @@ -276,6 +276,97 @@ func TestWithDrain(t *testing.T) { } }) + t.Run("late runCancel does not tear down a subsequent Run", func(t *testing.T) { + // Force the secondary-path race window: Stop A and B fire + // concurrently with B's ctx already cancelled. B falls into + // ctx.Done() and is about to call runCancel. Meanwhile, A + // drives drain to completion, runFunc exits, and main restarts + // the runnable. Without snapshotting runCancel under the lock, + // B's runCancel call cancels the *new* run. + + // Round 1: drain-enabled runnable that exits as soon as + // Stopping fires. + r := New(func(ctx context.Context) error { + <-Stopping(ctx) + return nil + }, WithDrain(1*time.Second)) + + go func() { + _ = r.Run(context.Background()) + }() + + // Wait until the runnable is actually running. + for !r.IsRunning() { + time.Sleep(time.Millisecond) + } + + // Stop A drives the drain. + require.NoError(t, r.Stop(context.Background())) + assert.False(t, r.IsRunning()) + + // Round 2: re-run with a runFunc that should run to completion. + // If Round 1's snapshot regression is present, a stale call to + // the new runCancel would cancel this run before it finishes. + round2Done := make(chan error, 1) + r2 := New(func(ctx context.Context) error { + select { + case <-time.After(150 * time.Millisecond): + return nil + case <-ctx.Done(): + return ctx.Err() + } + }, WithDrain(1*time.Second)) + + go func() { + round2Done <- r2.Run(context.Background()) + }() + + select { + case err := <-round2Done: + require.NoError(t, err, "round-2 runnable was cancelled prematurely") + case <-time.After(500 * time.Millisecond): + _ = r2.Stop(context.Background()) + t.Fatal("round-2 runnable did not complete") + } + }) + + t.Run("WithRetry stops retrying after Stopping fires", func(t *testing.T) { + started := make(chan struct{}, 1) + var attempts atomic.Int32 + + // runFunc errors transiently every time. Without the + // Stopping-aware retry guard, WithRetry keeps re-entering + // runFunc after Stop is called. + r := New(func(ctx context.Context) error { + select { + case started <- struct{}{}: + default: + } + attempts.Add(1) + <-Stopping(ctx) + return errors.New("transient") + }, WithDrain(2*time.Second), WithRetry(100, ResetNever)) + + runDone := make(chan error, 1) + go func() { + runDone <- r.Run(context.Background()) + }() + + <-started + + require.NoError(t, r.Stop(context.Background())) + + select { + case <-runDone: + case <-time.After(time.Second): + t.Fatal("Run did not return after Stop") + } + + // Exactly one attempt should have run — the retry wrapper + // must observe Stopping and abandon further attempts. + assert.Equal(t, int32(1), attempts.Load(), "retry continued after Stop drained") + }) + t.Run("Stopping returns nil when not configured", func(t *testing.T) { var observed bool From 00b32bca69c024d4fd6b1b76e45dd5a11296e94c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20-=20=E3=82=A2=E3=83=AC=E3=83=83=E3=82=AF=E3=82=B9?= Date: Sat, 2 May 2026 09:14:49 +0200 Subject: [PATCH 13/17] fix: WithRetry stops retrying after WithDrain's Stopping fires MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit WithRetry had no awareness of WithDrain. After Stop was called and Stopping(ctx) closed, a transient error from the in-flight attempt would still trigger a retry — re-entering runFunc and starting fresh work mid-shutdown, defeating drain semantics. Add a non-blocking Stopping(ctx) check between attempts. When drain is not configured, Stopping returns nil and the default branch runs unchanged. --- with_retry.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/with_retry.go b/with_retry.go index 97e0bbd..f409b9c 100644 --- a/with_retry.go +++ b/with_retry.go @@ -46,6 +46,17 @@ func (w *withRetry) apply(r *runnable) { return err } + // Don't retry once Stop has been called via WithDrain — + // the retry wrapper would otherwise re-enter runFunc and + // start fresh work mid-shutdown, defeating drain semantics. + // When WithDrain is not used, Stopping(ctx) is nil and the + // default branch runs (no behavior change). + select { + case <-Stopping(ctx): + return err + default: + } + if i > 0 { if r.onStop != nil { r.onStop() From b47ae7ebf51f1ea996b8c939631f1b64baffefcb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20-=20=E3=82=A2=E3=83=AC=E3=83=83=E3=82=AF=E3=82=B9?= Date: Sat, 2 May 2026 09:23:42 +0200 Subject: [PATCH 14/17] test: replace vacuous round-2 test with same-runnable Run/Stop/Run MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous "late runCancel" test created two separate *runnable instances for round 1 and round 2. They share no state, so a stale runCancel from round 1's Stop could not affect round 2 — the test passed even with the snapshot fix reverted. Replace with a single-runnable test: Run, then a primary Stop + secondary Stop with already-cancelled ctx (exercising the runCancel escalation path), then re-Run on the *same* runnable. Round 2 must run undisturbed until explicitly Stopped. The snapshot fix prevents round 1's secondary runCancel from cancelling round 2's runCtx via the field overwrite, so this is the closest deterministic regression shape available without runtime hooks. --- with_drain_test.go | 79 +++++++++++++++++++++++++++------------------- 1 file changed, 46 insertions(+), 33 deletions(-) diff --git a/with_drain_test.go b/with_drain_test.go index 26b7480..0e6d04c 100644 --- a/with_drain_test.go +++ b/with_drain_test.go @@ -276,57 +276,70 @@ func TestWithDrain(t *testing.T) { } }) - t.Run("late runCancel does not tear down a subsequent Run", func(t *testing.T) { - // Force the secondary-path race window: Stop A and B fire - // concurrently with B's ctx already cancelled. B falls into - // ctx.Done() and is about to call runCancel. Meanwhile, A - // drives drain to completion, runFunc exits, and main restarts - // the runnable. Without snapshotting runCancel under the lock, - // B's runCancel call cancels the *new* run. - - // Round 1: drain-enabled runnable that exits as soon as - // Stopping fires. + t.Run("same runnable survives Run-Stop-Run after secondary ctx.Done escalation", func(t *testing.T) { + // Round 1: a primary Stop drives drain to completion while a + // secondary Stop with an already-cancelled ctx hits the + // ctx.Done() branch and calls runCancel (which is snapshotted + // under the lock — see runnable.go). + // + // Round 2: same runnable, fresh Run with no Stop. If the + // snapshot fix were missing, round 1's secondary runCancel + // could in principle cancel round 2's runCtx (the field gets + // overwritten by Run). The snapshot makes that mechanically + // impossible, so round 2 must run undisturbed until we Stop it. r := New(func(ctx context.Context) error { - <-Stopping(ctx) - return nil + select { + case <-Stopping(ctx): + return nil + case <-ctx.Done(): + return ctx.Err() + } }, WithDrain(1*time.Second)) go func() { _ = r.Run(context.Background()) }() - // Wait until the runnable is actually running. for !r.IsRunning() { time.Sleep(time.Millisecond) } - // Stop A drives the drain. - require.NoError(t, r.Stop(context.Background())) - assert.False(t, r.IsRunning()) + // Primary Stop, no deadline — drives drain. + primaryDone := make(chan error, 1) + go func() { + primaryDone <- r.Stop(context.Background()) + }() - // Round 2: re-run with a runFunc that should run to completion. - // If Round 1's snapshot regression is present, a stale call to - // the new runCancel would cancel this run before it finishes. - round2Done := make(chan error, 1) - r2 := New(func(ctx context.Context) error { - select { - case <-time.After(150 * time.Millisecond): - return nil - case <-ctx.Done(): - return ctx.Err() - } - }, WithDrain(1*time.Second)) + // Secondary Stop with an already-cancelled ctx — exercises + // the ctx.Done() escalation path that calls runCancel. + cancelledCtx, cancel := context.WithCancel(context.Background()) + cancel() + _ = r.Stop(cancelledCtx) + <-primaryDone + for r.IsRunning() { + time.Sleep(time.Millisecond) + } + + // Round 2 — same runnable, fresh Run. Should run undisturbed + // until we Stop it. + round2Done := make(chan error, 1) go func() { - round2Done <- r2.Run(context.Background()) + round2Done <- r.Run(context.Background()) }() select { case err := <-round2Done: - require.NoError(t, err, "round-2 runnable was cancelled prematurely") - case <-time.After(500 * time.Millisecond): - _ = r2.Stop(context.Background()) - t.Fatal("round-2 runnable did not complete") + t.Fatalf("round-2 runnable exited prematurely: %v", err) + case <-time.After(150 * time.Millisecond): + } + + require.NoError(t, r.Stop(context.Background())) + select { + case err := <-round2Done: + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("round-2 runnable did not exit after Stop") } }) From a33e08c09b4adc60cfef12c9104c27a411432bd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20-=20=E3=82=A2=E3=83=AC=E3=83=83=E3=82=AF=E3=82=B9?= Date: Sat, 2 May 2026 09:28:12 +0200 Subject: [PATCH 15/17] test: rename round-2 test, drop unfounded snapshot-coverage claim MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A reviewer empirically reverted the runCancel-snapshot fix and ran the test 20×; all 20 passed. The test gates round 2 strictly after both round-1 Stops have returned, so by the time round 2 starts the field-overwrite vs. snapshot distinction is no longer observable. Rename to "runnable can be re-Run after a concurrent-Stop lifecycle" and document that it's a lifecycle smoke test, not a snapshot-fix regression. Deterministic coverage of that race needs testing/synctest (Go 1.25+) or a runtime hook — both out of scope. The snapshot fix is verified by inspection. --- with_drain_test.go | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/with_drain_test.go b/with_drain_test.go index 0e6d04c..55ec879 100644 --- a/with_drain_test.go +++ b/with_drain_test.go @@ -276,17 +276,20 @@ func TestWithDrain(t *testing.T) { } }) - t.Run("same runnable survives Run-Stop-Run after secondary ctx.Done escalation", func(t *testing.T) { - // Round 1: a primary Stop drives drain to completion while a - // secondary Stop with an already-cancelled ctx hits the - // ctx.Done() branch and calls runCancel (which is snapshotted - // under the lock — see runnable.go). + t.Run("runnable can be re-Run after a concurrent-Stop lifecycle", func(t *testing.T) { + // Lifecycle survival smoke test: a runnable that's been + // stopped via concurrent Stops (including one with an + // already-cancelled ctx hitting the runCancel escalation path) + // can be re-Run on the same instance and complete cleanly. // - // Round 2: same runnable, fresh Run with no Stop. If the - // snapshot fix were missing, round 1's secondary runCancel - // could in principle cancel round 2's runCtx (the field gets - // overwritten by Run). The snapshot makes that mechanically - // impossible, so round 2 must run undisturbed until we Stop it. + // This does NOT deterministically cover the runCancel-snapshot + // race in runnable.go (where a stale Stop could in principle + // reach r.runCancel after Run has overwritten the field). That + // race requires pausing the secondary Stop between its lock + // release and runCancel call while a fresh Run executes — + // achievable only via testing/synctest or a runtime hook. + // Both are out of scope here; the snapshot fix is verified by + // inspection, not this test. r := New(func(ctx context.Context) error { select { case <-Stopping(ctx): From 0758ed975bab81183711fcdbc08b18f8a84d32c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20-=20=E3=82=A2=E3=83=AC=E3=83=83=E3=82=AF=E3=82=B9?= Date: Sat, 2 May 2026 09:30:52 +0200 Subject: [PATCH 16/17] fix(retry): scope lastTime per Run cycle, drop the struct field MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The lastTime field on withRetry persisted across Run cycles. After a Stop and re-Run, the first iteration of the new cycle compared against the prior cycle's last attempt — usually far in the past — which always triggered the i=0 reset. The reset was a no-op (i was already 0), so behavior didn't visibly change, but the field carried state across what callers reasonably treat as independent invocations. Move lastTime to a function-local variable inside the closure so each Run cycle gets a fresh timer. Field on the struct is removed. Adds "retry budget is per-Run-cycle" — runs the same WithRetry runnable twice and asserts both cycles exhaust the same retry budget. --- with_retry.go | 9 +++++---- with_retry_test.go | 31 +++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/with_retry.go b/with_retry.go index f409b9c..ee67e70 100644 --- a/with_retry.go +++ b/with_retry.go @@ -11,8 +11,6 @@ const ResetNever time.Duration = 0 type withRetry struct { maxRetries int resetAfter time.Duration - - lastTime time.Time } func WithRetry(maxRetries int, resetAfter time.Duration) Option { @@ -25,12 +23,15 @@ func WithRetry(maxRetries int, resetAfter time.Duration) Option { func (w *withRetry) apply(r *runnable) { runFunc := r.runFunc r.runFunc = func(ctx context.Context) error { + // lastTime is per-Run-cycle: a fresh Run after Stop should not + // inherit stale timing state from the prior cycle. + var lastTime time.Time var err error for i := 0; i < w.maxRetries; i++ { - if w.resetAfter != ResetNever && time.Since(w.lastTime) > w.resetAfter { + if w.resetAfter != ResetNever && time.Since(lastTime) > w.resetAfter { i = 0 } - w.lastTime = time.Now() + lastTime = time.Now() if i > 0 { if r.onStart != nil { diff --git a/with_retry_test.go b/with_retry_test.go index 6166473..ba17ab0 100644 --- a/with_retry_test.go +++ b/with_retry_test.go @@ -58,4 +58,35 @@ func TestWithRetry(t *testing.T) { require.NoError(t, err) assert.Equal(t, 6, counter) }) + + t.Run("retry budget is per-Run-cycle", func(t *testing.T) { + // Each Run cycle gets a fresh retry budget — lastTime must + // not leak from the previous cycle. With a 100ms resetAfter + // and a 50ms gap between cycles, a leaked lastTime would + // cause cycle 2 to immediately reset i=0 inside the loop; + // per-cycle scoping makes both cycles behave identically. + runs := 0 + attempts := 0 + + r := New(func(ctx context.Context) error { + attempts++ + runs++ + if runs <= 2 { + return assert.AnError + } + return nil + }, WithRetry(3, 100*time.Millisecond)) + + // Cycle 1: 2 fails + 1 success = 3 attempts, exhausts budget + // just in time. + require.NoError(t, r.Run(context.Background())) + require.Equal(t, 3, attempts) + + runs = 0 + time.Sleep(50 * time.Millisecond) + + // Cycle 2: same shape, must succeed identically. + require.NoError(t, r.Run(context.Background())) + require.Equal(t, 6, attempts) + }) } From b703cdbda02dc9e6060b619a91c5f2556c24ff1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20-=20=E3=82=A2=E3=83=AC=E3=83=83=E3=82=AF=E3=82=B9?= Date: Sat, 2 May 2026 09:33:09 +0200 Subject: [PATCH 17/17] test: drop vacuous "retry budget is per-Run-cycle" test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reviewer reverted the lastTime-scoping fix and ran the test against the buggy code: it passed. The bug is purely code-hygiene (state on a struct that shouldn't have it) — observable behavior is unchanged because the i=0 reset is a no-op when i is already 0. There's nothing to assert. The commit message on the fix accurately describes what changed; a test that lies about its coverage is worse than no test. --- with_retry_test.go | 30 ------------------------------ 1 file changed, 30 deletions(-) diff --git a/with_retry_test.go b/with_retry_test.go index ba17ab0..c4781d5 100644 --- a/with_retry_test.go +++ b/with_retry_test.go @@ -59,34 +59,4 @@ func TestWithRetry(t *testing.T) { assert.Equal(t, 6, counter) }) - t.Run("retry budget is per-Run-cycle", func(t *testing.T) { - // Each Run cycle gets a fresh retry budget — lastTime must - // not leak from the previous cycle. With a 100ms resetAfter - // and a 50ms gap between cycles, a leaked lastTime would - // cause cycle 2 to immediately reset i=0 inside the loop; - // per-cycle scoping makes both cycles behave identically. - runs := 0 - attempts := 0 - - r := New(func(ctx context.Context) error { - attempts++ - runs++ - if runs <= 2 { - return assert.AnError - } - return nil - }, WithRetry(3, 100*time.Millisecond)) - - // Cycle 1: 2 fails + 1 success = 3 attempts, exhausts budget - // just in time. - require.NoError(t, r.Run(context.Background())) - require.Equal(t, 3, attempts) - - runs = 0 - time.Sleep(50 * time.Millisecond) - - // Cycle 2: same shape, must succeed identically. - require.NoError(t, r.Run(context.Background())) - require.Equal(t, 6, attempts) - }) }