From 530effe7bb0e7af7c04072fd0e0317cc7b84ab4e Mon Sep 17 00:00:00 2001 From: Rafael Matias Date: Wed, 18 Feb 2026 16:32:04 +0100 Subject: [PATCH] fix(dispatcher): prevent run ID mismatches when multiple groups share a workflow When multiple groups dispatch the same workflow, findWorkflowRun could assign the same GitHub run to multiple jobs or pick the wrong run. - Add buildClaimedRunIDs helper to track already-assigned run IDs - Skip claimed runs in findWorkflowRun to prevent double-assignment - Select oldest unclaimed run instead of newest (serialized dispatches mean the oldest unclaimed run is the correct match) - Acquire per-workflow lock in trackJob during run ID discovery to prevent races with the dispatch path --- pkg/dispatcher/dispatcher.go | 77 +++++++++++++++++++++++++++++++----- 1 file changed, 68 insertions(+), 9 deletions(-) diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go index 924e08b..3375646 100644 --- a/pkg/dispatcher/dispatcher.go +++ b/pkg/dispatcher/dispatcher.go @@ -152,7 +152,13 @@ func (d *dispatcher) waitForRunID( log := d.log.WithField("job_id", job.ID) for time.Now().Before(deadline) { - runID, runURL, err := d.findWorkflowRun(ctx, owner, repo, workflowID, job) + // Build fresh claimed set each iteration so we see newly assigned runs. + claimedRunIDs, claimErr := d.buildClaimedRunIDs(ctx) + if claimErr != nil { + log.WithError(claimErr).Warn("Failed to build claimed run IDs, proceeding without exclusion") + } + + runID, runURL, err := d.findWorkflowRun(ctx, owner, repo, workflowID, job, claimedRunIDs) if err == nil && runID != 0 { job.RunID = &runID job.RunURL = runURL @@ -428,8 +434,18 @@ func (d *dispatcher) trackRuns(ctx context.Context) error { return fmt.Errorf("listing jobs: %w", err) } + // Build the set of already-claimed run IDs from the fetched jobs so that + // trackJob won't assign the same GitHub run to multiple jobs. + claimedRunIDs := make(map[int64]struct{}, len(jobs)) + + for _, j := range jobs { + if j.RunID != nil && *j.RunID != 0 { + claimedRunIDs[*j.RunID] = struct{}{} + } + } + for _, job := range jobs { - if err := d.trackJob(ctx, job); err != nil { + if err := d.trackJob(ctx, job, claimedRunIDs); err != nil { d.log.WithError(err).WithField("job_id", job.ID).Error("Failed to track job") } } @@ -438,7 +454,8 @@ func (d *dispatcher) trackRuns(ctx context.Context) error { } // trackJob updates the status of a single job. -func (d *dispatcher) trackJob(ctx context.Context, job *store.Job) error { +// claimedRunIDs is the set of run IDs already assigned to other jobs in this tracking cycle. +func (d *dispatcher) trackJob(ctx context.Context, job *store.Job, claimedRunIDs map[int64]struct{}) error { log := d.log.WithField("job_id", job.ID) // Get the template to know which repo to query (may be nil for manual jobs). @@ -461,9 +478,17 @@ func (d *dispatcher) trackJob(ctx context.Context, job *store.Job) error { owner, repo, workflowID, _ := getEffectiveWorkflowParams(job, template) // If we don't have a run ID, we need to find it. + // Acquire the per-workflow lock to prevent races with the dispatch path + // (waitForRunID) which also calls findWorkflowRun under the same lock. if job.RunID == nil || *job.RunID == 0 { - runID, runURL, err := d.findWorkflowRun(ctx, owner, repo, workflowID, job) + workflowLock := d.getWorkflowLock(owner, repo, workflowID) + workflowLock.Lock() + + runID, runURL, err := d.findWorkflowRun(ctx, owner, repo, workflowID, job, claimedRunIDs) + if err != nil { + workflowLock.Unlock() + log.WithError(err).Debug("Could not find workflow run yet") // Check if the job has been triggered for too long without a run. @@ -482,9 +507,16 @@ func (d *dispatcher) trackJob(ctx context.Context, job *store.Job) error { job.RunURL = runURL if err := d.store.UpdateJob(ctx, job); err != nil { + workflowLock.Unlock() + return fmt.Errorf("updating job with run ID: %w", err) } + // Mark this run as claimed so other jobs in the same tracking cycle won't steal it. + claimedRunIDs[runID] = struct{}{} + + workflowLock.Unlock() + log.WithFields(logrus.Fields{ "run_id": runID, "run_url": runURL, @@ -583,11 +615,33 @@ func (d *dispatcher) trackJob(ctx context.Context, job *store.Job) error { return nil } +// buildClaimedRunIDs returns the set of run IDs currently assigned to triggered/running jobs. +// This is used to prevent multiple jobs from claiming the same GitHub workflow run. +func (d *dispatcher) buildClaimedRunIDs(ctx context.Context) (map[int64]struct{}, error) { + jobs, err := d.store.ListJobsByStatus(ctx, store.JobStatusTriggered, store.JobStatusRunning) + if err != nil { + return nil, fmt.Errorf("listing jobs for claimed run IDs: %w", err) + } + + claimed := make(map[int64]struct{}, len(jobs)) + + for _, j := range jobs { + if j.RunID != nil && *j.RunID != 0 { + claimed[*j.RunID] = struct{}{} + } + } + + return claimed, nil +} + // findWorkflowRun searches for a recently created workflow run that matches our job. +// claimedRunIDs contains run IDs already assigned to other jobs; these are skipped. +// A nil map is safe and disables exclusion (degrades to previous behavior). func (d *dispatcher) findWorkflowRun( ctx context.Context, owner, repo, workflowID string, job *store.Job, + claimedRunIDs map[int64]struct{}, ) (int64, string, error) { // We need to list recent workflow runs and find one that was created // around the time we triggered the job. @@ -614,9 +668,9 @@ func (d *dispatcher) findWorkflowRun( return 0, "", fmt.Errorf("no workflow runs found") } - // Find the run that was created closest to our trigger time. - // The most recent one is likely ours if there's only one. - // For better matching, we could compare inputs but that's not always available. + // Find the oldest unclaimed run created after our trigger time. + // Dispatches are serialized by the per-workflow lock, so the oldest + // unclaimed run after the trigger time is the most likely match. var bestRun *github.WorkflowRun for i, run := range runs { @@ -625,8 +679,13 @@ func (d *dispatcher) findWorkflowRun( continue } - // Take the most recent matching run. - if bestRun == nil || run.CreatedAt.After(bestRun.CreatedAt) { + // Skip runs already claimed by other jobs. + if _, claimed := claimedRunIDs[run.ID]; claimed { + continue + } + + // Take the oldest unclaimed matching run. + if bestRun == nil || run.CreatedAt.Before(bestRun.CreatedAt) { bestRun = runs[i] } }