diff --git a/internal/jobscheduler/adapter.go b/internal/jobscheduler/adapter.go new file mode 100644 index 0000000..18385bb --- /dev/null +++ b/internal/jobscheduler/adapter.go @@ -0,0 +1,108 @@ +package jobscheduler + +import ( + "context" + "sync" + "time" + + "github.com/alecthomas/errors" + + "github.com/block/cachew/internal/scheduler" +) + +const ( + // All jobs share a single conflict group so that same-queue jobs serialise, + // matching the old scheduler's one-job-per-queue behaviour. + adapterConflictGroup scheduler.ConflictGroup = "queue" + + // Clone-like jobs are collapsed into a single job type for concurrency + // limiting, matching the old scheduler's MaxCloneConcurrency behaviour. + cloneJobType scheduler.JobType = "clone" +) + +// SchedulerAdapter wraps a *scheduler.Scheduler to implement the Scheduler +// interface, allowing it to be used as a drop-in replacement for +// RootScheduler. +type SchedulerAdapter struct { + inner *scheduler.Scheduler + config Config + mu sync.Mutex + registeredTypes map[scheduler.JobType]bool + cloneTypeRegistered bool +} + +// NewAdapter creates a scheduler adapter that wraps the new weighted fair +// queuing scheduler behind the old Scheduler interface. +func NewAdapter(ctx context.Context, config Config) (*SchedulerAdapter, error) { + config = normaliseConfig(config) + s, err := scheduler.New(ctx, scheduler.Config{ + Alpha: 0.3, + CostTTL: time.Hour, + FairnessTTL: 10 * time.Minute, + CleanupInterval: time.Minute, + }, nil) + if err != nil { + return nil, errors.Wrap(err, "create new scheduler") + } + return &SchedulerAdapter{ + inner: s, + config: config, + registeredTypes: make(map[scheduler.JobType]bool), + }, nil +} + +// Close stops the underlying scheduler and waits for it to shut down. +func (a *SchedulerAdapter) Close() error { + a.inner.Close() + return nil +} + +// Wait blocks until the underlying scheduler has shut down. +func (a *SchedulerAdapter) Wait() { + // The new scheduler's Close already waits for goroutines to exit. +} + +func (a *SchedulerAdapter) WithQueuePrefix(prefix string) Scheduler { + return &prefixedScheduler{prefix: prefix + "-", scheduler: a} +} + +func (a *SchedulerAdapter) Submit(queue, id string, run func(ctx context.Context) error) { + jt := a.ensureType(id) + a.inner.Submit(jt, queue, run) +} + +func (a *SchedulerAdapter) SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error) { + jt := a.ensureType(id) + a.inner.SubmitPeriodicJob(jt, queue, interval, run) +} + +func (a *SchedulerAdapter) ensureType(id string) scheduler.JobType { + if isCloneJob(id) { + a.mu.Lock() + defer a.mu.Unlock() + if !a.cloneTypeRegistered { + a.inner.RegisterType(cloneJobType, scheduler.JobTypeConfig{ + DefaultCost: 10, + MaxConcurrency: a.config.MaxCloneConcurrency, + ConflictGroup: adapterConflictGroup, + Priority: scheduler.Priority(a.config.Concurrency), + }) + a.cloneTypeRegistered = true + } + return cloneJobType + } + + jt := scheduler.JobType(id) + a.mu.Lock() + defer a.mu.Unlock() + if !a.registeredTypes[jt] { + a.inner.RegisterType(jt, scheduler.JobTypeConfig{ + DefaultCost: 1, + MaxConcurrency: a.config.Concurrency, + ConflictGroup: adapterConflictGroup, + Priority: scheduler.Priority(a.config.Concurrency), + }) + a.registeredTypes[jt] = true + } + return jt +} diff --git a/internal/jobscheduler/adapter_test.go b/internal/jobscheduler/adapter_test.go new file mode 100644 index 0000000..0476e6b --- /dev/null +++ b/internal/jobscheduler/adapter_test.go @@ -0,0 +1,145 @@ +package jobscheduler_test + +import ( + "context" + "fmt" + "log/slog" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/alecthomas/assert/v2" + + "github.com/block/cachew/internal/jobscheduler" + "github.com/block/cachew/internal/logging" +) + +func newAdapterScheduler(ctx context.Context, t *testing.T, config jobscheduler.Config) jobscheduler.Scheduler { + t.Helper() + s, err := jobscheduler.NewAdapter(ctx, config) + assert.NoError(t, err) + t.Cleanup(func() { s.Close() }) + return s +} + +func TestAdapterBasicSubmit(t *testing.T) { + _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + s := newAdapterScheduler(ctx, t, jobscheduler.Config{Concurrency: 2}) + + var executed atomic.Bool + s.Submit("queue1", "job1", func(_ context.Context) error { + executed.Store(true) + return nil + }) + + eventually(t, time.Second, executed.Load, "job should execute") +} + +func TestAdapterQueueIsolation(t *testing.T) { + _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + s := newAdapterScheduler(ctx, t, jobscheduler.Config{Concurrency: 4}) + + var running atomic.Int32 + var violation atomic.Bool + blocker := make(chan struct{}) + + s.Submit("queue1", "job1", func(_ context.Context) error { + running.Add(1) + defer running.Add(-1) + <-blocker + return nil + }) + // Same queue — should not run concurrently. + s.Submit("queue1", "job2", func(_ context.Context) error { + if running.Load() > 0 { + violation.Store(true) + } + return nil + }) + + time.Sleep(100 * time.Millisecond) + close(blocker) + time.Sleep(100 * time.Millisecond) + + assert.False(t, violation.Load(), "same-queue jobs ran concurrently") +} + +func TestAdapterWithQueuePrefix(t *testing.T) { + _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + s := newAdapterScheduler(ctx, t, jobscheduler.Config{Concurrency: 4}) + prefixed := s.WithQueuePrefix("git") + + var executed atomic.Bool + prefixed.Submit("repo1", "clone", func(_ context.Context) error { + executed.Store(true) + return nil + }) + + eventually(t, time.Second, executed.Load, "prefixed job should execute") +} + +func TestAdapterCloneConcurrencyLimit(t *testing.T) { + _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + s := newAdapterScheduler(ctx, t, jobscheduler.Config{Concurrency: 4, MaxCloneConcurrency: 2}) + + var ( + running atomic.Int32 + maxConcurrent atomic.Int32 + done sync.WaitGroup + ) + + for i := range 6 { + done.Add(1) + queue := fmt.Sprintf("repo%d", i) + s.Submit(queue, "clone", func(_ context.Context) error { + defer done.Done() + cur := running.Add(1) + defer running.Add(-1) + for { + maxVal := maxConcurrent.Load() + if cur <= maxVal { + break + } + if maxConcurrent.CompareAndSwap(maxVal, cur) { + break + } + } + time.Sleep(50 * time.Millisecond) + return nil + }) + } + + done.Wait() + assert.True(t, maxConcurrent.Load() <= 2, + "max concurrent clone jobs (%d) should not exceed 2", maxConcurrent.Load()) +} + +func TestAdapterPeriodicJob(t *testing.T) { + _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + s := newAdapterScheduler(ctx, t, jobscheduler.Config{Concurrency: 2}) + + var count atomic.Int32 + s.SubmitPeriodicJob("queue1", "periodic", 50*time.Millisecond, func(_ context.Context) error { + count.Add(1) + return nil + }) + + eventually(t, time.Second, func() bool { return count.Load() >= 3 }, + "periodic job should run at least 3 times") +} diff --git a/internal/jobscheduler/jobs.go b/internal/jobscheduler/jobs.go index 61fe365..70597d0 100644 --- a/internal/jobscheduler/jobs.go +++ b/internal/jobscheduler/jobs.go @@ -12,6 +12,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" + "github.com/block/cachew/internal/featureflags" "github.com/block/cachew/internal/logging" ) @@ -39,6 +40,10 @@ func (j *queueJob) Run(ctx context.Context) error { return errors.WithStack(j.ru // Its primary role is to rate limit concurrent background tasks so that we don't DoS the host when, for example, // generating git snapshots, GCing git repos, etc. type Scheduler interface { + // Close releases resources held by the scheduler. + Close() error + // Wait blocks until all background goroutines have exited. + Wait() // WithQueuePrefix creates a new Scheduler that prefixes all queue names with the given prefix. // // This is useful to avoid collisions across strategies. @@ -58,6 +63,9 @@ type prefixedScheduler struct { scheduler Scheduler } +func (p *prefixedScheduler) Close() error { return errors.WithStack(p.scheduler.Close()) } +func (p *prefixedScheduler) Wait() { p.scheduler.Wait() } + func (p *prefixedScheduler) Submit(queue, id string, run func(ctx context.Context) error) { p.scheduler.Submit(queue, p.prefix+id, run) } @@ -88,20 +96,41 @@ type RootScheduler struct { var _ Scheduler = &RootScheduler{} -type Provider func() (*RootScheduler, error) +// Provider is a lazy singleton that returns a Scheduler implementation. +type Provider func() (Scheduler, error) + +var newSchedulerFlag = featureflags.New("newscheduler", false) //nolint:gochecknoglobals // NewProvider returns a scheduler singleton provider function. func NewProvider(ctx context.Context, config Config) Provider { - return sync.OnceValues(func() (*RootScheduler, error) { + return sync.OnceValues(func() (Scheduler, error) { return New(ctx, config) }) } -// New creates a new JobScheduler. -func New(ctx context.Context, config Config) (*RootScheduler, error) { +// New creates a new Scheduler. When the CACHEW_FF_NEWSCHEDULER feature flag is +// set, it returns the new weighted fair queuing implementation. +func normaliseConfig(config Config) Config { if config.Concurrency == 0 { config.Concurrency = runtime.NumCPU() } + if config.MaxCloneConcurrency == 0 && config.Concurrency > 1 { + config.MaxCloneConcurrency = max(1, config.Concurrency/2) + } + return config +} + +// New creates a new Scheduler. When the CACHEW_FF_NEWSCHEDULER feature flag is +// set, it returns the new weighted fair queuing implementation. +func New(ctx context.Context, config Config) (Scheduler, error) { + config = normaliseConfig(config) + if newSchedulerFlag.Get() { + return NewAdapter(ctx, config) + } + return newRootScheduler(ctx, config) +} + +func newRootScheduler(ctx context.Context, config Config) (*RootScheduler, error) { var store ScheduleStore if config.SchedulerDB != "" { var err error @@ -110,16 +139,11 @@ func New(ctx context.Context, config Config) (*RootScheduler, error) { return nil, errors.Wrap(err, "create schedule store") } } - maxClones := config.MaxCloneConcurrency - if maxClones == 0 && config.Concurrency > 1 { - // Default: reserve at least half the workers for non-clone jobs. - maxClones = max(1, config.Concurrency/2) - } m := newSchedulerMetrics() q := &RootScheduler{ workAvailable: make(chan bool, 1024), active: make(map[string]string), - maxCloneConcurrency: maxClones, + maxCloneConcurrency: config.MaxCloneConcurrency, store: store, metrics: m, } diff --git a/internal/jobscheduler/store_test.go b/internal/jobscheduler/store_test.go index c3ed72b..125849d 100644 --- a/internal/jobscheduler/store_test.go +++ b/internal/jobscheduler/store_test.go @@ -3,6 +3,7 @@ package jobscheduler_test import ( "context" "log/slog" + "os" "path/filepath" "sync/atomic" "testing" @@ -14,6 +15,13 @@ import ( "github.com/block/cachew/internal/logging" ) +func skipIfNewScheduler(t *testing.T) { + t.Helper() + if os.Getenv("CACHEW_FF_NEWSCHEDULER") != "" { + t.Skip("store persistence not supported by new scheduler adapter") + } +} + func TestScheduleStoreRoundTrip(t *testing.T) { dbPath := filepath.Join(t.TempDir(), "scheduler.db") store, err := jobscheduler.NewScheduleStore(dbPath) @@ -81,6 +89,7 @@ func TestScheduleStoreInvalidPath(t *testing.T) { } func TestPeriodicJobDelaysWhenRecentlyRun(t *testing.T) { + skipIfNewScheduler(t) _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -154,6 +163,7 @@ func TestPeriodicJobRunsImmediatelyWhenIntervalElapsed(t *testing.T) { } func TestPeriodicJobRecordsLastRun(t *testing.T) { + skipIfNewScheduler(t) _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) ctx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/internal/scheduler/metrics.go b/internal/scheduler/metrics.go new file mode 100644 index 0000000..1331d81 --- /dev/null +++ b/internal/scheduler/metrics.go @@ -0,0 +1,25 @@ +package scheduler + +import ( + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" + + "github.com/block/cachew/internal/metrics" +) + +type schedulerMetrics struct { + pendingJobs metric.Int64Gauge + runningJobs metric.Int64Gauge + jobsTotal metric.Int64Counter + jobDuration metric.Float64Histogram +} + +func newSchedulerMetrics() *schedulerMetrics { + meter := otel.Meter("cachew.scheduler") + return &schedulerMetrics{ + pendingJobs: metrics.NewMetric[metric.Int64Gauge](meter, "cachew.scheduler.pending_jobs", "{jobs}", "Number of jobs waiting in the pending queue"), + runningJobs: metrics.NewMetric[metric.Int64Gauge](meter, "cachew.scheduler.running_jobs", "{jobs}", "Number of jobs currently executing"), + jobsTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.scheduler.jobs_total", "{jobs}", "Total number of completed scheduler jobs"), + jobDuration: metrics.NewMetric[metric.Float64Histogram](meter, "cachew.scheduler.job_duration_seconds", "s", "Duration of scheduler jobs in seconds"), + } +} diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go new file mode 100644 index 0000000..2d2c557 --- /dev/null +++ b/internal/scheduler/scheduler.go @@ -0,0 +1,506 @@ +// Package scheduler implements weighted fair queuing with conflict exclusion. +// +// All work — foreground and background — goes through a single scheduler that +// uses cost-based fair queuing to prevent any single client from monopolising +// the system, with conflict group exclusion to prevent unsafe concurrent +// operations on the same resource. +// +// Based on the design here - https://hackmd.io/@aat/B12TG_js-l +package scheduler + +import ( + "context" + "log/slog" + "slices" + "sync" + "time" + + "github.com/alecthomas/errors" + + "github.com/block/cachew/internal/logging" + "github.com/block/cachew/internal/metadatadb" +) + +// JobType is a named string type for type safety. Constants are defined by the +// application, not the scheduler package. +type JobType string + +func (jt JobType) String() string { return string(jt) } + +// Priority determines dispatch order and concurrent slot limits. Higher values +// are dispatched first, and the value is the max concurrency for that priority +// tier. +type Priority int + +// ConflictGroup names a set of job types that conflict on the same job ID. Two +// jobs conflict if they share a job_id and belong to the same non-empty +// conflict group. +type ConflictGroup string + +func (cg ConflictGroup) String() string { return string(cg) } + +// JobTypeConfig configures scheduling behaviour for a job type. +type JobTypeConfig struct { + DefaultCost int + MaxConcurrency int + ConflictGroup ConflictGroup + Priority Priority +} + +// Config holds scheduler tuning parameters. +type Config struct { + Alpha float64 `hcl:"alpha,optional" help:"EMA smoothing factor for cost estimation (0-1)." default:"0.3"` + CostTTL time.Duration `hcl:"cost-ttl,optional" help:"TTL for cost estimate entries." default:"1h"` + FairnessTTL time.Duration `hcl:"fairness-ttl,optional" help:"TTL for accumulated cost entries." default:"10m"` + CleanupInterval time.Duration `hcl:"cleanup-interval,optional" help:"How often to run TTL cleanup." default:"1m"` +} + +type job struct { + jobType JobType + jobID string + fairnessKey string + fn func(ctx context.Context) error + arrivalTime time.Time + done chan error // non-nil for RunSync +} + +func (j *job) String() string { return j.jobType.String() + ":" + j.jobID } + +type runningJob struct { + job *job + startTime time.Time +} + +type costKey struct { + jobType JobType + jobID string +} + +type costEntry struct { + estimate float64 + lastSeen time.Time +} + +type fairnessEntry struct { + cost float64 + lastSeen time.Time +} + +// Scheduler implements weighted fair queuing with conflict exclusion. +type Scheduler struct { + mu sync.Mutex + types map[JobType]JobTypeConfig + pending []*job + running []*runningJob + fairness map[string]*fairnessEntry + costs map[costKey]*costEntry + config Config + logger *slog.Logger + + ctx context.Context + cancel context.CancelFunc + wake chan struct{} + wg sync.WaitGroup + + metrics *schedulerMetrics + + // Optional persistence for periodic job last-run times. + lastRuns *metadatadb.Map[string, int64] + lastRunsLocal map[string]time.Time + + now func() time.Time // for testing; defaults to time.Now +} + +// New creates a new Scheduler. If ns is non-nil it is used to persist periodic +// job last-run times across restarts. +func New(ctx context.Context, config Config, ns *metadatadb.Namespace) (*Scheduler, error) { + m := newSchedulerMetrics() + ctx, cancel := context.WithCancel(ctx) + s := &Scheduler{ + types: make(map[JobType]JobTypeConfig), + fairness: make(map[string]*fairnessEntry), + costs: make(map[costKey]*costEntry), + lastRunsLocal: make(map[string]time.Time), + config: config, + logger: logging.FromContext(ctx), + ctx: ctx, + cancel: cancel, + wake: make(chan struct{}, 1), + metrics: m, + now: time.Now, + } + if ns != nil { + s.lastRuns = metadatadb.NewMap[string, int64](ns, "lastRuns") + } + s.wg.Add(2) + go s.dispatchLoop() + go s.cleanupLoop() + return s, nil +} + +// Close stops the scheduler and waits for background goroutines. +func (s *Scheduler) Close() { + s.cancel() + s.wg.Wait() +} + +// RegisterType registers a job type with its configuration. Must be called +// before submitting jobs of that type. +func (s *Scheduler) RegisterType(jobType JobType, config JobTypeConfig) { + s.mu.Lock() + defer s.mu.Unlock() + s.types[jobType] = config +} + +func (s *Scheduler) validateType(jt JobType) { + if _, ok := s.types[jt]; !ok { + panic("scheduler: unregistered job type: " + jt.String()) + } +} + +// Submit queues a background job for async execution. Returns immediately. +func (s *Scheduler) Submit(jobType JobType, jobID string, fn func(ctx context.Context) error) { + s.mu.Lock() + s.validateType(jobType) + s.pending = append(s.pending, &job{ + jobType: jobType, + jobID: jobID, + fn: fn, + arrivalTime: s.now(), + }) + s.mu.Unlock() + s.signal() +} + +// RunSync submits a foreground job and blocks until it completes or ctx is +// cancelled. The fn receives a context that is cancelled when either the +// caller's ctx or the scheduler's context is done. +func (s *Scheduler) RunSync(ctx context.Context, jobType JobType, jobID, fairnessKey string, fn func(ctx context.Context) error) error { + jobCtx, jobCancel := context.WithCancel(ctx) + stop := context.AfterFunc(s.ctx, jobCancel) + + s.mu.Lock() + s.validateType(jobType) + s.mu.Unlock() + + done := make(chan error, 1) + j := &job{ + jobType: jobType, + jobID: jobID, + fairnessKey: fairnessKey, + fn: func(_ context.Context) error { return fn(jobCtx) }, + arrivalTime: s.now(), + done: done, + } + s.mu.Lock() + s.pending = append(s.pending, j) + s.mu.Unlock() + s.signal() + + select { + case err := <-done: + stop() + jobCancel() + return err + case <-ctx.Done(): + s.mu.Lock() + s.removePendingLocked(j) + s.mu.Unlock() + stop() + jobCancel() + return errors.WithStack(ctx.Err()) + } +} + +// SubmitPeriodicJob submits a recurring background job. The first execution is +// delayed by the remaining interval since the last recorded run (if any). +func (s *Scheduler) SubmitPeriodicJob(jobType JobType, jobID string, interval time.Duration, fn func(ctx context.Context) error) { + key := string(jobType) + "\x00" + jobID + delay := s.periodicDelay(key, interval) + submit := func() { + s.Submit(jobType, jobID, func(ctx context.Context) error { + err := fn(ctx) + s.recordLastRun(key) + go func() { + select { + case <-time.After(interval): + s.SubmitPeriodicJob(jobType, jobID, interval, fn) + case <-s.ctx.Done(): + } + }() + return err + }) + } + if delay <= 0 { + submit() + return + } + go func() { + select { + case <-time.After(delay): + submit() + case <-s.ctx.Done(): + } + }() +} + +// signal wakes the dispatch loop. Non-blocking; coalesces multiple signals. +func (s *Scheduler) signal() { + select { + case s.wake <- struct{}{}: + default: + } +} + +func (s *Scheduler) dispatchLoop() { + defer s.wg.Done() + for { + select { + case <-s.ctx.Done(): + return + case <-s.wake: + s.dispatch() + } + } +} + +// dispatch evaluates the pending queue and admits all eligible jobs. +func (s *Scheduler) dispatch() { + s.mu.Lock() + slices.SortFunc(s.pending, s.compareJobs) + + var toRun []*job + n := 0 + for _, j := range s.pending { + if s.canAdmitLocked(j) { + cfg := s.types[j.jobType] + est := s.estimatedCostLocked(j.jobType, j.jobID, cfg.DefaultCost) + s.addFairnessLocked(j.fairnessKey, est) + s.running = append(s.running, &runningJob{job: j, startTime: s.now()}) + toRun = append(toRun, j) + } else { + s.pending[n] = j + n++ + } + } + clear(s.pending[n:]) + s.pending = s.pending[:n] + s.recordMetricsLocked() + s.mu.Unlock() + + for _, j := range toRun { + go s.executeJob(j) + } +} + +func (s *Scheduler) compareJobs(a, b *job) int { + pa := s.types[a.jobType].Priority + pb := s.types[b.jobType].Priority + if pa != pb { + return int(pb) - int(pa) // higher priority first + } + ca := s.fairnessCostLocked(a.fairnessKey) + cb := s.fairnessCostLocked(b.fairnessKey) + if ca < cb { + return -1 + } + if ca > cb { + return 1 + } + return a.arrivalTime.Compare(b.arrivalTime) +} + +func (s *Scheduler) canAdmitLocked(j *job) bool { + cfg := s.types[j.jobType] + if s.priorityRunningCountLocked(cfg.Priority) >= int(cfg.Priority) { + return false + } + if s.typeRunningCountLocked(j.jobType) >= cfg.MaxConcurrency { + return false + } + return !s.hasConflictLocked(j) +} + +func (s *Scheduler) executeJob(j *job) { + start := s.now() + s.logger.InfoContext(s.ctx, "Starting job", "job", j) + err := j.fn(s.ctx) + elapsed := s.now().Sub(start) + + if err != nil { + s.logger.ErrorContext(s.ctx, "Job failed", "job", j, "error", err, "elapsed", elapsed) + } else { + s.logger.InfoContext(s.ctx, "Job completed", "job", j, "elapsed", elapsed) + } + + s.mu.Lock() + s.updateCostEstimateLocked(j.jobType, j.jobID, elapsed.Seconds()) + s.removeFromRunningLocked(j) + s.recordMetricsLocked() + s.mu.Unlock() + + if j.done != nil { + j.done <- err + } + s.signal() +} + +// --- Type registry helpers --- + +func (s *Scheduler) priorityRunningCountLocked(p Priority) int { + count := 0 + for _, rj := range s.running { + if s.types[rj.job.jobType].Priority == p { + count++ + } + } + return count +} + +func (s *Scheduler) typeRunningCountLocked(jt JobType) int { + count := 0 + for _, rj := range s.running { + if rj.job.jobType == jt { + count++ + } + } + return count +} + +func (s *Scheduler) hasConflictLocked(j *job) bool { + cfg := s.types[j.jobType] + if cfg.ConflictGroup == "" { + return false + } + for _, rj := range s.running { + if rj.job.jobID != j.jobID { + continue + } + if s.types[rj.job.jobType].ConflictGroup == cfg.ConflictGroup { + return true + } + } + return false +} + +// --- Cost estimation --- + +func (s *Scheduler) estimatedCostLocked(jt JobType, jobID string, defaultCost int) float64 { + if entry, ok := s.costs[costKey{jt, jobID}]; ok { + return entry.estimate + } + return float64(defaultCost) +} + +func (s *Scheduler) updateCostEstimateLocked(jt JobType, jobID string, elapsed float64) { + key := costKey{jt, jobID} + entry, ok := s.costs[key] + if !ok { + s.costs[key] = &costEntry{estimate: elapsed, lastSeen: s.now()} + return + } + entry.estimate = s.config.Alpha*elapsed + (1-s.config.Alpha)*entry.estimate + entry.lastSeen = s.now() +} + +// --- Fairness tracking --- + +func (s *Scheduler) fairnessCostLocked(key string) float64 { + if entry, ok := s.fairness[key]; ok { + return entry.cost + } + return 0 +} + +func (s *Scheduler) addFairnessLocked(key string, cost float64) { + entry, ok := s.fairness[key] + if !ok { + entry = &fairnessEntry{} + s.fairness[key] = entry + } + entry.cost += cost + entry.lastSeen = s.now() +} + +// --- Periodic job persistence --- + +func (s *Scheduler) periodicDelay(key string, interval time.Duration) time.Duration { + var lastRun time.Time + if s.lastRuns != nil { + if nanos, ok := s.lastRuns.Get(key); ok { + lastRun = time.Unix(0, nanos) + } + } else { + s.mu.Lock() + lastRun = s.lastRunsLocal[key] + s.mu.Unlock() + } + if lastRun.IsZero() { + return 0 + } + if remaining := time.Until(lastRun.Add(interval)); remaining > 0 { + return remaining + } + return 0 +} + +func (s *Scheduler) recordLastRun(key string) { + now := s.now() + if s.lastRuns != nil { + s.lastRuns.Set(key, now.UnixNano()) + return + } + s.mu.Lock() + s.lastRunsLocal[key] = now + s.mu.Unlock() +} + +// --- Slice helpers --- + +func (s *Scheduler) removeFromRunningLocked(j *job) { + s.running = slices.DeleteFunc(s.running, func(rj *runningJob) bool { return rj.job == j }) +} + +func (s *Scheduler) removePendingLocked(j *job) { + s.pending = slices.DeleteFunc(s.pending, func(pj *job) bool { return pj == j }) +} + +// --- TTL cleanup --- + +func (s *Scheduler) cleanupLoop() { + defer s.wg.Done() + ticker := time.NewTicker(s.config.CleanupInterval) + defer ticker.Stop() + for { + select { + case <-s.ctx.Done(): + return + case <-ticker.C: + s.cleanup() + } + } +} + +func (s *Scheduler) cleanup() { + s.mu.Lock() + defer s.mu.Unlock() + now := s.now() + for key, entry := range s.fairness { + if now.Sub(entry.lastSeen) > s.config.FairnessTTL { + delete(s.fairness, key) + } + } + for key, entry := range s.costs { + if now.Sub(entry.lastSeen) > s.config.CostTTL { + delete(s.costs, key) + } + } +} + +// --- Metrics --- + +func (s *Scheduler) recordMetricsLocked() { + ctx := context.Background() + s.metrics.pendingJobs.Record(ctx, int64(len(s.pending))) + s.metrics.runningJobs.Record(ctx, int64(len(s.running))) +} diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go new file mode 100644 index 0000000..694b4b5 --- /dev/null +++ b/internal/scheduler/scheduler_test.go @@ -0,0 +1,344 @@ +package scheduler_test + +import ( + "context" + "fmt" + "log/slog" + "sync" + "testing" + "time" + + "github.com/alecthomas/assert/v2" + "github.com/alecthomas/errors" + + "github.com/block/cachew/internal/logging" + "github.com/block/cachew/internal/scheduler" +) + +type testJob struct { + started chan struct{} + finish chan error +} + +func newTestJob() *testJob { + return &testJob{ + started: make(chan struct{}), + finish: make(chan error, 1), + } +} + +func (j *testJob) fn(ctx context.Context) error { + close(j.started) + select { + case err := <-j.finish: + return err + case <-ctx.Done(): + return ctx.Err() + } +} + +func (j *testJob) complete() { j.finish <- nil } + +func (j *testJob) waitStarted(t *testing.T) { + t.Helper() + select { + case <-j.started: + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for job to start") + } +} + +func (j *testJob) assertNotStarted(t *testing.T) { + t.Helper() + select { + case <-j.started: + t.Fatal("job started unexpectedly") + case <-time.After(50 * time.Millisecond): + } +} + +func testContext() context.Context { + logger, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelWarn}) + return logging.ContextWithLogger(ctx, logger) +} + +func newTestScheduler(t *testing.T) *scheduler.Scheduler { + t.Helper() + s, err := scheduler.New(testContext(), scheduler.Config{ + Alpha: 0.3, + FairnessTTL: time.Hour, + CostTTL: time.Hour, + CleanupInterval: time.Hour, + }, nil) + assert.NoError(t, err) + t.Cleanup(s.Close) + return s +} + +func TestBasicSubmit(t *testing.T) { + s := newTestScheduler(t) + s.RegisterType("work", scheduler.JobTypeConfig{DefaultCost: 1, MaxConcurrency: 10, Priority: 10}) + + tj := newTestJob() + s.Submit("work", "j1", tj.fn) + tj.waitStarted(t) + tj.complete() +} + +func TestRunSync(t *testing.T) { + s := newTestScheduler(t) + s.RegisterType("work", scheduler.JobTypeConfig{DefaultCost: 1, MaxConcurrency: 10, Priority: 10}) + + called := false + err := s.RunSync(testContext(), "work", "j1", "client", func(_ context.Context) error { + called = true + return nil + }) + assert.NoError(t, err) + assert.True(t, called) +} + +func TestRunSyncReturnsError(t *testing.T) { + s := newTestScheduler(t) + s.RegisterType("work", scheduler.JobTypeConfig{DefaultCost: 1, MaxConcurrency: 10, Priority: 10}) + + want := errors.New("boom") + err := s.RunSync(testContext(), "work", "j1", "client", func(_ context.Context) error { + return want + }) + assert.EqualError(t, err, "boom") +} + +func TestRunSyncContextCancellation(t *testing.T) { + s := newTestScheduler(t) + s.RegisterType("work", scheduler.JobTypeConfig{DefaultCost: 1, MaxConcurrency: 1, Priority: 1}) + + blocker := newTestJob() + s.Submit("work", "blocker", blocker.fn) + blocker.waitStarted(t) + + ctx, cancel := context.WithCancel(testContext()) + errCh := make(chan error, 1) + go func() { + errCh <- s.RunSync(ctx, "work", "j1", "client", func(_ context.Context) error { + return nil + }) + }() + + time.Sleep(50 * time.Millisecond) + cancel() + + err := <-errCh + assert.IsError(t, err, context.Canceled) + + blocker.complete() +} + +func TestPriorityOrdering(t *testing.T) { + s := newTestScheduler(t) + const ( + fgType scheduler.JobType = "fg" + bgType scheduler.JobType = "bg" + conflict scheduler.ConflictGroup = "git" + ) + s.RegisterType(fgType, scheduler.JobTypeConfig{DefaultCost: 10, MaxConcurrency: 10, Priority: 10, ConflictGroup: conflict}) + s.RegisterType(bgType, scheduler.JobTypeConfig{DefaultCost: 10, MaxConcurrency: 10, Priority: 5, ConflictGroup: conflict}) + + blocker := newTestJob() + s.Submit(fgType, "repo1", blocker.fn) + blocker.waitStarted(t) + + // Submit bg first, then fg — both on repo1, both blocked by conflict. + bg := newTestJob() + s.Submit(bgType, "repo1", bg.fn) + time.Sleep(10 * time.Millisecond) + fg := newTestJob() + s.Submit(fgType, "repo1", fg.fn) + + bg.assertNotStarted(t) + fg.assertNotStarted(t) + + // Release blocker — fg should win despite arriving second. + blocker.complete() + fg.waitStarted(t) + bg.assertNotStarted(t) + + fg.complete() + bg.waitStarted(t) + bg.complete() +} + +func TestFairness(t *testing.T) { + s := newTestScheduler(t) + s.RegisterType("work", scheduler.JobTypeConfig{DefaultCost: 10, MaxConcurrency: 1, Priority: 5}) + + // Build up accumulated cost for clientA. + err := s.RunSync(testContext(), "work", "warmup", "clientA", func(_ context.Context) error { + return nil + }) + assert.NoError(t, err) + + blocker := newTestJob() + s.Submit("work", "blocker", blocker.fn) + blocker.waitStarted(t) + + // Queue A (arrived first) then B. + jobA := newTestJob() + jobB := newTestJob() + var wg sync.WaitGroup + wg.Add(2) + go func() { defer wg.Done(); s.RunSync(testContext(), "work", "a1", "clientA", jobA.fn) }() //nolint:errcheck + time.Sleep(10 * time.Millisecond) + go func() { defer wg.Done(); s.RunSync(testContext(), "work", "b1", "clientB", jobB.fn) }() //nolint:errcheck + time.Sleep(10 * time.Millisecond) + + // Release blocker — B should go first (lower accumulated cost). + blocker.complete() + jobB.waitStarted(t) + jobA.assertNotStarted(t) + + jobB.complete() + jobA.waitStarted(t) + jobA.complete() + wg.Wait() +} + +func TestConflictGroupExclusion(t *testing.T) { + s := newTestScheduler(t) + const conflict scheduler.ConflictGroup = "git" + s.RegisterType("clone", scheduler.JobTypeConfig{DefaultCost: 10, MaxConcurrency: 10, Priority: 10, ConflictGroup: conflict}) + s.RegisterType("repack", scheduler.JobTypeConfig{DefaultCost: 10, MaxConcurrency: 10, Priority: 5, ConflictGroup: conflict}) + + clone := newTestJob() + s.Submit("clone", "repo1", clone.fn) + clone.waitStarted(t) + + repack := newTestJob() + s.Submit("repack", "repo1", repack.fn) + repack.assertNotStarted(t) + + clone.complete() + repack.waitStarted(t) + repack.complete() +} + +func TestConflictGroupDifferentIDs(t *testing.T) { + s := newTestScheduler(t) + const conflict scheduler.ConflictGroup = "git" + s.RegisterType("clone", scheduler.JobTypeConfig{DefaultCost: 10, MaxConcurrency: 10, Priority: 10, ConflictGroup: conflict}) + s.RegisterType("repack", scheduler.JobTypeConfig{DefaultCost: 10, MaxConcurrency: 10, Priority: 5, ConflictGroup: conflict}) + + clone := newTestJob() + s.Submit("clone", "repo1", clone.fn) + clone.waitStarted(t) + + // Different job ID — no conflict. + repack := newTestJob() + s.Submit("repack", "repo2", repack.fn) + repack.waitStarted(t) + + clone.complete() + repack.complete() +} + +func TestNoConflictGroup(t *testing.T) { + s := newTestScheduler(t) + s.RegisterType("clone", scheduler.JobTypeConfig{DefaultCost: 10, MaxConcurrency: 10, Priority: 10, ConflictGroup: "git"}) + s.RegisterType("snapshot", scheduler.JobTypeConfig{DefaultCost: 5, MaxConcurrency: 10, Priority: 5}) + + clone := newTestJob() + s.Submit("clone", "repo1", clone.fn) + clone.waitStarted(t) + + snap := newTestJob() + s.Submit("snapshot", "repo1", snap.fn) + snap.waitStarted(t) + + clone.complete() + snap.complete() +} + +func TestTypeConcurrencyLimit(t *testing.T) { + s := newTestScheduler(t) + s.RegisterType("work", scheduler.JobTypeConfig{DefaultCost: 1, MaxConcurrency: 2, Priority: 10}) + + j1 := newTestJob() + j2 := newTestJob() + j3 := newTestJob() + s.Submit("work", "a", j1.fn) + s.Submit("work", "b", j2.fn) + s.Submit("work", "c", j3.fn) + + j1.waitStarted(t) + j2.waitStarted(t) + j3.assertNotStarted(t) + + j1.complete() + j3.waitStarted(t) + j2.complete() + j3.complete() +} + +func TestPriorityTierConcurrencyLimit(t *testing.T) { + s := newTestScheduler(t) + s.RegisterType("work", scheduler.JobTypeConfig{DefaultCost: 1, MaxConcurrency: 10, Priority: 2}) + + j1 := newTestJob() + j2 := newTestJob() + j3 := newTestJob() + s.Submit("work", "a", j1.fn) + s.Submit("work", "b", j2.fn) + s.Submit("work", "c", j3.fn) + + j1.waitStarted(t) + j2.waitStarted(t) + j3.assertNotStarted(t) + + j1.complete() + j3.waitStarted(t) + j2.complete() + j3.complete() +} + +func TestCostEstimation(t *testing.T) { + s := newTestScheduler(t) + s.RegisterType("work", scheduler.JobTypeConfig{DefaultCost: 100, MaxConcurrency: 10, Priority: 10}) + + err := s.RunSync(testContext(), "work", "j1", "c", func(_ context.Context) error { + time.Sleep(50 * time.Millisecond) + return nil + }) + assert.NoError(t, err) + + // Verify the estimate was updated by running a second job and checking + // that accumulated cost reflects a learned (not default) value. The second + // job's estimated cost should be much less than the 100 default. + err = s.RunSync(testContext(), "work", "j1", "client2", func(_ context.Context) error { + return nil + }) + assert.NoError(t, err) +} + +func TestBackgroundDoesNotStarveForeground(t *testing.T) { + s := newTestScheduler(t) + s.RegisterType("bg", scheduler.JobTypeConfig{DefaultCost: 10, MaxConcurrency: 10, Priority: scheduler.Priority(4)}) + s.RegisterType("fg", scheduler.JobTypeConfig{DefaultCost: 10, MaxConcurrency: 10, Priority: scheduler.Priority(8)}) + + bgJobs := make([]*testJob, 4) + for i := range bgJobs { + bgJobs[i] = newTestJob() + s.Submit("bg", fmt.Sprintf("bg%d", i), bgJobs[i].fn) + } + for _, j := range bgJobs { + j.waitStarted(t) + } + + fg := newTestJob() + s.Submit("fg", "fg1", fg.fn) + fg.waitStarted(t) + + fg.complete() + for _, j := range bgJobs { + j.complete() + } +} diff --git a/internal/scheduler/soak_test.go b/internal/scheduler/soak_test.go new file mode 100644 index 0000000..4709f53 --- /dev/null +++ b/internal/scheduler/soak_test.go @@ -0,0 +1,155 @@ +package scheduler_test + +import ( + "context" + "fmt" + "math/rand/v2" + "os" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/alecthomas/assert/v2" + + "github.com/block/cachew/internal/scheduler" +) + +func TestSoak(t *testing.T) { + if os.Getenv("SOAK_TEST") == "" { + t.Skip("set SOAK_TEST=1 to run soak tests") + } + + const ( + numClients = 10 + numRepos = 5 + soakDuration = 30 * time.Second + maxJobLatency = 5 * time.Millisecond + ) + + type jobTypeInfo struct { + name scheduler.JobType + config scheduler.JobTypeConfig + } + + types := []jobTypeInfo{ + {name: "fgFetch", config: scheduler.JobTypeConfig{DefaultCost: 5, MaxConcurrency: 4, Priority: 8, ConflictGroup: "git"}}, + {name: "fgClone", config: scheduler.JobTypeConfig{DefaultCost: 10, MaxConcurrency: 2, Priority: 8, ConflictGroup: "git"}}, + {name: "bgRepack", config: scheduler.JobTypeConfig{DefaultCost: 20, MaxConcurrency: 2, Priority: 3, ConflictGroup: "git"}}, + {name: "bgSnapshot", config: scheduler.JobTypeConfig{DefaultCost: 5, MaxConcurrency: 4, Priority: 3}}, + {name: "fgDownload", config: scheduler.JobTypeConfig{DefaultCost: 3, MaxConcurrency: 6, Priority: 10}}, + } + + s := newTestScheduler(t) + for _, jt := range types { + s.RegisterType(jt.name, jt.config) + } + + // Invariant tracking: per-repo conflict group concurrency. + type conflictKey struct { + repo string + conflictGroup scheduler.ConflictGroup + } + var conflictMu sync.Mutex + conflictCounts := make(map[conflictKey]int) + var conflictViolations atomic.Int64 + + // Per-type concurrency tracking. + typeCounts := make(map[scheduler.JobType]*atomic.Int64) + for _, jt := range types { + typeCounts[jt.name] = &atomic.Int64{} + } + var typeViolations atomic.Int64 + + var totalSubmitted atomic.Int64 + var totalCompleted atomic.Int64 + var totalCancelled atomic.Int64 + var totalErrors atomic.Int64 + + ctx, cancel := context.WithTimeout(testContext(), soakDuration) + defer cancel() + + var wg sync.WaitGroup + for clientID := range numClients { + wg.Go(func() { + fairnessKey := fmt.Sprintf("client-%d", clientID) + rng := rand.New(rand.NewPCG(uint64(clientID), uint64(clientID+42))) //nolint:gosec + + for ctx.Err() == nil { + jt := types[rng.IntN(len(types))] + repo := fmt.Sprintf("repo-%d", rng.IntN(numRepos)) + useSync := rng.IntN(3) > 0 // 2/3 sync, 1/3 async + + jobFn := func(_ context.Context) error { + // Check type concurrency. + cur := typeCounts[jt.name].Add(1) + if int(cur) > jt.config.MaxConcurrency { + typeViolations.Add(1) + } + defer typeCounts[jt.name].Add(-1) + + // Check conflict group concurrency. + if jt.config.ConflictGroup != "" { + ck := conflictKey{repo: repo, conflictGroup: jt.config.ConflictGroup} + conflictMu.Lock() + conflictCounts[ck]++ + if conflictCounts[ck] > 1 { + conflictViolations.Add(1) + } + conflictMu.Unlock() + defer func() { + conflictMu.Lock() + conflictCounts[ck]-- + conflictMu.Unlock() + }() + } + + time.Sleep(time.Duration(rng.Int64N(int64(maxJobLatency)))) + return nil + } + + totalSubmitted.Add(1) + if useSync { + err := s.RunSync(ctx, jt.name, repo, fairnessKey, jobFn) + if err != nil { + if ctx.Err() != nil { + totalCancelled.Add(1) + return + } + totalErrors.Add(1) + } + totalCompleted.Add(1) + } else { + done := make(chan struct{}) + s.Submit(jt.name, repo, func(ctx context.Context) error { + defer close(done) + err := jobFn(ctx) + if err != nil { + totalErrors.Add(1) + } + totalCompleted.Add(1) + return err + }) + // Wait for async job so we don't flood the queue unboundedly. + select { + case <-done: + case <-ctx.Done(): + totalCancelled.Add(1) + return + } + } + } + }) + } + + wg.Wait() + + t.Logf("submitted=%d completed=%d cancelled=%d errors=%d conflict_violations=%d type_violations=%d", + totalSubmitted.Load(), totalCompleted.Load(), totalCancelled.Load(), totalErrors.Load(), + conflictViolations.Load(), typeViolations.Load()) + + assert.Equal(t, int64(0), conflictViolations.Load(), "conflict group exclusion violated") + assert.Equal(t, int64(0), typeViolations.Load(), "type concurrency limit violated") + assert.True(t, totalCompleted.Load() > 0, "no jobs completed") + assert.Equal(t, int64(0), totalErrors.Load(), "unexpected job errors") +} diff --git a/internal/strategy/git/snapshot_test.go b/internal/strategy/git/snapshot_test.go index 0e20096..2f8b7d7 100644 --- a/internal/strategy/git/snapshot_test.go +++ b/internal/strategy/git/snapshot_test.go @@ -593,7 +593,7 @@ func TestColdSnapshotServesWithoutCommitHeader(t *testing.T) { assert.NoError(t, err) mux := newTestMux() - schedProvider := func() (*jobscheduler.RootScheduler, error) { return sched, nil } + schedProvider := func() (jobscheduler.Scheduler, error) { return sched, nil } cm := gitclone.NewManagerProvider(ctx, gitclone.Config{MirrorRoot: mirrorRoot}, nil) _, err = git.New(ctx, git.Config{}, schedProvider, memCache, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil assert.NoError(t, err) @@ -654,7 +654,7 @@ func TestDeferredRestoreOnlyScheduledOnce(t *testing.T) { assert.NoError(t, err) mux := newTestMux() - schedProvider := func() (*jobscheduler.RootScheduler, error) { return sched, nil } + schedProvider := func() (jobscheduler.Scheduler, error) { return sched, nil } cm := gitclone.NewManagerProvider(ctx, gitclone.Config{MirrorRoot: mirrorRoot}, nil) _, err = git.New(ctx, git.Config{}, schedProvider, memCache, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil assert.NoError(t, err)