Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 23 additions & 172 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,8 @@ import (
"github.com/riverqueue/river/rivershared/riverpilot"
"github.com/riverqueue/river/rivershared/riversharedmaintenance"
"github.com/riverqueue/river/rivershared/startstop"
"github.com/riverqueue/river/rivershared/testsignal"
"github.com/riverqueue/river/rivershared/util/dbutil"
"github.com/riverqueue/river/rivershared/util/maputil"
"github.com/riverqueue/river/rivershared/util/serviceutil"
"github.com/riverqueue/river/rivershared/util/sliceutil"
"github.com/riverqueue/river/rivershared/util/testutil"
"github.com/riverqueue/river/rivershared/util/valutil"
Expand Down Expand Up @@ -605,18 +603,10 @@ type Client[TTx any] struct {
notifier *notifier.Notifier // may be nil in poll-only mode
periodicJobs *PeriodicJobBundle
pilot riverpilot.Pilot
producersByQueueName map[string]*producer
queueMaintainer *maintenance.QueueMaintainer

// queueMaintainerEpoch is incremented each time leadership is gained,
// giving each tryStartQueueMaintainer goroutine a term number.
// queueMaintainerMu serializes epoch checks with Stop calls so that a
// stale goroutine from an older term cannot tear down a maintainer
// started by a newer term.
queueMaintainerEpoch int64
queueMaintainerMu sync.Mutex

queues *QueueBundle
producersByQueueName map[string]*producer
queueMaintainer *maintenance.QueueMaintainer
queueMaintainerLeader *maintenance.QueueMaintainerLeader
queues *QueueBundle
services []startstop.Service
stopped <-chan struct{}
subscriptionManager *subscriptionManager
Expand All @@ -629,23 +619,16 @@ type Client[TTx any] struct {

// Test-only signals.
type clientTestSignals struct {
electedLeader testsignal.TestSignal[struct{}] // notifies when elected leader
queueMaintainerStartError testsignal.TestSignal[error] // notifies on each failed queue maintainer start attempt
queueMaintainerStartRetriesExhausted testsignal.TestSignal[struct{}] // notifies when leader resignation is requested after all queue maintainer start retries have been exhausted

jobCleaner *maintenance.JobCleanerTestSignals
jobRescuer *maintenance.JobRescuerTestSignals
jobScheduler *maintenance.JobSchedulerTestSignals
periodicJobEnqueuer *maintenance.PeriodicJobEnqueuerTestSignals
queueCleaner *maintenance.QueueCleanerTestSignals
reindexer *maintenance.ReindexerTestSignals
jobCleaner *maintenance.JobCleanerTestSignals
jobRescuer *maintenance.JobRescuerTestSignals
jobScheduler *maintenance.JobSchedulerTestSignals
periodicJobEnqueuer *maintenance.PeriodicJobEnqueuerTestSignals
queueCleaner *maintenance.QueueCleanerTestSignals
queueMaintainerLeader *maintenance.QueueMaintainerLeaderTestSignals
reindexer *maintenance.ReindexerTestSignals
}

func (ts *clientTestSignals) Init(tb testutil.TestingTB) {
ts.electedLeader.Init(tb)
ts.queueMaintainerStartError.Init(tb)
ts.queueMaintainerStartRetriesExhausted.Init(tb)

if ts.jobCleaner != nil {
ts.jobCleaner.Init(tb)
}
Expand All @@ -661,6 +644,9 @@ func (ts *clientTestSignals) Init(tb testutil.TestingTB) {
if ts.queueCleaner != nil {
ts.queueCleaner.Init(tb)
}
if ts.queueMaintainerLeader != nil {
ts.queueMaintainerLeader.Init(tb)
}
if ts.reindexer != nil {
ts.reindexer.Init(tb)
}
Expand Down Expand Up @@ -867,9 +853,6 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
client.services = append(client.services,
startstop.StartStopFunc(client.logStatsLoop))

client.services = append(client.services,
startstop.StartStopFunc(client.handleLeadershipChangeLoop))

if pluginPilot != nil {
client.services = append(client.services, pluginPilot.PluginServices()...)
}
Expand Down Expand Up @@ -972,6 +955,15 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
if config.TestOnly {
client.queueMaintainer.StaggerStartupDisable(true)
}

client.queueMaintainerLeader = maintenance.NewQueueMaintainerLeader(archetype, &maintenance.QueueMaintainerLeaderConfig{
ClientID: config.ID,
Elector: client.elector,
QueueMaintainer: client.queueMaintainer,
RequestResignFunc: client.clientNotifyBundle.RequestResign,
})
client.services = append(client.services, client.queueMaintainerLeader)
client.testSignals.queueMaintainerLeader = &client.queueMaintainerLeader.TestSignals
}

return client, nil
Expand Down Expand Up @@ -1292,147 +1284,6 @@ func (c *Client[TTx]) logStatsLoop(ctx context.Context, shouldStart bool, starte
return nil
}

func (c *Client[TTx]) handleLeadershipChangeLoop(ctx context.Context, shouldStart bool, started, stopped func()) error {
if !shouldStart {
return nil
}

go func() {
started()
defer stopped() // this defer should come first so it's last out

sub := c.elector.Listen()
defer sub.Unlisten()

// Cancel function for an in-progress tryStartQueueMaintainer. If
// leadership is lost while the start process is still retrying, used to
// abort it promptly instead of waiting for retries to finish.
var cancelQueueMaintainerStart context.CancelCauseFunc = func(_ error) {}

for {
select {
case <-ctx.Done():
cancelQueueMaintainerStart(context.Cause(ctx))
return

case notification := <-sub.C():
c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": Election change received",
slog.String("client_id", c.config.ID), slog.Bool("is_leader", notification.IsLeader))

switch {
case notification.IsLeader:
// Starting the queue maintainer takes time, so send the
// test signal first. Tests waiting on it can receive it,
// cancel the queue maintainer start, and finish faster.
c.testSignals.electedLeader.Signal(struct{}{})

// Start the queue maintainer with retries and exponential
// backoff in a separate goroutine so the leadership change
// loop remains responsive to new notifications. startCtx is
// used for cancellation in case leadership is lost while
// retries are in progress.
//
// Epoch is incremented so stale tryStartQueueMaintainer
// goroutines from a previous term cannot call Stop after a
// new term has begun.
var startCtx context.Context
startCtx, cancelQueueMaintainerStart = context.WithCancelCause(ctx)

c.queueMaintainerMu.Lock()
c.queueMaintainerEpoch++
epoch := c.queueMaintainerEpoch
c.queueMaintainerMu.Unlock()

go c.tryStartQueueMaintainer(startCtx, epoch)

default:
// Cancel any in-progress start attempts before stopping.
// Send a startstop.ErrStop to make sure services like
// Reindexer run any specific cleanup code for stops.
cancelQueueMaintainerStart(startstop.ErrStop)
cancelQueueMaintainerStart = func(_ error) {}

c.queueMaintainer.Stop()
}
}
}
}()

return nil
}

// Tries to start the queue maintainer after gaining leadership. We allow some
// retries with exponential backoff in case of failure, and in case the queue
// maintainer can't be started, we request resignation to allow another client
// to try and take over.
func (c *Client[TTx]) tryStartQueueMaintainer(ctx context.Context, epoch int64) {
const maxStartAttempts = 3

ctxCancelled := func() bool {
if ctx.Err() != nil {
c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Queue maintainer start cancelled")
return true
}
return false
}

// stopIfCurrentEpoch atomically checks whether this goroutine's epoch is
// still the active one and calls Stop only if it is. Combined with the
// epoch increment in handleLeadershipChangeLoop, prevents stale goroutine
// from stopping a maintainer started by a newer leadership term.
stopIfCurrentEpoch := func() bool {
c.queueMaintainerMu.Lock()
defer c.queueMaintainerMu.Unlock()

if c.queueMaintainerEpoch != epoch {
return false
}

c.queueMaintainer.Stop()
return true
}

var lastErr error
for attempt := 1; attempt <= maxStartAttempts; attempt++ {
if ctxCancelled() {
return
}

if lastErr = c.queueMaintainer.Start(ctx); lastErr == nil {
return
}

c.baseService.Logger.ErrorContext(ctx, c.baseService.Name+": Error starting queue maintainer",
slog.String("err", lastErr.Error()), slog.Int("attempt", attempt))

c.testSignals.queueMaintainerStartError.Signal(lastErr)

// Stop the queue maintainer to fully reset its state (and any
// sub-services) before retrying. The epoch check ensures a stale
// goroutine cannot stop a maintainer from a newer leadership term.
if !stopIfCurrentEpoch() {
return
}

if attempt < maxStartAttempts {
serviceutil.CancellableSleep(ctx, serviceutil.ExponentialBackoff(attempt, serviceutil.MaxAttemptsBeforeResetDefault))
}
}

if ctxCancelled() {
return
}

c.baseService.Logger.ErrorContext(ctx, c.baseService.Name+": Queue maintainer failed to start after all attempts, requesting leader resignation",
slog.String("err", lastErr.Error()))

c.testSignals.queueMaintainerStartRetriesExhausted.Signal(struct{}{})

if err := c.clientNotifyBundle.RequestResign(ctx); err != nil {
c.baseService.Logger.ErrorContext(ctx, c.baseService.Name+": Error requesting leader resignation", slog.String("err", err.Error()))
}
}

// Driver exposes the underlying driver used by the client.
//
// API is not stable. DO NOT USE.
Expand Down
2 changes: 1 addition & 1 deletion client_pilot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func Test_Client_PilotUsage(t *testing.T) {
pilot.testSignals.Init(t)

startClient(ctx, t, client)
client.testSignals.electedLeader.WaitOrTimeout()
client.queueMaintainerLeader.TestSignals.ElectedLeader.WaitOrTimeout()

pilot.testSignals.PeriodicJobGetAll.WaitOrTimeout()
pilot.testSignals.PeriodicJobUpsertMany.WaitOrTimeout()
Expand Down
30 changes: 15 additions & 15 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1116,15 +1116,15 @@ func Test_Client_Common(t *testing.T) {
startClient(ctx, t, client)

client.config.Logger.InfoContext(ctx, "Test waiting for client to be elected leader for the first time")
client.testSignals.electedLeader.WaitOrTimeout()
client.queueMaintainerLeader.TestSignals.ElectedLeader.WaitOrTimeout()
client.config.Logger.InfoContext(ctx, "Client was elected leader for the first time")

// We test the function with a forced resignation, but this is a general
// Notify test case so this could be changed to any notification.
require.NoError(t, client.Notify().RequestResign(ctx))

client.config.Logger.InfoContext(ctx, "Test waiting for client to be elected leader after forced resignation")
client.testSignals.electedLeader.WaitOrTimeout()
client.queueMaintainerLeader.TestSignals.ElectedLeader.WaitOrTimeout()
client.config.Logger.InfoContext(ctx, "Client was elected leader after forced resignation")
})

Expand All @@ -1137,7 +1137,7 @@ func Test_Client_Common(t *testing.T) {
startClient(ctx, t, client)

client.config.Logger.InfoContext(ctx, "Test waiting for client to be elected leader for the first time")
client.testSignals.electedLeader.WaitOrTimeout()
client.queueMaintainerLeader.TestSignals.ElectedLeader.WaitOrTimeout()
client.config.Logger.InfoContext(ctx, "Client was elected leader for the first time")

tx, err := bundle.dbPool.Begin(ctx)
Expand All @@ -1149,7 +1149,7 @@ func Test_Client_Common(t *testing.T) {
require.NoError(t, tx.Commit(ctx))

client.config.Logger.InfoContext(ctx, "Test waiting for client to be elected leader after forced resignation")
client.testSignals.electedLeader.WaitOrTimeout()
client.queueMaintainerLeader.TestSignals.ElectedLeader.WaitOrTimeout()
client.config.Logger.InfoContext(ctx, "Client was elected leader after forced resignation")
})

Expand Down Expand Up @@ -1422,7 +1422,7 @@ func Test_Client_Common(t *testing.T) {

// Despite no notifier, the client should still be able to elect itself
// leader.
client.testSignals.electedLeader.WaitOrTimeout()
client.queueMaintainerLeader.TestSignals.ElectedLeader.WaitOrTimeout()

event := riversharedtest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)
Expand Down Expand Up @@ -1450,7 +1450,7 @@ func Test_Client_Common(t *testing.T) {

// Despite no notifier, the client should still be able to elect itself
// leader.
client.testSignals.electedLeader.WaitOrTimeout()
client.queueMaintainerLeader.TestSignals.ElectedLeader.WaitOrTimeout()

event := riversharedtest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)
Expand Down Expand Up @@ -4881,7 +4881,7 @@ func Test_Client_Maintenance(t *testing.T) {
t.Helper()

startClient(ctx, t, client)
client.testSignals.electedLeader.WaitOrTimeout()
client.queueMaintainerLeader.TestSignals.ElectedLeader.WaitOrTimeout()
riversharedtest.WaitOrTimeout(t, client.queueMaintainer.Started())
}

Expand Down Expand Up @@ -5158,16 +5158,16 @@ func Test_Client_Maintenance(t *testing.T) {
client, _ := setup(t, config)

startClient(ctx, t, client)
client.testSignals.electedLeader.WaitOrTimeout()
client.queueMaintainerLeader.TestSignals.ElectedLeader.WaitOrTimeout()

// Wait for all 3 retry attempts to fail.
for range 3 {
err := client.testSignals.queueMaintainerStartError.WaitOrTimeout()
err := client.queueMaintainerLeader.TestSignals.StartError.WaitOrTimeout()
require.EqualError(t, err, "hook start error")
}

// After all retries exhausted, the client should request resignation.
client.testSignals.queueMaintainerStartRetriesExhausted.WaitOrTimeout()
client.queueMaintainerLeader.TestSignals.StartRetriesExhausted.WaitOrTimeout()
})

t.Run("PeriodicJobEnqueuerWithInsertOpts", func(t *testing.T) {
Expand Down Expand Up @@ -5276,7 +5276,7 @@ func Test_Client_Maintenance(t *testing.T) {

exec := client.driver.GetExecutor()

client.testSignals.electedLeader.WaitOrTimeout()
client.queueMaintainerLeader.TestSignals.ElectedLeader.WaitOrTimeout()

client.PeriodicJobs().Add(
NewPeriodicJob(cron.Every(15*time.Minute), func() (JobArgs, *InsertOpts) {
Expand Down Expand Up @@ -5322,7 +5322,7 @@ func Test_Client_Maintenance(t *testing.T) {

startClient(ctx, t, client)

client.testSignals.electedLeader.WaitOrTimeout()
client.queueMaintainerLeader.TestSignals.ElectedLeader.WaitOrTimeout()

svc := maintenance.GetService[*maintenance.PeriodicJobEnqueuer](client.queueMaintainer)
svc.TestSignals.EnteredLoop.WaitOrTimeout()
Expand Down Expand Up @@ -5395,7 +5395,7 @@ func Test_Client_Maintenance(t *testing.T) {

startClient(ctx, t, client)

client.testSignals.electedLeader.WaitOrTimeout()
client.queueMaintainerLeader.TestSignals.ElectedLeader.WaitOrTimeout()

svc := maintenance.GetService[*maintenance.PeriodicJobEnqueuer](client.queueMaintainer)
svc.TestSignals.EnteredLoop.WaitOrTimeout()
Expand Down Expand Up @@ -5440,7 +5440,7 @@ func Test_Client_Maintenance(t *testing.T) {

startClient(ctx, t, client)

client.testSignals.electedLeader.WaitOrTimeout()
client.queueMaintainerLeader.TestSignals.ElectedLeader.WaitOrTimeout()

svc := maintenance.GetService[*maintenance.PeriodicJobEnqueuer](client.queueMaintainer)
svc.TestSignals.EnteredLoop.WaitOrTimeout()
Expand Down Expand Up @@ -5535,7 +5535,7 @@ func Test_Client_Maintenance(t *testing.T) {

startClient(ctx, t, client)

client.testSignals.electedLeader.WaitOrTimeout()
client.queueMaintainerLeader.TestSignals.ElectedLeader.WaitOrTimeout()
qc := maintenance.GetService[*maintenance.QueueCleaner](client.queueMaintainer)
qc.TestSignals.DeletedBatch.WaitOrTimeout()

Expand Down
Loading
Loading