Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions internal/jobscheduler/adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package jobscheduler

import (
"context"
"sync"
"time"

"github.com/alecthomas/errors"

"github.com/block/cachew/internal/scheduler"
)

const (
// All jobs share a single conflict group so that same-queue jobs serialise,
// matching the old scheduler's one-job-per-queue behaviour.
adapterConflictGroup scheduler.ConflictGroup = "queue"

// Clone-like jobs are collapsed into a single job type for concurrency
// limiting, matching the old scheduler's MaxCloneConcurrency behaviour.
cloneJobType scheduler.JobType = "clone"
)

// SchedulerAdapter wraps a *scheduler.Scheduler to implement the Scheduler
// interface, allowing it to be used as a drop-in replacement for
// RootScheduler.
type SchedulerAdapter struct {
inner *scheduler.Scheduler
config Config
mu sync.Mutex
registeredTypes map[scheduler.JobType]bool
cloneTypeRegistered bool
}

// NewAdapter creates a scheduler adapter that wraps the new weighted fair
// queuing scheduler behind the old Scheduler interface.
func NewAdapter(ctx context.Context, config Config) (*SchedulerAdapter, error) {
config = normaliseConfig(config)
s, err := scheduler.New(ctx, scheduler.Config{
Alpha: 0.3,
CostTTL: time.Hour,
FairnessTTL: 10 * time.Minute,
CleanupInterval: time.Minute,
}, nil)
if err != nil {
return nil, errors.Wrap(err, "create new scheduler")
}
return &SchedulerAdapter{
inner: s,
config: config,
registeredTypes: make(map[scheduler.JobType]bool),
}, nil
}

// Close stops the underlying scheduler and waits for it to shut down.
func (a *SchedulerAdapter) Close() error {
a.inner.Close()
return nil
}

// Wait blocks until the underlying scheduler has shut down.
func (a *SchedulerAdapter) Wait() {
// The new scheduler's Close already waits for goroutines to exit.
}

func (a *SchedulerAdapter) WithQueuePrefix(prefix string) Scheduler {
return &prefixedScheduler{prefix: prefix + "-", scheduler: a}
}

func (a *SchedulerAdapter) Submit(queue, id string, run func(ctx context.Context) error) {
jt := a.ensureType(id)
a.inner.Submit(jt, queue, run)
}

func (a *SchedulerAdapter) SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error) {
jt := a.ensureType(id)
a.inner.SubmitPeriodicJob(jt, queue, interval, run)
}

func (a *SchedulerAdapter) ensureType(id string) scheduler.JobType {
if isCloneJob(id) {
a.mu.Lock()
defer a.mu.Unlock()
if !a.cloneTypeRegistered {
a.inner.RegisterType(cloneJobType, scheduler.JobTypeConfig{
DefaultCost: 10,
MaxConcurrency: a.config.MaxCloneConcurrency,
ConflictGroup: adapterConflictGroup,
Priority: scheduler.Priority(a.config.Concurrency),
})
a.cloneTypeRegistered = true
}
return cloneJobType
}

jt := scheduler.JobType(id)
a.mu.Lock()
defer a.mu.Unlock()
if !a.registeredTypes[jt] {
a.inner.RegisterType(jt, scheduler.JobTypeConfig{
DefaultCost: 1,
MaxConcurrency: a.config.Concurrency,
ConflictGroup: adapterConflictGroup,
Priority: scheduler.Priority(a.config.Concurrency),
})
a.registeredTypes[jt] = true
}
return jt
}
145 changes: 145 additions & 0 deletions internal/jobscheduler/adapter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package jobscheduler_test

import (
"context"
"fmt"
"log/slog"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/alecthomas/assert/v2"

"github.com/block/cachew/internal/jobscheduler"
"github.com/block/cachew/internal/logging"
)

func newAdapterScheduler(ctx context.Context, t *testing.T, config jobscheduler.Config) jobscheduler.Scheduler {
t.Helper()
s, err := jobscheduler.NewAdapter(ctx, config)
assert.NoError(t, err)
t.Cleanup(func() { s.Close() })
return s
}

