Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 83 additions & 3 deletions pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ type dispatcher struct {
wg sync.WaitGroup
mu sync.Mutex
runnerChangeCallback RunnerChangeCallback

// workflowLocks provides per-workflow-template locking to prevent race conditions
// when multiple groups dispatch the same workflow. Key: "owner/repo/workflow_id".
workflowLocks map[string]*sync.Mutex
workflowLocksMu sync.Mutex
}

// Ensure dispatcher implements Dispatcher.
Expand All @@ -59,6 +64,7 @@ func NewDispatcher(
ghClient: ghClient,
interval: cfg.Dispatcher.Interval,
trackingInterval: cfg.Dispatcher.TrackingInterval,
workflowLocks: make(map[string]*sync.Mutex),
}
}

Expand Down Expand Up @@ -112,6 +118,68 @@ func (d *dispatcher) notifyRunnerChange(runner *store.Runner) {
}
}

// getWorkflowLock returns or creates a mutex for a specific workflow template.
// This ensures sequential dispatch for jobs targeting the same workflow.
func (d *dispatcher) getWorkflowLock(owner, repo, workflowID string) *sync.Mutex {
key := fmt.Sprintf("%s/%s/%s", owner, repo, workflowID)

d.workflowLocksMu.Lock()
defer d.workflowLocksMu.Unlock()

if lock, ok := d.workflowLocks[key]; ok {
return lock
}

lock := &sync.Mutex{}
d.workflowLocks[key] = lock

return lock
}

// waitForRunID polls GitHub to find and match the run ID for a just-triggered job.
// This blocks until the run ID is found or timeout is reached.
func (d *dispatcher) waitForRunID(
ctx context.Context,
job *store.Job,
owner, repo, workflowID string,
) error {
const (
timeout = 60 * time.Second
pollInterval = 5 * time.Second
)

deadline := time.Now().Add(timeout)
log := d.log.WithField("job_id", job.ID)

for time.Now().Before(deadline) {
runID, runURL, err := d.findWorkflowRun(ctx, owner, repo, workflowID, job)
if err == nil && runID != 0 {
job.RunID = &runID
job.RunURL = runURL

if err := d.store.UpdateJob(ctx, job); err != nil {
return fmt.Errorf("updating job with run ID: %w", err)
}

log.WithFields(logrus.Fields{
"run_id": runID,
"run_url": runURL,
}).Info("Found workflow run inline")

return nil
}

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(pollInterval):
log.Debug("Polling for workflow run...")
}
}

return fmt.Errorf("timeout waiting for run ID after %v", timeout)
}

// getEffectiveWorkflowParams returns the effective workflow parameters,
// preferring job overrides over template defaults.
// For manual jobs (template == nil), only job fields are used.
Expand Down Expand Up @@ -276,6 +344,12 @@ func (d *dispatcher) dispatchForGroup(ctx context.Context, group *store.Group) e
owner, repo, workflowID, ref)
}

// Acquire per-workflow lock to prevent race conditions when multiple groups
// dispatch the same workflow. This ensures sequential dispatch and run ID matching.
workflowLock := d.getWorkflowLock(owner, repo, workflowID)
workflowLock.Lock()
defer workflowLock.Unlock()

logFields := logrus.Fields{
"job_id": job.ID,
"runner": idleRunner.Name,
Expand Down Expand Up @@ -309,13 +383,19 @@ func (d *dispatcher) dispatchForGroup(ctx context.Context, group *store.Group) e
return fmt.Errorf("triggering workflow dispatch: %w", err)
}

// We don't have the run ID yet since workflow_dispatch returns 204 No Content.
// We'll need to poll for it in the trackRunsLoop.
// For now, mark as triggered without a run ID.
// Mark as triggered without a run ID initially.
// workflow_dispatch returns 204 No Content with no run ID.
if err := d.queue.MarkTriggered(ctx, job.ID, 0, ""); err != nil {
return fmt.Errorf("marking job as triggered: %w", err)
}

// Wait inline for the run ID to be found while holding the workflow lock.
// This prevents race conditions when multiple jobs trigger the same workflow.
if err := d.waitForRunID(ctx, job, owner, repo, workflowID); err != nil {
// Log warning but don't fail - the tracking loop will continue trying.
log.WithError(err).Warn("Failed to match run ID inline, tracking loop will retry")
}

log.WithField("job_id", job.ID).Info("Job dispatched successfully")

return nil
Expand Down
Loading