Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ module "mesh_acknowledge" {
EVENT_PUBLISHER_DLQ_URL = module.sqs_event_publisher_errors.sqs_queue_url
EVENT_PUBLISHER_EVENT_BUS_ARN = aws_cloudwatch_event_bus.main.arn
MOCK_MESH_BUCKET = module.s3bucket_non_pii_data.bucket
SSM_MESH_PREFIX = "${local.ssm_mesh_prefix}"
SSM_SENDERS_PREFIX = "${local.ssm_senders_prefix}"
SSM_MESH_PREFIX = local.ssm_mesh_prefix
SSM_SENDERS_PREFIX = local.ssm_senders_prefix
USE_MESH_MOCK = var.enable_mock_mesh ? "true" : "false"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,16 @@ module "mesh_download" {
log_subscription_role_arn = local.acct.log_subscription_role_arn

lambda_env_vars = {
DOWNLOAD_METRIC_NAME = "mesh-download-successful-downloads"
DOWNLOAD_METRIC_NAMESPACE = "dl-mesh-download"
ENVIRONMENT = var.environment
EVENT_PUBLISHER_DLQ_URL = module.sqs_event_publisher_errors.sqs_queue_url
EVENT_PUBLISHER_EVENT_BUS_ARN = aws_cloudwatch_event_bus.main.arn
PII_BUCKET = module.s3bucket_pii_data.bucket
SSM_MESH_PREFIX = "${local.ssm_mesh_prefix}"
SSM_SENDERS_PREFIX = "${local.ssm_senders_prefix}"
USE_MESH_MOCK = var.enable_mock_mesh ? "true" : "false"
DOWNLOAD_METRIC_NAME = "mesh-download-successful-downloads"
DUPLICATE_DOWNLOAD_METRIC_NAME = "mesh-duplicate-downloads"
DOWNLOAD_METRIC_NAMESPACE = "dl-mesh-download"
ENVIRONMENT = var.environment
EVENT_PUBLISHER_DLQ_URL = module.sqs_event_publisher_errors.sqs_queue_url
EVENT_PUBLISHER_EVENT_BUS_ARN = aws_cloudwatch_event_bus.main.arn
PII_BUCKET = module.s3bucket_pii_data.bucket
SSM_MESH_PREFIX = local.ssm_mesh_prefix
SSM_SENDERS_PREFIX = local.ssm_senders_prefix
USE_MESH_MOCK = var.enable_mock_mesh ? "true" : "false"
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ module "mesh_poll" {
MAXIMUM_RUNTIME_MILLISECONDS = "240000" # 4 minutes (Lambda has 5 min timeout)
POLLING_METRIC_NAME = "mesh-poll-successful-polls"
POLLING_METRIC_NAMESPACE = "dl-mesh-poll"
SSM_MESH_PREFIX = "${local.ssm_mesh_prefix}"
SSM_SENDERS_PREFIX = "${local.ssm_senders_prefix}"
SSM_MESH_PREFIX = local.ssm_mesh_prefix
SSM_SENDERS_PREFIX = local.ssm_senders_prefix
USE_MESH_MOCK = var.enable_mock_mesh ? "true" : "false"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ module "report_sender" {
EVENT_PUBLISHER_DLQ_URL = module.sqs_event_publisher_errors.sqs_queue_url
EVENT_PUBLISHER_EVENT_BUS_ARN = aws_cloudwatch_event_bus.main.arn
MOCK_MESH_BUCKET = module.s3bucket_non_pii_data.bucket
SSM_MESH_PREFIX = "${local.ssm_mesh_prefix}"
SSM_SENDERS_PREFIX = "${local.ssm_senders_prefix}"
SSM_MESH_PREFIX = local.ssm_mesh_prefix
SSM_SENDERS_PREFIX = local.ssm_senders_prefix
USE_MESH_MOCK = var.enable_mock_mesh ? "true" : "false"
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
"""Tests for DocumentStore"""
import pytest
from unittest.mock import Mock
from mesh_download.document_store import DocumentStore, IntermediaryBodyStoreError
from botocore.exceptions import ClientError
from mesh_download.document_store import DocumentStore, IntermediaryBodyStoreError, DocumentAlreadyExistsError


def make_client_error(code):
"""Helper to build a botocore ClientError with a given error code"""
return ClientError(
{'Error': {'Code': code, 'Message': 'test'}},
'PutObject'
)


class TestDocumentStore:
Expand All @@ -21,20 +30,41 @@ def test_store_document_success(self):
store = DocumentStore(config)

result = store.store_document(
sender_id='SENDER_001',
message_reference='ref_123',
sender_id='SENDER-001',
message_reference='ref-123',
mesh_message_id='mesh-456',
content=b'test content'
)

assert result == 'document-reference/SENDER_001_ref_123'
assert result == 'document-reference/SENDER-001/ref-123_mesh-456'
mock_s3_client.put_object.assert_called_once_with(
Bucket='test-pii-bucket',
Key='document-reference/SENDER_001_ref_123',
Body=b'test content'
Key='document-reference/SENDER-001/ref-123_mesh-456',
Body=b'test content',
IfNoneMatch='*'
)

def test_store_document_s3_failure_raises_error(self):
"""Raises IntermediaryBodyStoreError when S3 put_object fails"""
"""Raises IntermediaryBodyStoreError when S3 put_object fails with a non-HTTP error"""
mock_s3_client = Mock()
mock_s3_client.put_object.side_effect = make_client_error('InternalError')

config = Mock()
config.s3_client = mock_s3_client
config.transactional_data_bucket = 'test-pii-bucket'

store = DocumentStore(config)

with pytest.raises(IntermediaryBodyStoreError):
store.store_document(
sender_id='SENDER-001',
message_reference='ref-123',
mesh_message_id='mesh-456',
content=b'test content'
)

def test_store_document_raises_error_on_non_200_response(self):
"""Raises IntermediaryBodyStoreError when S3 returns a non-200 HTTP status"""
mock_s3_client = Mock()
mock_s3_client.put_object.return_value = {
'ResponseMetadata': {'HTTPStatusCode': 500}
Expand All @@ -48,7 +78,27 @@ def test_store_document_s3_failure_raises_error(self):

with pytest.raises(IntermediaryBodyStoreError):
store.store_document(
sender_id='SENDER_001',
message_reference='ref_123',
sender_id='SENDER-001',
message_reference='ref-123',
mesh_message_id='mesh-456',
content=b'test content'
)

def test_store_document_precondition_failed_raises_document_already_exists(self):
"""Raises DocumentAlreadyExistsError when S3 returns PreconditionFailed (object already exists)"""
mock_s3_client = Mock()
mock_s3_client.put_object.side_effect = make_client_error('PreconditionFailed')

config = Mock()
config.s3_client = mock_s3_client
config.transactional_data_bucket = 'test-pii-bucket'

store = DocumentStore(config)

with pytest.raises(DocumentAlreadyExistsError, match='document-reference/SENDER-001/ref-123_mesh-456'):
store.store_document(
sender_id='SENDER-001',
message_reference='ref-123',
mesh_message_id='mesh-456',
content=b'test content'
)
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def setup_mocks():
mock_config.mesh_client = Mock()

mock_processor = Mock()
mock_processor.process_sqs_message = Mock()
mock_processor.process_sqs_message = Mock(return_value='downloaded')

return (
mock_context,
Expand Down Expand Up @@ -149,9 +149,9 @@ def test_handler_partial_batch_failure(self, mock_processor_class, mock_config_c

# Make second message fail
mock_processor.process_sqs_message.side_effect = [
None,
'downloaded',
Exception("Test error"),
None
'downloaded'
]

event = create_sqs_event(num_records=3)
Expand Down
85 changes: 70 additions & 15 deletions lambdas/mesh-download/mesh_download/__tests__/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from datetime import datetime, timezone
from pydantic import ValidationError
from mesh_download.errors import MeshMessageNotFound
from mesh_download.document_store import DocumentAlreadyExistsError


def setup_mocks():
Expand All @@ -19,6 +20,7 @@ def setup_mocks():
# Set up default config attributes
config.mesh_client = Mock()
config.download_metric = Mock()
config.duplicate_download_metric = Mock()
config.s3_client = Mock()
config.environment = 'development'
config.transactional_data_bucket = 'test-pii-bucket'
Expand Down Expand Up @@ -48,9 +50,9 @@ def create_valid_cloud_event():
'traceparent': '00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01',
'dataschema': 'https://notify.nhs.uk/cloudevents/schemas/digital-letters/2025-10-draft/data/digital-letters-mesh-inbox-message-received-data.schema.json',
'data': {
'meshMessageId': 'test_message_123',
'senderId': 'TEST_SENDER',
'messageReference': 'ref_001'
'meshMessageId': 'test-message-123',
'senderId': 'TEST-SENDER',
'messageReference': 'ref-001'
}
}

Expand Down Expand Up @@ -99,6 +101,7 @@ def test_processor_initialization_calls_mesh_handshake(self):
log=log,
mesh_client=config.mesh_client,
download_metric=config.download_metric,
duplicate_download_metric=config.duplicate_download_metric,
document_store=document_store,
event_publisher=event_publisher
)
Expand All @@ -115,7 +118,7 @@ def test_process_sqs_message_success(self, mock_datetime):
fixed_time = datetime(2025, 11, 19, 15, 30, 45, tzinfo=timezone.utc)
mock_datetime.now.return_value = fixed_time

document_store.store_document.return_value = 'document-reference/SENDER_001_ref_001'
document_store.store_document.return_value = 'document-reference/SENDER-001/ref-001_test-message-123'

event_publisher.send_events.return_value = []

Expand All @@ -124,6 +127,7 @@ def test_process_sqs_message_success(self, mock_datetime):
log=log,
mesh_client=config.mesh_client,
download_metric=config.download_metric,
duplicate_download_metric=config.duplicate_download_metric,
document_store=document_store,
event_publisher=event_publisher
)
Expand All @@ -133,15 +137,17 @@ def test_process_sqs_message_success(self, mock_datetime):

sqs_record = create_sqs_record()

processor.process_sqs_message(sqs_record)
outcome = processor.process_sqs_message(sqs_record)

config.mesh_client.retrieve_message.assert_called_once_with('test_message_123')
assert outcome == 'downloaded'
config.mesh_client.retrieve_message.assert_called_once_with('test-message-123')

mesh_message.read.assert_called_once()

document_store.store_document.assert_called_once_with(
sender_id='TEST_SENDER',
message_reference='ref_001',
sender_id='TEST-SENDER',
message_reference='ref-001',
mesh_message_id='test-message-123',
content=b'Test message content'
)

Expand Down Expand Up @@ -172,9 +178,9 @@ def test_process_sqs_message_success(self, mock_datetime):

# Verify CloudEvent data payload
event_data = published_event['data']
assert event_data['senderId'] == 'TEST_SENDER'
assert event_data['messageReference'] == 'ref_001'
assert event_data['messageUri'] == 's3://test-pii-bucket/document-reference/SENDER_001_ref_001'
assert event_data['senderId'] == 'TEST-SENDER'
assert event_data['messageReference'] == 'ref-001'
assert event_data['messageUri'] == 's3://test-pii-bucket/document-reference/SENDER-001/ref-001_test-message-123'
assert set(event_data.keys()) == {'senderId', 'messageReference', 'messageUri', 'meshMessageId'}

def test_process_sqs_message_validation_failure(self):
Expand All @@ -188,6 +194,7 @@ def test_process_sqs_message_validation_failure(self):
log=log,
mesh_client=config.mesh_client,
download_metric=config.download_metric,
duplicate_download_metric=config.duplicate_download_metric,
document_store=document_store,
event_publisher=event_publisher
)
Expand All @@ -212,6 +219,7 @@ def test_process_sqs_message_missing_mesh_message_id(self):
log=log,
mesh_client=config.mesh_client,
download_metric=config.download_metric,
duplicate_download_metric=config.duplicate_download_metric,
document_store=document_store,
event_publisher=event_publisher
)
Expand Down Expand Up @@ -239,17 +247,18 @@ def test_download_and_store_message_not_found(self):
log=log,
mesh_client=config.mesh_client,
download_metric=config.download_metric,
duplicate_download_metric=config.duplicate_download_metric,
document_store=document_store,
event_publisher=event_publisher
)

config.mesh_client.retrieve_message.return_value = None
sqs_record = create_sqs_record()

with pytest.raises(MeshMessageNotFound, match="MESH message with ID test_message_123 not found"):
with pytest.raises(MeshMessageNotFound, match="MESH message with ID test-message-123 not found"):
processor.process_sqs_message(sqs_record)

config.mesh_client.retrieve_message.assert_called_once_with('test_message_123')
config.mesh_client.retrieve_message.assert_called_once_with('test-message-123')
document_store.store_document.assert_not_called()
event_publisher.send_events.assert_not_called()
config.download_metric.record.assert_not_called()
Expand All @@ -269,6 +278,7 @@ def test_document_store_failure_prevents_ack_and_raises(self):
log=log,
mesh_client=config.mesh_client,
download_metric=config.download_metric,
duplicate_download_metric=config.duplicate_download_metric,
document_store=document_store,
event_publisher=event_publisher
)
Expand Down Expand Up @@ -304,6 +314,7 @@ def test_bucket_selection_with_mesh_mock_enabled(self, mock_datetime):
log=log,
mesh_client=config.mesh_client,
download_metric=config.download_metric,
duplicate_download_metric=config.duplicate_download_metric,
document_store=document_store,
event_publisher=event_publisher
)
Expand All @@ -312,8 +323,9 @@ def test_bucket_selection_with_mesh_mock_enabled(self, mock_datetime):
config.mesh_client.retrieve_message.return_value = mesh_message
sqs_record = create_sqs_record()

