From f15ba18faea2a6b2e2032b7865a5f70d5e1ec0dc Mon Sep 17 00:00:00 2001 From: Rafael Matias Date: Mon, 12 Jan 2026 16:00:00 +0100 Subject: [PATCH] fix(dispatcher): add per-workflow locking to prevent run ID race conditions When multiple groups trigger the same workflow (owner/repo/workflow_id), there was a race condition where jobs could get incorrect run IDs due to timestamp-based matching in a shared time window. Add per-workflow-template locking that: - Acquires a mutex keyed by owner/repo/workflow_id before dispatching - Waits inline (up to 60s) for run ID to be matched after trigger - Releases lock only after run ID is found or timeout This ensures sequential dispatch for jobs targeting the same workflow, preventing run ID cross-matching between concurrent dispatches. --- pkg/dispatcher/dispatcher.go | 86 ++++++++++++++++++++++++++++++++++-- 1 file changed, 83 insertions(+), 3 deletions(-) diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go index 236b2c0..924e08b 100644 --- a/pkg/dispatcher/dispatcher.go +++ b/pkg/dispatcher/dispatcher.go @@ -38,6 +38,11 @@ type dispatcher struct { wg sync.WaitGroup mu sync.Mutex runnerChangeCallback RunnerChangeCallback + + // workflowLocks provides per-workflow-template locking to prevent race conditions + // when multiple groups dispatch the same workflow. Key: "owner/repo/workflow_id". + workflowLocks map[string]*sync.Mutex + workflowLocksMu sync.Mutex } // Ensure dispatcher implements Dispatcher. @@ -59,6 +64,7 @@ func NewDispatcher( ghClient: ghClient, interval: cfg.Dispatcher.Interval, trackingInterval: cfg.Dispatcher.TrackingInterval, + workflowLocks: make(map[string]*sync.Mutex), } } @@ -112,6 +118,68 @@ func (d *dispatcher) notifyRunnerChange(runner *store.Runner) { } } +// getWorkflowLock returns or creates a mutex for a specific workflow template. +// This ensures sequential dispatch for jobs targeting the same workflow. +func (d *dispatcher) getWorkflowLock(owner, repo, workflowID string) *sync.Mutex { + key := fmt.Sprintf("%s/%s/%s", owner, repo, workflowID) + + d.workflowLocksMu.Lock() + defer d.workflowLocksMu.Unlock() + + if lock, ok := d.workflowLocks[key]; ok { + return lock + } + + lock := &sync.Mutex{} + d.workflowLocks[key] = lock + + return lock +} + +// waitForRunID polls GitHub to find and match the run ID for a just-triggered job. +// This blocks until the run ID is found or timeout is reached. +func (d *dispatcher) waitForRunID( + ctx context.Context, + job *store.Job, + owner, repo, workflowID string, +) error { + const ( + timeout = 60 * time.Second + pollInterval = 5 * time.Second + ) + + deadline := time.Now().Add(timeout) + log := d.log.WithField("job_id", job.ID) + + for time.Now().Before(deadline) { + runID, runURL, err := d.findWorkflowRun(ctx, owner, repo, workflowID, job) + if err == nil && runID != 0 { + job.RunID = &runID + job.RunURL = runURL + + if err := d.store.UpdateJob(ctx, job); err != nil { + return fmt.Errorf("updating job with run ID: %w", err) + } + + log.WithFields(logrus.Fields{ + "run_id": runID, + "run_url": runURL, + }).Info("Found workflow run inline") + + return nil + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(pollInterval): + log.Debug("Polling for workflow run...") + } + } + + return fmt.Errorf("timeout waiting for run ID after %v", timeout) +} + // getEffectiveWorkflowParams returns the effective workflow parameters, // preferring job overrides over template defaults. // For manual jobs (template == nil), only job fields are used. @@ -276,6 +344,12 @@ func (d *dispatcher) dispatchForGroup(ctx context.Context, group *store.Group) e owner, repo, workflowID, ref) } + // Acquire per-workflow lock to prevent race conditions when multiple groups + // dispatch the same workflow. This ensures sequential dispatch and run ID matching. + workflowLock := d.getWorkflowLock(owner, repo, workflowID) + workflowLock.Lock() + defer workflowLock.Unlock() + logFields := logrus.Fields{ "job_id": job.ID, "runner": idleRunner.Name, @@ -309,13 +383,19 @@ func (d *dispatcher) dispatchForGroup(ctx context.Context, group *store.Group) e return fmt.Errorf("triggering workflow dispatch: %w", err) } - // We don't have the run ID yet since workflow_dispatch returns 204 No Content. - // We'll need to poll for it in the trackRunsLoop. - // For now, mark as triggered without a run ID. + // Mark as triggered without a run ID initially. + // workflow_dispatch returns 204 No Content with no run ID. if err := d.queue.MarkTriggered(ctx, job.ID, 0, ""); err != nil { return fmt.Errorf("marking job as triggered: %w", err) } + // Wait inline for the run ID to be found while holding the workflow lock. + // This prevents race conditions when multiple jobs trigger the same workflow. + if err := d.waitForRunID(ctx, job, owner, repo, workflowID); err != nil { + // Log warning but don't fail - the tracking loop will continue trying. + log.WithError(err).Warn("Failed to match run ID inline, tracking loop will retry") + } + log.WithField("job_id", job.ID).Info("Job dispatched successfully") return nil