Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions services/ehr-transfer-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ dependencies {
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:dynamodb'

implementation 'software.amazon.sns:sns-extended-client:2.1.0'
implementation 'com.amazonaws:amazon-sqs-java-extended-client-lib:2.1.2'
implementation 'com.amazonaws:amazon-sqs-java-messaging-lib:2.1.4'
implementation 'software.amazon.payloadoffloading:payloadoffloading-common:2.1.0'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package uk.nhs.prm.repo.ehrtransferservice.configuration;

import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient;
import com.amazon.sqs.javamessaging.ExtendedClientConfiguration;
import jakarta.annotation.PostConstruct;
import jakarta.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
Expand All @@ -9,13 +11,22 @@
import org.springframework.context.annotation.Bean;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.Projection;
import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.*;
import software.amazon.awssdk.services.dynamodb.waiters.DynamoDbWaiter;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
Expand All @@ -28,16 +39,30 @@
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.payloadoffloading.S3BackedPayloadStore;
import software.amazon.payloadoffloading.S3Dao;
import software.amazon.sns.AmazonSNSExtendedClient;
import software.amazon.sns.SNSExtendedClientConfiguration;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static jakarta.jms.Session.CLIENT_ACKNOWLEDGE;
import static uk.nhs.prm.repo.ehrtransferservice.database.enumeration.TransferTableAttribute.*;
import static uk.nhs.prm.repo.ehrtransferservice.database.enumeration.TransferTableAttribute.INBOUND_CONVERSATION_ID;
import static uk.nhs.prm.repo.ehrtransferservice.database.enumeration.TransferTableAttribute.LAYER;
import static uk.nhs.prm.repo.ehrtransferservice.database.enumeration.TransferTableAttribute.NHS_NUMBER;
import static uk.nhs.prm.repo.ehrtransferservice.database.enumeration.TransferTableAttribute.OUTBOUND_CONVERSATION_ID;

@TestConfiguration
public class LocalStackAwsConfig {
private static final Region LOCALSTACK_REGION = Region.EU_WEST_2;

private static final StaticCredentialsProvider LOCALSTACK_CREDENTIALS =
StaticCredentialsProvider.create(AwsBasicCredentials.create("LSIAQAAAAAAVNCBMPNSG", "LSIAQAAAAAAVNCBMPNSG"));

@Autowired
private S3Client s3Client;
Expand Down Expand Up @@ -110,21 +135,7 @@ public class LocalStackAwsConfig {
private static final long DYNAMO_WRITE_CAPACITY_UNITS = 5L;

@Bean
public static SqsClient sqsClient(
@Value("${localstack.url}") String localstackUrl,
@Value("${aws.region}") String region
) throws URISyntaxException {
return SqsClient.builder()
.credentialsProvider(() -> AwsBasicCredentials.create("LSIAQAAAAAAVNCBMPNSG", "LSIAQAAAAAAVNCBMPNSG"))
.endpointOverride(new URI(localstackUrl))
.region(Region.of(region))
.build();
}

@Bean
public JmsListenerContainerFactory<?> myFactory(
ConnectionFactory connectionFactory
) {
public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setSessionAcknowledgeMode(CLIENT_ACKNOWLEDGE);
factory.setConnectionFactory(connectionFactory);
Expand All @@ -140,78 +151,72 @@ public ConnectionFactory connectionFactory() {
return activeMQConnectionFactory;
}

private String failoverUrl() {
return String.format("failover:(%s,%s)%s", amqEndpoint1, amqEndpoint2, randomOption);
}

@Bean
public static S3Client s3Client(
@Value("${localstack.url}") String localstackUrl,
@Value("${aws.region}") String region
) {
public static SnsClient snsClient(@Value("${localstack.url}") String localstackUrl) {
return SnsClient.builder()
.endpointOverride(URI.create(localstackUrl))
.region(LOCALSTACK_REGION)
.credentialsProvider(LOCALSTACK_CREDENTIALS)
.build();
}

@Bean
public static SqsClient sqsClient(@Value("${localstack.url}") String localstackUrl){
return SqsClient.builder()
.endpointOverride(URI.create(localstackUrl))
.region(LOCALSTACK_REGION)
.credentialsProvider(LOCALSTACK_CREDENTIALS)
.build();
}

@Bean
public static S3Client s3Client(@Value("${localstack.url}") String localstackUrl) {
return S3Client.builder()
.endpointOverride(URI.create(localstackUrl))
.region(Region.of(region))
.credentialsProvider(StaticCredentialsProvider.create(new AwsCredentials() {
@Override
public String accessKeyId() {
return "LSIAQAAAAAAVNCBMPNSG";
}

@Override
public String secretAccessKey() {
return "LSIAQAAAAAAVNCBMPNSG";
}
}))
.region(LOCALSTACK_REGION)
.credentialsProvider(LOCALSTACK_CREDENTIALS)
.serviceConfiguration(
S3Configuration.builder()
.pathStyleAccessEnabled(true)
.build())
.build();
}

private String failoverUrl() {
return String.format("failover:(%s,%s)%s", amqEndpoint1, amqEndpoint2, randomOption);
@Bean
public static AmazonSNSExtendedClient snsExtendedClient(
SnsClient snsClient,
S3Client s3Client,
@Value("${aws.sqsLargeMessageBucketName}") String sqsLargeMessageBucketName
) {
return new AmazonSNSExtendedClient(
snsClient,
new SNSExtendedClientConfiguration().withPayloadSupportEnabled(s3Client, sqsLargeMessageBucketName),
new S3BackedPayloadStore(new S3Dao(s3Client), sqsLargeMessageBucketName)
);
}

@Bean
public static SnsClient snsClient(
@Value("${localstack.url}") String localstackUrl,
@Value("${aws.region}") String region
public static AmazonSQSExtendedClient sqsExtendedClient(
SqsClient sqsClient,
S3Client s3Client,
@Value("${aws.sqsLargeMessageBucketName}") String sqsLargeMessageBucketName
) {
return SnsClient.builder()
.endpointOverride(URI.create(localstackUrl))
.region(Region.of(region))
.credentialsProvider(StaticCredentialsProvider.create(new AwsCredentials() {
@Override
public String accessKeyId() {
return "LSIAQAAAAAAVNCBMPNSG";
}

@Override
public String secretAccessKey() {
return "LSIAQAAAAAAVNCBMPNSG";
}
}))
.build();
return new AmazonSQSExtendedClient(
sqsClient,
new ExtendedClientConfiguration()
.withPayloadSupportEnabled(s3Client, sqsLargeMessageBucketName, true));
}

@Bean
public static DynamoDbClient dynamoDbClient(
@Value("${localstack.url}") String localstackUrl,
@Value("${aws.region}") String region
) {
public static DynamoDbClient dynamoDbClient(@Value("${localstack.url}") String localstackUrl) {
return DynamoDbClient.builder()
.endpointOverride(URI.create(localstackUrl))
.region(Region.of(region))
.credentialsProvider(
StaticCredentialsProvider.create(new AwsCredentials() {
@Override
public String accessKeyId() {
return "LSIAQAAAAAAVNCBMPNSG";
}

@Override
public String secretAccessKey() {
return "LSIAQAAAAAAVNCBMPNSG";
}
}))
.region(LOCALSTACK_REGION)
.credentialsProvider(LOCALSTACK_CREDENTIALS)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package uk.nhs.prm.repo.ehrtransferservice.handler;

import org.springframework.beans.factory.annotation.Qualifier;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.PurgeQueueRequest;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -38,6 +39,7 @@ public class NegativeAcknowledgmentHandlingIntegrationTest {
TransferService transferService;

@Autowired
@Qualifier("sqsClient")
private SqsClient sqs;

@Value("${activemq.inboundQueue}")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package uk.nhs.prm.repo.ehrtransferservice.parsers;

import org.junit.jupiter.api.BeforeEach;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.bean.override.mockito.MockitoBean;
import software.amazon.awssdk.services.sqs.SqsClient;
Expand Down Expand Up @@ -50,6 +51,7 @@
@ContextConfiguration(classes = LocalStackAwsConfig.class)
public class ParserBrokerIntegrationTest {
@Autowired
@Qualifier("sqsClient")
private SqsClient sqs;

@Autowired
Expand All @@ -58,9 +60,6 @@ public class ParserBrokerIntegrationTest {
@Autowired
TransferTrackerDbUtility transferTrackerDbUtility;

@MockitoBean
private EhrRepoService ehrRepoService;

@Value("${activemq.inboundQueue}")
private String inboundQueue;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;


public final class TestDataLoaderUtility {
private TestDataLoaderUtility() { }

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
environment=${NHS_ENVIRONMENT:local}
log_level=DEBUG
aws.region=${AWS_REGION:eu-west-2}
aws.defaultRegion=eu-west-2
aws.sessionToken=${AWS_SESSION_TOKEN:dummy-session-token}
localstack.url=${LOCALSTACK_URL:http://localhost:4566}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package uk.nhs.prm.repo.ehrtransferservice.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.sns.AmazonSNSExtendedClient;
import software.amazon.sns.SNSExtendedClientConfiguration;

@Configuration
public class SnsExtendedClientConfiguration {
@Value("${aws.sqsLargeMessageBucketName}")
private String bucketName;

@Bean
public AmazonSNSExtendedClient snsExtendedClient(SnsClient snsClient, S3Client s3) {
var snsExtendedClientConfiguration = new SNSExtendedClientConfiguration()
.withPayloadSupportEnabled(s3, bucketName);
return new AmazonSNSExtendedClient(snsClient, snsExtendedClientConfiguration);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package uk.nhs.prm.repo.ehrtransferservice.config;

import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient;
import com.amazon.sqs.javamessaging.ExtendedClientConfiguration;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.sqs.SqsClient;

@Configuration
@RequiredArgsConstructor
public class SqsExtendedClientConfiguration {
@Value("${aws.sqsLargeMessageBucketName}")
private String bucketName;

@Bean
public AmazonSQSExtendedClient sqsExtendedClient(SqsClient sqsClient, S3Client s3) {
var extendedClientConfiguration = new ExtendedClientConfiguration()
.withPayloadSupportEnabled(s3, bucketName, true);
return new AmazonSQSExtendedClient(sqsClient, extendedClientConfiguration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.MessageAttributeValue;
import software.amazon.awssdk.services.sns.model.PublishRequest;
import software.amazon.awssdk.services.sns.model.PublishResponse;
import software.amazon.sns.AmazonSNSExtendedClient;
import uk.nhs.prm.repo.ehrtransferservice.logging.Tracer;

import java.util.HashMap;
Expand All @@ -17,7 +17,7 @@
@Slf4j
@RequiredArgsConstructor
public class MessagePublisher {
private final SnsClient snsClient;
private final AmazonSNSExtendedClient snsExtendedClient;
private final Tracer tracer;

public void sendMessage(String topicArn, String message) {
Expand All @@ -27,7 +27,7 @@ public void sendMessage(String topicArn, String message) {
public void sendMessage(String topicArn, String message, Map<String, String> attributes) {
Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
messageAttributes.put("traceId", getMessageAttributeValue(tracer.getTraceId()));
if (attributes != null && attributes.size() > 0) {
if (attributes != null && !attributes.isEmpty()) {
attributes.forEach((key, value) -> messageAttributes.put(key, getMessageAttributeValue(value)));
}

Expand All @@ -37,7 +37,7 @@ public void sendMessage(String topicArn, String message, Map<String, String> att
.topicArn(topicArn)
.build();

PublishResponse response = snsClient.publish(request);
PublishResponse response = snsExtendedClient.publish(request);
String[] topicAttributes = topicArn.split(":");
log.info("PUBLISHED: message to {} topic. Published SNS message id: {}", topicAttributes[topicAttributes.length - 1], response.messageId());
}
Expand Down
Loading