processor.process_sqs_message(sqs_record)
outcome = processor.process_sqs_message(sqs_record)

assert outcome == 'downloaded'
# Verify event was published with PII bucket in URI
event_publisher.send_events.assert_called_once()
published_events = event_publisher.send_events.call_args[0][0]
Expand Down Expand Up @@ -342,6 +354,7 @@ def test_bucket_selection_with_mesh_mock_disabled(self, mock_datetime):
log=log,
mesh_client=config.mesh_client,
download_metric=config.download_metric,
duplicate_download_metric=config.duplicate_download_metric,
document_store=document_store,
event_publisher=event_publisher
)
Expand All @@ -350,10 +363,52 @@ def test_bucket_selection_with_mesh_mock_disabled(self, mock_datetime):
config.mesh_client.retrieve_message.return_value = mesh_message
sqs_record = create_sqs_record()

processor.process_sqs_message(sqs_record)
outcome = processor.process_sqs_message(sqs_record)

assert outcome == 'downloaded'
event_publisher.send_events.assert_called_once()
published_events = event_publisher.send_events.call_args[0][0]
assert len(published_events) == 1
message_uri = published_events[0]['data']['messageUri']
assert message_uri.startswith('s3://test-pii-bucket/')

def test_duplicate_delivery_skips_publish_and_acknowledge(self):
"""When S3 signals the object already exists, processor logs a warning, skips publishing and metric, but still acknowledges"""
from mesh_download.processor import MeshDownloadProcessor

config, log, event_publisher, document_store = setup_mocks()
bound_logger = Mock()
log.bind.return_value = bound_logger

document_store.store_document.side_effect = DocumentAlreadyExistsError(
"Document already exists for key: document-reference/TEST-SENDER/ref-001_mesh-123"
)

processor = MeshDownloadProcessor(
config=config,
log=log,
mesh_client=config.mesh_client,
download_metric=config.download_metric,
duplicate_download_metric=config.duplicate_download_metric,
document_store=document_store,
event_publisher=event_publisher
)

mesh_message = create_mesh_message()
config.mesh_client.retrieve_message.return_value = mesh_message
sqs_record = create_sqs_record()

# Should complete without raising
outcome = processor.process_sqs_message(sqs_record)

assert outcome == 'skipped'
bound_logger.warning.assert_called_once()
warning_msg = bound_logger.warning.call_args[0][0]
assert "already stored" in warning_msg
config.duplicate_download_metric.record.assert_called_once()

event_publisher.send_events.assert_not_called()
config.download_metric.record.assert_not_called()

# Acknowledge should still be called to remove message from MESH inbox
mesh_message.acknowledge.assert_called_once()
Loading
Loading