Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,11 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
scheduleFunc = config.ReindexerSchedule.Next
}

indexNames := maintenance.DefaultReindexerIndexNames()
indexNames = append(indexNames, client.pilot.ReindexerIndexNames()...)

reindexer := maintenance.NewReindexer(archetype, &maintenance.ReindexerConfig{
IndexNames: sliceutil.Uniq(indexNames),
ScheduleFunc: scheduleFunc,
Schema: config.Schema,
Timeout: config.ReindexerTimeout,
Expand Down
17 changes: 10 additions & 7 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7254,13 +7254,16 @@ func Test_NewClient_Defaults(t *testing.T) {
require.False(t, enqueuer.StaggerStartupIsDisabled())

reindexer := maintenance.GetService[*maintenance.Reindexer](client.queueMaintainer)
require.Contains(t, reindexer.Config.IndexNames, "river_job_args_index")
require.Contains(t, reindexer.Config.IndexNames, "river_job_kind")
require.Contains(t, reindexer.Config.IndexNames, "river_job_metadata_index")
require.Contains(t, reindexer.Config.IndexNames, "river_job_pkey")
require.Contains(t, reindexer.Config.IndexNames, "river_job_prioritized_fetching_index")
require.Contains(t, reindexer.Config.IndexNames, "river_job_state_and_finalized_at_index")
require.Contains(t, reindexer.Config.IndexNames, "river_job_unique_idx")
// Assert the exact list so index list changes require explicit test updates.
require.Equal(t, []string{
"river_job_args_index",
"river_job_kind",
"river_job_metadata_index",
"river_job_pkey",
"river_job_prioritized_fetching_index",
"river_job_state_and_finalized_at_index",
"river_job_unique_idx",
}, reindexer.Config.IndexNames)
now := time.Now().UTC()
nextMidnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC).AddDate(0, 0, 1)
require.Equal(t, nextMidnight, reindexer.Config.ScheduleFunc(now))
Expand Down
2 changes: 1 addition & 1 deletion insert_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ var jobStateAll = rivertype.JobStates() //nolint:gochecknoglobals
// what to do about the job that can't be scheduled. We can't send feedback to
// the caller at this point, so probably the best we could do is leave it in
// this untransitionable state until the `running` job finished, which isn't
// particularly satsifactory.
// particularly satisfactory.
var requiredV3states = []rivertype.JobState{ //nolint:gochecknoglobals
rivertype.JobStateAvailable,
rivertype.JobStatePending,
Expand Down
37 changes: 35 additions & 2 deletions internal/maintenance/reindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ var defaultIndexNames = []string{ //nolint:gochecknoglobals
"river_job_unique_idx",
}

// DefaultReindexerIndexNames returns the default set of indexes reindexed by River.
func DefaultReindexerIndexNames() []string {
return append([]string(nil), defaultIndexNames...)
}

// ReindexerTestSignals are internal signals used exclusively in tests.
type ReindexerTestSignals struct {
Reindexed testsignal.TestSignal[struct{}] // notifies when a run finishes executing reindexes for all indexes
Expand Down Expand Up @@ -137,7 +142,16 @@ func (s *Reindexer) Start(ctx context.Context) error {
for {
select {
case <-timerUntilNextRun.C:
for _, indexName := range s.Config.IndexNames {
reindexableIndexNames, err := s.reindexableIndexNames(ctx)
if err != nil {
if !errors.Is(err, context.Canceled) {
s.Logger.ErrorContext(ctx, s.Name+": Error listing reindexable indexes", slog.String("error", err.Error()))
}
timerUntilNextRun.Reset(time.Until(nextRunAt))
continue
}

for _, indexName := range reindexableIndexNames {
if _, err := s.reindexOne(ctx, indexName); err != nil {
if !errors.Is(err, context.Canceled) {
s.Logger.ErrorContext(ctx, s.Name+": Error reindexing", slog.String("error", err.Error()), slog.String("index_name", indexName))
Expand All @@ -155,7 +169,7 @@ func (s *Reindexer) Start(ctx context.Context) error {

// TODO: maybe we should log differently if some of these fail?
s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRanSuccessfully,
slog.Time("next_run_at", nextRunAt), slog.Int("num_reindexes_initiated", len(s.Config.IndexNames)))
slog.Time("next_run_at", nextRunAt), slog.Int("num_reindexes_initiated", len(reindexableIndexNames)))

// Reset the timer after the insert loop has finished so it's
// paused during work. Makes its firing more deterministic.
Expand All @@ -176,6 +190,25 @@ func (s *Reindexer) Start(ctx context.Context) error {
return nil
}

func (s *Reindexer) reindexableIndexNames(ctx context.Context) ([]string, error) {
indexesExist, err := s.exec.IndexesExist(ctx, &riverdriver.IndexesExistParams{
IndexNames: s.Config.IndexNames,
Schema: s.Config.Schema,
})
if err != nil {
return nil, err
}

indexNames := make([]string, 0, len(s.Config.IndexNames))
for _, indexName := range s.Config.IndexNames {
if indexesExist[indexName] {
indexNames = append(indexNames, indexName)
}
}

return indexNames, nil
}

func (s *Reindexer) reindexOne(ctx context.Context, indexName string) (bool, error) {
var cancel func()
if s.Config.Timeout > -1 {
Expand Down
27 changes: 26 additions & 1 deletion internal/maintenance/reindexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,22 @@ func TestReindexer(t *testing.T) {
require.True(t, requireReindexOne(indexName))
})

t.Run("ReindexableIndexNamesSkipsMissingIndexes", func(t *testing.T) {
t.Parallel()

svc, _ := setup(t)

svc.Config.IndexNames = []string{
"does_not_exist",
"river_job_kind",
"river_job_prioritized_fetching_index",
}

indexNames, err := svc.reindexableIndexNames(ctx)
require.NoError(t, err)
require.Equal(t, []string{"river_job_kind", "river_job_prioritized_fetching_index"}, indexNames)
})

t.Run("ReindexesMinimalSubsetofIndexes", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -249,7 +265,16 @@ func TestReindexer(t *testing.T) {
svc, bundle := setup(t)
svc = NewReindexer(&svc.Archetype, &ReindexerConfig{}, bundle.exec)

require.Equal(t, defaultIndexNames, svc.Config.IndexNames)
// Assert the exact list so index list changes require explicit test updates.
require.Equal(t, []string{
"river_job_args_index",
"river_job_kind",
"river_job_metadata_index",
"river_job_pkey",
"river_job_prioritized_fetching_index",
"river_job_state_and_finalized_at_index",
"river_job_unique_idx",
}, svc.Config.IndexNames)
require.Equal(t, ReindexerTimeoutDefault, svc.Config.Timeout)
require.Equal(t, svc.Config.ScheduleFunc(bundle.now), (&DefaultReindexerSchedule{}).Next(bundle.now))
})
Expand Down
42 changes: 40 additions & 2 deletions plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/jackc/pgx/v5/pgxpool"
"github.com/stretchr/testify/require"

"github.com/riverqueue/river/internal/maintenance"
"github.com/riverqueue/river/riverdbtest"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivershared/baseservice"
Expand Down Expand Up @@ -117,6 +118,38 @@ func TestClientPilotPlugin(t *testing.T) {
}
}

t.Run("ReindexerIndexNamesAddedAndDeduplicated", func(t *testing.T) {
t.Parallel()

var (
dbPool = riversharedtest.DBPool(ctx, t)
driver = riverpgxv5.New(dbPool)
schema = riverdbtest.TestSchema(ctx, t, driver, nil)
config = newTestConfig(t, schema)
pluginDriver = newDriverWithPlugin(t, dbPool)
pluginPilot = newPilotWithPlugin(t)
)
// Include a default index name to verify the merged list is deduplicated.
pluginPilot.reindexerIndexNames = []string{"plugin_custom_index", "river_job_kind"}
pluginDriver.pilot = pluginPilot

client, err := NewClient(pluginDriver, config)
require.NoError(t, err)

reindexer := maintenance.GetService[*maintenance.Reindexer](client.queueMaintainer)
// Assert the exact list so index list changes require explicit test updates.
require.Equal(t, []string{
"river_job_args_index",
"river_job_kind",
"river_job_metadata_index",
"river_job_pkey",
"river_job_prioritized_fetching_index",
"river_job_state_and_finalized_at_index",
"river_job_unique_idx",
"plugin_custom_index",
}, reindexer.Config.IndexNames)
})

t.Run("ServicesStart", func(t *testing.T) {
t.Parallel()

Expand All @@ -136,8 +169,9 @@ var _ pilotPlugin = &TestPilotWithPlugin{}
type TestPilotWithPlugin struct {
riverpilot.StandardPilot

maintenanceService startstop.Service
service startstop.Service
maintenanceService startstop.Service
reindexerIndexNames []string
service startstop.Service
}

func newPilotWithPlugin(t *testing.T) *TestPilotWithPlugin {
Expand Down Expand Up @@ -176,3 +210,7 @@ func (d *TestPilotWithPlugin) PluginMaintenanceServices() []startstop.Service {
func (d *TestPilotWithPlugin) PluginServices() []startstop.Service {
return []startstop.Service{d.service}
}

func (d *TestPilotWithPlugin) ReindexerIndexNames() []string {
return d.reindexerIndexNames
}
2 changes: 2 additions & 0 deletions rivershared/riverpilot/pilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type Pilot interface {
ProducerShutdown(ctx context.Context, exec riverdriver.Executor, params *ProducerShutdownParams) error

QueueMetadataChanged(ctx context.Context, exec riverdriver.Executor, params *QueueMetadataChangedParams) error

ReindexerIndexNames() []string
}

// PilotInitParams are parameters for initializing a pilot.
Expand Down
2 changes: 2 additions & 0 deletions rivershared/riverpilot/standard_pilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ func (p *StandardPilot) QueueMetadataChanged(ctx context.Context, exec riverdriv
return nil
}

func (p *StandardPilot) ReindexerIndexNames() []string { return nil }

type standardProducerState struct{}

func (s *standardProducerState) JobFinish(job *rivertype.JobRow) {
Expand Down
Loading