From 78a3e6d4f122315bbfade259c08b442a366f9e1b Mon Sep 17 00:00:00 2001 From: lapenna-bjss Date: Mon, 16 Mar 2026 10:56:51 +0000 Subject: [PATCH 1/7] CCM-15297: Update EventPublisher to handle retries --- utils/py-utils/dl_utils/event_publisher.py | 198 +++++++++++++-------- 1 file changed, 123 insertions(+), 75 deletions(-) diff --git a/utils/py-utils/dl_utils/event_publisher.py b/utils/py-utils/dl_utils/event_publisher.py index b533ccba1..c6bdee43c 100644 --- a/utils/py-utils/dl_utils/event_publisher.py +++ b/utils/py-utils/dl_utils/event_publisher.py @@ -6,15 +6,23 @@ import json import logging +import time from typing import List, Dict, Any, Optional, Literal, Callable from uuid import uuid4 import boto3 +from botocore.config import Config from botocore.exceptions import ClientError from pydantic import ValidationError DlqReason = Literal['INVALID_EVENT', 'EVENTBRIDGE_FAILURE'] MAX_BATCH_SIZE = 10 +MAX_PUBLISHER_RETRIES = 3 +TRANSIENT_ERROR_CODES = { + 'ThrottlingException', + 'InternalFailure', + 'ServiceUnavailable' +} class EventPublisher: @@ -44,7 +52,10 @@ def __init__( self.event_bus_arn = event_bus_arn self.dlq_url = dlq_url self.logger = logger or logging.getLogger(__name__) - self.events_client = events_client or boto3.client('events') + self.events_client = events_client or boto3.client( + 'events', + config=Config(retries={'max_attempts': 3, 'mode': 'standard'}) + ) self.sqs_client = sqs_client or boto3.client('sqs') def _validate_cloud_event(self, event: Dict[str, Any], validator: Callable[..., Any]) -> tuple[bool, Optional[str]]: @@ -54,9 +65,70 @@ def _validate_cloud_event(self, event: Dict[str, Any], validator: Callable[..., try: validator(**event) return (True, None) - except Exception as e: + except ValidationError as e: return (False, str(e)) + def _classify_failed_entries( + self, + response: Dict[str, Any], + events: List[Dict[str, Any]] + ) -> tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: + transient = [] + permanent = [] + + for entry, event in zip(response.get("Entries", []), events): + error_code = entry.get("ErrorCode") + if not error_code: + continue + + if error_code in TRANSIENT_ERROR_CODES: + transient.append(event) + else: + permanent.append(event) + + return transient, permanent + + def _send_batch_with_retry( + self, batch: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: + """ + Send a single batch to EventBridge with retries for transient errors. + Returns a list of events that permanently failed. + """ + events_to_retry = batch + + for attempt in range(MAX_PUBLISHER_RETRIES): + entries = [ + { + "Source": event["source"], + "DetailType": event["type"], + "Detail": json.dumps(event), + "EventBusName": self.event_bus_arn, + } + for event in events_to_retry + ] + + try: + response = self.events_client.put_events(Entries=entries) + + transient, permanent = self._classify_failed_entries( + response, events_to_retry + ) + + if not transient: + return permanent + + if attempt == MAX_PUBLISHER_RETRIES - 1: + return transient + permanent + + events_to_retry = transient + time.sleep(2 ** attempt) + + except ClientError: + return events_to_retry + + return events_to_retry + def _send_to_event_bridge(self, events: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Send events to EventBridge in batches. @@ -82,56 +154,62 @@ def _send_to_event_bridge(self, events: List[Dict[str, Any]]) -> List[Dict[str, } ) - try: - entries = [ - { - 'Source': event['source'], - 'DetailType': event['type'], - 'Detail': json.dumps(event), - 'EventBusName': self.event_bus_arn - } - for event in batch - ] + batch_failures = self._send_batch_with_retry(batch) - response = self.events_client.put_events(Entries=entries) + if batch_failures: + for event in batch_failures: + self.logger.warning( + 'Event failed to send to EventBridge', + extra={'event_id': event.get('id')} + ) + failed_events.extend(batch_failures) - failed_count = response.get('FailedEntryCount', 0) - success_count = len(batch) - failed_count + return failed_events - self.logger.info( - 'EventBridge batch sent', - extra={ - 'batch_size': len(batch), - 'failed_entry_count': failed_count, - 'successful_count': success_count + def _build_dlq_entries( + self, + events: List[Dict[str, Any]], + reason: DlqReason + ) -> tuple[List[Dict[str, Any]], Dict[str, Any]]: + """Build SQS batch entries for the DLQ and a mapping of entry IDs to events""" + id_to_event_map = {} + entries = [] + for event in events: + entry_id = str(uuid4()) + id_to_event_map[entry_id] = event + entries.append({ + 'Id': entry_id, + 'MessageBody': json.dumps(event), + 'MessageAttributes': { + 'DlqReason': { + 'DataType': 'String', + 'StringValue': reason } - ) - - # Track failed entries - if failed_count > 0 and 'Entries' in response: - for idx, entry in enumerate(response['Entries']): - if 'ErrorCode' in entry: - self.logger.warning( - 'Event failed to send to EventBridge', - extra={ - 'error_code': entry.get('ErrorCode'), - 'error_message': entry.get('ErrorMessage'), - 'event_id': batch[idx].get('id') - } - ) - failed_events.append(batch[idx]) + } + }) + return entries, id_to_event_map - except ClientError as error: + def _extract_failed_dlq_events( + self, + response: Dict[str, Any], + id_to_event_map: Dict[str, Any] + ) -> List[Dict[str, Any]]: + """Extract events that failed to send to the DLQ from a send_message_batch response.""" + failed = [] + for failed_entry in response.get('Failed', []): + entry_id = failed_entry.get('Id') + if entry_id and entry_id in id_to_event_map: + failed_event = id_to_event_map[entry_id] self.logger.warning( - 'EventBridge send error', + 'Event failed to send to DLQ', extra={ - 'error': str(error), - 'batch_size': len(batch) + 'error_code': failed_entry.get('Code'), + 'error_message': failed_entry.get('Message'), + 'event_id': failed_event.get('id') } ) - failed_events.extend(batch) - - return failed_events + failed.append(failed_event) + return failed def _send_to_dlq( self, @@ -154,44 +232,14 @@ def _send_to_dlq( for i in range(0, len(events), MAX_BATCH_SIZE): batch = events[i:i + MAX_BATCH_SIZE] - id_to_event_map = {} - - entries = [] - for event in batch: - entry_id = str(uuid4()) - id_to_event_map[entry_id] = event - entries.append({ - 'Id': entry_id, - 'MessageBody': json.dumps(event), - 'MessageAttributes': { - 'DlqReason': { - 'DataType': 'String', - 'StringValue': reason - } - } - }) + entries, id_to_event_map = self._build_dlq_entries(batch, reason) try: response = self.sqs_client.send_message_batch( QueueUrl=self.dlq_url, Entries=entries ) - - # Track failed DLQ sends - if 'Failed' in response: - for failed_entry in response['Failed']: - entry_id = failed_entry.get('Id') - if entry_id and entry_id in id_to_event_map: - failed_event = id_to_event_map[entry_id] - self.logger.warning( - 'Event failed to send to DLQ', - extra={ - 'error_code': failed_entry.get('Code'), - 'error_message': failed_entry.get('Message'), - 'event_id': failed_event.get('id') - } - ) - failed_dlqs.append(failed_event) + failed_dlqs.extend(self._extract_failed_dlq_events(response, id_to_event_map)) except ClientError as error: self.logger.warning( From c9abd2ff0eb774903e97e815cdc517927dac587c Mon Sep 17 00:00:00 2001 From: lapenna-bjss Date: Tue, 24 Mar 2026 10:10:54 +0000 Subject: [PATCH 2/7] CCM-15297: Add duplicate metric and update unit tests --- .../dl/module_lambda_mesh_download.tf | 19 ++--- .../__tests__/test_document_store.py | 68 ++++++++++++++--- .../mesh_download/__tests__/test_processor.py | 75 ++++++++++++++++--- lambdas/mesh-download/mesh_download/config.py | 13 ++++ .../mesh_download/document_store.py | 30 ++++++-- .../mesh-download/mesh_download/handler.py | 1 + .../mesh-download/mesh_download/processor.py | 40 ++++++---- 7 files changed, 195 insertions(+), 51 deletions(-) diff --git a/infrastructure/terraform/components/dl/module_lambda_mesh_download.tf b/infrastructure/terraform/components/dl/module_lambda_mesh_download.tf index 9717ac5f9..b1494c73e 100644 --- a/infrastructure/terraform/components/dl/module_lambda_mesh_download.tf +++ b/infrastructure/terraform/components/dl/module_lambda_mesh_download.tf @@ -37,15 +37,16 @@ module "mesh_download" { log_subscription_role_arn = local.acct.log_subscription_role_arn lambda_env_vars = { - DOWNLOAD_METRIC_NAME = "mesh-download-successful-downloads" - DOWNLOAD_METRIC_NAMESPACE = "dl-mesh-download" - ENVIRONMENT = var.environment - EVENT_PUBLISHER_DLQ_URL = module.sqs_event_publisher_errors.sqs_queue_url - EVENT_PUBLISHER_EVENT_BUS_ARN = aws_cloudwatch_event_bus.main.arn - PII_BUCKET = module.s3bucket_pii_data.bucket - SSM_MESH_PREFIX = "${local.ssm_mesh_prefix}" - SSM_SENDERS_PREFIX = "${local.ssm_senders_prefix}" - USE_MESH_MOCK = var.enable_mock_mesh ? "true" : "false" + DOWNLOAD_METRIC_NAME = "mesh-download-successful-downloads" + DUPLICATE_DOWNLOAD_METRIC_NAME = "mesh-duplicate-downloads" + DOWNLOAD_METRIC_NAMESPACE = "dl-mesh-download" + ENVIRONMENT = var.environment + EVENT_PUBLISHER_DLQ_URL = module.sqs_event_publisher_errors.sqs_queue_url + EVENT_PUBLISHER_EVENT_BUS_ARN = aws_cloudwatch_event_bus.main.arn + PII_BUCKET = module.s3bucket_pii_data.bucket + SSM_MESH_PREFIX = "${local.ssm_mesh_prefix}" + SSM_SENDERS_PREFIX = "${local.ssm_senders_prefix}" + USE_MESH_MOCK = var.enable_mock_mesh ? "true" : "false" } } diff --git a/lambdas/mesh-download/mesh_download/__tests__/test_document_store.py b/lambdas/mesh-download/mesh_download/__tests__/test_document_store.py index 9126c3dad..3af8b44f3 100644 --- a/lambdas/mesh-download/mesh_download/__tests__/test_document_store.py +++ b/lambdas/mesh-download/mesh_download/__tests__/test_document_store.py @@ -1,7 +1,16 @@ """Tests for DocumentStore""" import pytest from unittest.mock import Mock -from mesh_download.document_store import DocumentStore, IntermediaryBodyStoreError +from botocore.exceptions import ClientError +from mesh_download.document_store import DocumentStore, IntermediaryBodyStoreError, DocumentAlreadyExistsError + + +def make_client_error(code): + """Helper to build a botocore ClientError with a given error code""" + return ClientError( + {'Error': {'Code': code, 'Message': 'test'}}, + 'PutObject' + ) class TestDocumentStore: @@ -21,20 +30,41 @@ def test_store_document_success(self): store = DocumentStore(config) result = store.store_document( - sender_id='SENDER_001', - message_reference='ref_123', + sender_id='SENDER-001', + message_reference='ref-123', + mesh_message_id='mesh-456', content=b'test content' ) - assert result == 'document-reference/SENDER_001_ref_123' + assert result == 'document-reference/SENDER-001/ref-123_mesh-456' mock_s3_client.put_object.assert_called_once_with( Bucket='test-pii-bucket', - Key='document-reference/SENDER_001_ref_123', - Body=b'test content' + Key='document-reference/SENDER-001/ref-123_mesh-456', + Body=b'test content', + IfNoneMatch='*' ) def test_store_document_s3_failure_raises_error(self): - """Raises IntermediaryBodyStoreError when S3 put_object fails""" + """Raises IntermediaryBodyStoreError when S3 put_object fails with a non-HTTP error""" + mock_s3_client = Mock() + mock_s3_client.put_object.side_effect = make_client_error('InternalError') + + config = Mock() + config.s3_client = mock_s3_client + config.transactional_data_bucket = 'test-pii-bucket' + + store = DocumentStore(config) + + with pytest.raises(IntermediaryBodyStoreError): + store.store_document( + sender_id='SENDER-001', + message_reference='ref-123', + mesh_message_id='mesh-456', + content=b'test content' + ) + + def test_store_document_raises_error_on_non_200_response(self): + """Raises IntermediaryBodyStoreError when S3 returns a non-200 HTTP status""" mock_s3_client = Mock() mock_s3_client.put_object.return_value = { 'ResponseMetadata': {'HTTPStatusCode': 500} @@ -48,7 +78,27 @@ def test_store_document_s3_failure_raises_error(self): with pytest.raises(IntermediaryBodyStoreError): store.store_document( - sender_id='SENDER_001', - message_reference='ref_123', + sender_id='SENDER-001', + message_reference='ref-123', + mesh_message_id='mesh-456', + content=b'test content' + ) + + def test_store_document_precondition_failed_raises_document_already_exists(self): + """Raises DocumentAlreadyExistsError when S3 returns PreconditionFailed (object already exists)""" + mock_s3_client = Mock() + mock_s3_client.put_object.side_effect = make_client_error('PreconditionFailed') + + config = Mock() + config.s3_client = mock_s3_client + config.transactional_data_bucket = 'test-pii-bucket' + + store = DocumentStore(config) + + with pytest.raises(DocumentAlreadyExistsError, match='document-reference/SENDER-001/ref-123_mesh-456'): + store.store_document( + sender_id='SENDER-001', + message_reference='ref-123', + mesh_message_id='mesh-456', content=b'test content' ) diff --git a/lambdas/mesh-download/mesh_download/__tests__/test_processor.py b/lambdas/mesh-download/mesh_download/__tests__/test_processor.py index ecfc997be..4be751058 100644 --- a/lambdas/mesh-download/mesh_download/__tests__/test_processor.py +++ b/lambdas/mesh-download/mesh_download/__tests__/test_processor.py @@ -9,6 +9,7 @@ from datetime import datetime, timezone from pydantic import ValidationError from mesh_download.errors import MeshMessageNotFound +from mesh_download.document_store import DocumentAlreadyExistsError def setup_mocks(): @@ -19,6 +20,7 @@ def setup_mocks(): # Set up default config attributes config.mesh_client = Mock() config.download_metric = Mock() + config.duplicate_download_metric = Mock() config.s3_client = Mock() config.environment = 'development' config.transactional_data_bucket = 'test-pii-bucket' @@ -48,9 +50,9 @@ def create_valid_cloud_event(): 'traceparent': '00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01', 'dataschema': 'https://notify.nhs.uk/cloudevents/schemas/digital-letters/2025-10-draft/data/digital-letters-mesh-inbox-message-received-data.schema.json', 'data': { - 'meshMessageId': 'test_message_123', - 'senderId': 'TEST_SENDER', - 'messageReference': 'ref_001' + 'meshMessageId': 'test-message-123', + 'senderId': 'TEST-SENDER', + 'messageReference': 'ref-001' } } @@ -99,6 +101,7 @@ def test_processor_initialization_calls_mesh_handshake(self): log=log, mesh_client=config.mesh_client, download_metric=config.download_metric, + duplicate_download_metric=config.duplicate_download_metric, document_store=document_store, event_publisher=event_publisher ) @@ -115,7 +118,7 @@ def test_process_sqs_message_success(self, mock_datetime): fixed_time = datetime(2025, 11, 19, 15, 30, 45, tzinfo=timezone.utc) mock_datetime.now.return_value = fixed_time - document_store.store_document.return_value = 'document-reference/SENDER_001_ref_001' + document_store.store_document.return_value = 'document-reference/SENDER-001/ref-001_test-message-123' event_publisher.send_events.return_value = [] @@ -124,6 +127,7 @@ def test_process_sqs_message_success(self, mock_datetime): log=log, mesh_client=config.mesh_client, download_metric=config.download_metric, + duplicate_download_metric=config.duplicate_download_metric, document_store=document_store, event_publisher=event_publisher ) @@ -135,13 +139,14 @@ def test_process_sqs_message_success(self, mock_datetime): processor.process_sqs_message(sqs_record) - config.mesh_client.retrieve_message.assert_called_once_with('test_message_123') + config.mesh_client.retrieve_message.assert_called_once_with('test-message-123') mesh_message.read.assert_called_once() document_store.store_document.assert_called_once_with( - sender_id='TEST_SENDER', - message_reference='ref_001', + sender_id='TEST-SENDER', + message_reference='ref-001', + mesh_message_id='test-message-123', content=b'Test message content' ) @@ -172,9 +177,9 @@ def test_process_sqs_message_success(self, mock_datetime): # Verify CloudEvent data payload event_data = published_event['data'] - assert event_data['senderId'] == 'TEST_SENDER' - assert event_data['messageReference'] == 'ref_001' - assert event_data['messageUri'] == 's3://test-pii-bucket/document-reference/SENDER_001_ref_001' + assert event_data['senderId'] == 'TEST-SENDER' + assert event_data['messageReference'] == 'ref-001' + assert event_data['messageUri'] == 's3://test-pii-bucket/document-reference/SENDER-001/ref-001_test-message-123' assert set(event_data.keys()) == {'senderId', 'messageReference', 'messageUri', 'meshMessageId'} def test_process_sqs_message_validation_failure(self): @@ -188,6 +193,7 @@ def test_process_sqs_message_validation_failure(self): log=log, mesh_client=config.mesh_client, download_metric=config.download_metric, + duplicate_download_metric=config.duplicate_download_metric, document_store=document_store, event_publisher=event_publisher ) @@ -212,6 +218,7 @@ def test_process_sqs_message_missing_mesh_message_id(self): log=log, mesh_client=config.mesh_client, download_metric=config.download_metric, + duplicate_download_metric=config.duplicate_download_metric, document_store=document_store, event_publisher=event_publisher ) @@ -239,6 +246,7 @@ def test_download_and_store_message_not_found(self): log=log, mesh_client=config.mesh_client, download_metric=config.download_metric, + duplicate_download_metric=config.duplicate_download_metric, document_store=document_store, event_publisher=event_publisher ) @@ -246,10 +254,10 @@ def test_download_and_store_message_not_found(self): config.mesh_client.retrieve_message.return_value = None sqs_record = create_sqs_record() - with pytest.raises(MeshMessageNotFound, match="MESH message with ID test_message_123 not found"): + with pytest.raises(MeshMessageNotFound, match="MESH message with ID test-message-123 not found"): processor.process_sqs_message(sqs_record) - config.mesh_client.retrieve_message.assert_called_once_with('test_message_123') + config.mesh_client.retrieve_message.assert_called_once_with('test-message-123') document_store.store_document.assert_not_called() event_publisher.send_events.assert_not_called() config.download_metric.record.assert_not_called() @@ -269,6 +277,7 @@ def test_document_store_failure_prevents_ack_and_raises(self): log=log, mesh_client=config.mesh_client, download_metric=config.download_metric, + duplicate_download_metric=config.duplicate_download_metric, document_store=document_store, event_publisher=event_publisher ) @@ -304,6 +313,7 @@ def test_bucket_selection_with_mesh_mock_enabled(self, mock_datetime): log=log, mesh_client=config.mesh_client, download_metric=config.download_metric, + duplicate_download_metric=config.duplicate_download_metric, document_store=document_store, event_publisher=event_publisher ) @@ -342,6 +352,7 @@ def test_bucket_selection_with_mesh_mock_disabled(self, mock_datetime): log=log, mesh_client=config.mesh_client, download_metric=config.download_metric, + duplicate_download_metric=config.duplicate_download_metric, document_store=document_store, event_publisher=event_publisher ) @@ -357,3 +368,43 @@ def test_bucket_selection_with_mesh_mock_disabled(self, mock_datetime): assert len(published_events) == 1 message_uri = published_events[0]['data']['messageUri'] assert message_uri.startswith('s3://test-pii-bucket/') + + def test_duplicate_delivery_skips_publish_and_acknowledge(self): + """When S3 signals the object already exists, processor logs a warning, skips publishing and metric, but still acknowledges""" + from mesh_download.processor import MeshDownloadProcessor + + config, log, event_publisher, document_store = setup_mocks() + bound_logger = Mock() + log.bind.return_value = bound_logger + + document_store.store_document.side_effect = DocumentAlreadyExistsError( + "Document already exists for key: document-reference/TEST-SENDER/ref-001_mesh-123" + ) + + processor = MeshDownloadProcessor( + config=config, + log=log, + mesh_client=config.mesh_client, + download_metric=config.download_metric, + duplicate_download_metric=config.duplicate_download_metric, + document_store=document_store, + event_publisher=event_publisher + ) + + mesh_message = create_mesh_message() + config.mesh_client.retrieve_message.return_value = mesh_message + sqs_record = create_sqs_record() + + # Should complete without raising + processor.process_sqs_message(sqs_record) + + bound_logger.warning.assert_called_once() + warning_msg = bound_logger.warning.call_args[0][0] + assert "already stored" in warning_msg + config.duplicate_download_metric.record.assert_called_once() + + event_publisher.send_events.assert_not_called() + config.download_metric.record.assert_not_called() + + # Acknowledge should still be called to remove message from MESH inbox + mesh_message.acknowledge.assert_called_once() diff --git a/lambdas/mesh-download/mesh_download/config.py b/lambdas/mesh-download/mesh_download/config.py index 48dd1a102..e8091e3ad 100644 --- a/lambdas/mesh-download/mesh_download/config.py +++ b/lambdas/mesh-download/mesh_download/config.py @@ -9,6 +9,7 @@ "ssm_mesh_prefix": "SSM_MESH_PREFIX", "environment": "ENVIRONMENT", "download_metric_name": "DOWNLOAD_METRIC_NAME", + "duplicate_download_metric_name": "DUPLICATE_DOWNLOAD_METRIC_NAME", "download_metric_namespace": "DOWNLOAD_METRIC_NAMESPACE", "event_publisher_event_bus_arn": "EVENT_PUBLISHER_EVENT_BUS_ARN", "event_publisher_dlq_url": "EVENT_PUBLISHER_DLQ_URL", @@ -28,12 +29,14 @@ def __init__(self, ssm=None, s3_client=None): super().__init__(ssm=ssm, s3_client=s3_client) self.download_metric = None + self.duplicate_download_metric = None def __enter__(self): super().__enter__() # Build download metric self.download_metric = self.build_download_metric() + self.duplicate_download_metric = self.build_duplicate_download_metric() return self @@ -47,6 +50,16 @@ def build_download_metric(self): dimensions={"Environment": self.environment} ) + def build_duplicate_download_metric(self): + """ + Returns a custom metric to record messages that were attempted to be downloaded more than once + """ + return Metric( + name=self.duplicate_download_metric_name, + namespace=self.download_metric_namespace, + dimensions={"Environment": self.environment} + ) + @property def transactional_data_bucket(self): """ diff --git a/lambdas/mesh-download/mesh_download/document_store.py b/lambdas/mesh-download/mesh_download/document_store.py index eb5612a1d..8228b003d 100644 --- a/lambdas/mesh-download/mesh_download/document_store.py +++ b/lambdas/mesh-download/mesh_download/document_store.py @@ -1,10 +1,16 @@ """Module for storing document references in S3""" +from botocore.exceptions import ClientError + class IntermediaryBodyStoreError(Exception): """Error to represent any failure to upload document to intermediate location""" +class DocumentAlreadyExistsError(Exception): + """Raised when a document already exists in S3""" + + class DocumentStoreConfig: """Configuration holder for DocumentStore""" def __init__(self, s3_client, transactional_data_bucket): @@ -18,16 +24,24 @@ class DocumentStore: # pylint: disable=too-few-public-methods def __init__(self, config): self.config = config - def store_document(self, sender_id, message_reference, content): + def store_document(self, sender_id, message_reference, mesh_message_id, content): """store document reference in S3""" - s3_key = f"document-reference/{sender_id}_{message_reference}" - - s3_response = self.config.s3_client.put_object( - Bucket=self.config.transactional_data_bucket, - Key=s3_key, - Body=content - ) + s3_key = f"document-reference/{sender_id}/{message_reference}_{mesh_message_id}" + + try: + s3_response = self.config.s3_client.put_object( + Bucket=self.config.transactional_data_bucket, + Key=s3_key, + Body=content, + IfNoneMatch='*' + ) + except ClientError as e: + if e.response['Error']['Code'] == 'PreconditionFailed': + raise DocumentAlreadyExistsError( + f"Document already exists for key: {s3_key}" + ) from e + raise IntermediaryBodyStoreError(e) from e if s3_response['ResponseMetadata']['HTTPStatusCode'] != 200: raise IntermediaryBodyStoreError(s3_response) diff --git a/lambdas/mesh-download/mesh_download/handler.py b/lambdas/mesh-download/mesh_download/handler.py index 56bbb7367..a42fa5b81 100644 --- a/lambdas/mesh-download/mesh_download/handler.py +++ b/lambdas/mesh-download/mesh_download/handler.py @@ -43,6 +43,7 @@ def handler(event, context): log=log, mesh_client=config.mesh_client, download_metric=config.download_metric, + duplicate_download_metric=config.duplicate_download_metric, document_store=document_store, event_publisher=event_publisher ) diff --git a/lambdas/mesh-download/mesh_download/processor.py b/lambdas/mesh-download/mesh_download/processor.py index e51edcedb..0df000580 100644 --- a/lambdas/mesh-download/mesh_download/processor.py +++ b/lambdas/mesh-download/mesh_download/processor.py @@ -5,6 +5,7 @@ from pydantic import ValidationError from digital_letters_events import MESHInboxMessageDownloaded, MESHInboxMessageReceived from mesh_download.errors import MeshMessageNotFound +from mesh_download.document_store import DocumentAlreadyExistsError class MeshDownloadProcessor: @@ -13,6 +14,7 @@ def __init__(self, **kwargs): self.__log = kwargs['log'] self.__mesh_client = kwargs['mesh_client'] self.__download_metric = kwargs['download_metric'] + self.__duplicate_download_metric = kwargs['duplicate_download_metric'] self.__document_store = kwargs['document_store'] self.__event_publisher = kwargs['event_publisher'] @@ -72,27 +74,39 @@ def _handle_download(self, event, logger): content = message.read() logger.info("Downloaded MESH message content") - uri = self._store_message_content( - sender_id=data.senderId, - message_reference=data.messageReference, - message_content=content, - logger=logger - ) + duplicate = False + try: + uri = self._store_message_content( + sender_id=data.senderId, + message_reference=data.messageReference, + mesh_message_id=data.meshMessageId, + message_content=content, + logger=logger + ) + except DocumentAlreadyExistsError: + logger.warning( + "Message already stored in S3, skipping publish (duplicate delivery)", + mesh_message_id=data.meshMessageId, + message_reference=data.messageReference + ) + duplicate = True + self.__duplicate_download_metric.record(1) - self._publish_downloaded_event( - incoming_event=event, - message_uri=uri - ) + if not duplicate: + self._publish_downloaded_event( + incoming_event=event, + message_uri=uri + ) + self.__download_metric.record(1) message.acknowledge() logger.info("Acknowledged message") - self.__download_metric.record(1) - - def _store_message_content(self, sender_id, message_reference, message_content, logger): + def _store_message_content(self, sender_id, message_reference, mesh_message_id, message_content, logger): s3_key = self.__document_store.store_document( sender_id=sender_id, message_reference=message_reference, + mesh_message_id=mesh_message_id, content=message_content, ) From 0d2623225e879d54429e965962dc90bce847b193 Mon Sep 17 00:00:00 2001 From: lapenna-bjss Date: Tue, 24 Mar 2026 12:07:36 +0000 Subject: [PATCH 3/7] CCM-15297: revert to using Exception --- utils/py-utils/dl_utils/__tests__/test_event_publisher.py | 2 +- utils/py-utils/dl_utils/event_publisher.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/utils/py-utils/dl_utils/__tests__/test_event_publisher.py b/utils/py-utils/dl_utils/__tests__/test_event_publisher.py index 9cdd3bfca..22725485d 100644 --- a/utils/py-utils/dl_utils/__tests__/test_event_publisher.py +++ b/utils/py-utils/dl_utils/__tests__/test_event_publisher.py @@ -167,7 +167,7 @@ def test_should_send_failed_eventbridge_events_to_dlq( mock_events_client.put_events.return_value = { 'FailedEntryCount': 1, 'Entries': [ - {'ErrorCode': 'InternalFailure', 'ErrorMessage': 'Internal error'}, + {'ErrorCode': 'AccessDenied', 'ErrorMessage': 'Access denied'}, {'EventId': 'event-2'} ] } diff --git a/utils/py-utils/dl_utils/event_publisher.py b/utils/py-utils/dl_utils/event_publisher.py index c6bdee43c..b1ba93386 100644 --- a/utils/py-utils/dl_utils/event_publisher.py +++ b/utils/py-utils/dl_utils/event_publisher.py @@ -12,7 +12,6 @@ import boto3 from botocore.config import Config from botocore.exceptions import ClientError -from pydantic import ValidationError DlqReason = Literal['INVALID_EVENT', 'EVENTBRIDGE_FAILURE'] @@ -65,7 +64,7 @@ def _validate_cloud_event(self, event: Dict[str, Any], validator: Callable[..., try: validator(**event) return (True, None) - except ValidationError as e: + except Exception as e: return (False, str(e)) def _classify_failed_entries( From 3410004325fd6a87b86aa6023490181941e4ffc6 Mon Sep 17 00:00:00 2001 From: lapenna-bjss Date: Tue, 24 Mar 2026 16:03:47 +0000 Subject: [PATCH 4/7] CCM-15297: Add component and unit tests --- .../playwright/constants/backend-constants.ts | 1 + .../mesh-poll-download.component.spec.ts | 85 +++++++- .../__tests__/test_event_publisher.py | 190 +++++++++++++++++- utils/py-utils/dl_utils/event_publisher.py | 69 ++++++- 4 files changed, 332 insertions(+), 13 deletions(-) diff --git a/tests/playwright/constants/backend-constants.ts b/tests/playwright/constants/backend-constants.ts index b3f6b9f6f..12406b746 100644 --- a/tests/playwright/constants/backend-constants.ts +++ b/tests/playwright/constants/backend-constants.ts @@ -61,6 +61,7 @@ export const REPORTING_S3_BUCKET_NAME = `nhs-${process.env.AWS_ACCOUNT_ID}-${REG export const PREFIX_DL_FILES = `${CSI}/`; // Cloudwatch +export const MESH_DOWNLOAD_LAMBDA_LOG_GROUP_NAME = `/aws/lambda/${CSI}-mesh-download`; export const PDM_UPLOADER_LAMBDA_LOG_GROUP_NAME = `/aws/lambda/${CSI}-pdm-uploader`; export const PDM_POLL_LAMBDA_LOG_GROUP_NAME = `/aws/lambda/${CSI}-pdm-poll`; export const CORE_NOTIFIER_LAMBDA_LOG_GROUP_NAME = `/aws/lambda/${CSI}-core-notifier`; diff --git a/tests/playwright/digital-letters-component-tests/mesh-poll-download.component.spec.ts b/tests/playwright/digital-letters-component-tests/mesh-poll-download.component.spec.ts index 430b8512b..757b0606b 100644 --- a/tests/playwright/digital-letters-component-tests/mesh-poll-download.component.spec.ts +++ b/tests/playwright/digital-letters-component-tests/mesh-poll-download.component.spec.ts @@ -2,6 +2,7 @@ import { expect, test } from '@playwright/test'; import { ENV, MESH_DOWNLOAD_DLQ_NAME, + MESH_DOWNLOAD_LAMBDA_LOG_GROUP_NAME, MESH_POLL_LAMBDA_NAME, NON_PII_S3_BUCKET_NAME, PII_S3_BUCKET_NAME, @@ -95,7 +96,7 @@ test.describe('Digital Letters - MESH Poll and Download', () => { await expectToPassEventually(async () => { const storedMessage = await downloadFromS3( PII_S3_BUCKET_NAME, - `document-reference/${senderId}_${messageReference}`, + `document-reference/${senderId}/${messageReference}_${meshMessageId}`, ); expect(storedMessage.body).toContain(messageContent); @@ -230,4 +231,86 @@ test.describe('Digital Letters - MESH Poll and Download', () => { expect(receivedEvents.length).toBe(0); }, 15_000); }); + + test('should skip publishing downloaded event and acknowledge message when document already exists in S3', async () => { + test.setTimeout(200_000); + + const meshMessageId = `${Date.now()}_DUPLICATE_${uuidv4().slice(0, 8)}`; + const messageReference = uuidv4(); + const messageContent = JSON.stringify({ + senderId, + messageReference, + testData: 'This is a duplicate test letter content', + timestamp: new Date().toISOString(), + }); + + await uploadMeshMessage(meshMessageId, messageReference, messageContent); + + // Pre-upload the document to the PII bucket to simulate a previously processed message + const documentKey = `document-reference/${senderId}/${messageReference}_${meshMessageId}`; + await uploadToS3('pre-existing content', PII_S3_BUCKET_NAME, documentKey); + + // Publish the MESHInboxMessageReceived event directly, skipping the poll lambda + await eventPublisher.sendEvents( + [ + { + id: uuidv4(), + specversion: '1.0', + source: + '/nhs/england/notify/development/primary/data-plane/digitalletters/mesh', + subject: + 'customer/00000000-0000-0000-0000-000000000000/recipient/00000000-0000-0000-0000-000000000000', + type: 'uk.nhs.notify.digital.letters.mesh.inbox.message.received.v1', + time: new Date().toISOString(), + recordedtime: new Date().toISOString(), + severitynumber: 2, + severitytext: 'INFO', + traceparent: + '00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01', + dataschema: + 'https://notify.nhs.uk/cloudevents/schemas/digital-letters/2025-10-draft/data/digital-letters-mesh-inbox-message-received-data.schema.json', + data: { + meshMessageId, + senderId, + messageReference, + }, + }, + ], + messageMessageReceived, + ); + + await expectToPassEventually(async () => { + const warnLogEntry = await getLogsFromCloudwatch( + MESH_DOWNLOAD_LAMBDA_LOG_GROUP_NAME, + [ + '$.event = "Message already stored in S3, skipping publish (duplicate delivery)"', + `$.mesh_message_id = "${meshMessageId}"`, + ], + ); + expect(warnLogEntry.length).toBeGreaterThanOrEqual(1); + }, 120_000); + + // Assert that no MESHInboxMessageDownloaded event was published + await expectToPassEventually(async () => { + const downloadedEvents = await getLogsFromCloudwatch( + `/aws/vendedlogs/events/event-bus/nhs-${ENV}-dl`, + [ + '$.message_type = "EVENT_RECEIPT"', + '$.details.detail_type = "uk.nhs.notify.digital.letters.mesh.inbox.message.downloaded.v1"', + `$.details.event_detail = "*\\"messageReference\\":\\"${messageReference}\\"*"`, + ], + ); + expect(downloadedEvents.length).toBe(0); + }, 15_000); + + // Assert the MESH message was still acknowledged (deleted from mock inbox) + await expectToPassEventually(async () => { + await expect(async () => { + await downloadFromS3( + NON_PII_S3_BUCKET_NAME, + `mock-mesh/${meshMailboxId}/in/${meshMessageId}`, + ); + }).rejects.toThrow('No objects found'); + }, 60_000); + }); }); diff --git a/utils/py-utils/dl_utils/__tests__/test_event_publisher.py b/utils/py-utils/dl_utils/__tests__/test_event_publisher.py index 22725485d..d8c0a5f3e 100644 --- a/utils/py-utils/dl_utils/__tests__/test_event_publisher.py +++ b/utils/py-utils/dl_utils/__tests__/test_event_publisher.py @@ -1,10 +1,10 @@ import json import pytest -from unittest.mock import Mock, MagicMock, call +from unittest.mock import Mock, MagicMock, call, patch from uuid import uuid4 from botocore.exceptions import ClientError -from dl_utils.event_publisher import EventPublisher +from dl_utils.event_publisher import EventPublisher, MAX_PUBLISHER_RETRIES, TRANSIENT_ERROR_CODES @pytest.fixture @@ -194,11 +194,12 @@ def test_should_send_failed_eventbridge_events_to_dlq( assert dlq_call_args['Entries'][0]['MessageBody'] == json.dumps(valid_cloud_event) assert dlq_call_args['Entries'][0]['MessageAttributes']['DlqReason']['StringValue'] == 'EVENTBRIDGE_FAILURE' + @patch('dl_utils.event_publisher.time.sleep') def test_should_handle_eventbridge_send_error_and_send_all_events_to_dlq( - self, test_config, mock_events_client, mock_sqs_client, + self, mock_sleep, test_config, mock_events_client, mock_sqs_client, valid_cloud_event, valid_cloud_event2, mock_validator): mock_events_client.put_events.side_effect = ClientError( - {'Error': {'Code': 'InternalError', 'Message': 'EventBridge error'}}, + {'Error': {'Code': 'AccessDenied', 'Message': 'EventBridge error'}}, 'PutEvents' ) mock_sqs_client.send_message_batch.return_value = { @@ -210,7 +211,8 @@ def test_should_handle_eventbridge_send_error_and_send_all_events_to_dlq( validator=mock_validator) assert result == [] - assert mock_events_client.put_events.call_count == 1 + assert mock_events_client.put_events.call_count == 1 # permanent error, no retries + mock_sleep.assert_not_called() # Should call DLQ once for all events after EventBridge failure assert mock_sqs_client.send_message_batch.call_count == 1 @@ -338,3 +340,181 @@ def test_should_be_reusable_for_multiple_calls( assert result2 == [] assert mock_events_client.put_events.call_count == 2 + + +class TestRetryBehaviour: + """Tests for transient error retry logic in _send_batch_with_retry.""" + + @patch('dl_utils.event_publisher.time.sleep') + def test_should_retry_on_transient_error_and_succeed( + self, mock_sleep, test_config, mock_events_client, mock_sqs_client, + valid_cloud_event, mock_validator): + """Transient error on first attempt, success on second — no DLQ, put_events called twice.""" + mock_events_client.put_events.side_effect = [ + { + 'FailedEntryCount': 1, + 'Entries': [{'ErrorCode': 'ThrottlingException', 'ErrorMessage': 'Rate exceeded'}], + }, + { + 'FailedEntryCount': 0, + 'Entries': [{'EventId': 'event-1'}], + }, + ] + + publisher = EventPublisher(**test_config) + result = publisher.send_events([valid_cloud_event], validator=mock_validator) + + assert result == [] + assert mock_events_client.put_events.call_count == 2 + mock_sqs_client.send_message_batch.assert_not_called() + assert mock_sleep.call_count == 1 + + mock_logger = test_config['logger'] + mock_logger.info.assert_any_call( + 'Retrying transient failures', + extra={'attempt': 1, 'max_retries': MAX_PUBLISHER_RETRIES, 'retry_count': 1, 'permanent_failure_count': 0} + ) + + @patch('dl_utils.event_publisher.time.sleep') + def test_should_send_to_dlq_after_all_retries_exhausted( + self, mock_sleep, test_config, mock_events_client, mock_sqs_client, + valid_cloud_event, mock_validator): + """Transient error on all MAX_PUBLISHER_RETRIES attempts — event sent to DLQ as EVENTBRIDGE_FAILURE.""" + mock_events_client.put_events.return_value = { + 'FailedEntryCount': 1, + 'Entries': [{'ErrorCode': 'ThrottlingException', 'ErrorMessage': 'Rate exceeded'}], + } + mock_sqs_client.send_message_batch.return_value = {'Successful': []} + + publisher = EventPublisher(**test_config) + result = publisher.send_events([valid_cloud_event], validator=mock_validator) + + assert result == [] + assert mock_events_client.put_events.call_count == MAX_PUBLISHER_RETRIES + assert mock_sqs_client.send_message_batch.call_count == 1 + + dlq_call_args = mock_sqs_client.send_message_batch.call_args[1] + assert len(dlq_call_args['Entries']) == 1 + assert dlq_call_args['Entries'][0]['MessageBody'] == json.dumps(valid_cloud_event) + assert dlq_call_args['Entries'][0]['MessageAttributes']['DlqReason']['StringValue'] == 'EVENTBRIDGE_FAILURE' + + assert mock_sleep.call_count == MAX_PUBLISHER_RETRIES - 1 + + mock_logger = test_config['logger'] + mock_logger.warning.assert_any_call( + 'Retries exhausted, treating remaining transient failures as permanent', + extra={'attempt': MAX_PUBLISHER_RETRIES, 'max_retries': MAX_PUBLISHER_RETRIES, 'transient_failure_count': 1, 'permanent_failure_count': 0} + ) + + @patch('dl_utils.event_publisher.time.sleep') + def test_should_retry_only_transient_events_and_dlq_permanent_failures( + self, mock_sleep, test_config, mock_events_client, mock_sqs_client, + valid_cloud_event, valid_cloud_event2, mock_validator): + """Batch with one transient and one permanent failure on the first attempt. + The transient event is retried and succeeds. The permanent failure goes to DLQ.""" + mock_events_client.put_events.side_effect = [ + { + 'FailedEntryCount': 2, + 'Entries': [ + {'ErrorCode': 'ThrottlingException', 'ErrorMessage': 'Rate exceeded'}, + {'ErrorCode': 'AccessDenied', 'ErrorMessage': 'Not allowed'}, + ], + }, + # Second attempt (retry of event1 only): success + { + 'FailedEntryCount': 0, + 'Entries': [{'EventId': 'event-1'}], + }, + ] + mock_sqs_client.send_message_batch.return_value = {'Successful': []} + + publisher = EventPublisher(**test_config) + result = publisher.send_events( + [valid_cloud_event, valid_cloud_event2], validator=mock_validator + ) + + assert result == [] + assert mock_events_client.put_events.call_count == 2 + + # First call includes both events; second call includes only the retried transient one + first_call_entries = mock_events_client.put_events.call_args_list[0][1]['Entries'] + second_call_entries = mock_events_client.put_events.call_args_list[1][1]['Entries'] + assert len(first_call_entries) == 2 + assert len(second_call_entries) == 1 + assert second_call_entries[0]['Detail'] == json.dumps(valid_cloud_event) + + # The permanently failed event (event2) is sent to DLQ + assert mock_sqs_client.send_message_batch.call_count == 1 + dlq_call_args = mock_sqs_client.send_message_batch.call_args[1] + assert len(dlq_call_args['Entries']) == 1 + assert dlq_call_args['Entries'][0]['MessageBody'] == json.dumps(valid_cloud_event2) + assert dlq_call_args['Entries'][0]['MessageAttributes']['DlqReason']['StringValue'] == 'EVENTBRIDGE_FAILURE' + + assert mock_sleep.call_count == 1 + + @patch('dl_utils.event_publisher.time.sleep') + def test_should_retry_on_transient_client_error_and_succeed( + self, mock_sleep, test_config, mock_events_client, mock_sqs_client, + valid_cloud_event, mock_validator): + """Transient ClientError on first attempt, success on second.""" + mock_events_client.put_events.side_effect = [ + ClientError( + {'Error': {'Code': 'ThrottlingException', 'Message': 'Rate exceeded'}}, + 'PutEvents' + ), + {'FailedEntryCount': 0, 'Entries': [{'EventId': 'event-1'}]}, + ] + + publisher = EventPublisher(**test_config) + result = publisher.send_events([valid_cloud_event], validator=mock_validator) + + assert result == [] + assert mock_events_client.put_events.call_count == 2 + mock_sqs_client.send_message_batch.assert_not_called() + assert mock_sleep.call_count == 1 + + @patch('dl_utils.event_publisher.time.sleep') + def test_should_send_to_dlq_after_transient_client_error_exhausts_retries( + self, mock_sleep, test_config, mock_events_client, mock_sqs_client, + valid_cloud_event, mock_validator): + """Transient ClientError on all attempts. The event is sent to DLQ after retries exhausted.""" + mock_events_client.put_events.side_effect = ClientError( + {'Error': {'Code': 'ThrottlingException', 'Message': 'Rate exceeded'}}, + 'PutEvents' + ) + mock_sqs_client.send_message_batch.return_value = {'Successful': []} + + publisher = EventPublisher(**test_config) + result = publisher.send_events([valid_cloud_event], validator=mock_validator) + + assert result == [] + assert mock_events_client.put_events.call_count == MAX_PUBLISHER_RETRIES + assert mock_sqs_client.send_message_batch.call_count == 1 + + dlq_call_args = mock_sqs_client.send_message_batch.call_args[1] + assert dlq_call_args['Entries'][0]['MessageBody'] == json.dumps(valid_cloud_event) + assert dlq_call_args['Entries'][0]['MessageAttributes']['DlqReason']['StringValue'] == 'EVENTBRIDGE_FAILURE' + + assert mock_sleep.call_count == MAX_PUBLISHER_RETRIES - 1 + + @patch('dl_utils.event_publisher.time.sleep') + def test_should_not_retry_on_permanent_client_error( + self, mock_sleep, test_config, mock_events_client, mock_sqs_client, + valid_cloud_event, mock_validator): + """Non-transient ClientError fails the batch immediately without retrying.""" + mock_events_client.put_events.side_effect = ClientError( + {'Error': {'Code': 'AccessDenied', 'Message': 'Not allowed'}}, + 'PutEvents' + ) + mock_sqs_client.send_message_batch.return_value = {'Successful': []} + + publisher = EventPublisher(**test_config) + result = publisher.send_events([valid_cloud_event], validator=mock_validator) + + assert result == [] + assert mock_events_client.put_events.call_count == 1 + mock_sleep.assert_not_called() + + dlq_call_args = mock_sqs_client.send_message_batch.call_args[1] + assert dlq_call_args['Entries'][0]['MessageBody'] == json.dumps(valid_cloud_event) + assert dlq_call_args['Entries'][0]['MessageAttributes']['DlqReason']['StringValue'] == 'EVENTBRIDGE_FAILURE' diff --git a/utils/py-utils/dl_utils/event_publisher.py b/utils/py-utils/dl_utils/event_publisher.py index b1ba93386..5a4e25fb5 100644 --- a/utils/py-utils/dl_utils/event_publisher.py +++ b/utils/py-utils/dl_utils/event_publisher.py @@ -6,6 +6,7 @@ import json import logging +import random import time from typing import List, Dict, Any, Optional, Literal, Callable from uuid import uuid4 @@ -19,8 +20,12 @@ MAX_PUBLISHER_RETRIES = 3 TRANSIENT_ERROR_CODES = { 'ThrottlingException', + 'TooManyRequestsException', + 'RequestLimitExceeded', + 'ProvisionedThroughputExceededException', 'InternalFailure', - 'ServiceUnavailable' + 'InternalError', + 'ServiceUnavailable', } @@ -95,6 +100,7 @@ def _send_batch_with_retry( Returns a list of events that permanently failed. """ events_to_retry = batch + permanent_failures = [] for attempt in range(MAX_PUBLISHER_RETRIES): entries = [ @@ -114,19 +120,68 @@ def _send_batch_with_retry( response, events_to_retry ) + permanent_failures.extend(permanent) + if not transient: - return permanent + if permanent_failures: + self.logger.warning( + 'Batch completed with failures', + extra={'permanent_failure_count': len(permanent_failures)} + ) + else: + self.logger.info('Batch completed successfully') + return permanent_failures if attempt == MAX_PUBLISHER_RETRIES - 1: - return transient + permanent + self.logger.warning( + 'Retries exhausted, treating remaining transient failures as permanent', + extra={ + 'attempt': attempt + 1, + 'max_retries': MAX_PUBLISHER_RETRIES, + 'transient_failure_count': len(transient), + 'permanent_failure_count': len(permanent_failures), + } + ) + return permanent_failures + transient + self.logger.info( + 'Retrying transient failures', + extra={ + 'attempt': attempt + 1, + 'max_retries': MAX_PUBLISHER_RETRIES, + 'retry_count': len(transient), + 'permanent_failure_count': len(permanent_failures), + } + ) events_to_retry = transient - time.sleep(2 ** attempt) + time.sleep((2 ** attempt) + random.uniform(0, 1)) + + except ClientError as e: + error_code = e.response.get("Error", {}).get("Code") + + if error_code in TRANSIENT_ERROR_CODES and attempt < MAX_PUBLISHER_RETRIES - 1: + self.logger.info( + 'Retrying batch after transient ClientError', + extra={ + 'attempt': attempt + 1, + 'max_retries': MAX_PUBLISHER_RETRIES, + 'error_code': error_code, + 'retry_count': len(events_to_retry), + } + ) + time.sleep((2 ** attempt) + random.uniform(0, 1)) + continue - except ClientError: - return events_to_retry + self.logger.warning( + 'ClientError sending batch to EventBridge, failing entire batch', + extra={ + 'error_code': error_code, + 'batch_size': len(events_to_retry), + } + ) + return permanent_failures + events_to_retry - return events_to_retry + return permanent_failures + events_to_retry def _send_to_event_bridge(self, events: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ From 4748c77e240d199d51e48d086ae5b325d75ba251 Mon Sep 17 00:00:00 2001 From: lapenna-bjss Date: Wed, 25 Mar 2026 15:29:59 +0000 Subject: [PATCH 5/7] CCM-15297: Update logs --- .../mesh_download/__tests__/test_handler.py | 6 +++--- .../mesh_download/__tests__/test_processor.py | 12 ++++++++---- lambdas/mesh-download/mesh_download/handler.py | 6 ++++-- lambdas/mesh-download/mesh_download/processor.py | 4 +++- 4 files changed, 18 insertions(+), 10 deletions(-) diff --git a/lambdas/mesh-download/mesh_download/__tests__/test_handler.py b/lambdas/mesh-download/mesh_download/__tests__/test_handler.py index cf6b2d210..e344c8bb7 100644 --- a/lambdas/mesh-download/mesh_download/__tests__/test_handler.py +++ b/lambdas/mesh-download/mesh_download/__tests__/test_handler.py @@ -15,7 +15,7 @@ def setup_mocks(): mock_config.mesh_client = Mock() mock_processor = Mock() - mock_processor.process_sqs_message = Mock() + mock_processor.process_sqs_message = Mock(return_value='downloaded') return ( mock_context, @@ -149,9 +149,9 @@ def test_handler_partial_batch_failure(self, mock_processor_class, mock_config_c # Make second message fail mock_processor.process_sqs_message.side_effect = [ - None, + 'downloaded', Exception("Test error"), - None + 'downloaded' ] event = create_sqs_event(num_records=3) diff --git a/lambdas/mesh-download/mesh_download/__tests__/test_processor.py b/lambdas/mesh-download/mesh_download/__tests__/test_processor.py index 4be751058..b09b5e232 100644 --- a/lambdas/mesh-download/mesh_download/__tests__/test_processor.py +++ b/lambdas/mesh-download/mesh_download/__tests__/test_processor.py @@ -137,8 +137,9 @@ def test_process_sqs_message_success(self, mock_datetime): sqs_record = create_sqs_record() - processor.process_sqs_message(sqs_record) + outcome = processor.process_sqs_message(sqs_record) + assert outcome == 'downloaded' config.mesh_client.retrieve_message.assert_called_once_with('test-message-123') mesh_message.read.assert_called_once() @@ -322,8 +323,9 @@ def test_bucket_selection_with_mesh_mock_enabled(self, mock_datetime): config.mesh_client.retrieve_message.return_value = mesh_message sqs_record = create_sqs_record() - processor.process_sqs_message(sqs_record) + outcome = processor.process_sqs_message(sqs_record) + assert outcome == 'downloaded' # Verify event was published with PII bucket in URI event_publisher.send_events.assert_called_once() published_events = event_publisher.send_events.call_args[0][0] @@ -361,8 +363,9 @@ def test_bucket_selection_with_mesh_mock_disabled(self, mock_datetime): config.mesh_client.retrieve_message.return_value = mesh_message sqs_record = create_sqs_record() - processor.process_sqs_message(sqs_record) + outcome = processor.process_sqs_message(sqs_record) + assert outcome == 'downloaded' event_publisher.send_events.assert_called_once() published_events = event_publisher.send_events.call_args[0][0] assert len(published_events) == 1 @@ -396,8 +399,9 @@ def test_duplicate_delivery_skips_publish_and_acknowledge(self): sqs_record = create_sqs_record() # Should complete without raising - processor.process_sqs_message(sqs_record) + outcome = processor.process_sqs_message(sqs_record) + assert outcome == 'skipped' bound_logger.warning.assert_called_once() warning_msg = bound_logger.warning.call_args[0][0] assert "already stored" in warning_msg diff --git a/lambdas/mesh-download/mesh_download/handler.py b/lambdas/mesh-download/mesh_download/handler.py index a42fa5b81..d61b0afd1 100644 --- a/lambdas/mesh-download/mesh_download/handler.py +++ b/lambdas/mesh-download/mesh_download/handler.py @@ -21,6 +21,7 @@ def handler(event, context): processed = { 'retrieved': 0, 'downloaded': 0, + 'skipped': 0, 'failed': 0 } @@ -58,8 +59,8 @@ def handler(event, context): continue try: - processor.process_sqs_message(record) - processed['downloaded'] += 1 + outcome = processor.process_sqs_message(record) + processed[outcome] += 1 except Exception as exc: processed['failed'] += 1 @@ -71,6 +72,7 @@ def handler(event, context): log.info("Processed SQS event", retrieved=processed['retrieved'], downloaded=processed['downloaded'], + skipped=processed['skipped'], failed=processed['failed']) return {"batchItemFailures": batch_item_failures} diff --git a/lambdas/mesh-download/mesh_download/processor.py b/lambdas/mesh-download/mesh_download/processor.py index 0df000580..a8d4cf728 100644 --- a/lambdas/mesh-download/mesh_download/processor.py +++ b/lambdas/mesh-download/mesh_download/processor.py @@ -28,7 +28,7 @@ def process_sqs_message(self, sqs_record): logger = self.__log.bind(mesh_message_id=validated_event.data.meshMessageId) logger.info("Processing MESH download request") - self._handle_download(validated_event, logger) + return self._handle_download(validated_event, logger) except Exception as exc: self.__log.error( @@ -102,6 +102,8 @@ def _handle_download(self, event, logger): message.acknowledge() logger.info("Acknowledged message") + return 'skipped' if duplicate else 'downloaded' + def _store_message_content(self, sender_id, message_reference, mesh_message_id, message_content, logger): s3_key = self.__document_store.store_document( sender_id=sender_id, From dc787cddba2d962ecf81d309251c20cd2abe0691 Mon Sep 17 00:00:00 2001 From: simonlabarere Date: Tue, 31 Mar 2026 09:23:25 +0100 Subject: [PATCH 6/7] CCM-15297: Don't use double quotes in TF when they're not necessary --- .../terraform/components/dl/module_lambda_mesh_acknowledge.tf | 4 ++-- .../terraform/components/dl/module_lambda_mesh_download.tf | 4 ++-- .../terraform/components/dl/module_lambda_mesh_poll.tf | 4 ++-- .../terraform/components/dl/module_lambda_report_sender.tf | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/infrastructure/terraform/components/dl/module_lambda_mesh_acknowledge.tf b/infrastructure/terraform/components/dl/module_lambda_mesh_acknowledge.tf index 5a4919534..347509c1a 100644 --- a/infrastructure/terraform/components/dl/module_lambda_mesh_acknowledge.tf +++ b/infrastructure/terraform/components/dl/module_lambda_mesh_acknowledge.tf @@ -40,8 +40,8 @@ module "mesh_acknowledge" { EVENT_PUBLISHER_DLQ_URL = module.sqs_event_publisher_errors.sqs_queue_url EVENT_PUBLISHER_EVENT_BUS_ARN = aws_cloudwatch_event_bus.main.arn MOCK_MESH_BUCKET = module.s3bucket_non_pii_data.bucket - SSM_MESH_PREFIX = "${local.ssm_mesh_prefix}" - SSM_SENDERS_PREFIX = "${local.ssm_senders_prefix}" + SSM_MESH_PREFIX = local.ssm_mesh_prefix + SSM_SENDERS_PREFIX = local.ssm_senders_prefix USE_MESH_MOCK = var.enable_mock_mesh ? "true" : "false" } diff --git a/infrastructure/terraform/components/dl/module_lambda_mesh_download.tf b/infrastructure/terraform/components/dl/module_lambda_mesh_download.tf index b1494c73e..b8dd8e90f 100644 --- a/infrastructure/terraform/components/dl/module_lambda_mesh_download.tf +++ b/infrastructure/terraform/components/dl/module_lambda_mesh_download.tf @@ -44,8 +44,8 @@ module "mesh_download" { EVENT_PUBLISHER_DLQ_URL = module.sqs_event_publisher_errors.sqs_queue_url EVENT_PUBLISHER_EVENT_BUS_ARN = aws_cloudwatch_event_bus.main.arn PII_BUCKET = module.s3bucket_pii_data.bucket - SSM_MESH_PREFIX = "${local.ssm_mesh_prefix}" - SSM_SENDERS_PREFIX = "${local.ssm_senders_prefix}" + SSM_MESH_PREFIX = local.ssm_mesh_prefix + SSM_SENDERS_PREFIX = local.ssm_senders_prefix USE_MESH_MOCK = var.enable_mock_mesh ? "true" : "false" } diff --git a/infrastructure/terraform/components/dl/module_lambda_mesh_poll.tf b/infrastructure/terraform/components/dl/module_lambda_mesh_poll.tf index 25c188fce..a529af955 100644 --- a/infrastructure/terraform/components/dl/module_lambda_mesh_poll.tf +++ b/infrastructure/terraform/components/dl/module_lambda_mesh_poll.tf @@ -45,8 +45,8 @@ module "mesh_poll" { MAXIMUM_RUNTIME_MILLISECONDS = "240000" # 4 minutes (Lambda has 5 min timeout) POLLING_METRIC_NAME = "mesh-poll-successful-polls" POLLING_METRIC_NAMESPACE = "dl-mesh-poll" - SSM_MESH_PREFIX = "${local.ssm_mesh_prefix}" - SSM_SENDERS_PREFIX = "${local.ssm_senders_prefix}" + SSM_MESH_PREFIX = local.ssm_mesh_prefix + SSM_SENDERS_PREFIX = local.ssm_senders_prefix USE_MESH_MOCK = var.enable_mock_mesh ? "true" : "false" } diff --git a/infrastructure/terraform/components/dl/module_lambda_report_sender.tf b/infrastructure/terraform/components/dl/module_lambda_report_sender.tf index cd0c40b6d..0dd74cfe0 100644 --- a/infrastructure/terraform/components/dl/module_lambda_report_sender.tf +++ b/infrastructure/terraform/components/dl/module_lambda_report_sender.tf @@ -42,8 +42,8 @@ module "report_sender" { EVENT_PUBLISHER_DLQ_URL = module.sqs_event_publisher_errors.sqs_queue_url EVENT_PUBLISHER_EVENT_BUS_ARN = aws_cloudwatch_event_bus.main.arn MOCK_MESH_BUCKET = module.s3bucket_non_pii_data.bucket - SSM_MESH_PREFIX = "${local.ssm_mesh_prefix}" - SSM_SENDERS_PREFIX = "${local.ssm_senders_prefix}" + SSM_MESH_PREFIX = local.ssm_mesh_prefix + SSM_SENDERS_PREFIX = local.ssm_senders_prefix USE_MESH_MOCK = var.enable_mock_mesh ? "true" : "false" } From 758fc85ff96a0ca0a562b82734ce21a8d744afb9 Mon Sep 17 00:00:00 2001 From: simonlabarere Date: Tue, 31 Mar 2026 10:08:57 +0100 Subject: [PATCH 7/7] CCM-15297: Fix typecheck failing falling merge --- .../mesh-poll-download.component.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/playwright/digital-letters-component-tests/mesh-poll-download.component.spec.ts b/tests/playwright/digital-letters-component-tests/mesh-poll-download.component.spec.ts index cf258e0eb..ee9b0c602 100644 --- a/tests/playwright/digital-letters-component-tests/mesh-poll-download.component.spec.ts +++ b/tests/playwright/digital-letters-component-tests/mesh-poll-download.component.spec.ts @@ -276,7 +276,7 @@ test.describe('Digital Letters - MESH Poll and Download', () => { }, }, ], - messageMessageReceived, + validateMESHInboxMessageReceived, ); await expectToPassEventually(async () => {