From c424ff587a4d11e5d7d441478d120cd4d84eefeb Mon Sep 17 00:00:00 2001 From: mkdev11 Date: Fri, 8 May 2026 04:12:34 +0200 Subject: [PATCH 1/5] Fix stale PR file fetch completion --- packages/das/package.json | 1 + packages/das/src/queue/constants.ts | 11 + packages/das/src/queue/fetch.processor.ts | 130 ++++++- .../das/src/queue/pr-files-generation.spec.ts | 367 ++++++++++++++++++ .../das/src/webhook/github-fetcher.service.ts | 124 +++++- .../webhook/handlers/pull-request.handler.ts | 13 +- 6 files changed, 618 insertions(+), 28 deletions(-) create mode 100644 packages/das/src/queue/pr-files-generation.spec.ts diff --git a/packages/das/package.json b/packages/das/package.json index 36691c0..4ef9f03 100644 --- a/packages/das/package.json +++ b/packages/das/package.json @@ -13,6 +13,7 @@ "dev": "nest start --watch", "start:debug": "nest start --debug --watch", "start:prod": "node dist/main", + "test": "node --test -r ts-node/register \"src/**/*.spec.ts\"", "lint": "eslint \"src/**/*.ts\"", "lint:fix": "eslint \"src/**/*.ts\" --fix" }, diff --git a/packages/das/src/queue/constants.ts b/packages/das/src/queue/constants.ts index 9151658..fd77f4e 100644 --- a/packages/das/src/queue/constants.ts +++ b/packages/das/src/queue/constants.ts @@ -7,3 +7,14 @@ export const FETCH_JOBS = { } as const; export const DEFAULT_BACKFILL_DAYS = 40; + +export function prFilesJobId( + repoFullName: string, + prNumber: number, + headSha: string | null, + baseSha: string | null, +): string { + return `files-${repoFullName}-${prNumber}-${headSha ?? "no-head"}-${ + baseSha ?? "no-base" + }`; +} diff --git a/packages/das/src/queue/fetch.processor.ts b/packages/das/src/queue/fetch.processor.ts index 5449e3b..3ba8cc6 100644 --- a/packages/das/src/queue/fetch.processor.ts +++ b/packages/das/src/queue/fetch.processor.ts @@ -1,11 +1,19 @@ import { Processor, WorkerHost, InjectQueue } from "@nestjs/bullmq"; import { Logger } from "@nestjs/common"; import { InjectRepository } from "@nestjs/typeorm"; -import { Repository } from "typeorm"; +import { IsNull, Repository } from "typeorm"; import { Job, Queue } from "bullmq"; import { Issue, PullRequest } from "../entities"; -import { GitHubFetcherService } from "../webhook/github-fetcher.service"; -import { FETCH_QUEUE, FETCH_JOBS, DEFAULT_BACKFILL_DAYS } from "./constants"; +import { + GitHubFetcherService, + PrFilesGeneration, +} from "../webhook/github-fetcher.service"; +import { + FETCH_QUEUE, + FETCH_JOBS, + DEFAULT_BACKFILL_DAYS, + prFilesJobId, +} from "./constants"; export interface PrMetadataJobData { repoFullName: string; @@ -15,6 +23,8 @@ export interface PrMetadataJobData { export interface PrFilesJobData { repoFullName: string; prNumber: number; + expectedHeadSha?: string | null; + expectedBaseSha?: string | null; } export interface BackfillRepoJobData { @@ -48,8 +58,7 @@ export class FetchProcessor extends WorkerHost { break; } case FETCH_JOBS.PR_FILES: { - const { repoFullName, prNumber } = job.data as PrFilesJobData; - await this.handlePrFiles(repoFullName, prNumber); + await this.handlePrFiles(job.data as PrFilesJobData); break; } case FETCH_JOBS.BACKFILL_REPO: { @@ -92,18 +101,32 @@ export class FetchProcessor extends WorkerHost { } } - private async handlePrFiles( - repoFullName: string, - prNumber: number, - ): Promise { + private async handlePrFiles(data: PrFilesJobData): Promise { + const { repoFullName, prNumber } = data; this.logger.log(`Fetching PR files for ${repoFullName}#${prNumber}`); - await this.fetcher.fetchAndStorePrFiles(repoFullName, prNumber); + const generation = await this.resolvePrFilesGeneration(data); + if (!generation) return; - await this.prRepo.update( - { repoFullName, prNumber }, + const result = await this.fetcher.fetchAndStorePrFiles( + repoFullName, + prNumber, + generation, + ); + + if (result.status === "stale") { + await this.handleStalePrFilesJob(repoFullName, prNumber); + return; + } + + const updateResult = await this.prRepo.update( + this.prGenerationCriteria(repoFullName, prNumber, generation), { scoringDataStored: true }, ); + + if (!updateResult.affected) { + await this.handleStalePrFilesJob(repoFullName, prNumber); + } } private async handleBackfill( @@ -122,7 +145,7 @@ export class FetchProcessor extends WorkerHost { this.logger.log(`Backfilled ${prs.length} PRs from ${repoFullName}`); // Enqueue follow-up jobs (metadata + files for every PR). - for (const { prNumber } of prs) { + for (const { prNumber, headSha, baseSha } of prs) { await this.fetchQueue.add( FETCH_JOBS.PR_METADATA, { repoFullName, prNumber }, @@ -135,11 +158,18 @@ export class FetchProcessor extends WorkerHost { }, ); + const expectedHeadSha = headSha ?? null; + const expectedBaseSha = baseSha ?? null; await this.fetchQueue.add( FETCH_JOBS.PR_FILES, - { repoFullName, prNumber }, + { repoFullName, prNumber, expectedHeadSha, expectedBaseSha }, { - jobId: `files-${repoFullName}-${prNumber}`, + jobId: prFilesJobId( + repoFullName, + prNumber, + expectedHeadSha, + expectedBaseSha, + ), removeOnComplete: true, removeOnFail: 50, attempts: 3, @@ -152,4 +182,74 @@ export class FetchProcessor extends WorkerHost { await this.fetcher.backfillIssues(repoFullName, sinceDate); this.logger.log(`Backfilled issues from ${repoFullName}`); } + + private async resolvePrFilesGeneration( + data: PrFilesJobData, + ): Promise { + if ( + data.expectedHeadSha !== undefined || + data.expectedBaseSha !== undefined + ) { + return { + headSha: data.expectedHeadSha ?? null, + baseSha: data.expectedBaseSha ?? null, + }; + } + + const pr = await this.prRepo.findOneBy({ + repoFullName: data.repoFullName, + prNumber: data.prNumber, + }); + if (!pr) return null; + + return { + headSha: pr.headSha ?? null, + baseSha: pr.baseSha ?? null, + }; + } + + private async handleStalePrFilesJob( + repoFullName: string, + prNumber: number, + ): Promise { + await this.prRepo.update( + { repoFullName, prNumber }, + { scoringDataStored: false }, + ); + + const pr = await this.prRepo.findOneBy({ repoFullName, prNumber }); + if (!pr) return; + + const expectedHeadSha = pr.headSha ?? null; + const expectedBaseSha = pr.baseSha ?? null; + await this.fetchQueue.add( + FETCH_JOBS.PR_FILES, + { repoFullName, prNumber, expectedHeadSha, expectedBaseSha }, + { + jobId: prFilesJobId( + repoFullName, + prNumber, + expectedHeadSha, + expectedBaseSha, + ), + removeOnComplete: true, + removeOnFail: 50, + attempts: 3, + backoff: { type: "exponential", delay: 5000 }, + }, + ); + } + + private prGenerationCriteria( + repoFullName: string, + prNumber: number, + generation: PrFilesGeneration, + ): Record { + return { + repoFullName, + prNumber, + headSha: generation.headSha ?? IsNull(), + baseSha: generation.baseSha ?? IsNull(), + }; + } } diff --git a/packages/das/src/queue/pr-files-generation.spec.ts b/packages/das/src/queue/pr-files-generation.spec.ts new file mode 100644 index 0000000..54a26ab --- /dev/null +++ b/packages/das/src/queue/pr-files-generation.spec.ts @@ -0,0 +1,367 @@ +/* eslint-disable @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-unsafe-return */ +import test from "node:test"; +import assert from "node:assert/strict"; +import { FETCH_JOBS } from "./constants"; +import { FetchProcessor } from "./fetch.processor"; +import { PullRequestHandler } from "../webhook/handlers/pull-request.handler"; +import { GitHubFetcherService } from "../webhook/github-fetcher.service"; + +type MockRepo> = { + row: T | null; + deletes: unknown[]; + upserts: unknown[]; + updates: unknown[]; + repo: { + findOneBy: (criteria: Record) => Promise; + update: ( + criteria: Record | string, + patch: Record, + ) => Promise<{ affected: number }>; + upsert: (data: Record) => Promise; + delete: (criteria: Record) => Promise; + }; +}; + +function createMockRepo>( + initialRow: T | null = null, +): MockRepo { + const mock: MockRepo = { + row: initialRow, + deletes: [], + upserts: [], + updates: [], + repo: { + findOneBy: (criteria: Record): Promise => + Promise.resolve(matchesCriteria(mock.row, criteria) ? mock.row : null), + update: ( + criteria: Record | string, + patch: Record, + ): Promise<{ affected: number }> => { + mock.updates.push({ criteria, patch }); + if ( + mock.row && + (typeof criteria === "string" || matchesCriteria(mock.row, criteria)) + ) { + Object.assign(mock.row, patch); + return Promise.resolve({ affected: 1 }); + } + return Promise.resolve({ affected: 0 }); + }, + upsert: (data: Record): Promise => { + mock.upserts.push(data); + mock.row = { ...mock.row, ...data } as T; + return Promise.resolve(); + }, + delete: (criteria: Record): Promise => { + mock.deletes.push(criteria); + return Promise.resolve(); + }, + }, + }; + + return mock; +} + +function matchesCriteria>( + row: T | null, + criteria: Record, +): boolean { + if (!row) return false; + for (const [key, value] of Object.entries(criteria)) { + if (isNullFindOperator(value)) { + if (row[key] !== null && row[key] !== undefined) return false; + } else if (row[key] !== value) { + return false; + } + } + return true; +} + +function isNullFindOperator(value: unknown): boolean { + return ( + typeof value === "object" && + value !== null && + "_type" in value && + (value as { _type?: unknown })._type === "isNull" + ); +} + +function createQueue(): { + added: Array<{ name: string; data: any; opts: any }>; + queue: { add: (name: string, data: any, opts: any) => Promise }; +} { + const added: Array<{ name: string; data: any; opts: any }> = []; + return { + added, + queue: { + add: (name: string, data: any, opts: any): Promise => { + added.push({ name, data, opts }); + return Promise.resolve(); + }, + }, + }; +} + +function synchronizePayload(): Record { + return { + action: "synchronize", + repository: { full_name: "owner/repo" }, + pull_request: { + number: 7, + title: "PR", + state: "open", + merged: false, + created_at: "2026-05-01T00:00:00Z", + closed_at: null, + merged_at: null, + user: { id: 123, login: "miner" }, + author_association: "CONTRIBUTOR", + merged_by: null, + base: { ref: "test", sha: "B2" }, + head: { + ref: "feature", + sha: "H2", + repo: { full_name: "owner/repo" }, + }, + additions: 1, + deletions: 0, + commits: 1, + labels: [], + }, + }; +} + +void test("synchronize enqueues a PR file job for the webhook head/base generation", async () => { + const prRepo = createMockRepo(); + const repoRepo = createMockRepo({ repoFullName: "owner/repo" }); + const queue = createQueue(); + const handler = new PullRequestHandler( + prRepo.repo as any, + repoRepo.repo as any, + queue.queue as any, + ); + + await handler.handle(synchronizePayload()); + + const fileJob = queue.added.find((job) => job.name === FETCH_JOBS.PR_FILES); + assert.ok(fileJob); + assert.equal(fileJob.data.repoFullName, "owner/repo"); + assert.equal(fileJob.data.prNumber, 7); + assert.equal(fileJob.data.expectedHeadSha, "H2"); + assert.equal(fileJob.data.expectedBaseSha, "B2"); + assert.equal(fileJob.opts.jobId, "files-owner/repo-7-H2-B2"); +}); + +void test("stale PR file jobs cannot mark a newer PR generation complete", async () => { + const prRepo = createMockRepo({ + repoFullName: "owner/repo", + prNumber: 7, + headSha: "H1", + baseSha: "B1", + scoringDataStored: false, + }); + const issueRepo = createMockRepo(); + const queue = createQueue(); + const fetcher = { + fetchAndStorePrFiles: ( + _repoFullName: string, + _prNumber: number, + generation: { headSha: string | null; baseSha: string | null }, + ): Promise<{ status: "stored" }> => { + assert.deepEqual(generation, { headSha: "H1", baseSha: "B1" }); + Object.assign(prRepo.row ?? {}, { + headSha: "H2", + baseSha: "B2", + scoringDataStored: false, + }); + return Promise.resolve({ status: "stored" }); + }, + }; + const processor = new FetchProcessor( + fetcher as any, + prRepo.repo as any, + issueRepo.repo as any, + queue.queue as any, + ); + + await processor.process({ + name: FETCH_JOBS.PR_FILES, + data: { + repoFullName: "owner/repo", + prNumber: 7, + expectedHeadSha: "H1", + expectedBaseSha: "B1", + }, + } as any); + + assert.equal(prRepo.row?.headSha, "H2"); + assert.equal(prRepo.row?.baseSha, "B2"); + assert.equal(prRepo.row?.scoringDataStored, false); + assert.equal(queue.added.length, 1); + assert.equal(queue.added[0].opts.jobId, "files-owner/repo-7-H2-B2"); + assert.equal(queue.added[0].data.expectedHeadSha, "H2"); + assert.equal(queue.added[0].data.expectedBaseSha, "B2"); +}); + +void test("stale PR file jobs invalidate and requeue a completed newer generation", async () => { + const prRepo = createMockRepo({ + repoFullName: "owner/repo", + prNumber: 7, + headSha: "H2", + baseSha: "B2", + scoringDataStored: true, + }); + const issueRepo = createMockRepo(); + const queue = createQueue(); + const fetcher = { + fetchAndStorePrFiles: (): Promise<{ status: "stale" }> => + Promise.resolve({ status: "stale" }), + }; + const processor = new FetchProcessor( + fetcher as any, + prRepo.repo as any, + issueRepo.repo as any, + queue.queue as any, + ); + + await processor.process({ + name: FETCH_JOBS.PR_FILES, + data: { + repoFullName: "owner/repo", + prNumber: 7, + expectedHeadSha: "H1", + expectedBaseSha: "B1", + }, + } as any); + + assert.equal(prRepo.row?.scoringDataStored, false); + assert.equal(queue.added.length, 1); + assert.equal(queue.added[0].opts.jobId, "files-owner/repo-7-H2-B2"); + assert.equal(queue.added[0].data.expectedHeadSha, "H2"); + assert.equal(queue.added[0].data.expectedBaseSha, "B2"); +}); + +void test("current PR file jobs mark the matching generation complete", async () => { + const prRepo = createMockRepo({ + repoFullName: "owner/repo", + prNumber: 7, + headSha: "H2", + baseSha: "B2", + scoringDataStored: false, + }); + const issueRepo = createMockRepo(); + const queue = createQueue(); + const fetcher = { + fetchAndStorePrFiles: ( + _repoFullName: string, + _prNumber: number, + generation: { headSha: string | null; baseSha: string | null }, + ): Promise<{ status: "stored" }> => { + assert.deepEqual(generation, { headSha: "H2", baseSha: "B2" }); + return Promise.resolve({ status: "stored" }); + }, + }; + const processor = new FetchProcessor( + fetcher as any, + prRepo.repo as any, + issueRepo.repo as any, + queue.queue as any, + ); + + await processor.process({ + name: FETCH_JOBS.PR_FILES, + data: { + repoFullName: "owner/repo", + prNumber: 7, + expectedHeadSha: "H2", + expectedBaseSha: "B2", + }, + } as any); + + assert.equal(prRepo.row?.scoringDataStored, true); + assert.equal(queue.added.length, 0); +}); + +void test("legacy PR file jobs resolve the current generation from the PR row", async () => { + const prRepo = createMockRepo({ + repoFullName: "owner/repo", + prNumber: 7, + headSha: "H2", + baseSha: "B2", + scoringDataStored: false, + }); + const issueRepo = createMockRepo(); + const queue = createQueue(); + const fetcher = { + fetchAndStorePrFiles: ( + _repoFullName: string, + _prNumber: number, + generation: { headSha: string | null; baseSha: string | null }, + ): Promise<{ status: "stored" }> => { + assert.deepEqual(generation, { headSha: "H2", baseSha: "B2" }); + return Promise.resolve({ status: "stored" }); + }, + }; + const processor = new FetchProcessor( + fetcher as any, + prRepo.repo as any, + issueRepo.repo as any, + queue.queue as any, + ); + + await processor.process({ + name: FETCH_JOBS.PR_FILES, + data: { + repoFullName: "owner/repo", + prNumber: 7, + }, + } as any); + + assert.equal(prRepo.row?.scoringDataStored, true); + assert.equal(queue.added.length, 0); +}); + +void test("fetcher skips destructive file writes when the job generation is already stale", async () => { + const prFileRepo = createMockRepo(); + const prFileContentRepo = createMockRepo(); + const prRepo = createMockRepo({ + repoFullName: "owner/repo", + prNumber: 7, + headSha: "H2", + baseSha: "B2", + mergeBaseSha: null, + }); + const otherRepo = createMockRepo(); + const config = { + getOrThrow: (key: string): string => + key === "GITHUB_APP_ID" ? "123" : "/tmp/private-key.pem", + }; + const fetcher = new GitHubFetcherService( + config as any, + prFileRepo.repo as any, + prFileContentRepo.repo as any, + prRepo.repo as any, + otherRepo.repo as any, + otherRepo.repo as any, + otherRepo.repo as any, + otherRepo.repo as any, + ); + + (fetcher as any).getTokenForRepo = (): Promise => + Promise.resolve("token"); + (fetcher as any).fetchMergeBaseSha = (): Promise => + Promise.resolve("M2"); + (fetcher as any).fetchAllPrFiles = (): Promise => + Promise.resolve([]); + + const result = await fetcher.fetchAndStorePrFiles("owner/repo", 7, { + headSha: "H1", + baseSha: "B1", + }); + + assert.deepEqual(result, { status: "stale" }); + assert.equal(prFileRepo.deletes.length, 0); + assert.equal(prFileContentRepo.deletes.length, 0); + assert.equal(prFileRepo.upserts.length, 0); + assert.equal(prFileContentRepo.upserts.length, 0); +}); diff --git a/packages/das/src/webhook/github-fetcher.service.ts b/packages/das/src/webhook/github-fetcher.service.ts index 98c8de4..85cd363 100644 --- a/packages/das/src/webhook/github-fetcher.service.ts +++ b/packages/das/src/webhook/github-fetcher.service.ts @@ -2,7 +2,7 @@ import { Injectable, Logger, OnModuleInit } from "@nestjs/common"; import { ConfigService } from "@nestjs/config"; import { InjectRepository } from "@nestjs/typeorm"; -import { Repository } from "typeorm"; +import { IsNull, Repository } from "typeorm"; import { readFileSync } from "fs"; import { sign } from "jsonwebtoken"; import { @@ -26,6 +26,15 @@ const MAX_FILE_SIZE_BYTES = 1_000_000; // Starting batch size for batched GraphQL file-content requests. Halves on failure. const GRAPHQL_FILES_BATCH_SIZE = 50; +export interface PrFilesGeneration { + headSha: string | null; + baseSha: string | null; +} + +export interface PrFilesFetchResult { + status: "stored" | "stale"; +} + const sleep = (ms: number): Promise => new Promise((resolve) => setTimeout(resolve, ms)); @@ -328,7 +337,8 @@ export class GitHubFetcherService implements OnModuleInit { async fetchAndStorePrFiles( repoFullName: string, prNumber: number, - ): Promise { + expectedGeneration?: PrFilesGeneration, + ): Promise { const [owner, repo] = repoFullName.split("/"); const token = await this.getTokenForRepo(repoFullName); @@ -337,6 +347,14 @@ export class GitHubFetcherService implements OnModuleInit { throw new Error(`PR ${repoFullName}#${prNumber} not found in DB`); } + const generation = expectedGeneration ?? { + headSha: pr.headSha ?? null, + baseSha: pr.baseSha ?? null, + }; + if (!this.matchesPrFilesGeneration(pr, generation)) { + return { status: "stale" }; + } + // Fetch and store the merge-base SHA. Needed for correct tree-diff // scoring — differs from baseSha when base branch has advanced. Recompute // on every fetch: a stored value can go stale when head advances via @@ -349,7 +367,13 @@ export class GitHubFetcherService implements OnModuleInit { pr.headSha, ); if (mergeBaseSha) { - await this.prRepo.update({ repoFullName, prNumber }, { mergeBaseSha }); + const result = await this.prRepo.update( + this.prFilesGenerationCriteria(repoFullName, prNumber, generation), + { mergeBaseSha }, + ); + if (!result.affected) { + return { status: "stale" }; + } pr.mergeBaseSha = mergeBaseSha; } } @@ -357,6 +381,16 @@ export class GitHubFetcherService implements OnModuleInit { // 1. Fetch file list via REST const files = await this.fetchAllPrFiles(owner, repo, prNumber, token); + if ( + !(await this.isCurrentPrFilesGeneration( + repoFullName, + prNumber, + generation, + )) + ) { + return { status: "stale" }; + } + // Clear any stale data for this PR (e.g. after a synchronize event) await this.prFileRepo.delete({ repoFullName, prNumber }); await this.prFileContentRepo.delete({ repoFullName, prNumber }); @@ -383,7 +417,7 @@ export class GitHubFetcherService implements OnModuleInit { this.logger.warn( `PR ${repoFullName}#${prNumber} has no head SHA — skipping content fetch`, ); - return; + return { status: "stored" }; } // Prefer merge-base SHA (true common ancestor) over base SHA for @@ -391,7 +425,7 @@ export class GitHubFetcherService implements OnModuleInit { // merge-base couldn't be resolved. const baseForContents = pr.mergeBaseSha ?? pr.baseSha; - await this.fetchAndStoreBatchedContents( + const contentsResult = await this.fetchAndStoreBatchedContents( repoFullName, prNumber, files, @@ -400,7 +434,23 @@ export class GitHubFetcherService implements OnModuleInit { token, pr.headSha, baseForContents, + generation, ); + if (contentsResult.status === "stale") { + return contentsResult; + } + + if ( + !(await this.isCurrentPrFilesGeneration( + repoFullName, + prNumber, + generation, + )) + ) { + return { status: "stale" }; + } + + return { status: "stored" }; } private async fetchAllPrFiles( @@ -476,16 +526,26 @@ export class GitHubFetcherService implements OnModuleInit { token: string, headSha: string, baseSha: string | null, - ): Promise { + generation: PrFilesGeneration, + ): Promise { // Only fetch contents for files that have a meaningful version to fetch const scored = files.filter((f) => f.status !== "removed"); - if (scored.length === 0) return; + if (scored.length === 0) return { status: "stored" }; let batchSize = GRAPHQL_FILES_BATCH_SIZE; const minBatchSize = 5; for (let i = 0; i < scored.length; ) { const batch = scored.slice(i, i + batchSize); + if ( + !(await this.isCurrentPrFilesGeneration( + repoFullName, + prNumber, + generation, + )) + ) { + return { status: "stale" }; + } try { await this.fetchContentBatch( repoFullName, @@ -514,6 +574,8 @@ export class GitHubFetcherService implements OnModuleInit { } } } + + return { status: "stored" }; } private async fetchContentBatch( @@ -606,6 +668,38 @@ export class GitHubFetcherService implements OnModuleInit { } } + private matchesPrFilesGeneration( + pr: PullRequest, + generation: PrFilesGeneration, + ): boolean { + return ( + (pr.headSha ?? null) === generation.headSha && + (pr.baseSha ?? null) === generation.baseSha + ); + } + + private async isCurrentPrFilesGeneration( + repoFullName: string, + prNumber: number, + generation: PrFilesGeneration, + ): Promise { + const pr = await this.prRepo.findOneBy({ repoFullName, prNumber }); + return !!pr && this.matchesPrFilesGeneration(pr, generation); + } + + private prFilesGenerationCriteria( + repoFullName: string, + prNumber: number, + generation: PrFilesGeneration, + ): Record { + return { + repoFullName, + prNumber, + headSha: generation.headSha ?? IsNull(), + baseSha: generation.baseSha ?? IsNull(), + }; + } + private extractBlobText(blob: any): string | null { if (!blob) return null; if (blob.isBinary) return null; @@ -636,7 +730,9 @@ export class GitHubFetcherService implements OnModuleInit { async backfillPullRequests( repoFullName: string, sinceDate: Date, - ): Promise<{ prNumber: number }[]> { + ): Promise< + { prNumber: number; headSha: string | null; baseSha: string | null }[] + > { const [owner, repo] = repoFullName.split("/"); const token = await this.getTokenForRepo(repoFullName); @@ -718,7 +814,11 @@ export class GitHubFetcherService implements OnModuleInit { } `; - const prs: { prNumber: number }[] = []; + const prs: { + prNumber: number; + headSha: string | null; + baseSha: string | null; + }[] = []; let cursor: string | null = null; let defaultBranchWritten = false; @@ -823,7 +923,11 @@ export class GitHubFetcherService implements OnModuleInit { pr.timelineItems?.nodes ?? [], ); - prs.push({ prNumber: pr.number }); + prs.push({ + prNumber: pr.number, + headSha: pr.headRefOid ?? null, + baseSha: pr.baseRefOid ?? null, + }); } if (shouldStop || !page.pageInfo.hasNextPage) break; diff --git a/packages/das/src/webhook/handlers/pull-request.handler.ts b/packages/das/src/webhook/handlers/pull-request.handler.ts index dd54838..c683090 100644 --- a/packages/das/src/webhook/handlers/pull-request.handler.ts +++ b/packages/das/src/webhook/handlers/pull-request.handler.ts @@ -5,7 +5,7 @@ import { InjectQueue } from "@nestjs/bullmq"; import { Repository } from "typeorm"; import { Queue } from "bullmq"; import { PullRequest, Repo } from "../../entities"; -import { FETCH_QUEUE, FETCH_JOBS } from "../../queue/constants"; +import { FETCH_QUEUE, FETCH_JOBS, prFilesJobId } from "../../queue/constants"; @Injectable() export class PullRequestHandler { @@ -96,10 +96,17 @@ export class PullRequestHandler { ); } - const jobId = `files-${repoFullName}-${prNumber}`; + const expectedHeadSha = data.headSha ?? null; + const expectedBaseSha = data.baseSha ?? null; + const jobId = prFilesJobId( + repoFullName, + prNumber, + expectedHeadSha, + expectedBaseSha, + ); await this.fetchQueue.add( FETCH_JOBS.PR_FILES, - { repoFullName, prNumber }, + { repoFullName, prNumber, expectedHeadSha, expectedBaseSha }, { jobId, removeOnComplete: true, From 930473cc0b055fd8c8e835438711dd7a1827edf8 Mon Sep 17 00:00:00 2001 From: mkdev11 Date: Fri, 8 May 2026 06:43:09 +0200 Subject: [PATCH 2/5] Fix stale PR file write race --- .../das/src/queue/pr-files-generation.spec.ts | 298 +++++++++++++++++- .../das/src/webhook/github-fetcher.service.ts | 130 ++++++-- 2 files changed, 379 insertions(+), 49 deletions(-) diff --git a/packages/das/src/queue/pr-files-generation.spec.ts b/packages/das/src/queue/pr-files-generation.spec.ts index 54a26ab..2272318 100644 --- a/packages/das/src/queue/pr-files-generation.spec.ts +++ b/packages/das/src/queue/pr-files-generation.spec.ts @@ -5,6 +5,7 @@ import { FETCH_JOBS } from "./constants"; import { FetchProcessor } from "./fetch.processor"; import { PullRequestHandler } from "../webhook/handlers/pull-request.handler"; import { GitHubFetcherService } from "../webhook/github-fetcher.service"; +import { PrFile, PrFileContent, PullRequest } from "../entities"; type MockRepo> = { row: T | null; @@ -13,12 +14,21 @@ type MockRepo> = { updates: unknown[]; repo: { findOneBy: (criteria: Record) => Promise; + findOne: (options: { + where?: Record; + }) => Promise; update: ( criteria: Record | string, patch: Record, ) => Promise<{ affected: number }>; - upsert: (data: Record) => Promise; + upsert: ( + data: Record, + conflictPaths?: string[], + ) => Promise; delete: (criteria: Record) => Promise; + manager?: { + transaction: (work: (manager: any) => Promise) => Promise; + }; }; }; @@ -33,6 +43,12 @@ function createMockRepo>( repo: { findOneBy: (criteria: Record): Promise => Promise.resolve(matchesCriteria(mock.row, criteria) ? mock.row : null), + findOne: (options: { + where?: Record; + }): Promise => + Promise.resolve( + matchesCriteria(mock.row, options.where ?? {}) ? mock.row : null, + ), update: ( criteria: Record | string, patch: Record, @@ -62,6 +78,44 @@ function createMockRepo>( return mock; } +function createTransactionManager( + repos: Map>, + beforeTransaction?: () => void, +): { + transaction: (work: (manager: any) => Promise) => Promise; +} { + return { + transaction: async (work: (manager: any) => Promise): Promise => { + beforeTransaction?.(); + return work({ + getRepository: (entity: unknown) => { + const repo = repos.get(entity); + if (!repo) { + throw new Error(`No mock repository for ${String(entity)}`); + } + return repo.repo; + }, + }); + }, + }; +} + +function attachPrFilesTransactionManager( + prRepo: MockRepo, + prFileRepo: MockRepo, + prFileContentRepo: MockRepo, + beforeTransaction?: () => void, +): void { + prRepo.repo.manager = createTransactionManager( + new Map>([ + [PullRequest, prRepo], + [PrFile, prFileRepo], + [PrFileContent, prFileContentRepo], + ]), + beforeTransaction, + ); +} + function matchesCriteria>( row: T | null, criteria: Record, @@ -131,6 +185,28 @@ function synchronizePayload(): Record { }; } +function createGitHubFetcher( + prFileRepo: MockRepo, + prFileContentRepo: MockRepo, + prRepo: MockRepo, +): GitHubFetcherService { + const otherRepo = createMockRepo(); + const config = { + getOrThrow: (key: string): string => + key === "GITHUB_APP_ID" ? "123" : "/tmp/private-key.pem", + }; + return new GitHubFetcherService( + config as any, + prFileRepo.repo as any, + prFileContentRepo.repo as any, + prRepo.repo as any, + otherRepo.repo as any, + otherRepo.repo as any, + otherRepo.repo as any, + otherRepo.repo as any, + ); +} + void test("synchronize enqueues a PR file job for the webhook head/base generation", async () => { const prRepo = createMockRepo(); const repoRepo = createMockRepo({ repoFullName: "owner/repo" }); @@ -331,21 +407,7 @@ void test("fetcher skips destructive file writes when the job generation is alre baseSha: "B2", mergeBaseSha: null, }); - const otherRepo = createMockRepo(); - const config = { - getOrThrow: (key: string): string => - key === "GITHUB_APP_ID" ? "123" : "/tmp/private-key.pem", - }; - const fetcher = new GitHubFetcherService( - config as any, - prFileRepo.repo as any, - prFileContentRepo.repo as any, - prRepo.repo as any, - otherRepo.repo as any, - otherRepo.repo as any, - otherRepo.repo as any, - otherRepo.repo as any, - ); + const fetcher = createGitHubFetcher(prFileRepo, prFileContentRepo, prRepo); (fetcher as any).getTokenForRepo = (): Promise => Promise.resolve("token"); @@ -365,3 +427,207 @@ void test("fetcher skips destructive file writes when the job generation is alre assert.equal(prFileRepo.upserts.length, 0); assert.equal(prFileContentRepo.upserts.length, 0); }); + +void test("fetcher replaces PR file rows when the generation is current under the row lock", async () => { + const prFileRepo = createMockRepo(); + const prFileContentRepo = createMockRepo(); + const prRepo = createMockRepo({ + repoFullName: "owner/repo", + prNumber: 7, + headSha: null, + baseSha: null, + mergeBaseSha: null, + }); + attachPrFilesTransactionManager(prRepo, prFileRepo, prFileContentRepo); + const fetcher = createGitHubFetcher(prFileRepo, prFileContentRepo, prRepo); + + (fetcher as any).getTokenForRepo = (): Promise => + Promise.resolve("token"); + (fetcher as any).fetchAllPrFiles = (): Promise => + Promise.resolve([ + { + filename: "src/app.ts", + previous_filename: undefined, + status: "modified", + additions: 3, + deletions: 1, + changes: 4, + }, + ]); + + const result = await fetcher.fetchAndStorePrFiles("owner/repo", 7, { + headSha: null, + baseSha: null, + }); + + assert.deepEqual(result, { status: "stored" }); + assert.deepEqual(prFileRepo.deletes, [ + { repoFullName: "owner/repo", prNumber: 7 }, + ]); + assert.deepEqual(prFileContentRepo.deletes, [ + { repoFullName: "owner/repo", prNumber: 7 }, + ]); + assert.deepEqual(prFileRepo.upserts, [ + { + repoFullName: "owner/repo", + prNumber: 7, + filename: "src/app.ts", + previousFilename: null, + status: "modified", + additions: 3, + deletions: 1, + changes: 4, + }, + ]); + assert.equal(prFileContentRepo.upserts.length, 0); +}); + +void test("fetcher rechecks generation under the row lock before deleting PR file rows", async () => { + const prFileRepo = createMockRepo(); + const prFileContentRepo = createMockRepo(); + const prRepo = createMockRepo({ + repoFullName: "owner/repo", + prNumber: 7, + headSha: "H1", + baseSha: "B1", + mergeBaseSha: null, + scoringDataStored: false, + }); + let raced = false; + const deletePrFiles = prFileRepo.repo.delete; + prFileRepo.repo.delete = async ( + criteria: Record, + ): Promise => { + if (!raced) { + Object.assign(prRepo.row ?? {}, { + headSha: "H2", + baseSha: "B2", + scoringDataStored: true, + }); + raced = true; + } + await deletePrFiles(criteria); + }; + attachPrFilesTransactionManager(prRepo, prFileRepo, prFileContentRepo, () => { + if (!raced) { + Object.assign(prRepo.row ?? {}, { + headSha: "H2", + baseSha: "B2", + scoringDataStored: true, + }); + raced = true; + } + }); + const fetcher = createGitHubFetcher(prFileRepo, prFileContentRepo, prRepo); + + (fetcher as any).getTokenForRepo = (): Promise => + Promise.resolve("token"); + (fetcher as any).fetchMergeBaseSha = (): Promise => + Promise.resolve(null); + (fetcher as any).fetchAllPrFiles = (): Promise => + Promise.resolve([ + { + filename: "src/app.ts", + status: "removed", + additions: 0, + deletions: 4, + changes: 4, + }, + ]); + + const result = await fetcher.fetchAndStorePrFiles("owner/repo", 7, { + headSha: "H1", + baseSha: "B1", + }); + + assert.deepEqual(result, { status: "stale" }); + assert.equal(prRepo.row?.headSha, "H2"); + assert.equal(prRepo.row?.baseSha, "B2"); + assert.equal(prRepo.row?.scoringDataStored, true); + assert.equal(prFileRepo.deletes.length, 0); + assert.equal(prFileContentRepo.deletes.length, 0); + assert.equal(prFileRepo.upserts.length, 0); + assert.equal(prFileContentRepo.upserts.length, 0); +}); + +void test("fetcher rechecks generation under the row lock before storing PR file contents", async () => { + const prFileRepo = createMockRepo(); + const prFileContentRepo = createMockRepo(); + const prRepo = createMockRepo({ + repoFullName: "owner/repo", + prNumber: 7, + headSha: "H1", + baseSha: "B1", + mergeBaseSha: null, + scoringDataStored: false, + }); + let transactionCount = 0; + const upsertContent = prFileContentRepo.repo.upsert; + prFileContentRepo.repo.upsert = async ( + data: Record, + conflictPaths?: string[], + ): Promise => { + Object.assign(prRepo.row ?? {}, { + headSha: "H2", + baseSha: "B2", + scoringDataStored: true, + }); + await upsertContent(data, conflictPaths); + }; + attachPrFilesTransactionManager(prRepo, prFileRepo, prFileContentRepo, () => { + transactionCount++; + if (transactionCount === 2) { + Object.assign(prRepo.row ?? {}, { + headSha: "H2", + baseSha: "B2", + scoringDataStored: true, + }); + } + }); + const fetcher = createGitHubFetcher(prFileRepo, prFileContentRepo, prRepo); + + (fetcher as any).getTokenForRepo = (): Promise => + Promise.resolve("token"); + (fetcher as any).fetchMergeBaseSha = (): Promise => + Promise.resolve(null); + (fetcher as any).fetchAllPrFiles = (): Promise => + Promise.resolve([ + { + filename: "src/app.ts", + status: "modified", + additions: 3, + deletions: 1, + changes: 4, + }, + ]); + (fetcher as any).githubFetch = (): Promise<{ + ok: true; + json: () => Promise>; + }> => + Promise.resolve({ + ok: true, + json: () => + Promise.resolve({ + data: { + repository: { + base0: { text: "old", byteSize: 3, isBinary: false }, + head0: { text: "new", byteSize: 3, isBinary: false }, + }, + }, + }), + }); + + const result = await fetcher.fetchAndStorePrFiles("owner/repo", 7, { + headSha: "H1", + baseSha: "B1", + }); + + assert.deepEqual(result, { status: "stale" }); + assert.equal(prRepo.row?.headSha, "H2"); + assert.equal(prRepo.row?.baseSha, "B2"); + assert.equal(prRepo.row?.scoringDataStored, true); + assert.equal(prFileRepo.deletes.length, 1); + assert.equal(prFileContentRepo.deletes.length, 1); + assert.equal(prFileRepo.upserts.length, 1); + assert.equal(prFileContentRepo.upserts.length, 0); +}); diff --git a/packages/das/src/webhook/github-fetcher.service.ts b/packages/das/src/webhook/github-fetcher.service.ts index 85cd363..f4d2953 100644 --- a/packages/das/src/webhook/github-fetcher.service.ts +++ b/packages/das/src/webhook/github-fetcher.service.ts @@ -2,7 +2,7 @@ import { Injectable, Logger, OnModuleInit } from "@nestjs/common"; import { ConfigService } from "@nestjs/config"; import { InjectRepository } from "@nestjs/typeorm"; -import { IsNull, Repository } from "typeorm"; +import { EntityManager, IsNull, Repository } from "typeorm"; import { readFileSync } from "fs"; import { sign } from "jsonwebtoken"; import { @@ -391,25 +391,41 @@ export class GitHubFetcherService implements OnModuleInit { return { status: "stale" }; } - // Clear any stale data for this PR (e.g. after a synchronize event) - await this.prFileRepo.delete({ repoFullName, prNumber }); - await this.prFileContentRepo.delete({ repoFullName, prNumber }); + // Clear stale data and replace file metadata only while the PR row still + // matches this job's generation. Without the row lock, an older active job + // can delete rows written by a newer completed generation. + const filesStored = await this.writeIfCurrentPrFilesGeneration( + repoFullName, + prNumber, + generation, + async (manager) => { + const prFileRepo = manager.getRepository(PrFile); + const prFileContentRepo = manager.getRepository(PrFileContent); - // 2. Upsert file metadata - for (const file of files) { - await this.prFileRepo.upsert( - { - repoFullName, - prNumber, - filename: file.filename, - previousFilename: file.previous_filename ?? null, - status: file.status, - additions: file.additions ?? 0, - deletions: file.deletions ?? 0, - changes: file.changes ?? 0, - }, - ["repoFullName", "prNumber", "filename"], - ); + await prFileRepo.delete({ repoFullName, prNumber }); + await prFileContentRepo.delete({ repoFullName, prNumber }); + + // 2. Upsert file metadata + for (const file of files) { + await prFileRepo.upsert( + { + repoFullName, + prNumber, + filename: file.filename, + previousFilename: file.previous_filename ?? null, + status: file.status, + additions: file.additions ?? 0, + deletions: file.deletions ?? 0, + changes: file.changes ?? 0, + }, + ["repoFullName", "prNumber", "filename"], + ); + } + }, + ); + + if (!filesStored) { + return { status: "stale" }; } // 3. Fetch file contents in batches (base + head in one GraphQL call each) @@ -547,7 +563,7 @@ export class GitHubFetcherService implements OnModuleInit { return { status: "stale" }; } try { - await this.fetchContentBatch( + const result = await this.fetchContentBatch( repoFullName, prNumber, batch, @@ -556,7 +572,9 @@ export class GitHubFetcherService implements OnModuleInit { token, headSha, baseSha, + generation, ); + if (result.status === "stale") return result; i += batch.length; } catch (err) { if (batchSize > minBatchSize) { @@ -587,7 +605,8 @@ export class GitHubFetcherService implements OnModuleInit { token: string, headSha: string, baseSha: string | null, - ): Promise { + generation: PrFilesGeneration, + ): Promise { const fields: string[] = []; for (let i = 0; i < batch.length; i++) { const file = batch[i]; @@ -640,6 +659,15 @@ export class GitHubFetcherService implements OnModuleInit { } const repoData = body.data?.repository ?? {}; + const contents: Array<{ + repoFullName: string; + prNumber: number; + filename: string; + baseContent: string | null; + headContent: string | null; + isBinary: boolean; + byteSize: number | null; + }> = []; for (let i = 0; i < batch.length; i++) { const file = batch[i]; @@ -653,19 +681,55 @@ export class GitHubFetcherService implements OnModuleInit { const baseContent = this.extractBlobText(baseBlob); const byteSize = headBlob?.byteSize ?? baseBlob?.byteSize ?? null; - await this.prFileContentRepo.upsert( - { - repoFullName, - prNumber, - filename: file.filename, - baseContent, - headContent, - isBinary, - byteSize, - }, - ["repoFullName", "prNumber", "filename"], - ); + contents.push({ + repoFullName, + prNumber, + filename: file.filename, + baseContent, + headContent, + isBinary, + byteSize, + }); } + + const contentsStored = await this.writeIfCurrentPrFilesGeneration( + repoFullName, + prNumber, + generation, + async (manager) => { + const prFileContentRepo = manager.getRepository(PrFileContent); + for (const content of contents) { + await prFileContentRepo.upsert(content, [ + "repoFullName", + "prNumber", + "filename", + ]); + } + }, + ); + + return contentsStored ? { status: "stored" } : { status: "stale" }; + } + + private async writeIfCurrentPrFilesGeneration( + repoFullName: string, + prNumber: number, + generation: PrFilesGeneration, + write: (manager: EntityManager) => Promise, + ): Promise { + return this.prRepo.manager.transaction(async (manager) => { + const pr = await manager.getRepository(PullRequest).findOne({ + where: { repoFullName, prNumber }, + lock: { mode: "pessimistic_write" }, + }); + + if (!pr || !this.matchesPrFilesGeneration(pr, generation)) { + return false; + } + + await write(manager); + return true; + }); } private matchesPrFilesGeneration( From 5e2676665c8e6485c93e10bdca0f4534dfe8c026 Mon Sep 17 00:00:00 2001 From: mkdev11 Date: Fri, 8 May 2026 23:01:43 +0200 Subject: [PATCH 3/5] Run tests before DAS build --- packages/das/package.json | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/das/package.json b/packages/das/package.json index 4ef9f03..c189ff1 100644 --- a/packages/das/package.json +++ b/packages/das/package.json @@ -6,6 +6,7 @@ "private": true, "license": "UNLICENSED", "scripts": { + "prebuild": "npm test", "build": "nest build", "format": "prettier --write \"src/**/*.ts\"", "format:check": "prettier --check \"src/**/*.ts\"", From e12b130506a1d3837091eef1ea355c1d44fc2d0b Mon Sep 17 00:00:00 2001 From: mkdev11 Date: Mon, 11 May 2026 00:21:32 +0200 Subject: [PATCH 4/5] Drop contributor test artifacts --- packages/das/package.json | 2 - .../das/src/queue/pr-files-generation.spec.ts | 633 ------------------ 2 files changed, 635 deletions(-) delete mode 100644 packages/das/src/queue/pr-files-generation.spec.ts diff --git a/packages/das/package.json b/packages/das/package.json index c189ff1..36691c0 100644 --- a/packages/das/package.json +++ b/packages/das/package.json @@ -6,7 +6,6 @@ "private": true, "license": "UNLICENSED", "scripts": { - "prebuild": "npm test", "build": "nest build", "format": "prettier --write \"src/**/*.ts\"", "format:check": "prettier --check \"src/**/*.ts\"", @@ -14,7 +13,6 @@ "dev": "nest start --watch", "start:debug": "nest start --debug --watch", "start:prod": "node dist/main", - "test": "node --test -r ts-node/register \"src/**/*.spec.ts\"", "lint": "eslint \"src/**/*.ts\"", "lint:fix": "eslint \"src/**/*.ts\" --fix" }, diff --git a/packages/das/src/queue/pr-files-generation.spec.ts b/packages/das/src/queue/pr-files-generation.spec.ts deleted file mode 100644 index 2272318..0000000 --- a/packages/das/src/queue/pr-files-generation.spec.ts +++ /dev/null @@ -1,633 +0,0 @@ -/* eslint-disable @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-unsafe-return */ -import test from "node:test"; -import assert from "node:assert/strict"; -import { FETCH_JOBS } from "./constants"; -import { FetchProcessor } from "./fetch.processor"; -import { PullRequestHandler } from "../webhook/handlers/pull-request.handler"; -import { GitHubFetcherService } from "../webhook/github-fetcher.service"; -import { PrFile, PrFileContent, PullRequest } from "../entities"; - -type MockRepo> = { - row: T | null; - deletes: unknown[]; - upserts: unknown[]; - updates: unknown[]; - repo: { - findOneBy: (criteria: Record) => Promise; - findOne: (options: { - where?: Record; - }) => Promise; - update: ( - criteria: Record | string, - patch: Record, - ) => Promise<{ affected: number }>; - upsert: ( - data: Record, - conflictPaths?: string[], - ) => Promise; - delete: (criteria: Record) => Promise; - manager?: { - transaction: (work: (manager: any) => Promise) => Promise; - }; - }; -}; - -function createMockRepo>( - initialRow: T | null = null, -): MockRepo { - const mock: MockRepo = { - row: initialRow, - deletes: [], - upserts: [], - updates: [], - repo: { - findOneBy: (criteria: Record): Promise => - Promise.resolve(matchesCriteria(mock.row, criteria) ? mock.row : null), - findOne: (options: { - where?: Record; - }): Promise => - Promise.resolve( - matchesCriteria(mock.row, options.where ?? {}) ? mock.row : null, - ), - update: ( - criteria: Record | string, - patch: Record, - ): Promise<{ affected: number }> => { - mock.updates.push({ criteria, patch }); - if ( - mock.row && - (typeof criteria === "string" || matchesCriteria(mock.row, criteria)) - ) { - Object.assign(mock.row, patch); - return Promise.resolve({ affected: 1 }); - } - return Promise.resolve({ affected: 0 }); - }, - upsert: (data: Record): Promise => { - mock.upserts.push(data); - mock.row = { ...mock.row, ...data } as T; - return Promise.resolve(); - }, - delete: (criteria: Record): Promise => { - mock.deletes.push(criteria); - return Promise.resolve(); - }, - }, - }; - - return mock; -} - -function createTransactionManager( - repos: Map>, - beforeTransaction?: () => void, -): { - transaction: (work: (manager: any) => Promise) => Promise; -} { - return { - transaction: async (work: (manager: any) => Promise): Promise => { - beforeTransaction?.(); - return work({ - getRepository: (entity: unknown) => { - const repo = repos.get(entity); - if (!repo) { - throw new Error(`No mock repository for ${String(entity)}`); - } - return repo.repo; - }, - }); - }, - }; -} - -function attachPrFilesTransactionManager( - prRepo: MockRepo, - prFileRepo: MockRepo, - prFileContentRepo: MockRepo, - beforeTransaction?: () => void, -): void { - prRepo.repo.manager = createTransactionManager( - new Map>([ - [PullRequest, prRepo], - [PrFile, prFileRepo], - [PrFileContent, prFileContentRepo], - ]), - beforeTransaction, - ); -} - -function matchesCriteria>( - row: T | null, - criteria: Record, -): boolean { - if (!row) return false; - for (const [key, value] of Object.entries(criteria)) { - if (isNullFindOperator(value)) { - if (row[key] !== null && row[key] !== undefined) return false; - } else if (row[key] !== value) { - return false; - } - } - return true; -} - -function isNullFindOperator(value: unknown): boolean { - return ( - typeof value === "object" && - value !== null && - "_type" in value && - (value as { _type?: unknown })._type === "isNull" - ); -} - -function createQueue(): { - added: Array<{ name: string; data: any; opts: any }>; - queue: { add: (name: string, data: any, opts: any) => Promise }; -} { - const added: Array<{ name: string; data: any; opts: any }> = []; - return { - added, - queue: { - add: (name: string, data: any, opts: any): Promise => { - added.push({ name, data, opts }); - return Promise.resolve(); - }, - }, - }; -} - -function synchronizePayload(): Record { - return { - action: "synchronize", - repository: { full_name: "owner/repo" }, - pull_request: { - number: 7, - title: "PR", - state: "open", - merged: false, - created_at: "2026-05-01T00:00:00Z", - closed_at: null, - merged_at: null, - user: { id: 123, login: "miner" }, - author_association: "CONTRIBUTOR", - merged_by: null, - base: { ref: "test", sha: "B2" }, - head: { - ref: "feature", - sha: "H2", - repo: { full_name: "owner/repo" }, - }, - additions: 1, - deletions: 0, - commits: 1, - labels: [], - }, - }; -} - -function createGitHubFetcher( - prFileRepo: MockRepo, - prFileContentRepo: MockRepo, - prRepo: MockRepo, -): GitHubFetcherService { - const otherRepo = createMockRepo(); - const config = { - getOrThrow: (key: string): string => - key === "GITHUB_APP_ID" ? "123" : "/tmp/private-key.pem", - }; - return new GitHubFetcherService( - config as any, - prFileRepo.repo as any, - prFileContentRepo.repo as any, - prRepo.repo as any, - otherRepo.repo as any, - otherRepo.repo as any, - otherRepo.repo as any, - otherRepo.repo as any, - ); -} - -void test("synchronize enqueues a PR file job for the webhook head/base generation", async () => { - const prRepo = createMockRepo(); - const repoRepo = createMockRepo({ repoFullName: "owner/repo" }); - const queue = createQueue(); - const handler = new PullRequestHandler( - prRepo.repo as any, - repoRepo.repo as any, - queue.queue as any, - ); - - await handler.handle(synchronizePayload()); - - const fileJob = queue.added.find((job) => job.name === FETCH_JOBS.PR_FILES); - assert.ok(fileJob); - assert.equal(fileJob.data.repoFullName, "owner/repo"); - assert.equal(fileJob.data.prNumber, 7); - assert.equal(fileJob.data.expectedHeadSha, "H2"); - assert.equal(fileJob.data.expectedBaseSha, "B2"); - assert.equal(fileJob.opts.jobId, "files-owner/repo-7-H2-B2"); -}); - -void test("stale PR file jobs cannot mark a newer PR generation complete", async () => { - const prRepo = createMockRepo({ - repoFullName: "owner/repo", - prNumber: 7, - headSha: "H1", - baseSha: "B1", - scoringDataStored: false, - }); - const issueRepo = createMockRepo(); - const queue = createQueue(); - const fetcher = { - fetchAndStorePrFiles: ( - _repoFullName: string, - _prNumber: number, - generation: { headSha: string | null; baseSha: string | null }, - ): Promise<{ status: "stored" }> => { - assert.deepEqual(generation, { headSha: "H1", baseSha: "B1" }); - Object.assign(prRepo.row ?? {}, { - headSha: "H2", - baseSha: "B2", - scoringDataStored: false, - }); - return Promise.resolve({ status: "stored" }); - }, - }; - const processor = new FetchProcessor( - fetcher as any, - prRepo.repo as any, - issueRepo.repo as any, - queue.queue as any, - ); - - await processor.process({ - name: FETCH_JOBS.PR_FILES, - data: { - repoFullName: "owner/repo", - prNumber: 7, - expectedHeadSha: "H1", - expectedBaseSha: "B1", - }, - } as any); - - assert.equal(prRepo.row?.headSha, "H2"); - assert.equal(prRepo.row?.baseSha, "B2"); - assert.equal(prRepo.row?.scoringDataStored, false); - assert.equal(queue.added.length, 1); - assert.equal(queue.added[0].opts.jobId, "files-owner/repo-7-H2-B2"); - assert.equal(queue.added[0].data.expectedHeadSha, "H2"); - assert.equal(queue.added[0].data.expectedBaseSha, "B2"); -}); - -void test("stale PR file jobs invalidate and requeue a completed newer generation", async () => { - const prRepo = createMockRepo({ - repoFullName: "owner/repo", - prNumber: 7, - headSha: "H2", - baseSha: "B2", - scoringDataStored: true, - }); - const issueRepo = createMockRepo(); - const queue = createQueue(); - const fetcher = { - fetchAndStorePrFiles: (): Promise<{ status: "stale" }> => - Promise.resolve({ status: "stale" }), - }; - const processor = new FetchProcessor( - fetcher as any, - prRepo.repo as any, - issueRepo.repo as any, - queue.queue as any, - ); - - await processor.process({ - name: FETCH_JOBS.PR_FILES, - data: { - repoFullName: "owner/repo", - prNumber: 7, - expectedHeadSha: "H1", - expectedBaseSha: "B1", - }, - } as any); - - assert.equal(prRepo.row?.scoringDataStored, false); - assert.equal(queue.added.length, 1); - assert.equal(queue.added[0].opts.jobId, "files-owner/repo-7-H2-B2"); - assert.equal(queue.added[0].data.expectedHeadSha, "H2"); - assert.equal(queue.added[0].data.expectedBaseSha, "B2"); -}); - -void test("current PR file jobs mark the matching generation complete", async () => { - const prRepo = createMockRepo({ - repoFullName: "owner/repo", - prNumber: 7, - headSha: "H2", - baseSha: "B2", - scoringDataStored: false, - }); - const issueRepo = createMockRepo(); - const queue = createQueue(); - const fetcher = { - fetchAndStorePrFiles: ( - _repoFullName: string, - _prNumber: number, - generation: { headSha: string | null; baseSha: string | null }, - ): Promise<{ status: "stored" }> => { - assert.deepEqual(generation, { headSha: "H2", baseSha: "B2" }); - return Promise.resolve({ status: "stored" }); - }, - }; - const processor = new FetchProcessor( - fetcher as any, - prRepo.repo as any, - issueRepo.repo as any, - queue.queue as any, - ); - - await processor.process({ - name: FETCH_JOBS.PR_FILES, - data: { - repoFullName: "owner/repo", - prNumber: 7, - expectedHeadSha: "H2", - expectedBaseSha: "B2", - }, - } as any); - - assert.equal(prRepo.row?.scoringDataStored, true); - assert.equal(queue.added.length, 0); -}); - -void test("legacy PR file jobs resolve the current generation from the PR row", async () => { - const prRepo = createMockRepo({ - repoFullName: "owner/repo", - prNumber: 7, - headSha: "H2", - baseSha: "B2", - scoringDataStored: false, - }); - const issueRepo = createMockRepo(); - const queue = createQueue(); - const fetcher = { - fetchAndStorePrFiles: ( - _repoFullName: string, - _prNumber: number, - generation: { headSha: string | null; baseSha: string | null }, - ): Promise<{ status: "stored" }> => { - assert.deepEqual(generation, { headSha: "H2", baseSha: "B2" }); - return Promise.resolve({ status: "stored" }); - }, - }; - const processor = new FetchProcessor( - fetcher as any, - prRepo.repo as any, - issueRepo.repo as any, - queue.queue as any, - ); - - await processor.process({ - name: FETCH_JOBS.PR_FILES, - data: { - repoFullName: "owner/repo", - prNumber: 7, - }, - } as any); - - assert.equal(prRepo.row?.scoringDataStored, true); - assert.equal(queue.added.length, 0); -}); - -void test("fetcher skips destructive file writes when the job generation is already stale", async () => { - const prFileRepo = createMockRepo(); - const prFileContentRepo = createMockRepo(); - const prRepo = createMockRepo({ - repoFullName: "owner/repo", - prNumber: 7, - headSha: "H2", - baseSha: "B2", - mergeBaseSha: null, - }); - const fetcher = createGitHubFetcher(prFileRepo, prFileContentRepo, prRepo); - - (fetcher as any).getTokenForRepo = (): Promise => - Promise.resolve("token"); - (fetcher as any).fetchMergeBaseSha = (): Promise => - Promise.resolve("M2"); - (fetcher as any).fetchAllPrFiles = (): Promise => - Promise.resolve([]); - - const result = await fetcher.fetchAndStorePrFiles("owner/repo", 7, { - headSha: "H1", - baseSha: "B1", - }); - - assert.deepEqual(result, { status: "stale" }); - assert.equal(prFileRepo.deletes.length, 0); - assert.equal(prFileContentRepo.deletes.length, 0); - assert.equal(prFileRepo.upserts.length, 0); - assert.equal(prFileContentRepo.upserts.length, 0); -}); - -void test("fetcher replaces PR file rows when the generation is current under the row lock", async () => { - const prFileRepo = createMockRepo(); - const prFileContentRepo = createMockRepo(); - const prRepo = createMockRepo({ - repoFullName: "owner/repo", - prNumber: 7, - headSha: null, - baseSha: null, - mergeBaseSha: null, - }); - attachPrFilesTransactionManager(prRepo, prFileRepo, prFileContentRepo); - const fetcher = createGitHubFetcher(prFileRepo, prFileContentRepo, prRepo); - - (fetcher as any).getTokenForRepo = (): Promise => - Promise.resolve("token"); - (fetcher as any).fetchAllPrFiles = (): Promise => - Promise.resolve([ - { - filename: "src/app.ts", - previous_filename: undefined, - status: "modified", - additions: 3, - deletions: 1, - changes: 4, - }, - ]); - - const result = await fetcher.fetchAndStorePrFiles("owner/repo", 7, { - headSha: null, - baseSha: null, - }); - - assert.deepEqual(result, { status: "stored" }); - assert.deepEqual(prFileRepo.deletes, [ - { repoFullName: "owner/repo", prNumber: 7 }, - ]); - assert.deepEqual(prFileContentRepo.deletes, [ - { repoFullName: "owner/repo", prNumber: 7 }, - ]); - assert.deepEqual(prFileRepo.upserts, [ - { - repoFullName: "owner/repo", - prNumber: 7, - filename: "src/app.ts", - previousFilename: null, - status: "modified", - additions: 3, - deletions: 1, - changes: 4, - }, - ]); - assert.equal(prFileContentRepo.upserts.length, 0); -}); - -void test("fetcher rechecks generation under the row lock before deleting PR file rows", async () => { - const prFileRepo = createMockRepo(); - const prFileContentRepo = createMockRepo(); - const prRepo = createMockRepo({ - repoFullName: "owner/repo", - prNumber: 7, - headSha: "H1", - baseSha: "B1", - mergeBaseSha: null, - scoringDataStored: false, - }); - let raced = false; - const deletePrFiles = prFileRepo.repo.delete; - prFileRepo.repo.delete = async ( - criteria: Record, - ): Promise => { - if (!raced) { - Object.assign(prRepo.row ?? {}, { - headSha: "H2", - baseSha: "B2", - scoringDataStored: true, - }); - raced = true; - } - await deletePrFiles(criteria); - }; - attachPrFilesTransactionManager(prRepo, prFileRepo, prFileContentRepo, () => { - if (!raced) { - Object.assign(prRepo.row ?? {}, { - headSha: "H2", - baseSha: "B2", - scoringDataStored: true, - }); - raced = true; - } - }); - const fetcher = createGitHubFetcher(prFileRepo, prFileContentRepo, prRepo); - - (fetcher as any).getTokenForRepo = (): Promise => - Promise.resolve("token"); - (fetcher as any).fetchMergeBaseSha = (): Promise => - Promise.resolve(null); - (fetcher as any).fetchAllPrFiles = (): Promise => - Promise.resolve([ - { - filename: "src/app.ts", - status: "removed", - additions: 0, - deletions: 4, - changes: 4, - }, - ]); - - const result = await fetcher.fetchAndStorePrFiles("owner/repo", 7, { - headSha: "H1", - baseSha: "B1", - }); - - assert.deepEqual(result, { status: "stale" }); - assert.equal(prRepo.row?.headSha, "H2"); - assert.equal(prRepo.row?.baseSha, "B2"); - assert.equal(prRepo.row?.scoringDataStored, true); - assert.equal(prFileRepo.deletes.length, 0); - assert.equal(prFileContentRepo.deletes.length, 0); - assert.equal(prFileRepo.upserts.length, 0); - assert.equal(prFileContentRepo.upserts.length, 0); -}); - -void test("fetcher rechecks generation under the row lock before storing PR file contents", async () => { - const prFileRepo = createMockRepo(); - const prFileContentRepo = createMockRepo(); - const prRepo = createMockRepo({ - repoFullName: "owner/repo", - prNumber: 7, - headSha: "H1", - baseSha: "B1", - mergeBaseSha: null, - scoringDataStored: false, - }); - let transactionCount = 0; - const upsertContent = prFileContentRepo.repo.upsert; - prFileContentRepo.repo.upsert = async ( - data: Record, - conflictPaths?: string[], - ): Promise => { - Object.assign(prRepo.row ?? {}, { - headSha: "H2", - baseSha: "B2", - scoringDataStored: true, - }); - await upsertContent(data, conflictPaths); - }; - attachPrFilesTransactionManager(prRepo, prFileRepo, prFileContentRepo, () => { - transactionCount++; - if (transactionCount === 2) { - Object.assign(prRepo.row ?? {}, { - headSha: "H2", - baseSha: "B2", - scoringDataStored: true, - }); - } - }); - const fetcher = createGitHubFetcher(prFileRepo, prFileContentRepo, prRepo); - - (fetcher as any).getTokenForRepo = (): Promise => - Promise.resolve("token"); - (fetcher as any).fetchMergeBaseSha = (): Promise => - Promise.resolve(null); - (fetcher as any).fetchAllPrFiles = (): Promise => - Promise.resolve([ - { - filename: "src/app.ts", - status: "modified", - additions: 3, - deletions: 1, - changes: 4, - }, - ]); - (fetcher as any).githubFetch = (): Promise<{ - ok: true; - json: () => Promise>; - }> => - Promise.resolve({ - ok: true, - json: () => - Promise.resolve({ - data: { - repository: { - base0: { text: "old", byteSize: 3, isBinary: false }, - head0: { text: "new", byteSize: 3, isBinary: false }, - }, - }, - }), - }); - - const result = await fetcher.fetchAndStorePrFiles("owner/repo", 7, { - headSha: "H1", - baseSha: "B1", - }); - - assert.deepEqual(result, { status: "stale" }); - assert.equal(prRepo.row?.headSha, "H2"); - assert.equal(prRepo.row?.baseSha, "B2"); - assert.equal(prRepo.row?.scoringDataStored, true); - assert.equal(prFileRepo.deletes.length, 1); - assert.equal(prFileContentRepo.deletes.length, 1); - assert.equal(prFileRepo.upserts.length, 1); - assert.equal(prFileContentRepo.upserts.length, 0); -}); From bd32a185ab7d4d601aa9fcda91cd1951f1bc8a96 Mon Sep 17 00:00:00 2001 From: mkdev11 Date: Mon, 11 May 2026 01:26:31 +0200 Subject: [PATCH 5/5] Slim stale PR file completion fix --- packages/das/src/queue/fetch.processor.ts | 90 +++---- .../das/src/webhook/github-fetcher.service.ts | 236 +++--------------- 2 files changed, 69 insertions(+), 257 deletions(-) diff --git a/packages/das/src/queue/fetch.processor.ts b/packages/das/src/queue/fetch.processor.ts index 3ba8cc6..27860d2 100644 --- a/packages/das/src/queue/fetch.processor.ts +++ b/packages/das/src/queue/fetch.processor.ts @@ -4,10 +4,7 @@ import { InjectRepository } from "@nestjs/typeorm"; import { IsNull, Repository } from "typeorm"; import { Job, Queue } from "bullmq"; import { Issue, PullRequest } from "../entities"; -import { - GitHubFetcherService, - PrFilesGeneration, -} from "../webhook/github-fetcher.service"; +import { GitHubFetcherService } from "../webhook/github-fetcher.service"; import { FETCH_QUEUE, FETCH_JOBS, @@ -32,6 +29,11 @@ export interface BackfillRepoJobData { days?: number; } +interface PrFilesGeneration { + headSha: string | null; + baseSha: string | null; +} + type JobData = PrMetadataJobData | PrFilesJobData | BackfillRepoJobData; @Processor(FETCH_QUEUE, { concurrency: 5 }) @@ -105,19 +107,12 @@ export class FetchProcessor extends WorkerHost { const { repoFullName, prNumber } = data; this.logger.log(`Fetching PR files for ${repoFullName}#${prNumber}`); - const generation = await this.resolvePrFilesGeneration(data); - if (!generation) return; - - const result = await this.fetcher.fetchAndStorePrFiles( - repoFullName, - prNumber, - generation, - ); + const generation = { + headSha: data.expectedHeadSha ?? null, + baseSha: data.expectedBaseSha ?? null, + }; - if (result.status === "stale") { - await this.handleStalePrFilesJob(repoFullName, prNumber); - return; - } + await this.fetcher.fetchAndStorePrFiles(repoFullName, prNumber); const updateResult = await this.prRepo.update( this.prGenerationCriteria(repoFullName, prNumber, generation), @@ -158,23 +153,11 @@ export class FetchProcessor extends WorkerHost { }, ); - const expectedHeadSha = headSha ?? null; - const expectedBaseSha = baseSha ?? null; - await this.fetchQueue.add( - FETCH_JOBS.PR_FILES, - { repoFullName, prNumber, expectedHeadSha, expectedBaseSha }, - { - jobId: prFilesJobId( - repoFullName, - prNumber, - expectedHeadSha, - expectedBaseSha, - ), - removeOnComplete: true, - removeOnFail: 50, - attempts: 3, - backoff: { type: "exponential", delay: 5000 }, - }, + await this.enqueuePrFilesJob( + repoFullName, + prNumber, + headSha ?? null, + baseSha ?? null, ); } @@ -183,31 +166,6 @@ export class FetchProcessor extends WorkerHost { this.logger.log(`Backfilled issues from ${repoFullName}`); } - private async resolvePrFilesGeneration( - data: PrFilesJobData, - ): Promise { - if ( - data.expectedHeadSha !== undefined || - data.expectedBaseSha !== undefined - ) { - return { - headSha: data.expectedHeadSha ?? null, - baseSha: data.expectedBaseSha ?? null, - }; - } - - const pr = await this.prRepo.findOneBy({ - repoFullName: data.repoFullName, - prNumber: data.prNumber, - }); - if (!pr) return null; - - return { - headSha: pr.headSha ?? null, - baseSha: pr.baseSha ?? null, - }; - } - private async handleStalePrFilesJob( repoFullName: string, prNumber: number, @@ -220,8 +178,20 @@ export class FetchProcessor extends WorkerHost { const pr = await this.prRepo.findOneBy({ repoFullName, prNumber }); if (!pr) return; - const expectedHeadSha = pr.headSha ?? null; - const expectedBaseSha = pr.baseSha ?? null; + await this.enqueuePrFilesJob( + repoFullName, + prNumber, + pr.headSha ?? null, + pr.baseSha ?? null, + ); + } + + private async enqueuePrFilesJob( + repoFullName: string, + prNumber: number, + expectedHeadSha: string | null, + expectedBaseSha: string | null, + ): Promise { await this.fetchQueue.add( FETCH_JOBS.PR_FILES, { repoFullName, prNumber, expectedHeadSha, expectedBaseSha }, diff --git a/packages/das/src/webhook/github-fetcher.service.ts b/packages/das/src/webhook/github-fetcher.service.ts index f4d2953..60ee916 100644 --- a/packages/das/src/webhook/github-fetcher.service.ts +++ b/packages/das/src/webhook/github-fetcher.service.ts @@ -2,7 +2,7 @@ import { Injectable, Logger, OnModuleInit } from "@nestjs/common"; import { ConfigService } from "@nestjs/config"; import { InjectRepository } from "@nestjs/typeorm"; -import { EntityManager, IsNull, Repository } from "typeorm"; +import { Repository } from "typeorm"; import { readFileSync } from "fs"; import { sign } from "jsonwebtoken"; import { @@ -26,15 +26,6 @@ const MAX_FILE_SIZE_BYTES = 1_000_000; // Starting batch size for batched GraphQL file-content requests. Halves on failure. const GRAPHQL_FILES_BATCH_SIZE = 50; -export interface PrFilesGeneration { - headSha: string | null; - baseSha: string | null; -} - -export interface PrFilesFetchResult { - status: "stored" | "stale"; -} - const sleep = (ms: number): Promise => new Promise((resolve) => setTimeout(resolve, ms)); @@ -337,8 +328,7 @@ export class GitHubFetcherService implements OnModuleInit { async fetchAndStorePrFiles( repoFullName: string, prNumber: number, - expectedGeneration?: PrFilesGeneration, - ): Promise { + ): Promise { const [owner, repo] = repoFullName.split("/"); const token = await this.getTokenForRepo(repoFullName); @@ -347,14 +337,6 @@ export class GitHubFetcherService implements OnModuleInit { throw new Error(`PR ${repoFullName}#${prNumber} not found in DB`); } - const generation = expectedGeneration ?? { - headSha: pr.headSha ?? null, - baseSha: pr.baseSha ?? null, - }; - if (!this.matchesPrFilesGeneration(pr, generation)) { - return { status: "stale" }; - } - // Fetch and store the merge-base SHA. Needed for correct tree-diff // scoring — differs from baseSha when base branch has advanced. Recompute // on every fetch: a stored value can go stale when head advances via @@ -367,13 +349,7 @@ export class GitHubFetcherService implements OnModuleInit { pr.headSha, ); if (mergeBaseSha) { - const result = await this.prRepo.update( - this.prFilesGenerationCriteria(repoFullName, prNumber, generation), - { mergeBaseSha }, - ); - if (!result.affected) { - return { status: "stale" }; - } + await this.prRepo.update({ repoFullName, prNumber }, { mergeBaseSha }); pr.mergeBaseSha = mergeBaseSha; } } @@ -381,51 +357,25 @@ export class GitHubFetcherService implements OnModuleInit { // 1. Fetch file list via REST const files = await this.fetchAllPrFiles(owner, repo, prNumber, token); - if ( - !(await this.isCurrentPrFilesGeneration( - repoFullName, - prNumber, - generation, - )) - ) { - return { status: "stale" }; - } - - // Clear stale data and replace file metadata only while the PR row still - // matches this job's generation. Without the row lock, an older active job - // can delete rows written by a newer completed generation. - const filesStored = await this.writeIfCurrentPrFilesGeneration( - repoFullName, - prNumber, - generation, - async (manager) => { - const prFileRepo = manager.getRepository(PrFile); - const prFileContentRepo = manager.getRepository(PrFileContent); + // Clear any stale data for this PR (e.g. after a synchronize event) + await this.prFileRepo.delete({ repoFullName, prNumber }); + await this.prFileContentRepo.delete({ repoFullName, prNumber }); - await prFileRepo.delete({ repoFullName, prNumber }); - await prFileContentRepo.delete({ repoFullName, prNumber }); - - // 2. Upsert file metadata - for (const file of files) { - await prFileRepo.upsert( - { - repoFullName, - prNumber, - filename: file.filename, - previousFilename: file.previous_filename ?? null, - status: file.status, - additions: file.additions ?? 0, - deletions: file.deletions ?? 0, - changes: file.changes ?? 0, - }, - ["repoFullName", "prNumber", "filename"], - ); - } - }, - ); - - if (!filesStored) { - return { status: "stale" }; + // 2. Upsert file metadata + for (const file of files) { + await this.prFileRepo.upsert( + { + repoFullName, + prNumber, + filename: file.filename, + previousFilename: file.previous_filename ?? null, + status: file.status, + additions: file.additions ?? 0, + deletions: file.deletions ?? 0, + changes: file.changes ?? 0, + }, + ["repoFullName", "prNumber", "filename"], + ); } // 3. Fetch file contents in batches (base + head in one GraphQL call each) @@ -433,7 +383,7 @@ export class GitHubFetcherService implements OnModuleInit { this.logger.warn( `PR ${repoFullName}#${prNumber} has no head SHA — skipping content fetch`, ); - return { status: "stored" }; + return; } // Prefer merge-base SHA (true common ancestor) over base SHA for @@ -441,7 +391,7 @@ export class GitHubFetcherService implements OnModuleInit { // merge-base couldn't be resolved. const baseForContents = pr.mergeBaseSha ?? pr.baseSha; - const contentsResult = await this.fetchAndStoreBatchedContents( + await this.fetchAndStoreBatchedContents( repoFullName, prNumber, files, @@ -450,23 +400,7 @@ export class GitHubFetcherService implements OnModuleInit { token, pr.headSha, baseForContents, - generation, ); - if (contentsResult.status === "stale") { - return contentsResult; - } - - if ( - !(await this.isCurrentPrFilesGeneration( - repoFullName, - prNumber, - generation, - )) - ) { - return { status: "stale" }; - } - - return { status: "stored" }; } private async fetchAllPrFiles( @@ -542,28 +476,18 @@ export class GitHubFetcherService implements OnModuleInit { token: string, headSha: string, baseSha: string | null, - generation: PrFilesGeneration, - ): Promise { + ): Promise { // Only fetch contents for files that have a meaningful version to fetch const scored = files.filter((f) => f.status !== "removed"); - if (scored.length === 0) return { status: "stored" }; + if (scored.length === 0) return; let batchSize = GRAPHQL_FILES_BATCH_SIZE; const minBatchSize = 5; for (let i = 0; i < scored.length; ) { const batch = scored.slice(i, i + batchSize); - if ( - !(await this.isCurrentPrFilesGeneration( - repoFullName, - prNumber, - generation, - )) - ) { - return { status: "stale" }; - } try { - const result = await this.fetchContentBatch( + await this.fetchContentBatch( repoFullName, prNumber, batch, @@ -572,9 +496,7 @@ export class GitHubFetcherService implements OnModuleInit { token, headSha, baseSha, - generation, ); - if (result.status === "stale") return result; i += batch.length; } catch (err) { if (batchSize > minBatchSize) { @@ -592,8 +514,6 @@ export class GitHubFetcherService implements OnModuleInit { } } } - - return { status: "stored" }; } private async fetchContentBatch( @@ -605,8 +525,7 @@ export class GitHubFetcherService implements OnModuleInit { token: string, headSha: string, baseSha: string | null, - generation: PrFilesGeneration, - ): Promise { + ): Promise { const fields: string[] = []; for (let i = 0; i < batch.length; i++) { const file = batch[i]; @@ -659,15 +578,6 @@ export class GitHubFetcherService implements OnModuleInit { } const repoData = body.data?.repository ?? {}; - const contents: Array<{ - repoFullName: string; - prNumber: number; - filename: string; - baseContent: string | null; - headContent: string | null; - isBinary: boolean; - byteSize: number | null; - }> = []; for (let i = 0; i < batch.length; i++) { const file = batch[i]; @@ -681,87 +591,19 @@ export class GitHubFetcherService implements OnModuleInit { const baseContent = this.extractBlobText(baseBlob); const byteSize = headBlob?.byteSize ?? baseBlob?.byteSize ?? null; - contents.push({ - repoFullName, - prNumber, - filename: file.filename, - baseContent, - headContent, - isBinary, - byteSize, - }); + await this.prFileContentRepo.upsert( + { + repoFullName, + prNumber, + filename: file.filename, + baseContent, + headContent, + isBinary, + byteSize, + }, + ["repoFullName", "prNumber", "filename"], + ); } - - const contentsStored = await this.writeIfCurrentPrFilesGeneration( - repoFullName, - prNumber, - generation, - async (manager) => { - const prFileContentRepo = manager.getRepository(PrFileContent); - for (const content of contents) { - await prFileContentRepo.upsert(content, [ - "repoFullName", - "prNumber", - "filename", - ]); - } - }, - ); - - return contentsStored ? { status: "stored" } : { status: "stale" }; - } - - private async writeIfCurrentPrFilesGeneration( - repoFullName: string, - prNumber: number, - generation: PrFilesGeneration, - write: (manager: EntityManager) => Promise, - ): Promise { - return this.prRepo.manager.transaction(async (manager) => { - const pr = await manager.getRepository(PullRequest).findOne({ - where: { repoFullName, prNumber }, - lock: { mode: "pessimistic_write" }, - }); - - if (!pr || !this.matchesPrFilesGeneration(pr, generation)) { - return false; - } - - await write(manager); - return true; - }); - } - - private matchesPrFilesGeneration( - pr: PullRequest, - generation: PrFilesGeneration, - ): boolean { - return ( - (pr.headSha ?? null) === generation.headSha && - (pr.baseSha ?? null) === generation.baseSha - ); - } - - private async isCurrentPrFilesGeneration( - repoFullName: string, - prNumber: number, - generation: PrFilesGeneration, - ): Promise { - const pr = await this.prRepo.findOneBy({ repoFullName, prNumber }); - return !!pr && this.matchesPrFilesGeneration(pr, generation); - } - - private prFilesGenerationCriteria( - repoFullName: string, - prNumber: number, - generation: PrFilesGeneration, - ): Record { - return { - repoFullName, - prNumber, - headSha: generation.headSha ?? IsNull(), - baseSha: generation.baseSha ?? IsNull(), - }; } private extractBlobText(blob: any): string | null {