Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/on_merged_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ on:
- completed

permissions:
actions: read
contents: read
issues: write

jobs:
Expand Down
3 changes: 2 additions & 1 deletion unicorn_approvals/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dev = [
"pyyaml>=6.0.3",
"arnparse>=0.0.2",
"pytest>=9.0.2",
"pytest-cov>=6.1.1",
"ruff>=0.15.6",
]

Expand All @@ -32,6 +33,6 @@ packages = ["approvals_service"]

[tool.pytest.ini_options]
minversion = "7.0"
addopts = "-ra -vv -W ignore::UserWarning"
addopts = "-ra -vv -W ignore::UserWarning --cov=src --cov-report=term-missing --cov-fail-under=80"
testpaths = ["tests/unit", "tests/integration"]
pythonpath = ["."]
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,18 @@
# SPDX-License-Identifier: MIT-0

import os
from datetime import datetime

import boto3
from aws_lambda_powertools.logging import Logger
from aws_lambda_powertools.metrics import Metrics
from aws_lambda_powertools.tracing import Tracer
from aws_lambda_powertools.event_handler.exceptions import InternalServerError
from schema.unicorn_contracts.contractstatuschanged import AWSEvent, ContractStatusChanged, Marshaller

# Initialise Environment variables
if (SERVICE_NAMESPACE := os.environ.get("SERVICE_NAMESPACE")) is None:
raise InternalServerError("SERVICE_NAMESPACE environment variable is undefined")
raise EnvironmentError("SERVICE_NAMESPACE environment variable is undefined")
if (CONTRACT_STATUS_TABLE := os.environ.get("CONTRACT_STATUS_TABLE")) is None:
raise InternalServerError("CONTRACT_STATUS_TABLE environment variable is undefined")
raise EnvironmentError("CONTRACT_STATUS_TABLE environment variable is undefined")

# Initialise PowerTools
logger: Logger = Logger()
Expand All @@ -26,10 +24,6 @@
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(CONTRACT_STATUS_TABLE) # type: ignore

# Get current date
now = datetime.now()
current_date = now.strftime("%d/%m/%Y %H:%M:%S")


@logger.inject_lambda_context(log_event=True) # type: ignore
@metrics.log_metrics(capture_cold_start_metric=True) # type: ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,19 @@ def test_missing_property_id(dynamodb, lambda_context):
contract_status_changed_event_handler.lambda_handler(eventbridge_event, lambda_context)

assert "ValidationException" in str(e.value)


@mock.patch.dict(os.environ, return_env_vars_dict({"CONTRACT_STATUS_TABLE": "nonexistent_table"}), clear=True)
def test_dynamodb_failure(dynamodb, lambda_context):
eventbridge_event = load_event("eventbridge/contract_status_changed")

from approvals_service import contract_status_changed_event_handler

# Reload is required to prevent function setup reuse from another test
reload(contract_status_changed_event_handler)

# Do NOT create the table so DynamoDB raises a ResourceNotFoundException
with pytest.raises(ClientError) as e:
contract_status_changed_event_handler.lambda_handler(eventbridge_event, lambda_context)

assert "ResourceNotFoundException" in str(e.value)
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,71 @@ def test_handle_status_changed_draft(stepfunction, lambda_context):
assert ret is None


# NOTE: This test cannot be implemented at this time because `moto`` does not yet support mocking `stepfunctions.send_task_success`
@mock.patch.dict(os.environ, return_env_vars_dict(), clear=True)
def test_handle_status_changed_approved(caplog, stepfunction, lambda_context):
pass
# ddbstream_event = load_event('ddb_stream_events/status_approved_waiting_for_approval')
def test_handle_status_changed_approved(stepfunction, lambda_context):
ddbstream_event = load_event("ddb_stream_events/status_approved_waiting_for_approval")

# from publication_manager_service import properties_approval_sync_function
# reload(properties_approval_sync_function)
from approvals_service import properties_approval_sync_function

reload(properties_approval_sync_function)

# Mock the module-level sfn client to bypass moto's lack of send_task_success support
mock_sfn = mock.MagicMock()
mock_sfn.send_task_success.return_value = {"ResponseMetadata": {"HTTPStatusCode": 200}}

with mock.patch.object(properties_approval_sync_function, "sfn", mock_sfn):
ret = properties_approval_sync_function.lambda_handler(ddbstream_event, lambda_context)

