diff --git a/infrastructure/terraform/components/api/locals.tf b/infrastructure/terraform/components/api/locals.tf index adc6947a6..ea75ab9fd 100644 --- a/infrastructure/terraform/components/api/locals.tf +++ b/infrastructure/terraform/components/api/locals.tf @@ -20,16 +20,19 @@ locals { destination_arn = "arn:aws:logs:${var.region}:${var.shared_infra_account_id}:destination:nhs-main-obs-firehose-logs" common_lambda_env_vars = { - APIM_CORRELATION_HEADER = "nhsd-correlation-id", - DOWNLOAD_URL_TTL_SECONDS = 60 - EVENT_SOURCE = "/data-plane/supplier-api/${var.group}/${var.environment}/letters" - LETTER_TTL_HOURS = 12960, # 18 months * 30 days * 24 hours - LETTERS_TABLE_NAME = aws_dynamodb_table.letters.name, - MI_TABLE_NAME = aws_dynamodb_table.mi.name, - MI_TTL_HOURS = 2160 # 90 days * 24 hours - SNS_TOPIC_ARN = "${module.eventsub.sns_topic.arn}", - SUPPLIER_CONFIG_TABLE_NAME = aws_dynamodb_table.supplier-configuration.name - SUPPLIER_ID_HEADER = "nhsd-supplier-id", + APIM_CORRELATION_HEADER = "nhsd-correlation-id", + DOWNLOAD_URL_TTL_SECONDS = 60 + EVENT_SOURCE = "/data-plane/supplier-api/${var.group}/${var.environment}/letters" + LETTER_TTL_HOURS = 12960, # 18 months * 30 days * 24 hours + LETTER_QUEUE_TABLE_NAME = aws_dynamodb_table.letter_queue.name, + LETTER_QUEUE_TTL_HOURS = 168 # 7 days * 24 hours + LETTER_QUEUE_VISIBILITY_TIMEOUT = 300, # 5 minutes * 60 seconds + LETTERS_TABLE_NAME = aws_dynamodb_table.letters.name, + MI_TABLE_NAME = aws_dynamodb_table.mi.name, + MI_TTL_HOURS = 2160 # 90 days * 24 hours + SNS_TOPIC_ARN = "${module.eventsub.sns_topic.arn}", + SUPPLIER_CONFIG_TABLE_NAME = aws_dynamodb_table.supplier-configuration.name + SUPPLIER_ID_HEADER = "nhsd-supplier-id", } core_pdf_bucket_arn = "arn:aws:s3:::comms-${var.core_account_id}-eu-west-2-${var.core_environment}-api-stg-pdf-pipeline" diff --git a/infrastructure/terraform/components/api/module_lambda_get_letters.tf b/infrastructure/terraform/components/api/module_lambda_get_letters.tf index 6bb3ca216..f0bbd2a0d 100644 --- a/infrastructure/terraform/components/api/module_lambda_get_letters.tf +++ b/infrastructure/terraform/components/api/module_lambda_get_letters.tf @@ -63,11 +63,12 @@ data "aws_iam_policy_document" "get_letters_lambda" { "dynamodb:GetItem", "dynamodb:Query", "dynamodb:Scan", + "dynamodb:UpdateItem", ] resources = [ - aws_dynamodb_table.letters.arn, - "${aws_dynamodb_table.letters.arn}/index/supplierStatus-index" + aws_dynamodb_table.letter_queue.arn, + "${aws_dynamodb_table.letter_queue.arn}/index/queueSortOrder-index" ] } } diff --git a/internal/datastore/src/__test__/letter-queue-repository.test.ts b/internal/datastore/src/__test__/letter-queue-repository.test.ts index 629d8aefd..9038987ed 100644 --- a/internal/datastore/src/__test__/letter-queue-repository.test.ts +++ b/internal/datastore/src/__test__/letter-queue-repository.test.ts @@ -51,6 +51,7 @@ describe("LetterQueueRepository", () => { afterEach(async () => { await deleteTables(db); jest.useRealTimers(); + jest.restoreAllMocks(); }); afterAll(async () => { @@ -149,6 +150,196 @@ describe("LetterQueueRepository", () => { ).rejects.toThrow("Cannot do operations on a non-existent table"); }); }); + + describe("getLetters", () => { + it("filters by supplierId", async () => { + await letterQueueRepository.putLetter(createLetter()); + + const letters = await letterQueueRepository.getLetters("supplier2", 1); + + expect(letters).toHaveLength(0); + }); + + it("filters by visibilityTimestamp", async () => { + const pendingLetter = createLetter(); + await letterQueueRepository.putLetter(createLetter()); + await letterQueueRepository.updateVisibilityTimestamp( + pendingLetter, + new Date(Date.now() + 600_000), + ); + + const letters = await letterQueueRepository.getLetters("supplier1", 1); + + expect(letters).toHaveLength(0); + }); + + it("returns letters in timestamp order", async () => { + jest.useFakeTimers().setSystemTime(new Date()); + await letterQueueRepository.putLetter( + createLetter({ letterId: "first-letter" }), + ); + jest.advanceTimersByTime(1); + await letterQueueRepository.putLetter( + createLetter({ letterId: "second-letter" }), + ); + jest.advanceTimersByTime(1); + await letterQueueRepository.putLetter( + createLetter({ letterId: "third-letter" }), + ); + jest.advanceTimersByTime(1); + await letterQueueRepository.putLetter( + createLetter({ letterId: "fourth-letter" }), + ); + jest.advanceTimersByTime(1); + await letterQueueRepository.putLetter( + createLetter({ letterId: "fifth-letter" }), + ); + jest.advanceTimersByTime(1); + + const letters = await letterQueueRepository.getLetters("supplier1", 5); + + expect(letters[0].letterId).toBe("first-letter"); + expect(letters[1].letterId).toBe("second-letter"); + expect(letters[2].letterId).toBe("third-letter"); + expect(letters[3].letterId).toBe("fourth-letter"); + expect(letters[4].letterId).toBe("fifth-letter"); + }); + + it("limits results to the supplied number", async () => { + await letterQueueRepository.putLetter( + createLetter({ letterId: "first-letter" }), + ); + await letterQueueRepository.putLetter( + createLetter({ letterId: "second-letter" }), + ); + await letterQueueRepository.putLetter( + createLetter({ letterId: "third-letter" }), + ); + await letterQueueRepository.putLetter( + createLetter({ letterId: "fourth-letter" }), + ); + + const letters = await letterQueueRepository.getLetters("supplier1", 3); + + expect(letters).toHaveLength(3); + expect(letters[2].letterId).toBe("third-letter"); + }); + + it("applies the limit after filtering on supplier", async () => { + await letterQueueRepository.putLetter( + createLetter({ letterId: "first-letter" }), + ); + await letterQueueRepository.putLetter( + createLetter({ letterId: "second-letter", supplierId: "supplier2" }), + ); + await letterQueueRepository.putLetter( + createLetter({ letterId: "third-letter" }), + ); + await letterQueueRepository.putLetter( + createLetter({ letterId: "fourth-letter" }), + ); + + const letters = await letterQueueRepository.getLetters("supplier1", 3); + + expect(letters).toHaveLength(3); + expect(letters[2].letterId).toBe("fourth-letter"); + }); + + it("applies the limit after filtering on visibilityTimestamp", async () => { + await letterQueueRepository.putLetter( + createLetter({ letterId: "first-letter" }), + ); + await letterQueueRepository.putLetter( + createLetter({ letterId: "second-letter" }), + ); + await letterQueueRepository.putLetter( + createLetter({ letterId: "third-letter" }), + ); + await letterQueueRepository.putLetter( + createLetter({ letterId: "fourth-letter" }), + ); + await letterQueueRepository.updateVisibilityTimestamp( + createLetter({ letterId: "second-letter" }), + new Date(Date.now() + 600_000), + ); + + const letters = await letterQueueRepository.getLetters("supplier1", 3); + + expect(letters).toHaveLength(3); + expect(letters[2].letterId).toBe("fourth-letter"); + }); + + it("paginates through multiple DynamoDB pages to reach the limit", async () => { + await letterQueueRepository.putLetter( + createLetter({ letterId: "first-letter" }), + ); + await letterQueueRepository.putLetter( + createLetter({ letterId: "second-letter" }), + ); + await letterQueueRepository.putLetter( + createLetter({ letterId: "third-letter" }), + ); + + const pagedRepository = new LetterQueueRepository(db.docClient, logger, { + ...db.config, + queryPageSize: 1, + }); + + const letters = await pagedRepository.getLetters("supplier1", 3); + + expect(letters).toHaveLength(3); + expect(letters[0].letterId).toBe("first-letter"); + expect(letters[1].letterId).toBe("second-letter"); + expect(letters[2].letterId).toBe("third-letter"); + }); + + it("returns an empty array if no items found", async () => { + const letters = await letterQueueRepository.getLetters("supplier1", 3); + + expect(letters).toHaveLength(0); + }); + }); + + describe("updateVisibilityTimestamp", () => { + it("updates the visibilityTimestamp on an existing letter", async () => { + const pendingLetter = + await letterQueueRepository.putLetter(createLetter()); + + await letterQueueRepository.updateVisibilityTimestamp( + pendingLetter, + new Date("2026-03-04T13:15:45.000Z"), + ); + + const letter = await getLetter(db, "supplier1", "letter1"); + expect(letter?.visibilityTimestamp).toBe("2026-03-04T13:15:45.000Z"); + }); + + it("does nothing when the letter does not exist", async () => { + await letterQueueRepository.updateVisibilityTimestamp( + createLetter(), + new Date(), + ); + + expect(await letterExists(db, "supplier1", "letter1")).toBe(false); + }); + + it("rethrows errors from DynamoDB when updating the letter", async () => { + const misconfiguredRepository = new LetterQueueRepository( + db.docClient, + logger, + { + ...db.config, + letterQueueTableName: "nonexistent-table", + }, + ); + await expect( + misconfiguredRepository.updateVisibilityTimestamp( + createLetter(), + new Date(), + ), + ).rejects.toThrow("Cannot do operations on a non-existent table"); + }); + }); }); async function getLetter(db: DBContext, supplierId: string, letterId: string) { diff --git a/internal/datastore/src/__test__/letter-repository.test.ts b/internal/datastore/src/__test__/letter-repository.test.ts index dddd08879..c9eb5b126 100644 --- a/internal/datastore/src/__test__/letter-repository.test.ts +++ b/internal/datastore/src/__test__/letter-repository.test.ts @@ -1,5 +1,4 @@ import { Logger } from "pino"; -import { PutCommand } from "@aws-sdk/lib-dynamodb"; import { DBContext, createTables, @@ -8,7 +7,7 @@ import { } from "./db"; import { LetterRepository } from "../letter-repository"; import { InsertLetter, Letter, UpdateLetter } from "../types"; -import { LogStream, createTestLogger } from "./logs"; +import { createTestLogger } from "./logs"; import { LetterAlreadyExistsError } from "../letter-already-exists-error"; function createLetter( @@ -46,7 +45,6 @@ jest.setTimeout(30_000); describe("LetterRepository", () => { let db: DBContext; let letterRepository: LetterRepository; - let logStream: LogStream; let logger: Logger; beforeAll(async () => { @@ -55,7 +53,7 @@ describe("LetterRepository", () => { beforeEach(async () => { await createTables(db); - ({ logStream, logger } = createTestLogger()); + ({ logger } = createTestLogger()); letterRepository = new LetterRepository(db.docClient, logger, db.config); }); @@ -294,208 +292,6 @@ describe("LetterRepository", () => { expect(changedLetter.reasonCode).toBe("R01"); }); - test("should return a list of letters matching status", async () => { - await letterRepository.putLetter(createLetter("supplier1", "letter1")); - await letterRepository.putLetter(createLetter("supplier1", "letter2")); - await letterRepository.putLetter( - createLetter("supplier1", "letter3", "DELIVERED"), - ); - - const pendingLetters = await letterRepository.getLettersByStatus( - "supplier1", - "PENDING", - ); - expect(pendingLetters.letters).toHaveLength(2); - expect(pendingLetters.letters.map((l) => l.id)).toEqual([ - "letter1", - "letter2", - ]); - - const deliveredLetters = await letterRepository.getLettersByStatus( - "supplier1", - "DELIVERED", - ); - expect(deliveredLetters.letters).toHaveLength(1); - expect(deliveredLetters.letters[0].id).toBe("letter3"); - }); - - test("letter list should change when letter status is updated", async () => { - await letterRepository.putLetter(createLetter("supplier1", "letter1")); - await letterRepository.putLetter(createLetter("supplier1", "letter2")); - - const pendingLetters = await letterRepository.getLettersByStatus( - "supplier1", - "PENDING", - ); - expect(pendingLetters.letters).toHaveLength(2); - - const updateLetter: UpdateLetter = { - id: "letter1", - eventId: "event1", - supplierId: "supplier1", - status: "DELIVERED", - }; - await letterRepository.updateLetterStatus(updateLetter); - const remainingLetters = await letterRepository.getLettersByStatus( - "supplier1", - "PENDING", - ); - expect(remainingLetters.letters).toHaveLength(1); - expect(remainingLetters.letters[0].id).toBe("letter2"); - - const updatedLetters = await letterRepository.getLettersByStatus( - "supplier1", - "DELIVERED", - ); - expect(updatedLetters.letters).toHaveLength(1); - expect(updatedLetters.letters[0].id).toBe("letter1"); - }); - - test("letter list should support pagination", async () => { - for (let i = 1; i <= 99; i++) { - const paddedId = i.toString().padStart(3, "0"); - await letterRepository.putLetter( - createLetter("supplier1", `letter${paddedId}`), - ); - } - const firstPage = await letterRepository.getLettersByStatus( - "supplier1", - "PENDING", - ); - expect(firstPage.letters).toHaveLength(50); // Default page size is 50 - expect(firstPage.lastEvaluatedKey).toBeDefined(); - expect(firstPage.letters[0].id).toBe("letter001"); - expect(firstPage.letters[49].id).toBe("letter050"); - - const secondPage = await letterRepository.getLettersByStatus( - "supplier1", - "PENDING", - { - exclusiveStartKey: firstPage.lastEvaluatedKey, - }, - ); - expect(secondPage.letters).toHaveLength(49); - expect(secondPage.lastEvaluatedKey).toBeUndefined(); // No more pages - expect(secondPage.letters[0].id).toBe("letter051"); - expect(secondPage.letters[48].id).toBe("letter099"); - }); - - test("letter list should return empty when no letters match status", async () => { - await letterRepository.putLetter( - createLetter("supplier1", "letter1", "ACCEPTED"), - ); - const page = await letterRepository.getLettersByStatus( - "supplier1", - "PENDING", - ); - expect(page.letters).toHaveLength(0); - expect(page.lastEvaluatedKey).toBeUndefined(); - }); - - test("letter list should warn about invalid data", async () => { - await letterRepository.putLetter(createLetter("supplier1", "letter1")); - await db.docClient.send( - new PutCommand({ - TableName: db.config.lettersTableName, - Item: { - supplierId: "supplier1", - id: "invalid-letter", - // specificationId: 'specification1', // Missing required field - groupId: "group1", - url: "s3://bucket/invalid-letter.pdf", - status: "PENDING", - supplierStatus: "supplier1#PENDING", - supplierStatusSk: new Date().toISOString(), - createdAt: new Date().toISOString(), - updatedAt: new Date().toISOString(), - }, - }), - ); - - const pendingLetters = await letterRepository.getLettersByStatus( - "supplier1", - "PENDING", - ); - expect(pendingLetters.letters).toHaveLength(1); - expect(pendingLetters.letters[0].id).toBe("letter1"); - - expect( - logStream.logs.some((log) => log.includes("Invalid letter data:")), - ).toBe(true); - - expect( - logStream.logs.some( - (log) => - log.includes("specificationId") && - log.includes("Invalid input: expected string"), - ), - ).toBe(true); - }); - - test("should return all letters for a supplier status", async () => { - await letterRepository.putLetter(createLetter("supplier1", "letter1")); - await letterRepository.putLetter(createLetter("supplier1", "letter2")); - await letterRepository.putLetter(createLetter("supplier1", "letter3")); - await letterRepository.putLetter( - createLetter("supplier1", "letter4", "REJECTED"), - ); - await letterRepository.putLetter(createLetter("supplier2", "letter1")); - await letterRepository.putLetter(createLetter("supplier2", "letter2")); - - const letters = await letterRepository.getLettersBySupplier( - "supplier1", - "PENDING", - 10, - ); - expect(letters).toEqual([ - { - id: "letter1", - specificationId: "specification1", - groupId: "group1", - status: "PENDING", - }, - { - id: "letter2", - specificationId: "specification1", - groupId: "group1", - status: "PENDING", - }, - { - id: "letter3", - specificationId: "specification1", - groupId: "group1", - status: "PENDING", - }, - ]); - }); - - test("should return empty if no letters exist for a supplier", async () => { - await letterRepository.putLetter(createLetter("supplier1", "letter1")); - await letterRepository.putLetter(createLetter("supplier1", "letter2")); - - const letters = await letterRepository.getLettersBySupplier( - "supplier2", - "PENDING", - 10, - ); - expect(letters).toEqual([]); - }); - - test("should return empty if query result Items is null", async () => { - await letterRepository.putLetter(createLetter("supplier1", "letter1")); - - const mockSend = jest.fn().mockResolvedValue({ Items: null }); - const mockDdbClient = { send: mockSend } as any; - const repo = new LetterRepository( - mockDdbClient, - { debug: jest.fn() } as any, - { lettersTableName: "letters", lettersTtlHours: 1 }, - ); - - const letters = await repo.getLettersBySupplier("supplier1", "PENDING", 10); - expect(letters).toEqual([]); - }); - test("should batch write letters to the database", async () => { const before = Date.now(); @@ -571,71 +367,4 @@ describe("LetterRepository", () => { ]), ).rejects.toThrow("Cannot do operations on a non-existent table"); }); - - test("should paginate through multiple pages when fetching letters by supplier", async () => { - const mockSend = jest - .fn() - // first call returns 30 items with a LastEvaluatedKey - .mockResolvedValueOnce({ - Items: Array.from({ length: 30 }, (_, i) => ({ - id: `letter${(i + 1).toString().padStart(3, "0")}`, - status: "PENDING", - specificationId: "specification1", - groupId: "group1", - })), - LastEvaluatedKey: { id: "letter030", supplierId: "supplier1" }, - }) - // second call returns remaining 20 items without LastEvaluatedKey - .mockResolvedValueOnce({ - Items: Array.from({ length: 20 }, (_, i) => ({ - id: `letter${(i + 31).toString().padStart(3, "0")}`, - status: "PENDING", - specificationId: "specification1", - groupId: "group1", - })), - LastEvaluatedKey: undefined, - }); - - const mockDdbClient = { send: mockSend } as any; - const repo = new LetterRepository(mockDdbClient, logger, db.config); - - // request 50 letters - should require 2 DynamoDB queries due to mocked pagination - const letters = await repo.getLettersBySupplier("supplier1", "PENDING", 50); - - // verify all 50 letters were returned - expect(letters).toHaveLength(50); - - // verify two send calls were made (2 pages) - expect(mockSend).toHaveBeenCalledTimes(2); - - // verify the second call included the ExclusiveStartKey from first response - const secondCallInput = mockSend.mock.calls[1][0].input; - expect(secondCallInput.ExclusiveStartKey).toEqual({ - id: "letter030", - supplierId: "supplier1", - }); - }); - - test("should respect limit when fewer items available than requested", async () => { - // create only 10 letters - for (let i = 1; i <= 10; i++) { - await letterRepository.putLetter( - createLetter( - "supplier1", - `letter${i.toString().padStart(2, "0")}`, - "PENDING", - ), - ); - } - - // request 50 letters but only 10 exist - const letters = await letterRepository.getLettersBySupplier( - "supplier1", - "PENDING", - 50, - ); - - expect(letters).toHaveLength(10); - expect(letters.every((l) => l.status === "PENDING")).toBe(true); - }); }); diff --git a/internal/datastore/src/letter-queue-repository.ts b/internal/datastore/src/letter-queue-repository.ts index e30eadcfc..370e255ea 100644 --- a/internal/datastore/src/letter-queue-repository.ts +++ b/internal/datastore/src/letter-queue-repository.ts @@ -2,11 +2,15 @@ import { DeleteCommand, DynamoDBDocumentClient, PutCommand, + QueryCommand, + UpdateCommand, } from "@aws-sdk/lib-dynamodb"; import { Logger } from "pino"; +import z from "zod"; import { InsertPendingLetter, PendingLetter, + PendingLetterBase, PendingLetterSchema, } from "./types"; import { LetterAlreadyExistsError } from "./letter-already-exists-error"; @@ -15,6 +19,8 @@ import { LetterDoesNotExistError } from "./letter-does-not-exist-error"; type LetterQueueRepositoryConfig = { letterQueueTableName: string; letterQueueTtlHours: number; + /** Maximum number of items to fetch per DynamoDB page. Defaults to 100. */ + queryPageSize?: number; }; export default class LetterQueueRepository { @@ -86,4 +92,69 @@ export default class LetterQueueRepository { throw error; } } + + async getLetters( + supplierId: string, + limit: number, + ): Promise { + const letters: PendingLetter[] = []; + let lastEvaluatedKey: Record | undefined; + + do { + const result = await this.ddbClient.send( + new QueryCommand({ + TableName: this.config.letterQueueTableName, + IndexName: "queueSortOrder-index", + KeyConditionExpression: "supplierId = :supplierId", + FilterExpression: "visibilityTimestamp < :now", + ExpressionAttributeValues: { + ":supplierId": supplierId, + ":now": new Date().toISOString(), + }, + // 1000 is a compromise - a smaller number might result in a lot of round trips, a larger one might + // entail fetching and then throwing away a lot of data + Limit: this.config.queryPageSize ?? 1000, + ExclusiveStartKey: lastEvaluatedKey, + }), + ); + + const page = z.array(PendingLetterSchema).parse(result.Items); + letters.push(...page); + + lastEvaluatedKey = result.LastEvaluatedKey; + } while (lastEvaluatedKey !== undefined && letters.length < limit); + + return letters.slice(0, limit); + } + + async updateVisibilityTimestamp( + pendingLetter: PendingLetterBase, + timestamp: Date, + ): Promise { + try { + await this.ddbClient.send( + new UpdateCommand({ + TableName: this.config.letterQueueTableName, + Key: { + supplierId: pendingLetter.supplierId, + letterId: pendingLetter.letterId, + }, + UpdateExpression: "SET visibilityTimestamp = :ts", + ConditionExpression: "attribute_exists(letterId)", + ExpressionAttributeValues: { + ":ts": timestamp.toISOString(), + }, + }), + ); + } catch (error) { + if ( + error instanceof Error && + error.name === "ConditionalCheckFailedException" + ) { + // Letter has been deleted from queue as no longer pending - just ignore it + return; + } + throw error; + } + } } diff --git a/internal/datastore/src/letter-repository.ts b/internal/datastore/src/letter-repository.ts index 72b9be486..3af7932c3 100644 --- a/internal/datastore/src/letter-repository.ts +++ b/internal/datastore/src/letter-repository.ts @@ -3,21 +3,12 @@ import { DynamoDBDocumentClient, GetCommand, PutCommand, - QueryCommand, UpdateCommand, UpdateCommandOutput, } from "@aws-sdk/lib-dynamodb"; import { ConditionalCheckFailedException } from "@aws-sdk/client-dynamodb"; import { Logger } from "pino"; -import { z } from "zod"; -import { - InsertLetter, - Letter, - LetterBase, - LetterSchema, - LetterSchemaBase, - UpdateLetter, -} from "./types"; +import { InsertLetter, Letter, LetterSchema, UpdateLetter } from "./types"; import { LetterAlreadyExistsError } from "./letter-already-exists-error"; export type PagingOptions = Partial<{ @@ -25,10 +16,6 @@ export type PagingOptions = Partial<{ pageSize: number; }>; -const defaultPagingOptions = { - pageSize: 50, -}; - export type LetterRepositoryConfig = { lettersTableName: string; lettersTtlHours: number; @@ -123,46 +110,6 @@ export class LetterRepository { return LetterSchema.parse(result.Item); } - async getLettersByStatus( - supplierId: string, - status: Letter["status"], - options?: PagingOptions, - ): Promise<{ - letters: Letter[]; - lastEvaluatedKey?: Record; - }> { - const extendedOptions = { ...defaultPagingOptions, ...options }; - - const result = await this.ddbClient.send( - new QueryCommand({ - TableName: this.config.lettersTableName, - IndexName: "supplierStatus-index", - KeyConditionExpression: "supplierStatus = :supplierStatus", - ExpressionAttributeValues: { - ":supplierStatus": `${supplierId}#${status}`, - }, - Limit: extendedOptions.pageSize, - ExclusiveStartKey: extendedOptions.exclusiveStartKey, - }), - ); - - // Items is an empty array if no items match the query - const letters = result - .Items!.map((item) => LetterSchema.safeParse(item)) - .filter((letterItem) => { - if (!letterItem.success) { - this.log.warn(`Invalid letter data: ${letterItem.error}`); - } - return letterItem.success; - }) - .map((successLetterItem) => successLetterItem.data); - - return { - letters, - lastEvaluatedKey: result.LastEvaluatedKey, - }; - } - async updateLetterStatus( letterToUpdate: UpdateLetter, ): Promise { @@ -238,45 +185,4 @@ export class LetterRepository { } return { updateExpression, expressionAttributeValues }; } - - async getLettersBySupplier( - supplierId: string, - status: string, - limit: number, - ): Promise { - const items: Record[] = []; - let ExclusiveStartKey: Record | undefined; - const supplierStatus = `${supplierId}#${status}`; - let res; - - do { - const remaining = limit - items.length; - - res = await this.ddbClient.send( - new QueryCommand({ - TableName: this.config.lettersTableName, - IndexName: "supplierStatus-index", - KeyConditionExpression: "supplierStatus = :supplierStatus", - ExpressionAttributeNames: { - "#status": "status", // reserved keyword - }, - ExpressionAttributeValues: { - ":supplierStatus": supplierStatus, - }, - ProjectionExpression: - "id, #status, specificationId, groupId, reasonCode, reasonText", - Limit: remaining, // limit is a per-page cap - ExclusiveStartKey, - }), - ); - - if (res.Items?.length) { - items.push(...res.Items); - } - - ExclusiveStartKey = res.LastEvaluatedKey; - } while (res.LastEvaluatedKey && items.length < limit); - - return z.array(LetterSchemaBase).parse(items); - } } diff --git a/internal/datastore/src/types.ts b/internal/datastore/src/types.ts index 2c5078d05..730f91177 100644 --- a/internal/datastore/src/types.ts +++ b/internal/datastore/src/types.ts @@ -78,24 +78,28 @@ export type UpdateLetter = { reasonText?: string; }; -export const PendingLetterSchema = z.object({ +export const PendingLetterSchemaBase = z.object({ supplierId: idRef(SupplierSchema, "id"), letterId: idRef(LetterSchema, "id"), + specificationId: z.string(), + groupId: z.string(), +}); + +export const PendingLetterSchema = PendingLetterSchemaBase.extend({ queueTimestamp: z.string(), visibilityTimestamp: z.string(), queueSortOrderSk: z.string().describe("Secondary index SK"), - specificationId: z.string(), - groupId: z.string(), priority: z.int().min(0).max(99).optional(), ttl: z.int(), }); -export type PendingLetter = z.infer; +export const InsertPendingLetter = PendingLetterSchemaBase.extend({ + priority: z.int().min(0).max(99).optional(), +}); -export type InsertPendingLetter = Omit< - PendingLetter, - "ttl" | "queueTimestamp" | "visibilityTimestamp" | "queueSortOrderSk" ->; +export type PendingLetter = z.infer; +export type PendingLetterBase = z.infer; +export type InsertPendingLetter = z.infer; export const MISchemaBase = z.object({ id: z.string(), diff --git a/lambdas/api-handler/src/config/__tests__/deps.test.ts b/lambdas/api-handler/src/config/__tests__/deps.test.ts index 269bda443..dae06f5d9 100644 --- a/lambdas/api-handler/src/config/__tests__/deps.test.ts +++ b/lambdas/api-handler/src/config/__tests__/deps.test.ts @@ -4,11 +4,14 @@ describe("createDependenciesContainer", () => { const env = { LETTERS_TABLE_NAME: "LettersTable", LETTER_TTL_HOURS: 12_960, + LETTER_QUEUE_TABLE_NAME: "LetterQueueTable", + LETTER_QUEUE_TTL_HOURS: 168, MI_TABLE_NAME: "MITable", MI_TTL_HOURS: 2160, SUPPLIER_ID_HEADER: "nhsd-supplier-id", APIM_CORRELATION_HEADER: "nhsd-correlation-id", DOWNLOAD_URL_TTL_SECONDS: 60, + LETTER_QUEUE_VISIBILITY_TIMEOUT: 600, }; beforeEach(() => { @@ -36,6 +39,7 @@ describe("createDependenciesContainer", () => { // Repo client jest.mock("@internal/datastore", () => ({ LetterRepository: jest.fn(), + LetterQueueRepository: jest.fn(), MIRepository: jest.fn(), DBHealthcheck: jest.fn(), })); @@ -49,9 +53,8 @@ describe("createDependenciesContainer", () => { const { S3Client } = jest.requireMock("@aws-sdk/client-s3"); const { SQSClient } = jest.requireMock("@aws-sdk/client-sqs"); const { createLogger } = jest.requireMock("@internal/helpers"); - const { LetterRepository, MIRepository } = jest.requireMock( - "@internal/datastore", - ); + const { LetterQueueRepository, LetterRepository, MIRepository } = + jest.requireMock("@internal/datastore"); // allow re-import of deps to leverage mocks // eslint-disable-next-line @typescript-eslint/no-require-imports @@ -71,6 +74,13 @@ describe("createDependenciesContainer", () => { lettersTtlHours: 12_960, }); + expect(LetterQueueRepository).toHaveBeenCalledTimes(1); + const letterQueueRepoCtorArgs = LetterQueueRepository.mock.calls[0]; + expect(letterQueueRepoCtorArgs[2]).toEqual({ + letterQueueTableName: "LetterQueueTable", + letterQueueTtlHours: 168, + }); + expect(MIRepository).toHaveBeenCalledTimes(1); const miRepoCtorArgs = MIRepository.mock.calls[0]; expect(miRepoCtorArgs[2]).toEqual({ diff --git a/lambdas/api-handler/src/config/__tests__/env.test.ts b/lambdas/api-handler/src/config/__tests__/env.test.ts index 6b52a3474..50bf53f4e 100644 --- a/lambdas/api-handler/src/config/__tests__/env.test.ts +++ b/lambdas/api-handler/src/config/__tests__/env.test.ts @@ -19,10 +19,13 @@ describe("lambdaEnv", () => { process.env.SUPPLIER_ID_HEADER = "nhsd-supplier-id"; process.env.APIM_CORRELATION_HEADER = "nhsd-correlation-id"; process.env.LETTERS_TABLE_NAME = "letters-table"; + process.env.LETTER_QUEUE_TABLE_NAME = "letter-queue-table"; process.env.MI_TABLE_NAME = "mi-table"; process.env.LETTER_TTL_HOURS = "12960"; + process.env.LETTER_QUEUE_TTL_HOURS = "240"; process.env.MI_TTL_HOURS = "2160"; process.env.DOWNLOAD_URL_TTL_SECONDS = "60"; + process.env.LETTER_QUEUE_VISIBILITY_TIMEOUT = "600"; process.env.MAX_LIMIT = "2500"; process.env.QUEUE_URL = "url"; process.env.EVENT_SOURCE = "supplier-api"; @@ -34,10 +37,13 @@ describe("lambdaEnv", () => { SUPPLIER_ID_HEADER: "nhsd-supplier-id", APIM_CORRELATION_HEADER: "nhsd-correlation-id", LETTERS_TABLE_NAME: "letters-table", + LETTER_QUEUE_TABLE_NAME: "letter-queue-table", MI_TABLE_NAME: "mi-table", LETTER_TTL_HOURS: 12_960, + LETTER_QUEUE_TTL_HOURS: 240, MI_TTL_HOURS: 2160, DOWNLOAD_URL_TTL_SECONDS: 60, + LETTER_QUEUE_VISIBILITY_TIMEOUT: 600, MAX_LIMIT: 2500, QUEUE_URL: "url", EVENT_SOURCE: "supplier-api", @@ -61,8 +67,11 @@ describe("lambdaEnv", () => { process.env.SUPPLIER_ID_HEADER = "nhsd-supplier-id"; process.env.APIM_CORRELATION_HEADER = "nhsd-correlation-id"; process.env.LETTERS_TABLE_NAME = "letters-table"; + process.env.LETTER_QUEUE_TABLE_NAME = "letter-queue-table"; process.env.MI_TABLE_NAME = "mi-table"; process.env.LETTER_TTL_HOURS = "12960"; + process.env.LETTER_QUEUE_TTL_HOURS = "240"; + process.env.LETTER_QUEUE_VISIBILITY_TIMEOUT = "600"; process.env.MI_TTL_HOURS = "2160"; process.env.DOWNLOAD_URL_TTL_SECONDS = "60"; process.env.EVENT_SOURCE = "supplier-api"; @@ -74,8 +83,11 @@ describe("lambdaEnv", () => { SUPPLIER_ID_HEADER: "nhsd-supplier-id", APIM_CORRELATION_HEADER: "nhsd-correlation-id", LETTERS_TABLE_NAME: "letters-table", + LETTER_QUEUE_TABLE_NAME: "letter-queue-table", MI_TABLE_NAME: "mi-table", LETTER_TTL_HOURS: 12_960, + LETTER_QUEUE_TTL_HOURS: 240, + LETTER_QUEUE_VISIBILITY_TIMEOUT: 600, MI_TTL_HOURS: 2160, DOWNLOAD_URL_TTL_SECONDS: 60, MAX_LIMIT: undefined, diff --git a/lambdas/api-handler/src/config/deps.ts b/lambdas/api-handler/src/config/deps.ts index ff4a1020f..427d883fa 100644 --- a/lambdas/api-handler/src/config/deps.ts +++ b/lambdas/api-handler/src/config/deps.ts @@ -6,6 +6,7 @@ import { SNSClient } from "@aws-sdk/client-sns"; import { Logger } from "pino"; import { DBHealthcheck, + LetterQueueRepository, LetterRepository, MIRepository, } from "@internal/datastore"; @@ -17,6 +18,7 @@ export type Deps = { sqsClient: SQSClient; snsClient: SNSClient; letterRepo: LetterRepository; + letterQueueRepo: LetterQueueRepository; miRepo: MIRepository; dbHealthcheck: DBHealthcheck; logger: Logger; @@ -40,6 +42,18 @@ function createLetterRepository( return new LetterRepository(createDocumentClient(), log, config); } +function createLetterQueueRepository( + log: Logger, + environment: EnvVars, +): LetterQueueRepository { + const config = { + letterQueueTableName: environment.LETTER_QUEUE_TABLE_NAME, + letterQueueTtlHours: environment.LETTER_QUEUE_TTL_HOURS, + }; + + return new LetterQueueRepository(createDocumentClient(), log, config); +} + function createDBHealthcheck(environment: EnvVars): DBHealthcheck { const config = { lettersTableName: environment.LETTERS_TABLE_NAME, @@ -66,6 +80,7 @@ export function createDependenciesContainer(): Deps { sqsClient: new SQSClient(), snsClient: new SNSClient(), letterRepo: createLetterRepository(log, envVars), + letterQueueRepo: createLetterQueueRepository(log, envVars), miRepo: createMIRepository(log, envVars), dbHealthcheck: createDBHealthcheck(envVars), logger: log, diff --git a/lambdas/api-handler/src/config/env.ts b/lambdas/api-handler/src/config/env.ts index be22d3182..a7402eee6 100644 --- a/lambdas/api-handler/src/config/env.ts +++ b/lambdas/api-handler/src/config/env.ts @@ -4,10 +4,13 @@ const EnvVarsSchema = z.object({ SUPPLIER_ID_HEADER: z.string(), APIM_CORRELATION_HEADER: z.string(), LETTERS_TABLE_NAME: z.string(), + LETTER_QUEUE_TABLE_NAME: z.string(), + LETTER_QUEUE_TTL_HOURS: z.coerce.number().int(), MI_TABLE_NAME: z.string(), LETTER_TTL_HOURS: z.coerce.number().int(), MI_TTL_HOURS: z.coerce.number().int(), DOWNLOAD_URL_TTL_SECONDS: z.coerce.number().int(), + LETTER_QUEUE_VISIBILITY_TIMEOUT: z.coerce.number().int(), MAX_LIMIT: z.coerce.number().int().optional(), QUEUE_URL: z.coerce.string().optional(), PINO_LOG_LEVEL: z.coerce.string().optional(), diff --git a/lambdas/api-handler/src/handlers/__tests__/get-letters.test.ts b/lambdas/api-handler/src/handlers/__tests__/get-letters.test.ts index 564e4b830..43f0c3716 100644 --- a/lambdas/api-handler/src/handlers/__tests__/get-letters.test.ts +++ b/lambdas/api-handler/src/handlers/__tests__/get-letters.test.ts @@ -3,7 +3,6 @@ import type { APIGatewayProxyResult, Context } from "aws-lambda"; import { mockDeep } from "jest-mock-extended"; import { S3Client } from "@aws-sdk/client-s3"; import pino from "pino"; -import { LetterRepository } from "@internal/datastore/src"; import { processError } from "../../mappers/error-mapper"; import * as letterService from "../../services/letter-operations"; @@ -28,13 +27,10 @@ jest.mock("../../services/letter-operations"); describe("API Lambda handler", () => { const mockedDeps: jest.Mocked = { s3Client: {} as unknown as S3Client, - letterRepo: {} as unknown as LetterRepository, logger: { info: jest.fn(), error: jest.fn() } as unknown as pino.Logger, env: { SUPPLIER_ID_HEADER: "nhsd-supplier-id", APIM_CORRELATION_HEADER: "nhsd-correlation-id", - LETTERS_TABLE_NAME: "LETTERS_TABLE_NAME", - LETTER_TTL_HOURS: 12_960, DOWNLOAD_URL_TTL_SECONDS: 60, MAX_LIMIT: 2500, } as unknown as EnvVars, @@ -45,8 +41,8 @@ describe("API Lambda handler", () => { }); it("returns 200 OK with basic paginated resources", async () => { - const mockedGetLetters = letterService.getLettersForSupplier as jest.Mock; - mockedGetLetters.mockResolvedValue([ + const mockGetPendingLetters = letterService.getPendingLetters as jest.Mock; + mockGetPendingLetters.mockResolvedValue([ { id: "l1", specificationId: "s1", @@ -83,11 +79,11 @@ describe("API Lambda handler", () => { const getLettersHandler = createGetLettersHandler(mockedDeps); const result = await getLettersHandler(event, context, callback); - expect(mockedGetLetters).toHaveBeenCalledWith( + expect(mockGetPendingLetters).toHaveBeenCalledWith( "supplier1", - "PENDING", mockedDeps.env.MAX_LIMIT, - mockedDeps.letterRepo, + mockedDeps.letterQueueRepo, + mockedDeps.env.LETTER_QUEUE_VISIBILITY_TIMEOUT, ); const expected = { @@ -129,8 +125,8 @@ describe("API Lambda handler", () => { }); it("returns 200 OK with a valid limit", async () => { - const mockedGetLetters = letterService.getLettersForSupplier as jest.Mock; - mockedGetLetters.mockResolvedValue([ + const mockGetPendingLetters = letterService.getPendingLetters as jest.Mock; + mockGetPendingLetters.mockResolvedValue([ { id: "l1", specificationId: "s1", @@ -154,11 +150,11 @@ describe("API Lambda handler", () => { const getLettersHandler = createGetLettersHandler(mockedDeps); const result = await getLettersHandler(event, context, callback); - expect(mockedGetLetters).toHaveBeenCalledWith( + expect(mockGetPendingLetters).toHaveBeenCalledWith( "supplier1", - "PENDING", 50, - mockedDeps.letterRepo, + mockedDeps.letterQueueRepo, + mockedDeps.env.LETTER_QUEUE_VISIBILITY_TIMEOUT, ); const expected = { diff --git a/lambdas/api-handler/src/handlers/get-letters.ts b/lambdas/api-handler/src/handlers/get-letters.ts index 6ad0d57f3..d2b98bd04 100644 --- a/lambdas/api-handler/src/handlers/get-letters.ts +++ b/lambdas/api-handler/src/handlers/get-letters.ts @@ -5,7 +5,7 @@ import { import { Logger } from "pino"; import { MetricsLogger, metricScope } from "aws-embedded-metrics"; import { MetricStatus, emitForSingleSupplier } from "@internal/helpers"; -import { getLettersForSupplier } from "../services/letter-operations"; +import { getPendingLetters } from "../services/letter-operations"; import { extractCommonIds } from "../utils/common-ids"; import { requireEnvVar } from "../utils/validation"; import { ApiErrorDetail } from "../contracts/errors"; @@ -14,9 +14,6 @@ import ValidationError from "../errors/validation-error"; import { mapToGetLettersResponse } from "../mappers/letter-mapper"; import type { Deps } from "../config/deps"; -// The endpoint should only return pending letters for now -const status = "PENDING"; - function validateLimitParamOnly( queryStringParameters: APIGatewayProxyEventQueryStringParameters | null, logger: Logger, @@ -110,11 +107,11 @@ export default function createGetLettersHandler( deps.logger, ); - const letters = await getLettersForSupplier( + const letters = await getPendingLetters( supplierId, - status, limitNumber, - deps.letterRepo, + deps.letterQueueRepo, + deps.env.LETTER_QUEUE_VISIBILITY_TIMEOUT, ); const response = mapToGetLettersResponse(letters); @@ -123,7 +120,6 @@ export default function createGetLettersHandler( description: "Pending letters successfully fetched", supplierId, limitNumber, - status, lettersCount: letters.length, correlationId: commonIds.value.correlationId, }); diff --git a/lambdas/api-handler/src/services/__tests__/letter-operations.test.ts b/lambdas/api-handler/src/services/__tests__/letter-operations.test.ts index be69387e1..fa80ca2d3 100644 --- a/lambdas/api-handler/src/services/__tests__/letter-operations.test.ts +++ b/lambdas/api-handler/src/services/__tests__/letter-operations.test.ts @@ -7,7 +7,7 @@ import { enqueueLetterUpdateRequests, getLetterById, getLetterDataUrl, - getLettersForSupplier, + getPendingLetters, } from "../letter-operations"; import { UpdateLetterCommand } from "../../contracts/letters"; import { Deps } from "../../config/deps"; @@ -45,34 +45,53 @@ function makeLetter(id: string, status: Letter["status"]): Letter { }; } -describe("getLetterIdsForSupplier", () => { +afterEach(async () => { + jest.useRealTimers(); +}); + +describe("getPendingLetters", () => { beforeEach(() => { jest.clearAllMocks(); }); - - it("returns letter IDs from the repository", async () => { + it("returns letters from the letter queue repository", async () => { + jest.useFakeTimers().setSystemTime(new Date("2026-03-04T13:15:45.000Z")); const mockRepo = { - getLettersBySupplier: jest.fn().mockResolvedValue([ - { id: "id1", status: "PENDING", specificationId: "s1" }, - { id: "id2", status: "PENDING", specificationId: "s1" }, + getLetters: jest.fn().mockResolvedValue([ + { + supplierId: "supplier1", + letterId: "id1", + specificationId: "s1", + groupId: "g1", + }, + { + supplierId: "supplier1", + letterId: "id2", + specificationId: "s1", + groupId: "g1", + }, ]), + updateVisibilityTimestamp: jest.fn(), }; - - const result = await getLettersForSupplier( + const result = await getPendingLetters( "supplier1", - "PENDING", 10, mockRepo as any, + 600, ); - - expect(mockRepo.getLettersBySupplier).toHaveBeenCalledWith( - "supplier1", - "PENDING", - 10, + expect(mockRepo.getLetters).toHaveBeenCalledWith("supplier1", 10); + expect(mockRepo.updateVisibilityTimestamp).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ supplierId: "supplier1", letterId: "id1" }), + new Date("2026-03-04T13:25:45.000Z"), + ); + expect(mockRepo.updateVisibilityTimestamp).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ supplierId: "supplier1", letterId: "id2" }), + new Date("2026-03-04T13:25:45.000Z"), ); expect(result).toEqual([ - { id: "id1", status: "PENDING", specificationId: "s1" }, - { id: "id2", status: "PENDING", specificationId: "s1" }, + { id: "id1", status: "PENDING", specificationId: "s1", groupId: "g1" }, + { id: "id2", status: "PENDING", specificationId: "s1", groupId: "g1" }, ]); }); }); diff --git a/lambdas/api-handler/src/services/letter-operations.ts b/lambdas/api-handler/src/services/letter-operations.ts index dc2de921d..e1361d2d9 100644 --- a/lambdas/api-handler/src/services/letter-operations.ts +++ b/lambdas/api-handler/src/services/letter-operations.ts @@ -1,4 +1,9 @@ -import { LetterBase, LetterRepository } from "@internal/datastore"; +import { + LetterBase, + LetterQueueRepository, + LetterRepository, + PendingLetterBase, +} from "@internal/datastore"; import { getSignedUrl } from "@aws-sdk/s3-request-presigner"; import { GetObjectCommand, S3Client } from "@aws-sdk/client-s3"; import { SendMessageBatchCommand } from "@aws-sdk/client-sqs"; @@ -31,13 +36,37 @@ async function getDownloadUrl( return getSignedUrl(s3Client, command, { expiresIn: expiry }); } -export const getLettersForSupplier = async ( +function mapPendingLetterToLetterBase(pending: PendingLetterBase): LetterBase { + return { + id: pending.letterId, + status: "PENDING", + specificationId: pending.specificationId, + groupId: pending.groupId, + }; +} + +export const getPendingLetters = async ( supplierId: string, - status: string, + limit: number, - letterRepo: LetterRepository, + letterQueueRepo: LetterQueueRepository, + visibilityTimeout: number, ): Promise => { - return letterRepo.getLettersBySupplier(supplierId, status, limit); + const CONCURRENCY = 5; + + const pendingLetters = await letterQueueRepo.getLetters(supplierId, limit); + const timestamp = new Date(Date.now() + visibilityTimeout * 1000); + + for (let i = 0; i < pendingLetters.length; i += CONCURRENCY) { + const window = pendingLetters.slice(i, i + CONCURRENCY); + await Promise.all( + window.map((letter) => + letterQueueRepo.updateVisibilityTimestamp(letter, timestamp), + ), + ); + } + + return pendingLetters.map((letter) => mapPendingLetterToLetterBase(letter)); }; export const getLetterById = async (