func TestAdapterBasicSubmit(t *testing.T) {
_, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError})
ctx, cancel := context.WithCancel(ctx)
defer cancel()

s := newAdapterScheduler(ctx, t, jobscheduler.Config{Concurrency: 2})

var executed atomic.Bool
s.Submit("queue1", "job1", func(_ context.Context) error {
executed.Store(true)
return nil
})

eventually(t, time.Second, executed.Load, "job should execute")
}

func TestAdapterQueueIsolation(t *testing.T) {
_, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError})
ctx, cancel := context.WithCancel(ctx)
defer cancel()

s := newAdapterScheduler(ctx, t, jobscheduler.Config{Concurrency: 4})

var running atomic.Int32
var violation atomic.Bool
blocker := make(chan struct{})

s.Submit("queue1", "job1", func(_ context.Context) error {
running.Add(1)
defer running.Add(-1)
<-blocker
return nil
})
// Same queue — should not run concurrently.
s.Submit("queue1", "job2", func(_ context.Context) error {
if running.Load() > 0 {
violation.Store(true)
}
return nil
})

time.Sleep(100 * time.Millisecond)
close(blocker)
time.Sleep(100 * time.Millisecond)

assert.False(t, violation.Load(), "same-queue jobs ran concurrently")
}

func TestAdapterWithQueuePrefix(t *testing.T) {
_, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError})
ctx, cancel := context.WithCancel(ctx)
defer cancel()

s := newAdapterScheduler(ctx, t, jobscheduler.Config{Concurrency: 4})
prefixed := s.WithQueuePrefix("git")

var executed atomic.Bool
prefixed.Submit("repo1", "clone", func(_ context.Context) error {
executed.Store(true)
return nil
})

eventually(t, time.Second, executed.Load, "prefixed job should execute")
}

func TestAdapterCloneConcurrencyLimit(t *testing.T) {
_, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError})
ctx, cancel := context.WithCancel(ctx)
defer cancel()

s := newAdapterScheduler(ctx, t, jobscheduler.Config{Concurrency: 4, MaxCloneConcurrency: 2})

var (
running atomic.Int32
maxConcurrent atomic.Int32
done sync.WaitGroup
)

for i := range 6 {
done.Add(1)
queue := fmt.Sprintf("repo%d", i)
s.Submit(queue, "clone", func(_ context.Context) error {
defer done.Done()
cur := running.Add(1)
defer running.Add(-1)
for {
maxVal := maxConcurrent.Load()
if cur <= maxVal {
break
}
if maxConcurrent.CompareAndSwap(maxVal, cur) {
break
}
}
time.Sleep(50 * time.Millisecond)
return nil
})
}

done.Wait()
assert.True(t, maxConcurrent.Load() <= 2,
"max concurrent clone jobs (%d) should not exceed 2", maxConcurrent.Load())
}

func TestAdapterPeriodicJob(t *testing.T) {
_, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError})
ctx, cancel := context.WithCancel(ctx)
defer cancel()

s := newAdapterScheduler(ctx, t, jobscheduler.Config{Concurrency: 2})

var count atomic.Int32
s.SubmitPeriodicJob("queue1", "periodic", 50*time.Millisecond, func(_ context.Context) error {
count.Add(1)
return nil
})

eventually(t, time.Second, func() bool { return count.Load() >= 3 },
"periodic job should run at least 3 times")
}
44 changes: 34 additions & 10 deletions internal/jobscheduler/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/block/cachew/internal/featureflags"
"github.com/block/cachew/internal/logging"
)

