Skip to content

Commit a5457af

Browse files
committed
feat(batch): support FIFO entire batch failure override
1 parent 92c18b0 commit a5457af

2 files changed

Lines changed: 28 additions & 2 deletions

File tree

aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,12 @@ def lambda_handler(event, context: LambdaContext):
6666
None,
6767
)
6868

69-
def __init__(self, model: BatchSqsTypeModel | None = None, skip_group_on_error: bool = False):
69+
def __init__(
70+
self,
71+
model: BatchSqsTypeModel | None = None,
72+
skip_group_on_error: bool = False,
73+
raise_on_entire_batch_failure: bool = True,
74+
):
7075
"""
7176
Initialize the SqsFifoProcessor.
7277
@@ -77,12 +82,15 @@ def __init__(self, model: BatchSqsTypeModel | None = None, skip_group_on_error:
7782
skip_group_on_error: bool
7883
Determines whether to exclusively skip messages from the MessageGroupID that encountered processing failures
7984
Default is False.
85+
raise_on_entire_batch_failure: bool
86+
Raise an exception when the entire batch has failed processing.
87+
When set to False, partial failures are reported in the response.
8088
8189
"""
8290
self._skip_group_on_error: bool = skip_group_on_error
8391
self._current_group_id = None
8492
self._failed_group_ids: set[str] = set()
85-
super().__init__(EventType.SQS, model)
93+
super().__init__(EventType.SQS, model, raise_on_entire_batch_failure)
8694

8795
def _process_record(self, record):
8896
self._current_group_id = record.get("attributes", {}).get("MessageGroupId")

tests/functional/batch/required_dependencies/test_utilities_batch.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,24 @@ def lambda_handler(event, context):
499499
assert result["batchItemFailures"][1]["itemIdentifier"] == third_record.message_id
500500

501501

502+
def test_sqs_fifo_batch_processor_not_raise_when_entire_batch_fails(sqs_event_fifo_factory, record_handler):
503+
first_record = SQSRecord(sqs_event_fifo_factory("fail"))
504+
second_record = SQSRecord(sqs_event_fifo_factory("success"))
505+
event = {"Records": [first_record.raw_event, second_record.raw_event]}
506+
507+
processor = SqsFifoPartialProcessor(raise_on_entire_batch_failure=False)
508+
509+
@batch_processor(record_handler=record_handler, processor=processor)
510+
def lambda_handler(event, context):
511+
return processor.response()
512+
513+
response = lambda_handler(event, {})
514+
515+
assert len(response["batchItemFailures"]) == 2
516+
assert response["batchItemFailures"][0]["itemIdentifier"] == first_record.message_id
517+
assert response["batchItemFailures"][1]["itemIdentifier"] == second_record.message_id
518+
519+
502520
def test_sqs_fifo_batch_processor_middleware_with_skip_group_on_error(sqs_event_fifo_factory, record_handler):
503521
# GIVEN a batch of 5 records with 3 different MessageGroupID
504522
first_record = SQSRecord(sqs_event_fifo_factory("success", "1"))

0 commit comments

Comments
 (0)