diff --git a/infrastructure/terraform/components/api/module_lambda_supplier_allocator.tf b/infrastructure/terraform/components/api/module_lambda_supplier_allocator.tf index 7bb3595fb..388934831 100644 --- a/infrastructure/terraform/components/api/module_lambda_supplier_allocator.tf +++ b/infrastructure/terraform/components/api/module_lambda_supplier_allocator.tf @@ -35,7 +35,8 @@ module "supplier_allocator" { log_subscription_role_arn = local.acct.log_subscription_role_arn lambda_env_vars = merge(local.common_lambda_env_vars, { - UPSERT_LETTERS_QUEUE_URL = module.sqs_letter_updates.sqs_queue_url + UPSERT_LETTERS_QUEUE_URL = module.sqs_letter_updates.sqs_queue_url, + IDEMPOTENCY_TABLE_NAME = aws_dynamodb_table.idempotency.name }) } @@ -110,6 +111,7 @@ data "aws_iam_policy_document" "supplier_allocator_lambda" { resources = [ aws_dynamodb_table.supplier-quotas.arn, + aws_dynamodb_table.idempotency.arn, "${aws_dynamodb_table.supplier-quotas.arn}/index/*" ] } diff --git a/lambdas/supplier-allocator/package.json b/lambdas/supplier-allocator/package.json index e0368a4dc..5f8fb9312 100644 --- a/lambdas/supplier-allocator/package.json +++ b/lambdas/supplier-allocator/package.json @@ -1,5 +1,6 @@ { "dependencies": { + "@aws-lambda-powertools/idempotency": "^2.33.0", "@aws-sdk/client-dynamodb": "^3.858.0", "@aws-sdk/client-sqs": "^3.984.0", "@aws-sdk/lib-dynamodb": "^3.1044.0", diff --git a/lambdas/supplier-allocator/src/config/__tests__/deps.test.ts b/lambdas/supplier-allocator/src/config/__tests__/deps.test.ts index 88d04eab5..316fc91c7 100644 --- a/lambdas/supplier-allocator/src/config/__tests__/deps.test.ts +++ b/lambdas/supplier-allocator/src/config/__tests__/deps.test.ts @@ -4,6 +4,7 @@ describe("createDependenciesContainer", () => { const env = { SUPPLIER_CONFIG_TABLE_NAME: "SupplierConfigTable", SUPPLIER_QUOTAS_TABLE_NAME: "SupplierQuotasTable", + IDEMPOTENCY_TABLE_NAME: "IdempotencyTable", }; beforeEach(() => { @@ -27,6 +28,10 @@ describe("createDependenciesContainer", () => { SupplierQuotasRepository: jest.fn(), })); + jest.mock("@aws-lambda-powertools/idempotency/dynamodb", () => ({ + DynamoDBPersistenceLayer: jest.fn(), + })); + // Env jest.mock("../env", () => ({ envVars: env })); }); @@ -40,6 +45,9 @@ describe("createDependenciesContainer", () => { const { SupplierQuotasRepository } = jest.requireMock( "@internal/datastore", ); + const { DynamoDBPersistenceLayer } = jest.requireMock( + "@aws-lambda-powertools/idempotency/dynamodb", + ); // eslint-disable-next-line @typescript-eslint/no-require-imports const { createDependenciesContainer } = require("../deps"); const deps: Deps = createDependenciesContainer(); @@ -54,6 +62,11 @@ describe("createDependenciesContainer", () => { expect(supplierQuotasRepoCtorArgs[1]).toEqual({ supplierQuotasTableName: "SupplierQuotasTable", }); + expect(DynamoDBPersistenceLayer).toHaveBeenCalledTimes(1); + const idempotencyLayerCtorArgs = DynamoDBPersistenceLayer.mock.calls[0][0]; + expect(idempotencyLayerCtorArgs).toEqual({ + tableName: "IdempotencyTable", + }); expect(deps.env).toEqual(env); }); }); diff --git a/lambdas/supplier-allocator/src/config/__tests__/env.test.ts b/lambdas/supplier-allocator/src/config/__tests__/env.test.ts index 1f4da34cb..6274a6186 100644 --- a/lambdas/supplier-allocator/src/config/__tests__/env.test.ts +++ b/lambdas/supplier-allocator/src/config/__tests__/env.test.ts @@ -16,12 +16,14 @@ describe("lambdaEnv", () => { it("should load all environment variables successfully", () => { process.env.SUPPLIER_CONFIG_TABLE_NAME = "SupplierConfigTable"; process.env.SUPPLIER_QUOTAS_TABLE_NAME = "SupplierQuotasTable"; + process.env.IDEMPOTENCY_TABLE_NAME = "IdempotencyTable"; const { envVars } = require("../env"); expect(envVars).toEqual({ SUPPLIER_CONFIG_TABLE_NAME: "SupplierConfigTable", SUPPLIER_QUOTAS_TABLE_NAME: "SupplierQuotasTable", + IDEMPOTENCY_TABLE_NAME: "IdempotencyTable", }); }); }); diff --git a/lambdas/supplier-allocator/src/config/deps.ts b/lambdas/supplier-allocator/src/config/deps.ts index 5f58a00e0..e937d64ef 100644 --- a/lambdas/supplier-allocator/src/config/deps.ts +++ b/lambdas/supplier-allocator/src/config/deps.ts @@ -1,5 +1,6 @@ import { DynamoDBClient } from "@aws-sdk/client-dynamodb"; import { DynamoDBDocumentClient } from "@aws-sdk/lib-dynamodb"; +import { DynamoDBPersistenceLayer } from "@aws-lambda-powertools/idempotency/dynamodb"; import { SQSClient } from "@aws-sdk/client-sqs"; import { Logger } from "pino"; import { createLogger } from "@internal/helpers"; @@ -12,6 +13,7 @@ import { EnvVars, envVars } from "./env"; export type Deps = { supplierConfigRepo: SupplierConfigRepository; supplierQuotasRepo: SupplierQuotasRepository; + idempotencyLayer: DynamoDBPersistenceLayer; logger: Logger; env: EnvVars; sqsClient: SQSClient; @@ -22,11 +24,7 @@ function createDocumentClient(): DynamoDBDocumentClient { return DynamoDBDocumentClient.from(ddbClient); } -function createSupplierConfigRepository( - log: Logger, - // eslint-disable-next-line @typescript-eslint/no-shadow - envVars: EnvVars, -): SupplierConfigRepository { +function createSupplierConfigRepository(): SupplierConfigRepository { const config = { supplierConfigTableName: envVars.SUPPLIER_CONFIG_TABLE_NAME, }; @@ -34,11 +32,7 @@ function createSupplierConfigRepository( return new SupplierConfigRepository(createDocumentClient(), config); } -function createSupplierQuotasRepository( - log: Logger, - // eslint-disable-next-line @typescript-eslint/no-shadow - envVars: EnvVars, -): SupplierQuotasRepository { +function createSupplierQuotasRepository(): SupplierQuotasRepository { const config = { supplierQuotasTableName: envVars.SUPPLIER_QUOTAS_TABLE_NAME, }; @@ -46,12 +40,19 @@ function createSupplierQuotasRepository( return new SupplierQuotasRepository(createDocumentClient(), config); } +function createIdempotencyLayer(): DynamoDBPersistenceLayer { + return new DynamoDBPersistenceLayer({ + tableName: envVars.IDEMPOTENCY_TABLE_NAME, + }); +} + export function createDependenciesContainer(): Deps { const log = createLogger({ logLevel: envVars.PINO_LOG_LEVEL }); return { - supplierConfigRepo: createSupplierConfigRepository(log, envVars), - supplierQuotasRepo: createSupplierQuotasRepository(log, envVars), + supplierConfigRepo: createSupplierConfigRepository(), + supplierQuotasRepo: createSupplierQuotasRepository(), + idempotencyLayer: createIdempotencyLayer(), logger: log, env: envVars, sqsClient: new SQSClient({}), diff --git a/lambdas/supplier-allocator/src/config/env.ts b/lambdas/supplier-allocator/src/config/env.ts index a155e4dbc..7d99ef0df 100644 --- a/lambdas/supplier-allocator/src/config/env.ts +++ b/lambdas/supplier-allocator/src/config/env.ts @@ -4,6 +4,7 @@ const EnvVarsSchema = z.object({ SUPPLIER_CONFIG_TABLE_NAME: z.string(), SUPPLIER_QUOTAS_TABLE_NAME: z.string(), PINO_LOG_LEVEL: z.coerce.string().optional(), + IDEMPOTENCY_TABLE_NAME: z.string(), }); export type EnvVars = z.infer; diff --git a/lambdas/supplier-allocator/src/handler/__tests__/allocate-handler.test.ts b/lambdas/supplier-allocator/src/handler/__tests__/allocate-handler.test.ts index 39ae14c84..cc02087cd 100644 --- a/lambdas/supplier-allocator/src/handler/__tests__/allocate-handler.test.ts +++ b/lambdas/supplier-allocator/src/handler/__tests__/allocate-handler.test.ts @@ -7,15 +7,11 @@ import { $LetterStatusChangeEvent, LetterStatusChangeEvent, } from "@nhsdigital/nhs-notify-event-schemas-supplier-api/src/events/letter-events"; -import { - SupplierConfigRepository, - SupplierQuotasRepository, -} from "@internal/datastore"; +import { makeIdempotent } from "@aws-lambda-powertools/idempotency"; import createSupplierAllocatorHandler from "../allocate-handler"; import * as supplierConfig from "../../services/supplier-config"; import * as supplierQuotas from "../../services/supplier-quotas"; import * as allocationConfig from "../allocation-config"; - import { Deps } from "../../config/deps"; import packageJson from "../../../package.json"; @@ -28,6 +24,14 @@ jest.mock("../../services/supplier-config"); jest.mock("../../services/supplier-quotas"); jest.mock("../allocation-config"); +jest.mock("@aws-lambda-powertools/idempotency", () => { + const original = jest.requireActual("@aws-lambda-powertools/idempotency"); + return { + ...original, + makeIdempotent: jest.fn((fn, _) => fn), + }; +}); + function createSQSEvent(records: SQSRecord[]): SQSEvent { return { Records: records, @@ -185,16 +189,15 @@ function setupDefaultMocks() { } describe("createSupplierAllocatorHandler", () => { - let mockSqsClient: jest.Mocked; - let mockedDeps: jest.Mocked; - let mockedSupplierConfigRepo: jest.Mocked; - let mockedSupplierQuotasRepo: jest.Mocked; - beforeEach(() => { - mockSqsClient = { - send: jest.fn(), - } as unknown as jest.Mocked; - - mockedSupplierConfigRepo = { + const mockedDeps: jest.Mocked = { + logger: { error: jest.fn(), info: jest.fn() } as unknown as pino.Logger, + env: { + SUPPLIER_CONFIG_TABLE_NAME: "SupplierConfigTable", + SUPPLIER_QUOTAS_TABLE_NAME: "SupplierQuotasTable", + IDEMPOTENCY_TABLE_NAME: "IdempotencyTable", + }, + sqsClient: { send: jest.fn() } as unknown as SQSClient, + supplierConfigRepo: { ddbClient: {} as any, config: {} as any, getLetterVariant: jest.fn(), @@ -203,27 +206,18 @@ describe("createSupplierAllocatorHandler", () => { getSuppliersDetails: jest.fn(), getSupplierPacksForPackSpecification: jest.fn(), getPackSpecification: jest.fn(), - }; - - mockedSupplierQuotasRepo = { + }, + supplierQuotasRepo: { ddbClient: {} as any, config: {} as any, getOverallAllocation: jest.fn(), updateOverallAllocation: jest.fn(), getDailyAllocation: jest.fn(), updateDailyAllocation: jest.fn(), - }; + }, + } as unknown as Deps; - mockedDeps = { - logger: { error: jest.fn(), info: jest.fn() } as unknown as pino.Logger, - env: { - SUPPLIER_CONFIG_TABLE_NAME: "SupplierConfigTable", - SUPPLIER_QUOTAS_TABLE_NAME: "SupplierQuotasTable", - }, - sqsClient: mockSqsClient, - supplierConfigRepo: mockedSupplierConfigRepo, - supplierQuotasRepo: mockedSupplierQuotasRepo, - }; + beforeEach(() => { jest.clearAllMocks(); }); @@ -244,8 +238,8 @@ describe("createSupplierAllocatorHandler", () => { expect(result.batchItemFailures).toHaveLength(0); - expect(mockSqsClient.send).toHaveBeenCalledTimes(1); - const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0]; + expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(1); + const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock.calls[0][0]; expect(sendCall).toBeInstanceOf(SendMessageCommand); const messageBody = JSON.parse(sendCall.input.MessageBody); @@ -281,8 +275,8 @@ describe("createSupplierAllocatorHandler", () => { expect(result.batchItemFailures).toHaveLength(0); - expect(mockSqsClient.send).toHaveBeenCalledTimes(1); - const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0]; + expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(1); + const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock.calls[0][0]; expect(sendCall).toBeInstanceOf(SendMessageCommand); const messageBody = JSON.parse(sendCall.input.MessageBody); @@ -315,8 +309,8 @@ describe("createSupplierAllocatorHandler", () => { expect(result.batchItemFailures).toHaveLength(0); - expect(mockSqsClient.send).toHaveBeenCalledTimes(1); - const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0]; + expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(1); + const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock.calls[0][0]; const messageBody = JSON.parse(sendCall.input.MessageBody); expect(messageBody.allocationDetails.supplierSpec).toEqual({ supplierId: "supplier1", @@ -361,7 +355,7 @@ describe("createSupplierAllocatorHandler", () => { const handler = createSupplierAllocatorHandler(mockedDeps); await handler(evt, {} as any, {} as any); - const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0]; + const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock.calls[0][0]; const messageBody = JSON.parse(sendCall.input.MessageBody); expect(messageBody.letterEvent.data.domainId).toBe("letter-test"); }); @@ -410,7 +404,7 @@ describe("createSupplierAllocatorHandler", () => { if (!result) throw new Error("expected BatchResponse, got void"); expect(result.batchItemFailures).toHaveLength(0); - expect(mockSqsClient.send).toHaveBeenCalledTimes(2); + expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(2); }); test("returns batch failure for invalid JSON", async () => { @@ -480,7 +474,7 @@ describe("createSupplierAllocatorHandler", () => { process.env.UPSERT_LETTERS_QUEUE_URL = "https://sqs.test.queue"; const sqsError = new Error("SQS send failed"); - (mockSqsClient.send as jest.Mock).mockRejectedValueOnce(sqsError); + (mockedDeps.sqsClient.send as jest.Mock).mockRejectedValueOnce(sqsError); const handler = createSupplierAllocatorHandler(mockedDeps); const result = await handler(evt, {} as any, {} as any); @@ -513,7 +507,7 @@ describe("createSupplierAllocatorHandler", () => { expect(result.batchItemFailures).toHaveLength(1); expect(result.batchItemFailures[0].itemIdentifier).toBe("fail-msg"); - expect(mockSqsClient.send).toHaveBeenCalledTimes(2); + expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(2); }); test("sends correct queue URL in SQS message command", async () => { @@ -529,7 +523,7 @@ describe("createSupplierAllocatorHandler", () => { const handler = createSupplierAllocatorHandler(mockedDeps); await handler(evt, {} as any, {} as any); - const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0]; + const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock.calls[0][0]; expect(sendCall.input.QueueUrl).toBe(queueUrl); }); @@ -557,8 +551,8 @@ describe("createSupplierAllocatorHandler", () => { variantId: "lv1", }), ); - expect(mockSqsClient.send).toHaveBeenCalledTimes(1); - const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0]; + expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(1); + const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock.calls[0][0]; expect(sendCall).toBeInstanceOf(SendMessageCommand); const messageBody = JSON.parse(sendCall.input.MessageBody); @@ -667,8 +661,9 @@ describe("createSupplierAllocatorHandler", () => { variantId: "lv1", }), ); - expect(mockSqsClient.send).toHaveBeenCalledTimes(1); - const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0]; + expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(1); + const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock + .calls[0][0]; expect(sendCall).toBeInstanceOf(SendMessageCommand); const messageBody = JSON.parse(sendCall.input.MessageBody); @@ -711,8 +706,8 @@ describe("createSupplierAllocatorHandler", () => { variantId: "lv1", }), ); - expect(mockSqsClient.send).toHaveBeenCalledTimes(1); - const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0]; + expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(1); + const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock.calls[0][0]; expect(sendCall).toBeInstanceOf(SendMessageCommand); const messageBody = JSON.parse(sendCall.input.MessageBody); @@ -771,4 +766,22 @@ describe("createSupplierAllocatorHandler", () => { expect(result.batchItemFailures).toHaveLength(0); expect(allocationConfig.selectSupplierByFactor).toHaveBeenCalledTimes(2); }); + + test("does not process a message more than once due to idempotency wrapper", async () => { + const preparedEvent = createPreparedV2Event(); + const evt: SQSEvent = createSQSEvent([ + createSqsRecord("msg1", JSON.stringify(preparedEvent)), + ]); + + process.env.UPSERT_LETTERS_QUEUE_URL = "https://sqs.test.queue"; + + setupDefaultMocks(); + (makeIdempotent as jest.Mock).mockImplementationOnce((_fn) => "supplier1"); + + const handler = createSupplierAllocatorHandler(mockedDeps); + await handler(evt, {} as any, {} as any); + + expect(makeIdempotent).toHaveBeenCalledTimes(1); + expect(mockedDeps.sqsClient.send).not.toHaveBeenCalled(); + }); }); diff --git a/lambdas/supplier-allocator/src/handler/allocate-handler.ts b/lambdas/supplier-allocator/src/handler/allocate-handler.ts index ec3cc106d..65e8c3582 100644 --- a/lambdas/supplier-allocator/src/handler/allocate-handler.ts +++ b/lambdas/supplier-allocator/src/handler/allocate-handler.ts @@ -1,4 +1,4 @@ -import { SQSBatchItemFailure, SQSEvent, SQSHandler } from "aws-lambda"; +import { Context, SQSBatchItemFailure, SQSEvent, SQSHandler } from "aws-lambda"; import { SendMessageCommand } from "@aws-sdk/client-sqs"; import { LetterVariant, @@ -13,6 +13,10 @@ import { buildEMFObject, formatGroupId, } from "@internal/helpers"; +import { + IdempotencyConfig, + makeIdempotent, +} from "@aws-lambda-powertools/idempotency"; import { getVariantDetails, getVolumeGroupDetails, @@ -29,6 +33,10 @@ import { import { Deps } from "../config/deps"; import { PreparedEventSchema, PreparedEvents, SupplierDetails } from "./types"; +const idempotencyConfig = new IdempotencyConfig({ + eventKeyJmesPath: "data.domainId", +}); + function parseQueueMessage(queueMessage: string): PreparedEvents { const result = PreparedEventSchema.safeParse(queueMessage); @@ -225,20 +233,21 @@ function emitDataMetrics( } function incrementAllocation( - volumeGroupAllocation: VolumeGroupAllocation, + volumeGroupAllocations: VolumeGroupAllocation, volumeGroupId: string, supplierId: string, allocation: number, deps: Deps, ) { - const groupAllocations = volumeGroupAllocation.get(volumeGroupId) ?? {}; + const groupAllocations = volumeGroupAllocations.get(volumeGroupId) ?? {}; groupAllocations[supplierId] = (groupAllocations[supplierId] ?? 0) + allocation; - volumeGroupAllocation.set(volumeGroupId, groupAllocations); + volumeGroupAllocations.set(volumeGroupId, groupAllocations); deps.logger.info({ description: "Updated allocations for volume group and supplier", volumeGroupId, groupAllocations, + setVolumeGroupAllocations: volumeGroupAllocations.get(volumeGroupId), }); } @@ -258,13 +267,109 @@ async function saveAllocations( } } +type SupplierAllocationResult = { + supplier: string; + priority: string; +}; + +async function processSupplierAllocation( + letterEvent: PreparedEvents, + deps: Deps, + perAllocationSuccess: AllocationMetrics, + perAllocationFailure: AllocationMetrics, + volumeGroupAllocations: VolumeGroupAllocation, +): Promise { + const supplierDetails: SupplierDetails = await getSupplierFromConfig( + letterEvent, + deps, + ); + deps.logger.info({ + description: "Resolved supplier details from config", + supplierDetails, + }); + const supplierSpec = supplierDetails?.allocationDetails?.supplierSpec; + + const supplier = supplierSpec.supplierId; + const priority = String(supplierSpec.priority); + + if (supplierDetails.allocationDetails.allocationStatus.status === "PENDING") { + incrementMetric(perAllocationSuccess, supplier, priority); + emitDataMetrics(letterEvent, supplier, "extra_data_dimensions", deps); + + incrementAllocation( + volumeGroupAllocations, + supplierDetails.volumeGroupId, + supplier, + 1, + deps, + ); + } else { + incrementMetric(perAllocationFailure, supplier, priority); + } + + // Send to allocated letters queue + const queueUrl = process.env.UPSERT_LETTERS_QUEUE_URL; + if (!queueUrl) { + throw new Error("UPSERT_LETTERS_QUEUE_URL not configured"); + } + + const queueMessage = { + letterEvent, + allocationDetails: supplierDetails.allocationDetails, + }; + + deps.logger.info({ + description: "Sending message to upsert letter queue", + msg: queueMessage, + url: queueUrl, + }); + + await deps.sqsClient.send( + new SendMessageCommand({ + QueueUrl: queueUrl, + MessageBody: JSON.stringify(queueMessage), + }), + ); + return { + supplier, + priority, + }; +} + export default function createSupplierAllocatorHandler(deps: Deps): SQSHandler { - return async (event: SQSEvent) => { + const getSupplierIdempotently = ( + perAllocationSuccess: AllocationMetrics, + perAllocationFailure: AllocationMetrics, + volumeGroupAllocations: VolumeGroupAllocation, + ) => { + return makeIdempotent( + (letterEvent: PreparedEvents, depsInner: Deps) => + processSupplierAllocation( + letterEvent, + depsInner, + perAllocationSuccess, + perAllocationFailure, + volumeGroupAllocations, + ), + { + persistenceStore: deps.idempotencyLayer, + config: idempotencyConfig, + }, + ); + }; + return async (event: SQSEvent, context: Context) => { const batchItemFailures: SQSBatchItemFailure[] = []; const perAllocationSuccess: AllocationMetrics = new Map(); const perAllocationFailure: AllocationMetrics = new Map(); const volumeGroupAllocations: VolumeGroupAllocation = new Map(); + // create an idempotent function bound to this handler's batchItemFailures + const boundGetSupplierIdempotently = getSupplierIdempotently( + perAllocationSuccess, + perAllocationFailure, + volumeGroupAllocations, + ); + const tasks = event.Records.map(async (record) => { let supplier = "unknown"; let priority = "unknown"; @@ -275,65 +380,17 @@ export default function createSupplierAllocatorHandler(deps: Deps): SQSHandler { deps.logger.info({ description: "Extracted letter event", messageId: record.messageId, + domainId: letterEvent.data.domainId, + letterVariantId: letterEvent.data.letterVariantId, }); - const supplierDetails: SupplierDetails = await getSupplierFromConfig( - letterEvent, - deps, - ); - - deps.logger.info({ - description: "Resolved supplier details from config", - supplierDetails, - }); - const supplierSpec = supplierDetails?.allocationDetails?.supplierSpec; - - supplier = supplierSpec.supplierId; - priority = String(supplierSpec.priority); - - if ( - supplierDetails.allocationDetails.allocationStatus.status === - "PENDING" - ) { - incrementMetric(perAllocationSuccess, supplier, priority); - - incrementAllocation( - volumeGroupAllocations, - supplierDetails.volumeGroupId, - supplier, - 1, - deps, - ); - } else { - incrementMetric(perAllocationFailure, supplier, priority); - } - - // Send to allocated letters queue - const queueUrl = process.env.UPSERT_LETTERS_QUEUE_URL; - if (!queueUrl) { - throw new Error("UPSERT_LETTERS_QUEUE_URL not configured"); - } - - const queueMessage = { - letterEvent, - allocationDetails: supplierDetails.allocationDetails, - }; - - deps.logger.info({ - description: "Sending message to upsert letter queue", - msg: queueMessage, - url: queueUrl, - }); + idempotencyConfig.registerLambdaContext(context); - await deps.sqsClient.send( - new SendMessageCommand({ - QueueUrl: queueUrl, - MessageBody: JSON.stringify(queueMessage), - }), - ); + const supplierAllocationResult: SupplierAllocationResult = + await boundGetSupplierIdempotently(letterEvent, deps); - incrementMetric(perAllocationSuccess, supplier, priority); - emitDataMetrics(letterEvent, supplier, "extra_data_dimensions", deps); + supplier = supplierAllocationResult.supplier; + priority = supplierAllocationResult.priority; } catch (error) { deps.logger.error({ description: "Error processing allocation of record", diff --git a/package-lock.json b/package-lock.json index b997674e0..f06ef4ac0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -248,6 +248,7 @@ "name": "nhs-notify-supplier-api-allocate-letter", "version": "0.0.1", "dependencies": { + "@aws-lambda-powertools/idempotency": "^2.33.0", "@aws-sdk/client-dynamodb": "^3.858.0", "@aws-sdk/client-sqs": "^3.984.0", "@aws-sdk/lib-dynamodb": "^3.1044.0", diff --git a/tests/component-tests/allocation-tests/supplier-allocation.spec.ts b/tests/component-tests/allocation-tests/supplier-allocation.spec.ts new file mode 100644 index 000000000..37e7ab0dc --- /dev/null +++ b/tests/component-tests/allocation-tests/supplier-allocation.spec.ts @@ -0,0 +1,122 @@ +import { expect, test } from "@playwright/test"; +import { + getTotalAllocationForVolumeGroup, + getTotalDailyAllocation, +} from "tests/helpers/supplier-quotas-helper"; +import { + pollSupplierAllocatorForAllocationDetails, + supplierIdFromSupplierAllocatorLog, +} from "tests/helpers/aws-cloudwatch-helper"; +import { logger } from "tests/helpers/pino-logger"; +import { format } from "date-fns"; +import { toZonedTime } from "date-fns-tz"; +import { randomUUID } from "node:crypto"; +import { createPreparedV1Event } from "tests/helpers/event-fixtures"; +import { sendSnsBatchEvent } from "tests/helpers/send-sns-event"; + +test.describe("Supplier Allocation Tests", () => { + test("Verify that successful supplier allocation emits a PENDING event for the allocated supplier", async () => { + test.setTimeout(180_000); // 3 minutes for long running polling + const domainId = randomUUID(); + logger.info(`Testing supplier allocation with domainId: ${domainId}`); + const preparedEvent = createPreparedV1Event({ domainId }); + const response = await sendSnsBatchEvent([ + { id: preparedEvent.id, message: preparedEvent }, + ]); + expect(response.Successful).toHaveLength(1); + + // poll supplier allocator to check if supplier has been allocated + const allocationDetails = + await pollSupplierAllocatorForAllocationDetails(domainId); + + const supplierId = allocationDetails?.supplierSpec?.supplierId; + + const status = allocationDetails?.allocationStatus?.status; + + expect(supplierId).toBeTruthy(); + expect(status).toBe("PENDING"); + + logger.info( + `Supplier ${supplierId} allocated with status ${status} for domainId ${domainId} in supplier allocator lambda`, + ); + }); + + test("Verify that supplier allocator emits a rejected request for an unknown letter variant", async () => { + test.setTimeout(180_000); // 3 minutes for long running polling + const domainId = randomUUID(); + logger.info( + `Testing supplier allocation with unknown letter variant for domainId: ${domainId}`, + ); + const preparedEvent = createPreparedV1Event({ + domainId, + letterVariantId: "unknown-letter-variant", + }); + const response = await sendSnsBatchEvent([ + { id: preparedEvent.id, message: preparedEvent }, + ]); + expect(response.Successful).toHaveLength(1); + const allocationDetails = + await pollSupplierAllocatorForAllocationDetails(domainId); + + const supplierId = allocationDetails?.supplierSpec?.supplierId; + + const status = allocationDetails?.allocationStatus?.status; + + expect(supplierId).toBe("unknown"); + expect(status).toBe("REJECTED"); + }); + + test("Verify that supplier allocations are correctly updated for a volume group", async () => { + test.setTimeout(180_000); + const volumeGroupId = "volumeGroup-test1"; + const originalTotalAllocation = + await getTotalAllocationForVolumeGroup(volumeGroupId); + logger.info( + `Total allocation for volume group ${volumeGroupId}: ${originalTotalAllocation}`, + ); + + const allocationDate = format( + toZonedTime(new Date(), "Europe/London"), + "yyyy-MM-dd", + ); + const originalTotalDailyAllocation = + await getTotalDailyAllocation(allocationDate); + logger.info( + `Total daily allocation for date ${allocationDate}: ${originalTotalDailyAllocation}`, + ); + + // Create 2 messages with same domain id + const domainId = randomUUID(); + + const message1 = createPreparedV1Event({ + domainId, + letterVariantId: "notify-standard-test1", + }); + const message2 = createPreparedV1Event({ + domainId, + letterVariantId: "notify-standard-test1", + }); + + const eventBatch = [message1, message2]; + const response = await sendSnsBatchEvent( + eventBatch.map((event) => ({ id: event.id, message: event })), + ); + expect(response.Successful).toHaveLength(eventBatch.length); + + await supplierIdFromSupplierAllocatorLog(domainId); + + const newTotalAllocation = + await getTotalAllocationForVolumeGroup(volumeGroupId); + logger.info( + `New total allocation for volume group ${volumeGroupId}: ${newTotalAllocation}`, + ); + expect(newTotalAllocation).toBe(originalTotalAllocation + 1); + + const newTotalDailyAllocation = + await getTotalDailyAllocation(allocationDate); + logger.info( + `New total daily allocation for date ${allocationDate}: ${newTotalDailyAllocation}`, + ); + expect(newTotalDailyAllocation).toBe(originalTotalDailyAllocation + 1); + }); +}); diff --git a/tests/component-tests/events-tests/event-subscription.spec.ts b/tests/component-tests/events-tests/event-subscription.spec.ts index 7faf8c9a1..2eb1a3373 100644 --- a/tests/component-tests/events-tests/event-subscription.spec.ts +++ b/tests/component-tests/events-tests/event-subscription.spec.ts @@ -1,6 +1,10 @@ import { expect, test } from "@playwright/test"; import { sendSnsEvent } from "tests/helpers/send-sns-event"; -import { createPreparedV1Event } from "tests/helpers/event-fixtures"; +import { sendSqsEvent } from "tests/helpers/send-sqs-event"; +import { + createPendingAllocatedEvent, + createPreparedV1Event, +} from "tests/helpers/event-fixtures"; import { randomUUID } from "node:crypto"; import { logger } from "tests/helpers/pino-logger"; import { createValidRequestHeaders } from "tests/constants/request-headers"; @@ -91,35 +95,14 @@ test.describe("Event Subscription SNS Tests", () => { ); expect(getLetterResponse.status()).toBe(404); }); - - test("Verify that an error is logged for a duplicate letter id", async () => { + test("Verify that an error is logged for duplicates sent on the sqs queue", async () => { const domainId = randomUUID(); logger.info(`Testing event subscription with domainId: ${domainId}`); - const preparedEvent1 = createPreparedV1Event({ domainId }); - const response1 = await sendSnsEvent(preparedEvent1); - - expect(response1.MessageId).toBeTruthy(); - - // poll supplier allocator to check if supplier has been allocated - const message = await pollSupplierAllocatorLogForResolvedSpec(domainId); - const supplierAllocatorLog = JSON.parse(message) as { - msg?: { allocationDetails?: { supplierSpec?: { supplierId?: string } } }; - }; - const supplierId = - supplierAllocatorLog.msg?.allocationDetails?.supplierSpec?.supplierId; - - logger.info( - `Supplier ${supplierId} allocated for domainId ${domainId} in supplier allocator lambda`, - ); - if (!supplierId) { - throw new Error("supplierId was not found in supplier allocator log"); - } - - const preparedEvent2 = createPreparedV1Event({ domainId }); - const response2 = await sendSnsEvent(preparedEvent2); - expect(response2.MessageId).toBeTruthy(); + const pendingEvent = createPendingAllocatedEvent({ domainId }); + await sendSqsEvent(JSON.stringify(pendingEvent)); + const pendingEventDuplicate = createPendingAllocatedEvent({ domainId }); + await sendSqsEvent(JSON.stringify(pendingEventDuplicate)); - // poll supplier upsert to check if duplicate letter id was processed await pollUpsertLetterLogForWarning("Letter already exists", domainId); }); }); diff --git a/tests/component-tests/letterQueue-tests/queue-operations.spec.ts b/tests/component-tests/letterQueue-tests/queue-operations.spec.ts index 1810f273c..c7ae75bb7 100644 --- a/tests/component-tests/letterQueue-tests/queue-operations.spec.ts +++ b/tests/component-tests/letterQueue-tests/queue-operations.spec.ts @@ -1,11 +1,12 @@ import { expect, test } from "@playwright/test"; import { randomUUID } from "node:crypto"; import { - createPreparedEventBatchWithSameDomainId, + createPendingEventBatchWithSameDomainId, createPreparedV1Event, } from "tests/helpers/event-fixtures"; import { logger } from "tests/helpers/pino-logger"; -import { sendSnsBatchEvent, sendSnsEvent } from "tests/helpers/send-sns-event"; +import { sendSnsEvent } from "tests/helpers/send-sns-event"; +import { sendSqsEventBatch } from "tests/helpers/send-sqs-event"; import { pollUpsertLetterLogForWarning, supplierIdFromSupplierAllocatorLog, @@ -80,27 +81,19 @@ test.describe("Letter Queue Tests", () => { test("Verify if the only one entry is inserted in the letter queue table for a batch of events with the same letterId", async () => { const letterId = randomUUID(); - const eventBatch = createPreparedEventBatchWithSameDomainId({ + const eventBatch = createPendingEventBatchWithSameDomainId({ domainId: letterId, }); logger.info( `Sending batch event with ${eventBatch.length} events ${letterId}`, ); - const response = await sendSnsBatchEvent( - eventBatch.map((event) => ({ - id: event.id, - message: event, - })), - ); - expect(response.Successful).toHaveLength(eventBatch.length); - - const supplierId = await supplierIdFromSupplierAllocatorLog(letterId); + await sendSqsEventBatch(eventBatch.map((event) => JSON.stringify(event))); logger.info( `Verifying duplicate queue inserts are ignored for the batch of events with same letterId ${letterId}`, ); const [letterExists, itemCount] = await checkLetterQueueTable( - supplierId, + "supplier1", letterId, ); expect(letterExists).toBe(true); diff --git a/tests/config/main.config.ts b/tests/config/main.config.ts index ec77660b7..f3d13b0d7 100644 --- a/tests/config/main.config.ts +++ b/tests/config/main.config.ts @@ -32,6 +32,11 @@ const localConfig: PlaywrightTestConfig = { testDir: path.resolve(__dirname, "../component-tests/integration-tests"), testMatch: "**/*.spec.ts", }, + { + name: "allocation-tests", + testDir: path.resolve(__dirname, "../component-tests/allocation-tests"), + testMatch: "**/*.spec.ts", + }, ], }; diff --git a/tests/constants/api-constants.ts b/tests/constants/api-constants.ts index 608fb75d8..e1cda7268 100644 --- a/tests/constants/api-constants.ts +++ b/tests/constants/api-constants.ts @@ -16,3 +16,5 @@ export const EVENT_SUBSCRIPTION_TOPIC_ARN = process.env.EVENT_SUBSCRIPTION_TOPIC_ARN ?? `arn:aws:sns:${AWS_REGION}:${AWS_ACCOUNT_ID}:${EVENT_SUBSCRIPTION_TOPIC_NAME}`; export const LETTERQUEUE_TABLENAME = `nhs-${envName}-supapi-letter-queue`; +export const SUPPLIER_QUOTAS_TABLENAME = `nhs-${envName}-supapi-supplier-quotas`; +export const UPSERT_LETTERS_QUEUE_URL = `https://sqs.${AWS_REGION}.amazonaws.com/${AWS_ACCOUNT_ID}/nhs-${envName}-supapi-letter-updates-queue`; diff --git a/tests/helpers/aws-cloudwatch-helper.ts b/tests/helpers/aws-cloudwatch-helper.ts index facd450c4..3a398066e 100644 --- a/tests/helpers/aws-cloudwatch-helper.ts +++ b/tests/helpers/aws-cloudwatch-helper.ts @@ -97,12 +97,10 @@ export async function pollUpsertLetterLogForWarning( export async function supplierIdFromSupplierAllocatorLog( domainId: string, ): Promise { - const message = await pollSupplierAllocatorLogForResolvedSpec(domainId); - const supplierAllocatorLog = JSON.parse(message) as { - msg?: { allocationDetails?: { supplierSpec?: { supplierId?: string } } }; - }; - const supplierId = - supplierAllocatorLog.msg?.allocationDetails?.supplierSpec?.supplierId; + const allocationDetails = + await pollSupplierAllocatorForAllocationDetails(domainId); + + const supplierId = allocationDetails?.supplierSpec?.supplierId; logger.info( `Supplier ${supplierId} allocated for domainId ${domainId} in supplier allocator lambda`, @@ -113,3 +111,18 @@ export async function supplierIdFromSupplierAllocatorLog( } return supplierId; } + +export async function pollSupplierAllocatorForAllocationDetails( + domainId: string, +) { + const message = await pollSupplierAllocatorLogForResolvedSpec(domainId); + const supplierAllocatorLog = JSON.parse(message) as { + msg?: { + allocationDetails?: { + supplierSpec?: { supplierId?: string }; + allocationStatus?: { status?: string }; + }; + }; + }; + return supplierAllocatorLog.msg?.allocationDetails; +} diff --git a/tests/helpers/event-fixtures.ts b/tests/helpers/event-fixtures.ts index 4e71f6f2e..3d82004d6 100644 --- a/tests/helpers/event-fixtures.ts +++ b/tests/helpers/event-fixtures.ts @@ -39,12 +39,34 @@ export function createPreparedV1Event(overrides: Record = {}) { }; } -export function createPreparedEventBatchWithSameDomainId( +export function createPendingEventBatchWithSameDomainId( overrides: Record = {}, ) { return [ - createPreparedV1Event(overrides), - createPreparedV1Event(overrides), - createPreparedV1Event(overrides), + createPendingAllocatedEvent(overrides), + createPendingAllocatedEvent(overrides), + createPendingAllocatedEvent(overrides), ]; } + +export function createPendingAllocatedEvent( + overrides: Record = {}, +) { + const letterEvent = createPreparedV1Event(overrides); + const allocationDetails = { + supplierSpec: { + supplierId: "supplier1", + specId: "spec1", + priority: 1, + billingId: "billing1", + }, + allocationStatus: { + status: "PENDING", + }, + }; + + return { + letterEvent, + allocationDetails, + }; +} diff --git a/tests/helpers/send-sqs-event.ts b/tests/helpers/send-sqs-event.ts new file mode 100644 index 000000000..c7789bc5e --- /dev/null +++ b/tests/helpers/send-sqs-event.ts @@ -0,0 +1,29 @@ +import { + SQSClient, + SendMessageBatchCommand, + SendMessageCommand, +} from "@aws-sdk/client-sqs"; +import { UPSERT_LETTERS_QUEUE_URL } from "tests/constants/api-constants"; + +const sqsClient = new SQSClient({}); + +export async function sendSqsEvent(messageBody: string): Promise { + await sqsClient.send( + new SendMessageCommand({ + QueueUrl: UPSERT_LETTERS_QUEUE_URL, + MessageBody: messageBody, + }), + ); +} + +export async function sendSqsEventBatch(messages: string[]): Promise { + await sqsClient.send( + new SendMessageBatchCommand({ + QueueUrl: UPSERT_LETTERS_QUEUE_URL, + Entries: messages.map((message, index) => ({ + Id: `message-${index}`, + MessageBody: message, + })), + }), + ); +} diff --git a/tests/helpers/supplier-quotas-helper.ts b/tests/helpers/supplier-quotas-helper.ts new file mode 100644 index 000000000..680e9d920 --- /dev/null +++ b/tests/helpers/supplier-quotas-helper.ts @@ -0,0 +1,105 @@ +import { DynamoDBClient } from "@aws-sdk/client-dynamodb"; +import { DynamoDBDocumentClient, GetCommand } from "@aws-sdk/lib-dynamodb"; +import z from "zod"; +import { SUPPLIER_QUOTAS_TABLENAME } from "../constants/api-constants"; +import { logger } from "./pino-logger"; + +const ddb = new DynamoDBClient({}); +const docClient = DynamoDBDocumentClient.from(ddb); + +export const overallAllocationSchema = z.object({ + id: z.string(), + volumeGroup: z.string(), + allocations: z.record(z.string(), z.number()), +}); + +export const dailyAllocationSchema = z.object({ + id: z.string(), + date: z.string(), + allocations: z.record(z.string(), z.number()), +}); + +export async function getTotalAllocationForVolumeGroup( + volumeGroupId: string, +): Promise { + try { + const result = await docClient.send( + new GetCommand({ + TableName: SUPPLIER_QUOTAS_TABLENAME, + Key: { pk: "ENTITY#overall-allocation", sk: `ID#${volumeGroupId}` }, + }), + ); + logger.info(`Selecting from table name: ${SUPPLIER_QUOTAS_TABLENAME}`); + + if (!result.Item) { + logger.warn( + `No overall allocation found for volume group ${volumeGroupId}`, + ); + return 0; // Default to 0 if no allocation record exists + } + // Strip DynamoDB keys before parsing + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const { pk, sk, ...item } = result.Item; + const overallAllocation = overallAllocationSchema.parse(item); + + logger.info( + `Fetched overall allocation for volume group ${volumeGroupId}: ${JSON.stringify(overallAllocation)}`, + ); + const { allocations } = overallAllocation; + + const totalAllocation = Object.values(allocations).reduce( + (sum, allocation) => sum + allocation, + 0, + ); + logger.info( + `Fetched overall allocation for volume group ${volumeGroupId}: ${totalAllocation}`, + ); + return totalAllocation; + } catch (error) { + logger.error( + `Error fetching overall allocation for volume group ${volumeGroupId}: ${error}`, + ); + throw error; + } +} +export async function getTotalDailyAllocation( + allocationDate: string, +): Promise { + try { + const result = await docClient.send( + new GetCommand({ + TableName: SUPPLIER_QUOTAS_TABLENAME, + Key: { pk: "ENTITY#daily-allocation", sk: `ID#${allocationDate}` }, + }), + ); + logger.info(`Selecting from table name: ${SUPPLIER_QUOTAS_TABLENAME}`); + + if (!result.Item) { + logger.warn(`No daily allocation found for date ${allocationDate}`); + return 0; // Default to 0 if no allocation record exists + } + // Strip DynamoDB keys before parsing + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const { pk, sk, ...item } = result.Item; + const dailyAllocation = dailyAllocationSchema.parse(item); + + logger.info( + `Fetched daily allocation for date ${allocationDate}: ${JSON.stringify(dailyAllocation)}`, + ); + const { allocations } = dailyAllocation; + + const totalAllocation = Object.values(allocations).reduce( + (sum, allocation) => sum + allocation, + 0, + ); + logger.info( + `Fetched daily allocation for date ${allocationDate}: ${totalAllocation}`, + ); + return totalAllocation; + } catch (error) { + logger.error( + `Error fetching daily allocation for date ${allocationDate}: ${error}`, + ); + throw error; + } +}