From b2966f09e442929a11473f798fb9093c41028b37 Mon Sep 17 00:00:00 2001 From: AndyFlintAnswerDigital Date: Tue, 31 Mar 2026 18:30:28 +0100 Subject: [PATCH] [PRM-666] REVERTED - Gone back to AWS SDK v1 due to extended client --- services/ehr-transfer-service/build.gradle | 25 ++- .../configuration/LocalStackAwsConfig.java | 204 ++++++++++-------- .../config/S3ClientSpringConfiguration.java | 7 + .../config/SnsClientSpringConfiguration.java | 11 +- .../config/SnsExtendedClient.java | 22 ++ .../config/SqsClientSpringConfiguration.java | 8 + .../config/SqsExtendedClient.java | 24 +++ .../SqsListenerSpringConfiguration.java | 21 +- .../NegativeAcknowledgementListener.java | 8 +- .../listeners/S3ExtendedMessageListener.java | 6 +- .../ehrtransferservice/logging/Tracer.java | 4 +- .../message_publishers/MessagePublisher.java | 31 ++- .../parsers/S3ExtendedMessageFetcher.java | 4 +- .../RepoIncomingEventListener.java | 8 +- .../ehrtransferservice/config/TracerTest.java | 2 +- .../config/UpdateableTraceContextTest.java | 3 +- .../RepoIncomingEventListenerTest.java | 2 +- 17 files changed, 250 insertions(+), 140 deletions(-) create mode 100644 services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SnsExtendedClient.java create mode 100644 services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SqsExtendedClient.java diff --git a/services/ehr-transfer-service/build.gradle b/services/ehr-transfer-service/build.gradle index 43346e78..82ea6a01 100644 --- a/services/ehr-transfer-service/build.gradle +++ b/services/ehr-transfer-service/build.gradle @@ -70,9 +70,28 @@ dependencies { implementation 'software.amazon.awssdk:s3' implementation 'software.amazon.awssdk:dynamodb' - implementation 'com.amazonaws:amazon-sqs-java-extended-client-lib:2.1.2' - implementation 'com.amazonaws:amazon-sqs-java-messaging-lib:2.1.4' - implementation 'software.amazon.payloadoffloading:payloadoffloading-common:2.1.0' + // OUT OF DATE - REINTRODUCED TO ALLOW SNS/SQS extended client libraries to be used without needing to upgrade the AWS SDK to v2. + // Once we upgrade amazon-sqs-java-messaging-lib to v2 we can remove aws-java-sdk-core and sqs. + implementation 'com.amazonaws:amazon-sqs-java-extended-client-lib:1.2.2' + implementation 'software.amazon.sns:sns-extended-client:1.1.3' + implementation 'software.amazon.payloadoffloading:payloadoffloading-common:1.1.1' + + /** + * TODO FYI to future developers - Here be dragons. + * When Amazon released the v2 AWS SDK, they did not release a v2 version of the SNS and SQS extended libraries + * The system receives EHR cores/fragments of up to 5MB in size. These are too large to place on an SQS queue + * which has a hard limit of 256KB. The system architecture relies on the SQS extended client library to offload + * large messages to S3 and place a reference on the queue. (See the 'handlers' package) + * Either + * A (UNLIKELY) - Amazon Release a v2 version of the SQS extended client library and we can upgrade to it and + * remove the v1 AWS SDK dependencies + * B - We implement our own offloading mechanism to remove the dependency on the v1 AWS SDK and the SQS extended client library. + * This would be a non-trivial amount of work and would require significant testing to ensure we don't lose messages or have issues with message visibility timeouts etc. + * C - We remove the 'internal queues' of this service and the S3 buckets that SQS offloads to + */ + implementation 'com.amazonaws:amazon-sqs-java-messaging-lib:1.1.2' + implementation 'com.amazonaws:aws-java-sdk-core:1.12.368' + implementation 'com.amazonaws:aws-java-sdk-sqs:1.12.533' compileOnly 'org.projectlombok:lombok:1.18.42' annotationProcessor 'org.projectlombok:lombok' diff --git a/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/configuration/LocalStackAwsConfig.java b/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/configuration/LocalStackAwsConfig.java index f3532f63..23267372 100644 --- a/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/configuration/LocalStackAwsConfig.java +++ b/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/configuration/LocalStackAwsConfig.java @@ -1,5 +1,16 @@ package uk.nhs.prm.repo.ehrtransferservice.configuration; +import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient; +import com.amazon.sqs.javamessaging.ExtendedClientConfiguration; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.sns.AmazonSNS; +import com.amazonaws.services.sns.AmazonSNSClientBuilder; +import com.amazonaws.services.sqs.AmazonSQSAsync; +import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder; import jakarta.annotation.PostConstruct; import jakarta.jms.ConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory; @@ -25,15 +36,16 @@ import software.amazon.awssdk.services.sns.model.CreateTopicResponse; import software.amazon.awssdk.services.sns.model.SubscribeRequest; import software.amazon.awssdk.services.sqs.SqsClient; -import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; -import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest; -import software.amazon.awssdk.services.sqs.model.QueueAttributeName; +import software.amazon.payloadoffloading.S3BackedPayloadStore; +import software.amazon.payloadoffloading.S3Dao; +import software.amazon.sns.AmazonSNSExtendedClient; +import software.amazon.sns.SNSExtendedClientConfiguration; import java.net.URI; import java.net.URISyntaxException; import java.util.*; -import static jakarta.jms.Session.CLIENT_ACKNOWLEDGE; +import static javax.jms.Session.CLIENT_ACKNOWLEDGE; import static uk.nhs.prm.repo.ehrtransferservice.database.enumeration.TransferTableAttribute.*; @TestConfiguration @@ -43,7 +55,7 @@ public class LocalStackAwsConfig { private S3Client s3Client; @Autowired - private SqsClient sqsClient; + private AmazonSQSAsync amazonSQSAsync; @Autowired private DynamoDbClient dynamoDbClient; @@ -110,24 +122,17 @@ public class LocalStackAwsConfig { private static final long DYNAMO_WRITE_CAPACITY_UNITS = 5L; @Bean - public static SqsClient sqsClient( - @Value("${localstack.url}") String localstackUrl, - @Value("${aws.region}") String region - ) throws URISyntaxException { + public static SqsClient sqsClient(@Value("${localstack.url}") String localstackUrl) throws URISyntaxException { return SqsClient.builder() - .credentialsProvider(() -> AwsBasicCredentials.create("LSIAQAAAAAAVNCBMPNSG", "LSIAQAAAAAAVNCBMPNSG")) + .credentialsProvider((()-> AwsBasicCredentials.create("LSIAQAAAAAAVNCBMPNSG", "LSIAQAAAAAAVNCBMPNSG"))) .endpointOverride(new URI(localstackUrl)) - .region(Region.of(region)) .build(); } @Bean - public JmsListenerContainerFactory myFactory( - ConnectionFactory connectionFactory - ) { + public JmsListenerContainerFactory myFactory() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setSessionAcknowledgeMode(CLIENT_ACKNOWLEDGE); - factory.setConnectionFactory(connectionFactory); return factory; } @@ -141,13 +146,22 @@ public ConnectionFactory connectionFactory() { } @Bean - public static S3Client s3Client( - @Value("${localstack.url}") String localstackUrl, - @Value("${aws.region}") String region - ) { + public AmazonS3 amazonS3(@Value("${localstack.url}") String localstackUrl) { + return AmazonS3ClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("LSIAQAAAAAAVNCBMPNSG", "LSIAQAAAAAAVNCBMPNSG"))) + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(localstackUrl, "eu-west-2")) + .build(); + } + + // TODO: this S3Client bean is used to setup large message bucket only. + // the real dependency used in code is the one above (AmazonS3 / v1). + // Therefore: find a way to create the bucket - setting GrantFullControl using + // the class above, then get rid of this S3Client / v2. + @Bean + public static S3Client s3Client(@Value("${localstack.url}") String localstackUrl) { return S3Client.builder() .endpointOverride(URI.create(localstackUrl)) - .region(Region.of(region)) + .region(Region.EU_WEST_2) .credentialsProvider(StaticCredentialsProvider.create(new AwsCredentials() { @Override public String accessKeyId() { @@ -171,13 +185,36 @@ private String failoverUrl() { } @Bean - public static SnsClient snsClient( - @Value("${localstack.url}") String localstackUrl, - @Value("${aws.region}") String region - ) { + public static AmazonSQSAsync amazonSQSAsync(@Value("${localstack.url}") String localstackUrl) { + return AmazonSQSAsyncClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("LSIAQAAAAAAVNCBMPNSG", "LSIAQAAAAAAVNCBMPNSG"))) + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(localstackUrl, "eu-west-2")) + .build(); + } + + @Bean + public static AmazonSQSExtendedClient s3SupportedSqsClient(AmazonSQSAsync sqsClient, AmazonS3 amazonS3, @Value("${aws.sqsLargeMessageBucketName}") String sqsLargeMessageBucketName) { + return new AmazonSQSExtendedClient(sqsClient, new ExtendedClientConfiguration().withPayloadSupportEnabled(amazonS3, sqsLargeMessageBucketName, true)); + } + + @Bean + public static AmazonSNS amazonSNS(@Value("${localstack.url}") String localstackUrl) { + return AmazonSNSClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("LSIAQAAAAAAVNCBMPNSG", "LSIAQAAAAAAVNCBMPNSG"))) + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(localstackUrl, "eu-west-2")) + .build(); + } + + @Bean + public static AmazonSNSExtendedClient s3SupportedSnsClient(AmazonSNS amazonSNS, AmazonS3 amazonS3, @Value("${aws.sqsLargeMessageBucketName}") String sqsLargeMessageBucketName) { + return new AmazonSNSExtendedClient(amazonSNS, new SNSExtendedClientConfiguration(), new S3BackedPayloadStore(new S3Dao(amazonS3), sqsLargeMessageBucketName)); + } + + @Bean + public static SnsClient snsClient(@Value("${localstack.url}") String localstackUrl) { return SnsClient.builder() .endpointOverride(URI.create(localstackUrl)) - .region(Region.of(region)) + .region(Region.EU_WEST_2) .credentialsProvider(StaticCredentialsProvider.create(new AwsCredentials() { @Override public String accessKeyId() { @@ -193,13 +230,10 @@ public String secretAccessKey() { } @Bean - public static DynamoDbClient dynamoDbClient( - @Value("${localstack.url}") String localstackUrl, - @Value("${aws.region}") String region - ) { + public static DynamoDbClient dynamoDbClient(@Value("${localstack.url}") String localstackUrl) { return DynamoDbClient.builder() .endpointOverride(URI.create(localstackUrl)) - .region(Region.of(region)) + .region(Region.EU_WEST_2) .credentialsProvider( StaticCredentialsProvider.create(new AwsCredentials() { @Override @@ -223,28 +257,51 @@ public void setupTestQueuesAndTopics() { } private void setUpQueueAndTopics() { - createQueue(repoIncomingQueueName); - - createSnsTopic("test_splunk_uploader_topic"); - - createQueueAndSnsReceiverSubscription(transferCompleteQueueName, "test_transfer_complete_topic"); - createQueueAndSnsReceiverSubscription(largeEhrQueueName, "test_large_ehr_topic"); - createQueueAndSnsReceiverSubscription(positiveAcksQueueName, "test_positive_acks_topic"); - createQueueAndSnsReceiverSubscription(parsingDlqQueueName, "test_dlq_topic"); - createQueueAndSnsReceiverSubscription(ehrCompleteQueueName, "test_ehr_complete_topic"); - createQueueAndSnsReceiverSubscription(nackInternalQueueName, "test_negative_acks_topic"); - createQueueAndSnsReceiverSubscription("ehr_in_unhandled_queue", "test_ehr_in_unhandled_topic"); - - createQueueAndObservabilityQueueAndSnsReceiverSubscriptions( - largeMessageFragmentsQueueName, - largeMessageFragmentsObservabilityQueueName, - "test_large_message_fragments_topic"); - - createQueueAndObservabilityQueueAndSnsReceiverSubscriptions( - smallEhrQueueName, - smallEhrObservabilityQueueName, - "test_small_ehr_topic" - ); + amazonSQSAsync.createQueue(repoIncomingQueueName); + + var fragmentQueue = amazonSQSAsync.createQueue(largeMessageFragmentsQueueName); + var fragmentsTopic = snsClient.createTopic(CreateTopicRequest.builder().name("test_large_message_fragments_topic").build()); + createSnsTestReceiverSubscription(fragmentsTopic, getQueueArn(fragmentQueue.getQueueUrl())); + + var transferCompleteQueue = amazonSQSAsync.createQueue(transferCompleteQueueName); + var transferCompleteTopic = snsClient.createTopic(CreateTopicRequest.builder().name("test_transfer_complete_topic").build()); + createSnsTestReceiverSubscription(transferCompleteTopic, getQueueArn(transferCompleteQueue.getQueueUrl())); + + var fragmentObservabilityQueue = amazonSQSAsync.createQueue(largeMessageFragmentsObservabilityQueueName); + createSnsTestReceiverSubscription(fragmentsTopic, getQueueArn(fragmentObservabilityQueue.getQueueUrl())); + + var smallEhrQueue = amazonSQSAsync.createQueue(smallEhrQueueName); + var smallEhrTopic = snsClient.createTopic(CreateTopicRequest.builder().name("test_small_ehr_topic").build()); + createSnsTestReceiverSubscription(smallEhrTopic, getQueueArn(smallEhrQueue.getQueueUrl())); + + var smallEhrObservabilityQueue = amazonSQSAsync.createQueue(smallEhrObservabilityQueueName); + createSnsTestReceiverSubscription(smallEhrTopic, getQueueArn(smallEhrObservabilityQueue.getQueueUrl())); + + var largeEhrQueue = amazonSQSAsync.createQueue(largeEhrQueueName); + var largeEhrTopic = snsClient.createTopic(CreateTopicRequest.builder().name("test_large_ehr_topic").build()); + createSnsTestReceiverSubscription(largeEhrTopic, getQueueArn(largeEhrQueue.getQueueUrl())); + + var positiveAcksTopic = snsClient.createTopic(CreateTopicRequest.builder().name("test_positive_acks_topic").build()); + var positiveAcksQueue = amazonSQSAsync.createQueue(positiveAcksQueueName); + createSnsTestReceiverSubscription(positiveAcksTopic, getQueueArn(positiveAcksQueue.getQueueUrl())); + + var parsingDlqTopic = snsClient.createTopic(CreateTopicRequest.builder().name("test_dlq_topic").build()); + var parsingDlqQueue = amazonSQSAsync.createQueue(parsingDlqQueueName); + createSnsTestReceiverSubscription(parsingDlqTopic, getQueueArn(parsingDlqQueue.getQueueUrl())); + + var ehrCompleteQueue = amazonSQSAsync.createQueue(ehrCompleteQueueName); + var ehrCompleteTopic = snsClient.createTopic(CreateTopicRequest.builder().name("test_ehr_complete_topic").build()); + createSnsTestReceiverSubscription(ehrCompleteTopic, getQueueArn(ehrCompleteQueue.getQueueUrl())); + + var nackTopic = snsClient.createTopic(CreateTopicRequest.builder().name("test_negative_acks_topic").build()); + var nackQueue = amazonSQSAsync.createQueue(nackInternalQueueName); + createSnsTestReceiverSubscription(nackTopic, getQueueArn(nackQueue.getQueueUrl())); + + var ehrInUnhandledTopic = snsClient.createTopic(CreateTopicRequest.builder().name("test_ehr_in_unhandled_topic").build()); + var ehrInUnhandledObservabilityQueue = amazonSQSAsync.createQueue("ehr_in_unhandled_queue"); + createSnsTestReceiverSubscription(ehrInUnhandledTopic, getQueueArn(ehrInUnhandledObservabilityQueue.getQueueUrl())); + + snsClient.createTopic(CreateTopicRequest.builder().name("test_splunk_uploader_topic").build()); } private void setupS3Bucket() { @@ -361,42 +418,6 @@ private void deleteDynamoTable(DynamoDbWaiter waiter, DescribeTableRequest table waiter.waitUntilTableNotExists(tableRequest); } - private void createQueueAndSnsReceiverSubscription(String queueName, String snsName) { - String queueUrl = createQueue(queueName); - CreateTopicResponse topic = createSnsTopic(snsName); - createSnsTestReceiverSubscription(topic, getQueueArn(queueUrl)); - } - - private void createQueueAndObservabilityQueueAndSnsReceiverSubscriptions( - String queueName, - String observabilityQueueName, - String snsName - ) { - String queueUrl = createQueue(queueName); - String observabilityQueueUrl = createQueue(observabilityQueueName); - - CreateTopicResponse topic = createSnsTopic(snsName); - - createSnsTestReceiverSubscription(topic, getQueueArn(queueUrl)); - createSnsTestReceiverSubscription(topic, getQueueArn(observabilityQueueUrl)); - } - - private String createQueue(String queueName) { - CreateQueueRequest createQueueRequest = CreateQueueRequest.builder() - .queueName(queueName) - .build(); - - return sqsClient.createQueue(createQueueRequest).queueUrl(); - } - - private CreateTopicResponse createSnsTopic(String snsName) { - CreateTopicRequest createTopicRequest = CreateTopicRequest.builder() - .name(snsName) - .build(); - - return snsClient.createTopic(createTopicRequest); - } - private void createSnsTestReceiverSubscription(CreateTopicResponse topic, String queueArn) { final Map attributes = new HashMap<>(); attributes.put("RawMessageDelivery", "True"); @@ -410,10 +431,7 @@ private void createSnsTestReceiverSubscription(CreateTopicResponse topic, String snsClient.subscribe(subscribeRequest); } private String getQueueArn(String queueUrl) { - GetQueueAttributesRequest request = GetQueueAttributesRequest.builder() - .queueUrl(queueUrl) - .attributeNames(QueueAttributeName.QUEUE_ARN) - .build(); - return sqsClient.getQueueAttributes(request).attributes().get(QueueAttributeName.QUEUE_ARN); + var queueAttributes = amazonSQSAsync.getQueueAttributes(queueUrl, List.of("QueueArn")); + return queueAttributes.getAttributes().get("QueueArn"); } } diff --git a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/S3ClientSpringConfiguration.java b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/S3ClientSpringConfiguration.java index ac15734b..5b0a3b5d 100644 --- a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/S3ClientSpringConfiguration.java +++ b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/S3ClientSpringConfiguration.java @@ -1,5 +1,7 @@ package uk.nhs.prm.repo.ehrtransferservice.config; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; @@ -24,4 +26,9 @@ public S3Client amazonS3Client() { .credentialsProvider(awsCredentialsProvider) .build(); } + + @Bean + public AmazonS3 amazonS3ClientForSnsExtended() { + return AmazonS3ClientBuilder.standard().withRegion(awsRegion).build(); + } } \ No newline at end of file diff --git a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SnsClientSpringConfiguration.java b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SnsClientSpringConfiguration.java index edc4c922..3045a0b6 100644 --- a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SnsClientSpringConfiguration.java +++ b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SnsClientSpringConfiguration.java @@ -1,5 +1,7 @@ package uk.nhs.prm.repo.ehrtransferservice.config; +import com.amazonaws.services.sns.AmazonSNS; +import com.amazonaws.services.sns.AmazonSNSClientBuilder; import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; @@ -17,7 +19,14 @@ public class SnsClientSpringConfiguration { private AwsCredentialsProvider awsCredentialsProvider; @Bean - public SnsClient snsClient() { + public AmazonSNS snsClient() { + return AmazonSNSClientBuilder.standard() + .withRegion(awsRegion) + .build(); + } + + @Bean + public SnsClient snsClientV2() { return SnsClient.builder() .region(Region.of(awsRegion)) .credentialsProvider(awsCredentialsProvider) diff --git a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SnsExtendedClient.java b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SnsExtendedClient.java new file mode 100644 index 00000000..adf1cd59 --- /dev/null +++ b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SnsExtendedClient.java @@ -0,0 +1,22 @@ +package uk.nhs.prm.repo.ehrtransferservice.config; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.sns.AmazonSNS; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import software.amazon.sns.AmazonSNSExtendedClient; +import software.amazon.sns.SNSExtendedClientConfiguration; + +@Configuration +public class SnsExtendedClient { + @Value("${aws.sqsLargeMessageBucketName}") + private String bucketName; + + @Bean + public AmazonSNSExtendedClient s3SupportedSnsClient(AmazonSNS snsClient, AmazonS3 s3) { + var snsExtendedClientConfiguration = new SNSExtendedClientConfiguration() + .withPayloadSupportEnabled(s3, bucketName); + return new AmazonSNSExtendedClient(snsClient, snsExtendedClientConfiguration); + } +} \ No newline at end of file diff --git a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SqsClientSpringConfiguration.java b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SqsClientSpringConfiguration.java index 2144e47b..200b26a3 100644 --- a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SqsClientSpringConfiguration.java +++ b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SqsClientSpringConfiguration.java @@ -1,11 +1,19 @@ package uk.nhs.prm.repo.ehrtransferservice.config; +import com.amazonaws.services.sqs.AmazonSQSAsync; +import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import software.amazon.awssdk.services.sqs.SqsClient; @Configuration public class SqsClientSpringConfiguration { + + @Bean + public AmazonSQSAsync amazonSQSAsync() { + return AmazonSQSAsyncClientBuilder.defaultClient(); + } + @Bean public SqsClient sqsClient() { return SqsClient.create(); diff --git a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SqsExtendedClient.java b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SqsExtendedClient.java new file mode 100644 index 00000000..c1806644 --- /dev/null +++ b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SqsExtendedClient.java @@ -0,0 +1,24 @@ +package uk.nhs.prm.repo.ehrtransferservice.config; + +import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient; +import com.amazon.sqs.javamessaging.ExtendedClientConfiguration; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.sqs.AmazonSQSAsync; +import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +@RequiredArgsConstructor +public class SqsExtendedClient { + + @Value("${aws.sqsLargeMessageBucketName}") + private String bucketName; + + @Bean + public AmazonSQSExtendedClient s3SupportedSqsClient(AmazonSQSAsync sqsClient, AmazonS3 s3) { + var extendedClientConfiguration = new ExtendedClientConfiguration().withPayloadSupportEnabled(s3, bucketName, true); + return new AmazonSQSExtendedClient(sqsClient, extendedClientConfiguration); + } +} \ No newline at end of file diff --git a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SqsListenerSpringConfiguration.java b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SqsListenerSpringConfiguration.java index 88a0ac3c..1911404f 100644 --- a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SqsListenerSpringConfiguration.java +++ b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SqsListenerSpringConfiguration.java @@ -4,7 +4,8 @@ import com.amazon.sqs.javamessaging.SQSConnection; import com.amazon.sqs.javamessaging.SQSConnectionFactory; import com.amazon.sqs.javamessaging.SQSSession; -import software.amazon.awssdk.services.sqs.SqsClient; +import com.amazonaws.services.sqs.AmazonSQSAsync; +import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; @@ -25,8 +26,9 @@ import uk.nhs.prm.repo.ehrtransferservice.repo_incoming.RepoIncomingEventParser; import uk.nhs.prm.repo.ehrtransferservice.repo_incoming.RepoIncomingService; -import jakarta.jms.JMSException; -import jakarta.jms.Session; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; @Configuration @RequiredArgsConstructor @@ -58,8 +60,13 @@ public class SqsListenerSpringConfiguration { private String negativeAckQueueName; @Bean - public SQSConnection createConnection(SqsClient sqsClient) throws JMSException { - SQSConnectionFactory connectionFactory = new SQSConnectionFactory(new ProviderConfiguration(), sqsClient); + public AmazonSQSAsync amazonSQSAsync() { + return AmazonSQSAsyncClientBuilder.defaultClient(); + } + + @Bean + public SQSConnection createConnection(AmazonSQSAsync amazonSQSAsync) throws JMSException { + var connectionFactory = new SQSConnectionFactory(new ProviderConfiguration(), amazonSQSAsync); return connectionFactory.createConnection(); } @@ -68,7 +75,7 @@ public Session createRepoIncomingQueueListener(SQSConnection connection) throws Session session = getSession(connection); log.info("repo incoming queue name : {}", repoIncomingQueueName); - var incomingQueueConsumer = session.createConsumer(session.createQueue(repoIncomingQueueName)); + MessageConsumer incomingQueueConsumer = session.createConsumer(session.createQueue(repoIncomingQueueName)); incomingQueueConsumer.setMessageListener(new RepoIncomingEventListener(tracer, repoIncomingService, repoIncomingEventParser)); connection.start(); @@ -122,4 +129,4 @@ private Session createS3ExtendedSqsListener(SQSConnection connection, private Session getSession(SQSConnection connection) throws JMSException { return connection.createSession(false, SQSSession.UNORDERED_ACKNOWLEDGE); } -} +} \ No newline at end of file diff --git a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/listeners/NegativeAcknowledgementListener.java b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/listeners/NegativeAcknowledgementListener.java index 55ed23d2..b84af8d1 100644 --- a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/listeners/NegativeAcknowledgementListener.java +++ b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/listeners/NegativeAcknowledgementListener.java @@ -1,8 +1,6 @@ package uk.nhs.prm.repo.ehrtransferservice.listeners; -import jakarta.jms.Message; -import jakarta.jms.MessageListener; -import jakarta.jms.TextMessage; + import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import uk.nhs.prm.repo.ehrtransferservice.logging.Tracer; @@ -10,6 +8,10 @@ import uk.nhs.prm.repo.ehrtransferservice.models.ack.Acknowledgement; import uk.nhs.prm.repo.ehrtransferservice.parsers.Parser; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.TextMessage; + @Slf4j @RequiredArgsConstructor public class NegativeAcknowledgementListener implements MessageListener { diff --git a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/listeners/S3ExtendedMessageListener.java b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/listeners/S3ExtendedMessageListener.java index 36a02af0..832f63b1 100644 --- a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/listeners/S3ExtendedMessageListener.java +++ b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/listeners/S3ExtendedMessageListener.java @@ -1,6 +1,5 @@ package uk.nhs.prm.repo.ehrtransferservice.listeners; -import jakarta.jms.JMSException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import uk.nhs.prm.repo.ehrtransferservice.logging.Tracer; @@ -9,8 +8,9 @@ import uk.nhs.prm.repo.ehrtransferservice.handlers.MessageHandler; import uk.nhs.prm.repo.ehrtransferservice.parsers.S3ExtendedMessageFetcher; -import jakarta.jms.Message; -import jakarta.jms.MessageListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; @Slf4j @RequiredArgsConstructor diff --git a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/logging/Tracer.java b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/logging/Tracer.java index 2c684fb0..bc51f05c 100644 --- a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/logging/Tracer.java +++ b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/logging/Tracer.java @@ -1,12 +1,12 @@ package uk.nhs.prm.repo.ehrtransferservice.logging; -import jakarta.jms.JMSException; -import jakarta.jms.Message; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.slf4j.MDC; import org.springframework.context.annotation.Configuration; +import javax.jms.JMSException; +import javax.jms.Message; import static uk.nhs.prm.repo.ehrtransferservice.logging.TraceKey.CONVERSATION_ID; import static uk.nhs.prm.repo.ehrtransferservice.logging.TraceKey.TRACE_ID; diff --git a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/message_publishers/MessagePublisher.java b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/message_publishers/MessagePublisher.java index a8afaefd..10574892 100644 --- a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/message_publishers/MessagePublisher.java +++ b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/message_publishers/MessagePublisher.java @@ -1,13 +1,12 @@ package uk.nhs.prm.repo.ehrtransferservice.message_publishers; +import com.amazonaws.services.sns.model.MessageAttributeValue; +import com.amazonaws.services.sns.model.PublishRequest; import com.google.gson.Gson; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -import software.amazon.awssdk.services.sns.SnsClient; -import software.amazon.awssdk.services.sns.model.MessageAttributeValue; -import software.amazon.awssdk.services.sns.model.PublishRequest; -import software.amazon.awssdk.services.sns.model.PublishResponse; +import software.amazon.sns.AmazonSNSExtendedClient; import uk.nhs.prm.repo.ehrtransferservice.logging.Tracer; import java.util.HashMap; @@ -17,7 +16,7 @@ @Slf4j @RequiredArgsConstructor public class MessagePublisher { - private final SnsClient snsClient; + private final AmazonSNSExtendedClient snsClient; private final Tracer tracer; public void sendMessage(String topicArn, String message) { @@ -31,15 +30,14 @@ public void sendMessage(String topicArn, String message, Map att attributes.forEach((key, value) -> messageAttributes.put(key, getMessageAttributeValue(value))); } - PublishRequest request = PublishRequest.builder() - .message(message) - .messageAttributes(messageAttributes) - .topicArn(topicArn) - .build(); + PublishRequest request = new PublishRequest() + .withMessage(message) + .withMessageAttributes(messageAttributes) + .withTopicArn(topicArn); - PublishResponse response = snsClient.publish(request); - String[] topicAttributes = topicArn.split(":"); - log.info("PUBLISHED: message to {} topic. Published SNS message id: {}", topicAttributes[topicAttributes.length - 1], response.messageId()); + var result = snsClient.publish(request); + var topicAttributes = topicArn.split(":"); + log.info("PUBLISHED: message to {} topic. Published SNS message id: {}", topicAttributes[topicAttributes.length - 1], result.getMessageId()); } public void sendJsonMessage(String topicArn, Object message, Map attributes) { @@ -48,9 +46,6 @@ public void sendJsonMessage(String topicArn, Object message, Map } private MessageAttributeValue getMessageAttributeValue(String attributeValue) { - return MessageAttributeValue.builder() - .dataType("String") - .stringValue(attributeValue) - .build(); + return new MessageAttributeValue().withDataType("String").withStringValue(attributeValue); } -} +} \ No newline at end of file diff --git a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/parsers/S3ExtendedMessageFetcher.java b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/parsers/S3ExtendedMessageFetcher.java index a700a60a..56633061 100644 --- a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/parsers/S3ExtendedMessageFetcher.java +++ b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/parsers/S3ExtendedMessageFetcher.java @@ -10,8 +10,8 @@ import uk.nhs.prm.repo.ehrtransferservice.models.S3PointerMessage; import uk.nhs.prm.repo.ehrtransferservice.models.enums.Status; -import jakarta.jms.Message; -import jakarta.jms.TextMessage; +import javax.jms.Message; +import javax.jms.TextMessage; import java.io.BufferedReader; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; diff --git a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/repo_incoming/RepoIncomingEventListener.java b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/repo_incoming/RepoIncomingEventListener.java index 541f3423..0d3d59a5 100644 --- a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/repo_incoming/RepoIncomingEventListener.java +++ b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/repo_incoming/RepoIncomingEventListener.java @@ -7,10 +7,10 @@ import uk.nhs.prm.repo.ehrtransferservice.exceptions.acknowledgement.EhrCompleteAcknowledgementFailedException; import uk.nhs.prm.repo.ehrtransferservice.logging.Tracer; -import jakarta.jms.JMSException; -import jakarta.jms.Message; -import jakarta.jms.MessageListener; -import jakarta.jms.TextMessage; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.TextMessage; @Slf4j @RequiredArgsConstructor diff --git a/services/ehr-transfer-service/src/test/java/uk/nhs/prm/repo/ehrtransferservice/config/TracerTest.java b/services/ehr-transfer-service/src/test/java/uk/nhs/prm/repo/ehrtransferservice/config/TracerTest.java index 1d6cb1b4..3e007166 100644 --- a/services/ehr-transfer-service/src/test/java/uk/nhs/prm/repo/ehrtransferservice/config/TracerTest.java +++ b/services/ehr-transfer-service/src/test/java/uk/nhs/prm/repo/ehrtransferservice/config/TracerTest.java @@ -7,7 +7,7 @@ import uk.nhs.prm.repo.ehrtransferservice.logging.UpdateableTraceContext; import uk.nhs.prm.repo.ehrtransferservice.logging.Tracer; -import jakarta.jms.JMSException; +import javax.jms.JMSException; import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; diff --git a/services/ehr-transfer-service/src/test/java/uk/nhs/prm/repo/ehrtransferservice/config/UpdateableTraceContextTest.java b/services/ehr-transfer-service/src/test/java/uk/nhs/prm/repo/ehrtransferservice/config/UpdateableTraceContextTest.java index 2ad52148..d263d6ae 100644 --- a/services/ehr-transfer-service/src/test/java/uk/nhs/prm/repo/ehrtransferservice/config/UpdateableTraceContextTest.java +++ b/services/ehr-transfer-service/src/test/java/uk/nhs/prm/repo/ehrtransferservice/config/UpdateableTraceContextTest.java @@ -5,11 +5,10 @@ import org.slf4j.MDC; import uk.nhs.prm.repo.ehrtransferservice.logging.UpdateableTraceContext; -import jakarta.jms.JMSException; +import javax.jms.JMSException; import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.spy; import static uk.nhs.prm.repo.ehrtransferservice.logging.TraceKey.CONVERSATION_ID; import static uk.nhs.prm.repo.ehrtransferservice.logging.TraceKey.TRACE_ID; diff --git a/services/ehr-transfer-service/src/test/java/uk/nhs/prm/repo/ehrtransferservice/repo_incoming/RepoIncomingEventListenerTest.java b/services/ehr-transfer-service/src/test/java/uk/nhs/prm/repo/ehrtransferservice/repo_incoming/RepoIncomingEventListenerTest.java index 6f80c1f9..7e8dc4e2 100644 --- a/services/ehr-transfer-service/src/test/java/uk/nhs/prm/repo/ehrtransferservice/repo_incoming/RepoIncomingEventListenerTest.java +++ b/services/ehr-transfer-service/src/test/java/uk/nhs/prm/repo/ehrtransferservice/repo_incoming/RepoIncomingEventListenerTest.java @@ -20,7 +20,7 @@ import uk.nhs.prm.repo.ehrtransferservice.exceptions.timeout.TimeoutExceededException; import uk.nhs.prm.repo.ehrtransferservice.logging.Tracer; -import jakarta.jms.Message; +import javax.jms.Message; import java.util.UUID; import java.util.stream.Stream;