diff --git a/services/ehr-transfer-service/build.gradle b/services/ehr-transfer-service/build.gradle index 43346e78..c87d1570 100644 --- a/services/ehr-transfer-service/build.gradle +++ b/services/ehr-transfer-service/build.gradle @@ -70,6 +70,7 @@ dependencies { implementation 'software.amazon.awssdk:s3' implementation 'software.amazon.awssdk:dynamodb' + implementation 'software.amazon.sns:sns-extended-client:2.1.0' implementation 'com.amazonaws:amazon-sqs-java-extended-client-lib:2.1.2' implementation 'com.amazonaws:amazon-sqs-java-messaging-lib:2.1.4' implementation 'software.amazon.payloadoffloading:payloadoffloading-common:2.1.0' diff --git a/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/configuration/LocalStackAwsConfig.java b/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/configuration/LocalStackAwsConfig.java index f3532f63..465c819c 100644 --- a/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/configuration/LocalStackAwsConfig.java +++ b/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/configuration/LocalStackAwsConfig.java @@ -1,5 +1,7 @@ package uk.nhs.prm.repo.ehrtransferservice.configuration; +import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient; +import com.amazon.sqs.javamessaging.ExtendedClientConfiguration; import jakarta.annotation.PostConstruct; import jakarta.jms.ConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory; @@ -9,13 +11,22 @@ import org.springframework.context.annotation.Bean; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; +import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; +import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; +import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest; +import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest; +import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex; +import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; +import software.amazon.awssdk.services.dynamodb.model.KeyType; +import software.amazon.awssdk.services.dynamodb.model.Projection; +import software.amazon.awssdk.services.dynamodb.model.ProjectionType; +import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput; +import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; import software.amazon.awssdk.services.s3.S3Configuration; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.AwsCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; -import software.amazon.awssdk.services.dynamodb.model.*; import software.amazon.awssdk.services.dynamodb.waiters.DynamoDbWaiter; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; @@ -28,16 +39,30 @@ import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest; import software.amazon.awssdk.services.sqs.model.QueueAttributeName; +import software.amazon.payloadoffloading.S3BackedPayloadStore; +import software.amazon.payloadoffloading.S3Dao; +import software.amazon.sns.AmazonSNSExtendedClient; +import software.amazon.sns.SNSExtendedClientConfiguration; import java.net.URI; -import java.net.URISyntaxException; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; import static jakarta.jms.Session.CLIENT_ACKNOWLEDGE; -import static uk.nhs.prm.repo.ehrtransferservice.database.enumeration.TransferTableAttribute.*; +import static uk.nhs.prm.repo.ehrtransferservice.database.enumeration.TransferTableAttribute.INBOUND_CONVERSATION_ID; +import static uk.nhs.prm.repo.ehrtransferservice.database.enumeration.TransferTableAttribute.LAYER; +import static uk.nhs.prm.repo.ehrtransferservice.database.enumeration.TransferTableAttribute.NHS_NUMBER; +import static uk.nhs.prm.repo.ehrtransferservice.database.enumeration.TransferTableAttribute.OUTBOUND_CONVERSATION_ID; @TestConfiguration public class LocalStackAwsConfig { + private static final Region LOCALSTACK_REGION = Region.EU_WEST_2; + + private static final StaticCredentialsProvider LOCALSTACK_CREDENTIALS = + StaticCredentialsProvider.create(AwsBasicCredentials.create("LSIAQAAAAAAVNCBMPNSG", "LSIAQAAAAAAVNCBMPNSG")); @Autowired private S3Client s3Client; @@ -110,21 +135,7 @@ public class LocalStackAwsConfig { private static final long DYNAMO_WRITE_CAPACITY_UNITS = 5L; @Bean - public static SqsClient sqsClient( - @Value("${localstack.url}") String localstackUrl, - @Value("${aws.region}") String region - ) throws URISyntaxException { - return SqsClient.builder() - .credentialsProvider(() -> AwsBasicCredentials.create("LSIAQAAAAAAVNCBMPNSG", "LSIAQAAAAAAVNCBMPNSG")) - .endpointOverride(new URI(localstackUrl)) - .region(Region.of(region)) - .build(); - } - - @Bean - public JmsListenerContainerFactory myFactory( - ConnectionFactory connectionFactory - ) { + public JmsListenerContainerFactory myFactory(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setSessionAcknowledgeMode(CLIENT_ACKNOWLEDGE); factory.setConnectionFactory(connectionFactory); @@ -140,25 +151,34 @@ public ConnectionFactory connectionFactory() { return activeMQConnectionFactory; } + private String failoverUrl() { + return String.format("failover:(%s,%s)%s", amqEndpoint1, amqEndpoint2, randomOption); + } + @Bean - public static S3Client s3Client( - @Value("${localstack.url}") String localstackUrl, - @Value("${aws.region}") String region - ) { + public static SnsClient snsClient(@Value("${localstack.url}") String localstackUrl) { + return SnsClient.builder() + .endpointOverride(URI.create(localstackUrl)) + .region(LOCALSTACK_REGION) + .credentialsProvider(LOCALSTACK_CREDENTIALS) + .build(); + } + + @Bean + public static SqsClient sqsClient(@Value("${localstack.url}") String localstackUrl){ + return SqsClient.builder() + .endpointOverride(URI.create(localstackUrl)) + .region(LOCALSTACK_REGION) + .credentialsProvider(LOCALSTACK_CREDENTIALS) + .build(); + } + + @Bean + public static S3Client s3Client(@Value("${localstack.url}") String localstackUrl) { return S3Client.builder() .endpointOverride(URI.create(localstackUrl)) - .region(Region.of(region)) - .credentialsProvider(StaticCredentialsProvider.create(new AwsCredentials() { - @Override - public String accessKeyId() { - return "LSIAQAAAAAAVNCBMPNSG"; - } - - @Override - public String secretAccessKey() { - return "LSIAQAAAAAAVNCBMPNSG"; - } - })) + .region(LOCALSTACK_REGION) + .credentialsProvider(LOCALSTACK_CREDENTIALS) .serviceConfiguration( S3Configuration.builder() .pathStyleAccessEnabled(true) @@ -166,52 +186,37 @@ public String secretAccessKey() { .build(); } - private String failoverUrl() { - return String.format("failover:(%s,%s)%s", amqEndpoint1, amqEndpoint2, randomOption); + @Bean + public static AmazonSNSExtendedClient snsExtendedClient( + SnsClient snsClient, + S3Client s3Client, + @Value("${aws.sqsLargeMessageBucketName}") String sqsLargeMessageBucketName + ) { + return new AmazonSNSExtendedClient( + snsClient, + new SNSExtendedClientConfiguration().withPayloadSupportEnabled(s3Client, sqsLargeMessageBucketName), + new S3BackedPayloadStore(new S3Dao(s3Client), sqsLargeMessageBucketName) + ); } @Bean - public static SnsClient snsClient( - @Value("${localstack.url}") String localstackUrl, - @Value("${aws.region}") String region + public static AmazonSQSExtendedClient sqsExtendedClient( + SqsClient sqsClient, + S3Client s3Client, + @Value("${aws.sqsLargeMessageBucketName}") String sqsLargeMessageBucketName ) { - return SnsClient.builder() - .endpointOverride(URI.create(localstackUrl)) - .region(Region.of(region)) - .credentialsProvider(StaticCredentialsProvider.create(new AwsCredentials() { - @Override - public String accessKeyId() { - return "LSIAQAAAAAAVNCBMPNSG"; - } - - @Override - public String secretAccessKey() { - return "LSIAQAAAAAAVNCBMPNSG"; - } - })) - .build(); + return new AmazonSQSExtendedClient( + sqsClient, + new ExtendedClientConfiguration() + .withPayloadSupportEnabled(s3Client, sqsLargeMessageBucketName, true)); } @Bean - public static DynamoDbClient dynamoDbClient( - @Value("${localstack.url}") String localstackUrl, - @Value("${aws.region}") String region - ) { + public static DynamoDbClient dynamoDbClient(@Value("${localstack.url}") String localstackUrl) { return DynamoDbClient.builder() .endpointOverride(URI.create(localstackUrl)) - .region(Region.of(region)) - .credentialsProvider( - StaticCredentialsProvider.create(new AwsCredentials() { - @Override - public String accessKeyId() { - return "LSIAQAAAAAAVNCBMPNSG"; - } - - @Override - public String secretAccessKey() { - return "LSIAQAAAAAAVNCBMPNSG"; - } - })) + .region(LOCALSTACK_REGION) + .credentialsProvider(LOCALSTACK_CREDENTIALS) .build(); } diff --git a/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/handler/NegativeAcknowledgmentHandlingIntegrationTest.java b/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/handler/NegativeAcknowledgmentHandlingIntegrationTest.java index 26a4143d..17f828f3 100644 --- a/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/handler/NegativeAcknowledgmentHandlingIntegrationTest.java +++ b/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/handler/NegativeAcknowledgmentHandlingIntegrationTest.java @@ -1,5 +1,6 @@ package uk.nhs.prm.repo.ehrtransferservice.handler; +import org.springframework.beans.factory.annotation.Qualifier; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.PurgeQueueRequest; import org.junit.jupiter.api.BeforeEach; @@ -38,6 +39,7 @@ public class NegativeAcknowledgmentHandlingIntegrationTest { TransferService transferService; @Autowired + @Qualifier("sqsClient") private SqsClient sqs; @Value("${activemq.inboundQueue}") diff --git a/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/parsers/ParserBrokerIntegrationTest.java b/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/parsers/ParserBrokerIntegrationTest.java index 0ae30258..dde0f998 100644 --- a/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/parsers/ParserBrokerIntegrationTest.java +++ b/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/parsers/ParserBrokerIntegrationTest.java @@ -1,6 +1,7 @@ package uk.nhs.prm.repo.ehrtransferservice.parsers; import org.junit.jupiter.api.BeforeEach; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.bean.override.mockito.MockitoBean; import software.amazon.awssdk.services.sqs.SqsClient; @@ -50,6 +51,7 @@ @ContextConfiguration(classes = LocalStackAwsConfig.class) public class ParserBrokerIntegrationTest { @Autowired + @Qualifier("sqsClient") private SqsClient sqs; @Autowired @@ -58,9 +60,6 @@ public class ParserBrokerIntegrationTest { @Autowired TransferTrackerDbUtility transferTrackerDbUtility; - @MockitoBean - private EhrRepoService ehrRepoService; - @Value("${activemq.inboundQueue}") private String inboundQueue; diff --git a/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/utils/TestDataLoaderUtility.java b/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/utils/TestDataLoaderUtility.java index 14cfbe80..40a02144 100644 --- a/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/utils/TestDataLoaderUtility.java +++ b/services/ehr-transfer-service/src/integration/java/uk/nhs/prm/repo/ehrtransferservice/utils/TestDataLoaderUtility.java @@ -7,7 +7,6 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; - public final class TestDataLoaderUtility { private TestDataLoaderUtility() { } diff --git a/services/ehr-transfer-service/src/integration/resources/application-test.properties b/services/ehr-transfer-service/src/integration/resources/application-test.properties index 091ae30f..02fcceb5 100644 --- a/services/ehr-transfer-service/src/integration/resources/application-test.properties +++ b/services/ehr-transfer-service/src/integration/resources/application-test.properties @@ -1,6 +1,5 @@ environment=${NHS_ENVIRONMENT:local} log_level=DEBUG -aws.region=${AWS_REGION:eu-west-2} aws.defaultRegion=eu-west-2 aws.sessionToken=${AWS_SESSION_TOKEN:dummy-session-token} localstack.url=${LOCALSTACK_URL:http://localhost:4566} diff --git a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SnsExtendedClientConfiguration.java b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SnsExtendedClientConfiguration.java new file mode 100644 index 00000000..ad7a77d6 --- /dev/null +++ b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SnsExtendedClientConfiguration.java @@ -0,0 +1,22 @@ +package uk.nhs.prm.repo.ehrtransferservice.config; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.sns.SnsClient; +import software.amazon.sns.AmazonSNSExtendedClient; +import software.amazon.sns.SNSExtendedClientConfiguration; + +@Configuration +public class SnsExtendedClientConfiguration { + @Value("${aws.sqsLargeMessageBucketName}") + private String bucketName; + + @Bean + public AmazonSNSExtendedClient snsExtendedClient(SnsClient snsClient, S3Client s3) { + var snsExtendedClientConfiguration = new SNSExtendedClientConfiguration() + .withPayloadSupportEnabled(s3, bucketName); + return new AmazonSNSExtendedClient(snsClient, snsExtendedClientConfiguration); + } +} diff --git a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SqsExtendedClientConfiguration.java b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SqsExtendedClientConfiguration.java new file mode 100644 index 00000000..824fcc12 --- /dev/null +++ b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/config/SqsExtendedClientConfiguration.java @@ -0,0 +1,24 @@ +package uk.nhs.prm.repo.ehrtransferservice.config; + +import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient; +import com.amazon.sqs.javamessaging.ExtendedClientConfiguration; +import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.sqs.SqsClient; + +@Configuration +@RequiredArgsConstructor +public class SqsExtendedClientConfiguration { + @Value("${aws.sqsLargeMessageBucketName}") + private String bucketName; + + @Bean + public AmazonSQSExtendedClient sqsExtendedClient(SqsClient sqsClient, S3Client s3) { + var extendedClientConfiguration = new ExtendedClientConfiguration() + .withPayloadSupportEnabled(s3, bucketName, true); + return new AmazonSQSExtendedClient(sqsClient, extendedClientConfiguration); + } +} \ No newline at end of file diff --git a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/message_publishers/MessagePublisher.java b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/message_publishers/MessagePublisher.java index a8afaefd..1e05a4ca 100644 --- a/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/message_publishers/MessagePublisher.java +++ b/services/ehr-transfer-service/src/main/java/uk/nhs/prm/repo/ehrtransferservice/message_publishers/MessagePublisher.java @@ -4,10 +4,10 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -import software.amazon.awssdk.services.sns.SnsClient; import software.amazon.awssdk.services.sns.model.MessageAttributeValue; import software.amazon.awssdk.services.sns.model.PublishRequest; import software.amazon.awssdk.services.sns.model.PublishResponse; +import software.amazon.sns.AmazonSNSExtendedClient; import uk.nhs.prm.repo.ehrtransferservice.logging.Tracer; import java.util.HashMap; @@ -17,7 +17,7 @@ @Slf4j @RequiredArgsConstructor public class MessagePublisher { - private final SnsClient snsClient; + private final AmazonSNSExtendedClient snsExtendedClient; private final Tracer tracer; public void sendMessage(String topicArn, String message) { @@ -27,7 +27,7 @@ public void sendMessage(String topicArn, String message) { public void sendMessage(String topicArn, String message, Map attributes) { Map messageAttributes = new HashMap<>(); messageAttributes.put("traceId", getMessageAttributeValue(tracer.getTraceId())); - if (attributes != null && attributes.size() > 0) { + if (attributes != null && !attributes.isEmpty()) { attributes.forEach((key, value) -> messageAttributes.put(key, getMessageAttributeValue(value))); } @@ -37,7 +37,7 @@ public void sendMessage(String topicArn, String message, Map att .topicArn(topicArn) .build(); - PublishResponse response = snsClient.publish(request); + PublishResponse response = snsExtendedClient.publish(request); String[] topicAttributes = topicArn.split(":"); log.info("PUBLISHED: message to {} topic. Published SNS message id: {}", topicAttributes[topicAttributes.length - 1], response.messageId()); }