diff --git a/packages/das/src/queue/constants.ts b/packages/das/src/queue/constants.ts index 9151658..fd77f4e 100644 --- a/packages/das/src/queue/constants.ts +++ b/packages/das/src/queue/constants.ts @@ -7,3 +7,14 @@ export const FETCH_JOBS = { } as const; export const DEFAULT_BACKFILL_DAYS = 40; + +export function prFilesJobId( + repoFullName: string, + prNumber: number, + headSha: string | null, + baseSha: string | null, +): string { + return `files-${repoFullName}-${prNumber}-${headSha ?? "no-head"}-${ + baseSha ?? "no-base" + }`; +} diff --git a/packages/das/src/queue/fetch.processor.ts b/packages/das/src/queue/fetch.processor.ts index 5449e3b..27860d2 100644 --- a/packages/das/src/queue/fetch.processor.ts +++ b/packages/das/src/queue/fetch.processor.ts @@ -1,11 +1,16 @@ import { Processor, WorkerHost, InjectQueue } from "@nestjs/bullmq"; import { Logger } from "@nestjs/common"; import { InjectRepository } from "@nestjs/typeorm"; -import { Repository } from "typeorm"; +import { IsNull, Repository } from "typeorm"; import { Job, Queue } from "bullmq"; import { Issue, PullRequest } from "../entities"; import { GitHubFetcherService } from "../webhook/github-fetcher.service"; -import { FETCH_QUEUE, FETCH_JOBS, DEFAULT_BACKFILL_DAYS } from "./constants"; +import { + FETCH_QUEUE, + FETCH_JOBS, + DEFAULT_BACKFILL_DAYS, + prFilesJobId, +} from "./constants"; export interface PrMetadataJobData { repoFullName: string; @@ -15,6 +20,8 @@ export interface PrMetadataJobData { export interface PrFilesJobData { repoFullName: string; prNumber: number; + expectedHeadSha?: string | null; + expectedBaseSha?: string | null; } export interface BackfillRepoJobData { @@ -22,6 +29,11 @@ export interface BackfillRepoJobData { days?: number; } +interface PrFilesGeneration { + headSha: string | null; + baseSha: string | null; +} + type JobData = PrMetadataJobData | PrFilesJobData | BackfillRepoJobData; @Processor(FETCH_QUEUE, { concurrency: 5 }) @@ -48,8 +60,7 @@ export class FetchProcessor extends WorkerHost { break; } case FETCH_JOBS.PR_FILES: { - const { repoFullName, prNumber } = job.data as PrFilesJobData; - await this.handlePrFiles(repoFullName, prNumber); + await this.handlePrFiles(job.data as PrFilesJobData); break; } case FETCH_JOBS.BACKFILL_REPO: { @@ -92,18 +103,25 @@ export class FetchProcessor extends WorkerHost { } } - private async handlePrFiles( - repoFullName: string, - prNumber: number, - ): Promise { + private async handlePrFiles(data: PrFilesJobData): Promise { + const { repoFullName, prNumber } = data; this.logger.log(`Fetching PR files for ${repoFullName}#${prNumber}`); + const generation = { + headSha: data.expectedHeadSha ?? null, + baseSha: data.expectedBaseSha ?? null, + }; + await this.fetcher.fetchAndStorePrFiles(repoFullName, prNumber); - await this.prRepo.update( - { repoFullName, prNumber }, + const updateResult = await this.prRepo.update( + this.prGenerationCriteria(repoFullName, prNumber, generation), { scoringDataStored: true }, ); + + if (!updateResult.affected) { + await this.handleStalePrFilesJob(repoFullName, prNumber); + } } private async handleBackfill( @@ -122,7 +140,7 @@ export class FetchProcessor extends WorkerHost { this.logger.log(`Backfilled ${prs.length} PRs from ${repoFullName}`); // Enqueue follow-up jobs (metadata + files for every PR). - for (const { prNumber } of prs) { + for (const { prNumber, headSha, baseSha } of prs) { await this.fetchQueue.add( FETCH_JOBS.PR_METADATA, { repoFullName, prNumber }, @@ -135,16 +153,11 @@ export class FetchProcessor extends WorkerHost { }, ); - await this.fetchQueue.add( - FETCH_JOBS.PR_FILES, - { repoFullName, prNumber }, - { - jobId: `files-${repoFullName}-${prNumber}`, - removeOnComplete: true, - removeOnFail: 50, - attempts: 3, - backoff: { type: "exponential", delay: 5000 }, - }, + await this.enqueuePrFilesJob( + repoFullName, + prNumber, + headSha ?? null, + baseSha ?? null, ); } @@ -152,4 +165,61 @@ export class FetchProcessor extends WorkerHost { await this.fetcher.backfillIssues(repoFullName, sinceDate); this.logger.log(`Backfilled issues from ${repoFullName}`); } + + private async handleStalePrFilesJob( + repoFullName: string, + prNumber: number, + ): Promise { + await this.prRepo.update( + { repoFullName, prNumber }, + { scoringDataStored: false }, + ); + + const pr = await this.prRepo.findOneBy({ repoFullName, prNumber }); + if (!pr) return; + + await this.enqueuePrFilesJob( + repoFullName, + prNumber, + pr.headSha ?? null, + pr.baseSha ?? null, + ); + } + + private async enqueuePrFilesJob( + repoFullName: string, + prNumber: number, + expectedHeadSha: string | null, + expectedBaseSha: string | null, + ): Promise { + await this.fetchQueue.add( + FETCH_JOBS.PR_FILES, + { repoFullName, prNumber, expectedHeadSha, expectedBaseSha }, + { + jobId: prFilesJobId( + repoFullName, + prNumber, + expectedHeadSha, + expectedBaseSha, + ), + removeOnComplete: true, + removeOnFail: 50, + attempts: 3, + backoff: { type: "exponential", delay: 5000 }, + }, + ); + } + + private prGenerationCriteria( + repoFullName: string, + prNumber: number, + generation: PrFilesGeneration, + ): Record { + return { + repoFullName, + prNumber, + headSha: generation.headSha ?? IsNull(), + baseSha: generation.baseSha ?? IsNull(), + }; + } } diff --git a/packages/das/src/webhook/github-fetcher.service.ts b/packages/das/src/webhook/github-fetcher.service.ts index 98c8de4..60ee916 100644 --- a/packages/das/src/webhook/github-fetcher.service.ts +++ b/packages/das/src/webhook/github-fetcher.service.ts @@ -636,7 +636,9 @@ export class GitHubFetcherService implements OnModuleInit { async backfillPullRequests( repoFullName: string, sinceDate: Date, - ): Promise<{ prNumber: number }[]> { + ): Promise< + { prNumber: number; headSha: string | null; baseSha: string | null }[] + > { const [owner, repo] = repoFullName.split("/"); const token = await this.getTokenForRepo(repoFullName); @@ -718,7 +720,11 @@ export class GitHubFetcherService implements OnModuleInit { } `; - const prs: { prNumber: number }[] = []; + const prs: { + prNumber: number; + headSha: string | null; + baseSha: string | null; + }[] = []; let cursor: string | null = null; let defaultBranchWritten = false; @@ -823,7 +829,11 @@ export class GitHubFetcherService implements OnModuleInit { pr.timelineItems?.nodes ?? [], ); - prs.push({ prNumber: pr.number }); + prs.push({ + prNumber: pr.number, + headSha: pr.headRefOid ?? null, + baseSha: pr.baseRefOid ?? null, + }); } if (shouldStop || !page.pageInfo.hasNextPage) break; diff --git a/packages/das/src/webhook/handlers/pull-request.handler.ts b/packages/das/src/webhook/handlers/pull-request.handler.ts index dd54838..c683090 100644 --- a/packages/das/src/webhook/handlers/pull-request.handler.ts +++ b/packages/das/src/webhook/handlers/pull-request.handler.ts @@ -5,7 +5,7 @@ import { InjectQueue } from "@nestjs/bullmq"; import { Repository } from "typeorm"; import { Queue } from "bullmq"; import { PullRequest, Repo } from "../../entities"; -import { FETCH_QUEUE, FETCH_JOBS } from "../../queue/constants"; +import { FETCH_QUEUE, FETCH_JOBS, prFilesJobId } from "../../queue/constants"; @Injectable() export class PullRequestHandler { @@ -96,10 +96,17 @@ export class PullRequestHandler { ); } - const jobId = `files-${repoFullName}-${prNumber}`; + const expectedHeadSha = data.headSha ?? null; + const expectedBaseSha = data.baseSha ?? null; + const jobId = prFilesJobId( + repoFullName, + prNumber, + expectedHeadSha, + expectedBaseSha, + ); await this.fetchQueue.add( FETCH_JOBS.PR_FILES, - { repoFullName, prNumber }, + { repoFullName, prNumber, expectedHeadSha, expectedBaseSha }, { jobId, removeOnComplete: true,