Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 68 additions & 9 deletions pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,13 @@ func (d *dispatcher) waitForRunID(
log := d.log.WithField("job_id", job.ID)

for time.Now().Before(deadline) {
runID, runURL, err := d.findWorkflowRun(ctx, owner, repo, workflowID, job)
// Build fresh claimed set each iteration so we see newly assigned runs.
claimedRunIDs, claimErr := d.buildClaimedRunIDs(ctx)
if claimErr != nil {
log.WithError(claimErr).Warn("Failed to build claimed run IDs, proceeding without exclusion")
}

runID, runURL, err := d.findWorkflowRun(ctx, owner, repo, workflowID, job, claimedRunIDs)
if err == nil && runID != 0 {
job.RunID = &runID
job.RunURL = runURL
Expand Down Expand Up @@ -428,8 +434,18 @@ func (d *dispatcher) trackRuns(ctx context.Context) error {
return fmt.Errorf("listing jobs: %w", err)
}

// Build the set of already-claimed run IDs from the fetched jobs so that
// trackJob won't assign the same GitHub run to multiple jobs.
claimedRunIDs := make(map[int64]struct{}, len(jobs))

for _, j := range jobs {
if j.RunID != nil && *j.RunID != 0 {
claimedRunIDs[*j.RunID] = struct{}{}
}
}

for _, job := range jobs {
if err := d.trackJob(ctx, job); err != nil {
if err := d.trackJob(ctx, job, claimedRunIDs); err != nil {
d.log.WithError(err).WithField("job_id", job.ID).Error("Failed to track job")
}
}
Expand All @@ -438,7 +454,8 @@ func (d *dispatcher) trackRuns(ctx context.Context) error {
}

// trackJob updates the status of a single job.
func (d *dispatcher) trackJob(ctx context.Context, job *store.Job) error {
// claimedRunIDs is the set of run IDs already assigned to other jobs in this tracking cycle.
func (d *dispatcher) trackJob(ctx context.Context, job *store.Job, claimedRunIDs map[int64]struct{}) error {
log := d.log.WithField("job_id", job.ID)

// Get the template to know which repo to query (may be nil for manual jobs).
Expand All @@ -461,9 +478,17 @@ func (d *dispatcher) trackJob(ctx context.Context, job *store.Job) error {
owner, repo, workflowID, _ := getEffectiveWorkflowParams(job, template)

// If we don't have a run ID, we need to find it.
// Acquire the per-workflow lock to prevent races with the dispatch path
// (waitForRunID) which also calls findWorkflowRun under the same lock.
if job.RunID == nil || *job.RunID == 0 {
runID, runURL, err := d.findWorkflowRun(ctx, owner, repo, workflowID, job)
workflowLock := d.getWorkflowLock(owner, repo, workflowID)
workflowLock.Lock()

runID, runURL, err := d.findWorkflowRun(ctx, owner, repo, workflowID, job, claimedRunIDs)

if err != nil {
workflowLock.Unlock()

log.WithError(err).Debug("Could not find workflow run yet")

// Check if the job has been triggered for too long without a run.
Expand All @@ -482,9 +507,16 @@ func (d *dispatcher) trackJob(ctx context.Context, job *store.Job) error {
job.RunURL = runURL

if err := d.store.UpdateJob(ctx, job); err != nil {
workflowLock.Unlock()

return fmt.Errorf("updating job with run ID: %w", err)
}

// Mark this run as claimed so other jobs in the same tracking cycle won't steal it.
claimedRunIDs[runID] = struct{}{}

workflowLock.Unlock()

log.WithFields(logrus.Fields{
"run_id": runID,
"run_url": runURL,
Expand Down Expand Up @@ -583,11 +615,33 @@ func (d *dispatcher) trackJob(ctx context.Context, job *store.Job) error {
return nil
}

// buildClaimedRunIDs returns the set of run IDs currently assigned to triggered/running jobs.
// This is used to prevent multiple jobs from claiming the same GitHub workflow run.
func (d *dispatcher) buildClaimedRunIDs(ctx context.Context) (map[int64]struct{}, error) {
jobs, err := d.store.ListJobsByStatus(ctx, store.JobStatusTriggered, store.JobStatusRunning)
if err != nil {
return nil, fmt.Errorf("listing jobs for claimed run IDs: %w", err)
}

claimed := make(map[int64]struct{}, len(jobs))

for _, j := range jobs {
if j.RunID != nil && *j.RunID != 0 {
claimed[*j.RunID] = struct{}{}
}
}

return claimed, nil
}

// findWorkflowRun searches for a recently created workflow run that matches our job.
// claimedRunIDs contains run IDs already assigned to other jobs; these are skipped.
// A nil map is safe and disables exclusion (degrades to previous behavior).
func (d *dispatcher) findWorkflowRun(
ctx context.Context,
owner, repo, workflowID string,
job *store.Job,
claimedRunIDs map[int64]struct{},
) (int64, string, error) {
// We need to list recent workflow runs and find one that was created
// around the time we triggered the job.
Expand All @@ -614,9 +668,9 @@ func (d *dispatcher) findWorkflowRun(
return 0, "", fmt.Errorf("no workflow runs found")
}

// Find the run that was created closest to our trigger time.
// The most recent one is likely ours if there's only one.
// For better matching, we could compare inputs but that's not always available.
// Find the oldest unclaimed run created after our trigger time.
// Dispatches are serialized by the per-workflow lock, so the oldest
// unclaimed run after the trigger time is the most likely match.
var bestRun *github.WorkflowRun

for i, run := range runs {
Expand All @@ -625,8 +679,13 @@ func (d *dispatcher) findWorkflowRun(
continue
}

// Take the most recent matching run.
if bestRun == nil || run.CreatedAt.After(bestRun.CreatedAt) {
// Skip runs already claimed by other jobs.
if _, claimed := claimedRunIDs[run.ID]; claimed {
continue
}

// Take the oldest unclaimed matching run.
if bestRun == nil || run.CreatedAt.Before(bestRun.CreatedAt) {
bestRun = runs[i]
}
}
Expand Down
Loading