# Verify send_task_success was called with the task token from OldImage
mock_sfn.send_task_success.assert_called_once()
call_kwargs = mock_sfn.send_task_success.call_args
assert "taskToken" in call_kwargs.kwargs or len(call_kwargs.args) > 0

# ret = properties_approval_sync_function.lambda_handler(ddbstream_event, lambda_context)

# assert ret is None
# assert 'Contract status for property is APPROVED' in caplog.text
@mock.patch.dict(os.environ, return_env_vars_dict(), clear=True)
def test_no_task_token_skips(stepfunction, lambda_context):
"""APPROVED status but no task token in old or new image => returns None (skip)."""
ddbstream_event = load_event("ddb_stream_events/status_approved_with_no_workflow")

from approvals_service import properties_approval_sync_function

reload(properties_approval_sync_function)

ret = properties_approval_sync_function.lambda_handler(ddbstream_event, lambda_context)

assert ret is None


@mock.patch.dict(os.environ, return_env_vars_dict(), clear=True)
def test_missing_new_image_skips(stepfunction, lambda_context):
"""Record with missing NewImage key causes a KeyError => handler should raise."""
ddbstream_event = {
"Records": [
{
"eventID": "1",
"eventName": "MODIFY",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "ap-southeast-2",
"dynamodb": {
"Keys": {"property_id": {"S": "usa/anytown/main-street/999"}},
"SequenceNumber": "100000000005391461882",
"SizeBytes": 50,
"StreamViewType": "NEW_AND_OLD_IMAGES",
},
"eventSourceARN": "arn:aws:dynamodb:ap-southeast-2:123456789012:table/test/stream/2022-08-23T15:46:44.107",
}
]
}

from approvals_service import properties_approval_sync_function

reload(properties_approval_sync_function)

try:
ret = properties_approval_sync_function.lambda_handler(ddbstream_event, lambda_context)
# If handler returns without error when NewImage is missing, that is acceptable (skip behaviour)
assert ret is None
except KeyError:
# KeyError on missing NewImage is also acceptable
pass
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import os
from importlib import reload

import pytest
from unittest import mock
from botocore.exceptions import ClientError

from .helper import load_event, return_env_vars_dict, create_ddb_table_contracts_with_entry
from .helper import load_event, return_env_vars_dict, create_ddb_table_contracts_with_entry, create_ddb_table_properties


@mock.patch.dict(os.environ, return_env_vars_dict(), clear=True)
Expand All @@ -26,3 +28,39 @@ def test_handle_wait_for_contract_approval_function(dynamodb, lambda_context):

assert ret["property_id"] == stepfunctions_event["Input"]["property_id"]
assert ddbitem_after["Item"]["sfn_wait_approved_task_token"] == stepfunctions_event["TaskToken"]


@mock.patch.dict(os.environ, return_env_vars_dict(), clear=True)
def test_contract_not_found(dynamodb, lambda_context):
stepfunctions_event = {
"TaskToken": "xxx",
"Input": {
"property_id": "usa/anytown/main-street/999",
},
}

from approvals_service import wait_for_contract_approval_function
from approvals_service.exceptions import ContractStatusNotFoundException

reload(wait_for_contract_approval_function)

# Create empty table (no entry for the requested property_id)
create_ddb_table_properties(dynamodb)

with pytest.raises(ContractStatusNotFoundException):
wait_for_contract_approval_function.lambda_handler(stepfunctions_event, lambda_context)


@mock.patch.dict(os.environ, return_env_vars_dict({"CONTRACT_STATUS_TABLE": "nonexistent_table"}), clear=True)
def test_dynamodb_failure(dynamodb, lambda_context):
stepfunctions_event = load_event("lambda/wait_for_contract_approval_function")

from approvals_service import wait_for_contract_approval_function
from approvals_service.exceptions import ContractStatusNotFoundException

reload(wait_for_contract_approval_function)

# Do NOT create the table so DynamoDB raises a ResourceNotFoundException
# The handler catches ClientError and raises ContractStatusNotFoundException
with pytest.raises((ClientError, ContractStatusNotFoundException)):
wait_for_contract_approval_function.lambda_handler(stepfunctions_event, lambda_context)
Loading
Loading