From 62d45d4fc635474cacda125c9ecb0f7d9b246fc9 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Thu, 2 Apr 2026 20:41:49 -0500 Subject: [PATCH] allow pilot to add reindex indexes The reindexer operates on a fixed list of indexes. I'm not certain if we have a compelling use case to expose a generalized config for this, but in the meantime, we have pro features which would benefit from periodic re-indexing. Add a pilot hook that allows for extra reindex targets while preserving the existing schedule and timeout behavior. The reindexer also filters configured targets through `IndexesExist` so missing optional indexes are skipped cleanly, and tests pin the default, merged, and deduplicated target lists. --- client.go | 4 +++ client_test.go | 17 ++++++---- insert_opts.go | 2 +- internal/maintenance/reindexer.go | 37 +++++++++++++++++++-- internal/maintenance/reindexer_test.go | 27 ++++++++++++++- plugin_test.go | 42 ++++++++++++++++++++++-- rivershared/riverpilot/pilot.go | 2 ++ rivershared/riverpilot/standard_pilot.go | 2 ++ 8 files changed, 120 insertions(+), 13 deletions(-) diff --git a/client.go b/client.go index c07c70d4..caeec734 100644 --- a/client.go +++ b/client.go @@ -938,7 +938,11 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client scheduleFunc = config.ReindexerSchedule.Next } + indexNames := maintenance.DefaultReindexerIndexNames() + indexNames = append(indexNames, client.pilot.ReindexerIndexNames()...) + reindexer := maintenance.NewReindexer(archetype, &maintenance.ReindexerConfig{ + IndexNames: sliceutil.Uniq(indexNames), ScheduleFunc: scheduleFunc, Schema: config.Schema, Timeout: config.ReindexerTimeout, diff --git a/client_test.go b/client_test.go index cb5eed0f..418a75b4 100644 --- a/client_test.go +++ b/client_test.go @@ -7254,13 +7254,16 @@ func Test_NewClient_Defaults(t *testing.T) { require.False(t, enqueuer.StaggerStartupIsDisabled()) reindexer := maintenance.GetService[*maintenance.Reindexer](client.queueMaintainer) - require.Contains(t, reindexer.Config.IndexNames, "river_job_args_index") - require.Contains(t, reindexer.Config.IndexNames, "river_job_kind") - require.Contains(t, reindexer.Config.IndexNames, "river_job_metadata_index") - require.Contains(t, reindexer.Config.IndexNames, "river_job_pkey") - require.Contains(t, reindexer.Config.IndexNames, "river_job_prioritized_fetching_index") - require.Contains(t, reindexer.Config.IndexNames, "river_job_state_and_finalized_at_index") - require.Contains(t, reindexer.Config.IndexNames, "river_job_unique_idx") + // Assert the exact list so index list changes require explicit test updates. + require.Equal(t, []string{ + "river_job_args_index", + "river_job_kind", + "river_job_metadata_index", + "river_job_pkey", + "river_job_prioritized_fetching_index", + "river_job_state_and_finalized_at_index", + "river_job_unique_idx", + }, reindexer.Config.IndexNames) now := time.Now().UTC() nextMidnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC).AddDate(0, 0, 1) require.Equal(t, nextMidnight, reindexer.Config.ScheduleFunc(now)) diff --git a/insert_opts.go b/insert_opts.go index 032e235e..bc1443d8 100644 --- a/insert_opts.go +++ b/insert_opts.go @@ -240,7 +240,7 @@ var jobStateAll = rivertype.JobStates() //nolint:gochecknoglobals // what to do about the job that can't be scheduled. We can't send feedback to // the caller at this point, so probably the best we could do is leave it in // this untransitionable state until the `running` job finished, which isn't -// particularly satsifactory. +// particularly satisfactory. var requiredV3states = []rivertype.JobState{ //nolint:gochecknoglobals rivertype.JobStateAvailable, rivertype.JobStatePending, diff --git a/internal/maintenance/reindexer.go b/internal/maintenance/reindexer.go index 7ff4bde9..070b089b 100644 --- a/internal/maintenance/reindexer.go +++ b/internal/maintenance/reindexer.go @@ -39,6 +39,11 @@ var defaultIndexNames = []string{ //nolint:gochecknoglobals "river_job_unique_idx", } +// DefaultReindexerIndexNames returns the default set of indexes reindexed by River. +func DefaultReindexerIndexNames() []string { + return append([]string(nil), defaultIndexNames...) +} + // ReindexerTestSignals are internal signals used exclusively in tests. type ReindexerTestSignals struct { Reindexed testsignal.TestSignal[struct{}] // notifies when a run finishes executing reindexes for all indexes @@ -137,7 +142,16 @@ func (s *Reindexer) Start(ctx context.Context) error { for { select { case <-timerUntilNextRun.C: - for _, indexName := range s.Config.IndexNames { + reindexableIndexNames, err := s.reindexableIndexNames(ctx) + if err != nil { + if !errors.Is(err, context.Canceled) { + s.Logger.ErrorContext(ctx, s.Name+": Error listing reindexable indexes", slog.String("error", err.Error())) + } + timerUntilNextRun.Reset(time.Until(nextRunAt)) + continue + } + + for _, indexName := range reindexableIndexNames { if _, err := s.reindexOne(ctx, indexName); err != nil { if !errors.Is(err, context.Canceled) { s.Logger.ErrorContext(ctx, s.Name+": Error reindexing", slog.String("error", err.Error()), slog.String("index_name", indexName)) @@ -155,7 +169,7 @@ func (s *Reindexer) Start(ctx context.Context) error { // TODO: maybe we should log differently if some of these fail? s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRanSuccessfully, - slog.Time("next_run_at", nextRunAt), slog.Int("num_reindexes_initiated", len(s.Config.IndexNames))) + slog.Time("next_run_at", nextRunAt), slog.Int("num_reindexes_initiated", len(reindexableIndexNames))) // Reset the timer after the insert loop has finished so it's // paused during work. Makes its firing more deterministic. @@ -176,6 +190,25 @@ func (s *Reindexer) Start(ctx context.Context) error { return nil } +func (s *Reindexer) reindexableIndexNames(ctx context.Context) ([]string, error) { + indexesExist, err := s.exec.IndexesExist(ctx, &riverdriver.IndexesExistParams{ + IndexNames: s.Config.IndexNames, + Schema: s.Config.Schema, + }) + if err != nil { + return nil, err + } + + indexNames := make([]string, 0, len(s.Config.IndexNames)) + for _, indexName := range s.Config.IndexNames { + if indexesExist[indexName] { + indexNames = append(indexNames, indexName) + } + } + + return indexNames, nil +} + func (s *Reindexer) reindexOne(ctx context.Context, indexName string) (bool, error) { var cancel func() if s.Config.Timeout > -1 { diff --git a/internal/maintenance/reindexer_test.go b/internal/maintenance/reindexer_test.go index 04f28cf3..9fcb6782 100644 --- a/internal/maintenance/reindexer_test.go +++ b/internal/maintenance/reindexer_test.go @@ -127,6 +127,22 @@ func TestReindexer(t *testing.T) { require.True(t, requireReindexOne(indexName)) }) + t.Run("ReindexableIndexNamesSkipsMissingIndexes", func(t *testing.T) { + t.Parallel() + + svc, _ := setup(t) + + svc.Config.IndexNames = []string{ + "does_not_exist", + "river_job_kind", + "river_job_prioritized_fetching_index", + } + + indexNames, err := svc.reindexableIndexNames(ctx) + require.NoError(t, err) + require.Equal(t, []string{"river_job_kind", "river_job_prioritized_fetching_index"}, indexNames) + }) + t.Run("ReindexesMinimalSubsetofIndexes", func(t *testing.T) { t.Parallel() @@ -249,7 +265,16 @@ func TestReindexer(t *testing.T) { svc, bundle := setup(t) svc = NewReindexer(&svc.Archetype, &ReindexerConfig{}, bundle.exec) - require.Equal(t, defaultIndexNames, svc.Config.IndexNames) + // Assert the exact list so index list changes require explicit test updates. + require.Equal(t, []string{ + "river_job_args_index", + "river_job_kind", + "river_job_metadata_index", + "river_job_pkey", + "river_job_prioritized_fetching_index", + "river_job_state_and_finalized_at_index", + "river_job_unique_idx", + }, svc.Config.IndexNames) require.Equal(t, ReindexerTimeoutDefault, svc.Config.Timeout) require.Equal(t, svc.Config.ScheduleFunc(bundle.now), (&DefaultReindexerSchedule{}).Next(bundle.now)) }) diff --git a/plugin_test.go b/plugin_test.go index 13135c6f..685adac5 100644 --- a/plugin_test.go +++ b/plugin_test.go @@ -8,6 +8,7 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/stretchr/testify/require" + "github.com/riverqueue/river/internal/maintenance" "github.com/riverqueue/river/riverdbtest" "github.com/riverqueue/river/riverdriver/riverpgxv5" "github.com/riverqueue/river/rivershared/baseservice" @@ -117,6 +118,38 @@ func TestClientPilotPlugin(t *testing.T) { } } + t.Run("ReindexerIndexNamesAddedAndDeduplicated", func(t *testing.T) { + t.Parallel() + + var ( + dbPool = riversharedtest.DBPool(ctx, t) + driver = riverpgxv5.New(dbPool) + schema = riverdbtest.TestSchema(ctx, t, driver, nil) + config = newTestConfig(t, schema) + pluginDriver = newDriverWithPlugin(t, dbPool) + pluginPilot = newPilotWithPlugin(t) + ) + // Include a default index name to verify the merged list is deduplicated. + pluginPilot.reindexerIndexNames = []string{"plugin_custom_index", "river_job_kind"} + pluginDriver.pilot = pluginPilot + + client, err := NewClient(pluginDriver, config) + require.NoError(t, err) + + reindexer := maintenance.GetService[*maintenance.Reindexer](client.queueMaintainer) + // Assert the exact list so index list changes require explicit test updates. + require.Equal(t, []string{ + "river_job_args_index", + "river_job_kind", + "river_job_metadata_index", + "river_job_pkey", + "river_job_prioritized_fetching_index", + "river_job_state_and_finalized_at_index", + "river_job_unique_idx", + "plugin_custom_index", + }, reindexer.Config.IndexNames) + }) + t.Run("ServicesStart", func(t *testing.T) { t.Parallel() @@ -136,8 +169,9 @@ var _ pilotPlugin = &TestPilotWithPlugin{} type TestPilotWithPlugin struct { riverpilot.StandardPilot - maintenanceService startstop.Service - service startstop.Service + maintenanceService startstop.Service + reindexerIndexNames []string + service startstop.Service } func newPilotWithPlugin(t *testing.T) *TestPilotWithPlugin { @@ -176,3 +210,7 @@ func (d *TestPilotWithPlugin) PluginMaintenanceServices() []startstop.Service { func (d *TestPilotWithPlugin) PluginServices() []startstop.Service { return []startstop.Service{d.service} } + +func (d *TestPilotWithPlugin) ReindexerIndexNames() []string { + return d.reindexerIndexNames +} diff --git a/rivershared/riverpilot/pilot.go b/rivershared/riverpilot/pilot.go index e42ef797..53c7e359 100644 --- a/rivershared/riverpilot/pilot.go +++ b/rivershared/riverpilot/pilot.go @@ -57,6 +57,8 @@ type Pilot interface { ProducerShutdown(ctx context.Context, exec riverdriver.Executor, params *ProducerShutdownParams) error QueueMetadataChanged(ctx context.Context, exec riverdriver.Executor, params *QueueMetadataChangedParams) error + + ReindexerIndexNames() []string } // PilotInitParams are parameters for initializing a pilot. diff --git a/rivershared/riverpilot/standard_pilot.go b/rivershared/riverpilot/standard_pilot.go index 22598eb9..8f3d2d55 100644 --- a/rivershared/riverpilot/standard_pilot.go +++ b/rivershared/riverpilot/standard_pilot.go @@ -75,6 +75,8 @@ func (p *StandardPilot) QueueMetadataChanged(ctx context.Context, exec riverdriv return nil } +func (p *StandardPilot) ReindexerIndexNames() []string { return nil } + type standardProducerState struct{} func (s *standardProducerState) JobFinish(job *rivertype.JobRow) {