Expand Down Expand Up @@ -39,6 +40,10 @@ func (j *queueJob) Run(ctx context.Context) error { return errors.WithStack(j.ru
// Its primary role is to rate limit concurrent background tasks so that we don't DoS the host when, for example,
// generating git snapshots, GCing git repos, etc.
type Scheduler interface {
// Close releases resources held by the scheduler.
Close() error
// Wait blocks until all background goroutines have exited.
Wait()
// WithQueuePrefix creates a new Scheduler that prefixes all queue names with the given prefix.
//
// This is useful to avoid collisions across strategies.
Expand All @@ -58,6 +63,9 @@ type prefixedScheduler struct {
scheduler Scheduler
}

func (p *prefixedScheduler) Close() error { return errors.WithStack(p.scheduler.Close()) }
func (p *prefixedScheduler) Wait() { p.scheduler.Wait() }

func (p *prefixedScheduler) Submit(queue, id string, run func(ctx context.Context) error) {
p.scheduler.Submit(queue, p.prefix+id, run)
}
Expand Down Expand Up @@ -88,20 +96,41 @@ type RootScheduler struct {

var _ Scheduler = &RootScheduler{}

type Provider func() (*RootScheduler, error)
// Provider is a lazy singleton that returns a Scheduler implementation.
type Provider func() (Scheduler, error)

var newSchedulerFlag = featureflags.New("newscheduler", false) //nolint:gochecknoglobals

// NewProvider returns a scheduler singleton provider function.
func NewProvider(ctx context.Context, config Config) Provider {
return sync.OnceValues(func() (*RootScheduler, error) {
return sync.OnceValues(func() (Scheduler, error) {
return New(ctx, config)
})
}

// New creates a new JobScheduler.
func New(ctx context.Context, config Config) (*RootScheduler, error) {
// New creates a new Scheduler. When the CACHEW_FF_NEWSCHEDULER feature flag is
// set, it returns the new weighted fair queuing implementation.
func normaliseConfig(config Config) Config {
if config.Concurrency == 0 {
config.Concurrency = runtime.NumCPU()
}
if config.MaxCloneConcurrency == 0 && config.Concurrency > 1 {
config.MaxCloneConcurrency = max(1, config.Concurrency/2)
}
return config
}

// New creates a new Scheduler. When the CACHEW_FF_NEWSCHEDULER feature flag is
// set, it returns the new weighted fair queuing implementation.
func New(ctx context.Context, config Config) (Scheduler, error) {
config = normaliseConfig(config)
if newSchedulerFlag.Get() {
return NewAdapter(ctx, config)
}
return newRootScheduler(ctx, config)
}

func newRootScheduler(ctx context.Context, config Config) (*RootScheduler, error) {
var store ScheduleStore
if config.SchedulerDB != "" {
var err error
Expand All @@ -110,16 +139,11 @@ func New(ctx context.Context, config Config) (*RootScheduler, error) {
return nil, errors.Wrap(err, "create schedule store")
}
}
maxClones := config.MaxCloneConcurrency
if maxClones == 0 && config.Concurrency > 1 {
// Default: reserve at least half the workers for non-clone jobs.
maxClones = max(1, config.Concurrency/2)
}
m := newSchedulerMetrics()
q := &RootScheduler{
workAvailable: make(chan bool, 1024),
active: make(map[string]string),
maxCloneConcurrency: maxClones,
maxCloneConcurrency: config.MaxCloneConcurrency,
store: store,
metrics: m,
}
Expand Down
10 changes: 10 additions & 0 deletions internal/jobscheduler/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package jobscheduler_test
import (
"context"
"log/slog"
"os"
"path/filepath"
"sync/atomic"
"testing"
Expand All @@ -14,6 +15,13 @@ import (
"github.com/block/cachew/internal/logging"
)

func skipIfNewScheduler(t *testing.T) {
t.Helper()
if os.Getenv("CACHEW_FF_NEWSCHEDULER") != "" {
t.Skip("store persistence not supported by new scheduler adapter")
}
}

func TestScheduleStoreRoundTrip(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), "scheduler.db")
store, err := jobscheduler.NewScheduleStore(dbPath)
Expand Down Expand Up @@ -81,6 +89,7 @@ func TestScheduleStoreInvalidPath(t *testing.T) {
}

func TestPeriodicJobDelaysWhenRecentlyRun(t *testing.T) {
skipIfNewScheduler(t)
_, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError})
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down Expand Up @@ -154,6 +163,7 @@ func TestPeriodicJobRunsImmediatelyWhenIntervalElapsed(t *testing.T) {
}

func TestPeriodicJobRecordsLastRun(t *testing.T) {
skipIfNewScheduler(t)
_, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError})
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down
Loading