Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions services/ehr-transfer-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,28 @@ dependencies {
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:dynamodb'

implementation 'com.amazonaws:amazon-sqs-java-extended-client-lib:2.1.2'
implementation 'com.amazonaws:amazon-sqs-java-messaging-lib:2.1.4'
implementation 'software.amazon.payloadoffloading:payloadoffloading-common:2.1.0'
// OUT OF DATE - REINTRODUCED TO ALLOW SNS/SQS extended client libraries to be used without needing to upgrade the AWS SDK to v2.
// Once we upgrade amazon-sqs-java-messaging-lib to v2 we can remove aws-java-sdk-core and sqs.
implementation 'com.amazonaws:amazon-sqs-java-extended-client-lib:1.2.2'
implementation 'software.amazon.sns:sns-extended-client:1.1.3'
implementation 'software.amazon.payloadoffloading:payloadoffloading-common:1.1.1'

/**
* TODO FYI to future developers - Here be dragons.
* When Amazon released the v2 AWS SDK, they did not release a v2 version of the SNS and SQS extended libraries
* The system receives EHR cores/fragments of up to 5MB in size. These are too large to place on an SQS queue
* which has a hard limit of 256KB. The system architecture relies on the SQS extended client library to offload
* large messages to S3 and place a reference on the queue. (See the 'handlers' package)
* Either
* A (UNLIKELY) - Amazon Release a v2 version of the SQS extended client library and we can upgrade to it and
* remove the v1 AWS SDK dependencies
* B - We implement our own offloading mechanism to remove the dependency on the v1 AWS SDK and the SQS extended client library.
* This would be a non-trivial amount of work and would require significant testing to ensure we don't lose messages or have issues with message visibility timeouts etc.
* C - We remove the 'internal queues' of this service and the S3 buckets that SQS offloads to
*/
implementation 'com.amazonaws:amazon-sqs-java-messaging-lib:1.1.2'
implementation 'com.amazonaws:aws-java-sdk-core:1.12.368'
implementation 'com.amazonaws:aws-java-sdk-sqs:1.12.533'

compileOnly 'org.projectlombok:lombok:1.18.42'
annotationProcessor 'org.projectlombok:lombok'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
package uk.nhs.prm.repo.ehrtransferservice.configuration;

import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient;
import com.amazon.sqs.javamessaging.ExtendedClientConfiguration;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.AmazonSNSClientBuilder;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
import jakarta.annotation.PostConstruct;
import jakarta.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
Expand All @@ -25,15 +36,16 @@
import software.amazon.awssdk.services.sns.model.CreateTopicResponse;
import software.amazon.awssdk.services.sns.model.SubscribeRequest;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.payloadoffloading.S3BackedPayloadStore;
import software.amazon.payloadoffloading.S3Dao;
import software.amazon.sns.AmazonSNSExtendedClient;
import software.amazon.sns.SNSExtendedClientConfiguration;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.*;

import static jakarta.jms.Session.CLIENT_ACKNOWLEDGE;
import static javax.jms.Session.CLIENT_ACKNOWLEDGE;
import static uk.nhs.prm.repo.ehrtransferservice.database.enumeration.TransferTableAttribute.*;

@TestConfiguration
Expand All @@ -43,7 +55,7 @@ public class LocalStackAwsConfig {
private S3Client s3Client;

@Autowired
private SqsClient sqsClient;
private AmazonSQSAsync amazonSQSAsync;

@Autowired
private DynamoDbClient dynamoDbClient;
Expand Down Expand Up @@ -110,24 +122,17 @@ public class LocalStackAwsConfig {
private static final long DYNAMO_WRITE_CAPACITY_UNITS = 5L;

@Bean
public static SqsClient sqsClient(
@Value("${localstack.url}") String localstackUrl,
@Value("${aws.region}") String region
) throws URISyntaxException {
public static SqsClient sqsClient(@Value("${localstack.url}") String localstackUrl) throws URISyntaxException {
return SqsClient.builder()
.credentialsProvider(() -> AwsBasicCredentials.create("LSIAQAAAAAAVNCBMPNSG", "LSIAQAAAAAAVNCBMPNSG"))
.credentialsProvider((()-> AwsBasicCredentials.create("LSIAQAAAAAAVNCBMPNSG", "LSIAQAAAAAAVNCBMPNSG")))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need an extra bracket?

.endpointOverride(new URI(localstackUrl))
.region(Region.of(region))
.build();
}

@Bean
public JmsListenerContainerFactory<?> myFactory(
ConnectionFactory connectionFactory
) {
public JmsListenerContainerFactory<?> myFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setSessionAcknowledgeMode(CLIENT_ACKNOWLEDGE);
factory.setConnectionFactory(connectionFactory);
return factory;
}

Expand All @@ -141,13 +146,22 @@ public ConnectionFactory connectionFactory() {
}

@Bean
public static S3Client s3Client(
@Value("${localstack.url}") String localstackUrl,
@Value("${aws.region}") String region
) {
public AmazonS3 amazonS3(@Value("${localstack.url}") String localstackUrl) {
return AmazonS3ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("LSIAQAAAAAAVNCBMPNSG", "LSIAQAAAAAAVNCBMPNSG")))
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(localstackUrl, "eu-west-2"))
.build();
}

// TODO: this S3Client bean is used to setup large message bucket only.
// the real dependency used in code is the one above (AmazonS3 / v1).
// Therefore: find a way to create the bucket - setting GrantFullControl using
// the class above, then get rid of this S3Client / v2.
@Bean
public static S3Client s3Client(@Value("${localstack.url}") String localstackUrl) {
return S3Client.builder()
.endpointOverride(URI.create(localstackUrl))
.region(Region.of(region))
.region(Region.EU_WEST_2)
.credentialsProvider(StaticCredentialsProvider.create(new AwsCredentials() {
@Override
public String accessKeyId() {
Expand All @@ -171,13 +185,36 @@ private String failoverUrl() {
}

@Bean
public static SnsClient snsClient(
@Value("${localstack.url}") String localstackUrl,
@Value("${aws.region}") String region
) {
public static AmazonSQSAsync amazonSQSAsync(@Value("${localstack.url}") String localstackUrl) {
return AmazonSQSAsyncClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("LSIAQAAAAAAVNCBMPNSG", "LSIAQAAAAAAVNCBMPNSG")))
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(localstackUrl, "eu-west-2"))
.build();
}

@Bean
public static AmazonSQSExtendedClient s3SupportedSqsClient(AmazonSQSAsync sqsClient, AmazonS3 amazonS3, @Value("${aws.sqsLargeMessageBucketName}") String sqsLargeMessageBucketName) {
return new AmazonSQSExtendedClient(sqsClient, new ExtendedClientConfiguration().withPayloadSupportEnabled(amazonS3, sqsLargeMessageBucketName, true));
}

@Bean
public static AmazonSNS amazonSNS(@Value("${localstack.url}") String localstackUrl) {
return AmazonSNSClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("LSIAQAAAAAAVNCBMPNSG", "LSIAQAAAAAAVNCBMPNSG")))
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(localstackUrl, "eu-west-2"))
.build();
}

@Bean
public static AmazonSNSExtendedClient s3SupportedSnsClient(AmazonSNS amazonSNS, AmazonS3 amazonS3, @Value("${aws.sqsLargeMessageBucketName}") String sqsLargeMessageBucketName) {
return new AmazonSNSExtendedClient(amazonSNS, new SNSExtendedClientConfiguration(), new S3BackedPayloadStore(new S3Dao(amazonS3), sqsLargeMessageBucketName));
}

@Bean
public static SnsClient snsClient(@Value("${localstack.url}") String localstackUrl) {
return SnsClient.builder()
.endpointOverride(URI.create(localstackUrl))
.region(Region.of(region))
.region(Region.EU_WEST_2)
.credentialsProvider(StaticCredentialsProvider.create(new AwsCredentials() {
@Override
public String accessKeyId() {
Expand All @@ -193,13 +230,10 @@ public String secretAccessKey() {
}

@Bean
public static DynamoDbClient dynamoDbClient(
@Value("${localstack.url}") String localstackUrl,
@Value("${aws.region}") String region
) {
public static DynamoDbClient dynamoDbClient(@Value("${localstack.url}") String localstackUrl) {
return DynamoDbClient.builder()
.endpointOverride(URI.create(localstackUrl))
.region(Region.of(region))
.region(Region.EU_WEST_2)
.credentialsProvider(
StaticCredentialsProvider.create(new AwsCredentials() {
@Override
Expand All @@ -223,28 +257,51 @@ public void setupTestQueuesAndTopics() {
}

private void setUpQueueAndTopics() {
createQueue(repoIncomingQueueName);

createSnsTopic("test_splunk_uploader_topic");

createQueueAndSnsReceiverSubscription(transferCompleteQueueName, "test_transfer_complete_topic");
createQueueAndSnsReceiverSubscription(largeEhrQueueName, "test_large_ehr_topic");
createQueueAndSnsReceiverSubscription(positiveAcksQueueName, "test_positive_acks_topic");
createQueueAndSnsReceiverSubscription(parsingDlqQueueName, "test_dlq_topic");
createQueueAndSnsReceiverSubscription(ehrCompleteQueueName, "test_ehr_complete_topic");
createQueueAndSnsReceiverSubscription(nackInternalQueueName, "test_negative_acks_topic");
createQueueAndSnsReceiverSubscription("ehr_in_unhandled_queue", "test_ehr_in_unhandled_topic");

createQueueAndObservabilityQueueAndSnsReceiverSubscriptions(
largeMessageFragmentsQueueName,
largeMessageFragmentsObservabilityQueueName,
"test_large_message_fragments_topic");

createQueueAndObservabilityQueueAndSnsReceiverSubscriptions(
smallEhrQueueName,
smallEhrObservabilityQueueName,
"test_small_ehr_topic"
);
amazonSQSAsync.createQueue(repoIncomingQueueName);

var fragmentQueue = amazonSQSAsync.createQueue(largeMessageFragmentsQueueName);
var fragmentsTopic = snsClient.createTopic(CreateTopicRequest.builder().name("test_large_message_fragments_topic").build());
createSnsTestReceiverSubscription(fragmentsTopic, getQueueArn(fragmentQueue.getQueueUrl()));

var transferCompleteQueue = amazonSQSAsync.createQueue(transferCompleteQueueName);
var transferCompleteTopic = snsClient.createTopic(CreateTopicRequest.builder().name("test_transfer_complete_topic").build());
createSnsTestReceiverSubscription(transferCompleteTopic, getQueueArn(transferCompleteQueue.getQueueUrl()));

var fragmentObservabilityQueue = amazonSQSAsync.createQueue(largeMessageFragmentsObservabilityQueueName);
createSnsTestReceiverSubscription(fragmentsTopic, getQueueArn(fragmentObservabilityQueue.getQueueUrl()));

var smallEhrQueue = amazonSQSAsync.createQueue(smallEhrQueueName);
var smallEhrTopic = snsClient.createTopic(CreateTopicRequest.builder().name("test_small_ehr_topic").build());
createSnsTestReceiverSubscription(smallEhrTopic, getQueueArn(smallEhrQueue.getQueueUrl()));

var smallEhrObservabilityQueue = amazonSQSAsync.createQueue(smallEhrObservabilityQueueName);
createSnsTestReceiverSubscription(smallEhrTopic, getQueueArn(smallEhrObservabilityQueue.getQueueUrl()));

var largeEhrQueue = amazonSQSAsync.createQueue(largeEhrQueueName);
var largeEhrTopic = snsClient.createTopic(CreateTopicRequest.builder().name("test_large_ehr_topic").build());
createSnsTestReceiverSubscription(largeEhrTopic, getQueueArn(largeEhrQueue.getQueueUrl()));

var positiveAcksTopic = snsClient.createTopic(CreateTopicRequest.builder().name("test_positive_acks_topic").build());
var positiveAcksQueue = amazonSQSAsync.createQueue(positiveAcksQueueName);
createSnsTestReceiverSubscription(positiveAcksTopic, getQueueArn(positiveAcksQueue.getQueueUrl()));

var parsingDlqTopic = snsClient.createTopic(CreateTopicRequest.builder().name("test_dlq_topic").build());
var parsingDlqQueue = amazonSQSAsync.createQueue(parsingDlqQueueName);
createSnsTestReceiverSubscription(parsingDlqTopic, getQueueArn(parsingDlqQueue.getQueueUrl()));

var ehrCompleteQueue = amazonSQSAsync.createQueue(ehrCompleteQueueName);
var ehrCompleteTopic = snsClient.createTopic(CreateTopicRequest.builder().name("test_ehr_complete_topic").build());
createSnsTestReceiverSubscription(ehrCompleteTopic, getQueueArn(ehrCompleteQueue.getQueueUrl()));

var nackTopic = snsClient.createTopic(CreateTopicRequest.builder().name("test_negative_acks_topic").build());
var nackQueue = amazonSQSAsync.createQueue(nackInternalQueueName);
createSnsTestReceiverSubscription(nackTopic, getQueueArn(nackQueue.getQueueUrl()));

var ehrInUnhandledTopic = snsClient.createTopic(CreateTopicRequest.builder().name("test_ehr_in_unhandled_topic").build());
var ehrInUnhandledObservabilityQueue = amazonSQSAsync.createQueue("ehr_in_unhandled_queue");
createSnsTestReceiverSubscription(ehrInUnhandledTopic, getQueueArn(ehrInUnhandledObservabilityQueue.getQueueUrl()));

snsClient.createTopic(CreateTopicRequest.builder().name("test_splunk_uploader_topic").build());
}

private void setupS3Bucket() {
Expand Down Expand Up @@ -361,42 +418,6 @@ private void deleteDynamoTable(DynamoDbWaiter waiter, DescribeTableRequest table
waiter.waitUntilTableNotExists(tableRequest);
}

private void createQueueAndSnsReceiverSubscription(String queueName, String snsName) {
String queueUrl = createQueue(queueName);
CreateTopicResponse topic = createSnsTopic(snsName);
createSnsTestReceiverSubscription(topic, getQueueArn(queueUrl));
}

private void createQueueAndObservabilityQueueAndSnsReceiverSubscriptions(
String queueName,
String observabilityQueueName,
String snsName
) {
String queueUrl = createQueue(queueName);
String observabilityQueueUrl = createQueue(observabilityQueueName);

CreateTopicResponse topic = createSnsTopic(snsName);

createSnsTestReceiverSubscription(topic, getQueueArn(queueUrl));
createSnsTestReceiverSubscription(topic, getQueueArn(observabilityQueueUrl));
}

private String createQueue(String queueName) {
CreateQueueRequest createQueueRequest = CreateQueueRequest.builder()
.queueName(queueName)
.build();

return sqsClient.createQueue(createQueueRequest).queueUrl();
}

private CreateTopicResponse createSnsTopic(String snsName) {
CreateTopicRequest createTopicRequest = CreateTopicRequest.builder()
.name(snsName)
.build();

return snsClient.createTopic(createTopicRequest);
}

private void createSnsTestReceiverSubscription(CreateTopicResponse topic, String queueArn) {
final Map<String, String> attributes = new HashMap<>();
attributes.put("RawMessageDelivery", "True");
Expand All @@ -410,10 +431,7 @@ private void createSnsTestReceiverSubscription(CreateTopicResponse topic, String
snsClient.subscribe(subscribeRequest);
}
private String getQueueArn(String queueUrl) {
GetQueueAttributesRequest request = GetQueueAttributesRequest.builder()
.queueUrl(queueUrl)
.attributeNames(QueueAttributeName.QUEUE_ARN)
.build();
return sqsClient.getQueueAttributes(request).attributes().get(QueueAttributeName.QUEUE_ARN);
var queueAttributes = amazonSQSAsync.getQueueAttributes(queueUrl, List.of("QueueArn"));
return queueAttributes.getAttributes().get("QueueArn");
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package uk.nhs.prm.repo.ehrtransferservice.config;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
Expand All @@ -24,4 +26,9 @@ public S3Client amazonS3Client() {
.credentialsProvider(awsCredentialsProvider)
.build();
}

@Bean
public AmazonS3 amazonS3ClientForSnsExtended() {
return AmazonS3ClientBuilder.standard().withRegion(awsRegion).build();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package uk.nhs.prm.repo.ehrtransferservice.config;

import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.AmazonSNSClientBuilder;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
Expand All @@ -17,7 +19,14 @@ public class SnsClientSpringConfiguration {
private AwsCredentialsProvider awsCredentialsProvider;

@Bean
public SnsClient snsClient() {
public AmazonSNS snsClient() {
return AmazonSNSClientBuilder.standard()
.withRegion(awsRegion)
.build();
}

@Bean
public SnsClient snsClientV2() {
return SnsClient.builder()
.region(Region.of(awsRegion))
.credentialsProvider(awsCredentialsProvider)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package uk.nhs.prm.repo.ehrtransferservice.config;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.sns.AmazonSNS;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import software.amazon.sns.AmazonSNSExtendedClient;
import software.amazon.sns.SNSExtendedClientConfiguration;

@Configuration
public class SnsExtendedClient {
@Value("${aws.sqsLargeMessageBucketName}")
private String bucketName;

@Bean
public AmazonSNSExtendedClient s3SupportedSnsClient(AmazonSNS snsClient, AmazonS3 s3) {
var snsExtendedClientConfiguration = new SNSExtendedClientConfiguration()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var?

.withPayloadSupportEnabled(s3, bucketName);
return new AmazonSNSExtendedClient(snsClient, snsExtendedClientConfiguration);
}
}
Loading
Loading