From 0c59219288be16ed1efb9ab12fd62ebd74fb8678 Mon Sep 17 00:00:00 2001 From: "Tobias.Mikula" Date: Tue, 7 Apr 2026 11:50:49 +0200 Subject: [PATCH 1/3] Adding auto-approve/merge dependabot tool to project. --- .github/dependabot.yml | 33 +++++++++++++++++++++++++ .github/workflows/dependabot_auto.yml | 35 +++++++++++++++++++++++++++ 2 files changed, 68 insertions(+) create mode 100644 .github/dependabot.yml create mode 100644 .github/workflows/dependabot_auto.yml diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..68e0c70 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,33 @@ +version: 2 +updates: + - package-ecosystem: "github-actions" + directory: "/" + target-branch: "master" + schedule: + interval: "weekly" + day: "sunday" + labels: + - "auto update" + - "infrastructure" + - "no RN" + open-pull-requests-limit: 3 + commit-message: + prefix: "chore" + include: "scope" + + - package-ecosystem: "pip" + directory: "/" + target-branch: "master" + schedule: + interval: "weekly" + day: "sunday" + labels: + - "auto update" + - "infrastructure" + - "no RN" + open-pull-requests-limit: 3 + commit-message: + prefix: "chore" + include: "scope" + allow: + - dependency-type: "direct" diff --git a/.github/workflows/dependabot_auto.yml b/.github/workflows/dependabot_auto.yml new file mode 100644 index 0000000..7243c96 --- /dev/null +++ b/.github/workflows/dependabot_auto.yml @@ -0,0 +1,35 @@ +# Auto-merge will only occur if all required status checks on the branch are successful. +name: Dependabot Auto-Approve and Auto-Merge +on: + pull_request: + types: [opened, synchronize] + +permissions: + contents: write + pull-requests: write + +jobs: + dependabot: + name: Auto-approve and auto-merge Dependabot PRs + runs-on: ubuntu-latest + if: github.event.pull_request.user.login == 'dependabot[bot]' && github.repository == 'AbsaOSS/EventGate' + steps: + - name: Dependabot metadata + id: metadata + uses: dependabot/fetch-metadata@21025c705c08248db411dc16f3619e6b5f9ea21a + with: + github-token: "${{ secrets.GITHUB_TOKEN }}" + + - name: Approve a PR + run: gh pr review --approve "$PR_URL" + env: + PR_URL: ${{ github.event.pull_request.html_url }} + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + - name: Enable auto-merge for Dependabot PRs + if: startsWith(steps.metadata.outputs.update-type, 'version-update') || startsWith(steps.metadata.outputs.update-type, 'security') + run: gh pr merge --auto --squash "$PR_URL" + continue-on-error: true + env: + PR_URL: ${{ github.event.pull_request.html_url }} + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} From 7a5f4ffb930273a01dcb43b1e5f0ac15b88d4bbd Mon Sep 17 00:00:00 2001 From: "Tobias.Mikula" Date: Tue, 7 Apr 2026 12:24:27 +0200 Subject: [PATCH 2/3] Improving project code cov > 95 % and setting the threshold to 90 %. --- DEVELOPER.md | 4 +- Makefile | 2 +- tests/unit/handlers/test_handler_api.py | 8 ++ tests/unit/handlers/test_handler_token.py | 94 ++++++++++++++++------ tests/unit/utils/test_trace_logging.py | 12 +++ tests/unit/writers/test_writer_postgres.py | 26 ++++++ 6 files changed, 120 insertions(+), 26 deletions(-) diff --git a/DEVELOPER.md b/DEVELOPER.md index d299673..d1399a3 100644 --- a/DEVELOPER.md +++ b/DEVELOPER.md @@ -139,10 +139,10 @@ pytest tests/unit/writers/test_writer_eventbridge.py::test_write_success Code coverage is collected using the pytest-cov coverage tool. To run the tests and collect coverage information, use the following command: ```shell -pytest --cov=. -v tests/unit/ --cov-fail-under=80 --cov-report=html +pytest --cov=. -v tests/unit/ --cov-fail-under=90 --cov-report=html ``` -This will execute all tests in the tests directory and generate a code coverage report with missing line details and enforce a minimum 80% threshold. +This will execute all tests in the tests directory and generate a code coverage report with missing line details and enforce a minimum 90% threshold. Open the HTML coverage report: ```shell diff --git a/Makefile b/Makefile index 9f80c1a..9f88233 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ PYTHON := .venv/bin/python PY_FILES := $(shell git ls-files '*.py') MIN_PYLINT_SCORE = 9.5 -MIN_COVERAGE = 80 +MIN_COVERAGE = 90 black: ## Run Black formatter $(PYTHON) -m black . diff --git a/tests/unit/handlers/test_handler_api.py b/tests/unit/handlers/test_handler_api.py index c73f29f..2467d34 100644 --- a/tests/unit/handlers/test_handler_api.py +++ b/tests/unit/handlers/test_handler_api.py @@ -36,6 +36,14 @@ def test_load_api_definition_file_not_found(): handler.with_api_definition_loaded() +def test_load_api_definition_empty_file_raises(): + """Test that RuntimeError is raised when api.yaml is empty.""" + with patch("builtins.open", mock_open(read_data="")): + handler = HandlerApi() + with pytest.raises(RuntimeError, match="API specification initialization failed"): + handler.with_api_definition_loaded() + + def test_get_api_returns_correct_response(): """Test get_api returns correct response structure.""" mock_content = "openapi: 3.0.0" diff --git a/tests/unit/handlers/test_handler_token.py b/tests/unit/handlers/test_handler_token.py index 8307b6a..5ac89d1 100644 --- a/tests/unit/handlers/test_handler_token.py +++ b/tests/unit/handlers/test_handler_token.py @@ -91,34 +91,22 @@ def always_fail(_token, _key, **_kwargs): assert "Verification failed for all public keys" in str(exc.value) -def test_extract_token_empty(): - assert "" == HandlerToken.extract_token({}) - - -def test_extract_token_direct_bearer_header(): - token = HandlerToken.extract_token({"Bearer": " tok123 "}) - assert "tok123" == token - - ## Checking the freshness of public keys -def test_refresh_keys_not_needed_when_keys_fresh(token_handler): - """Keys loaded less than 28 minutes ago should not trigger refresh.""" - token_handler._last_loaded_at = datetime.now(timezone.utc) - timedelta(minutes=10) +@pytest.mark.parametrize( + "age_minutes, expected_called", + [ + (10, False), + (29, True), + ], +) +def test_refresh_keys_age_check(token_handler, age_minutes, expected_called): + """Fresh keys should not trigger refresh; stale keys should.""" + token_handler._last_loaded_at = datetime.now(timezone.utc) - timedelta(minutes=age_minutes) token_handler.public_keys = [Mock(spec=RSAPublicKey)] with patch.object(token_handler, "with_public_keys_queried") as mock_load: token_handler._refresh_keys_if_needed() - mock_load.assert_not_called() - - -def test_refresh_keys_triggered_when_keys_stale(token_handler): - """Keys loaded more than 28 minutes ago should trigger refresh.""" - token_handler._last_loaded_at = datetime.now(timezone.utc) - timedelta(minutes=29) - token_handler.public_keys = [Mock(spec=RSAPublicKey)] - - with patch.object(token_handler, "with_public_keys_queried") as mock_load: - token_handler._refresh_keys_if_needed() - mock_load.assert_called_once() + assert expected_called == mock_load.called def test_refresh_keys_handles_load_failure_gracefully(token_handler): @@ -156,3 +144,63 @@ def test_handler_token_custom_ssl_ca_bundle_path(): config = {"token_public_keys_url": "https://example.com/keys", "ssl_ca_bundle": "/path/to/custom/ca-bundle.pem"} handler = HandlerToken(config) assert "/path/to/custom/ca-bundle.pem" == handler.ssl_ca_bundle + + +def test_refresh_keys_skipped_when_never_loaded(token_handler): + """When _last_loaded_at is None, refresh should return immediately without loading.""" + assert token_handler._last_loaded_at is None + + with patch.object(token_handler, "with_public_keys_queried") as mock_load: + token_handler._refresh_keys_if_needed() + mock_load.assert_not_called() + + +def test_with_public_keys_queried_multi_key_response(token_handler): + """Response with `keys` list should load all keys.""" + key_b64 = "dGVzdA==" + mock_response = Mock() + mock_response.json.return_value = {"keys": [{"key": key_b64}, {"key": key_b64}]} + + with patch("requests.get", return_value=mock_response): + with patch( + "cryptography.hazmat.primitives.serialization.load_der_public_key", + return_value=Mock(spec=RSAPublicKey), + ): + result = token_handler.with_public_keys_queried() + assert result is token_handler + assert 2 == len(token_handler.public_keys) + + +def test_with_public_keys_queried_no_keys_raises(token_handler): + """Response without any keys should raise RuntimeError.""" + mock_response = Mock() + mock_response.json.return_value = {"other": "data"} + + with patch("requests.get", return_value=mock_response): + with pytest.raises(RuntimeError, match="Token public key initialization failed"): + token_handler.with_public_keys_queried() + + +def test_with_public_keys_queried_request_exception_raises(token_handler): + """Network error during key fetch should raise RuntimeError.""" + import requests as req + + with patch("requests.get", side_effect=req.ConnectionError("timeout")): + with pytest.raises(RuntimeError, match="Token public key initialization failed"): + token_handler.with_public_keys_queried() + + +@pytest.mark.parametrize( + "headers, expected", + [ + ({}, ""), + ({"Authorization": 12345}, ""), + ({"Authorization": " "}, ""), + ({"Authorization": "Basic abc123"}, ""), + ({"Bearer": " tok123 "}, "tok123"), + ({"Authorization": "Bearer mytoken"}, "mytoken"), + ], +) +def test_extract_token(headers, expected): + """extract_token should return the bearer token or empty string for all header variants.""" + assert expected == HandlerToken.extract_token(headers) diff --git a/tests/unit/utils/test_trace_logging.py b/tests/unit/utils/test_trace_logging.py index df1cd29..ecaefbf 100644 --- a/tests/unit/utils/test_trace_logging.py +++ b/tests/unit/utils/test_trace_logging.py @@ -17,11 +17,23 @@ from unittest.mock import MagicMock from src.utils.logging_levels import TRACE_LEVEL +from src.utils.trace_logging import log_payload_at_trace import src.writers.writer_eventbridge as writer_eventbridge import src.writers.writer_kafka as writer_kafka import src.writers.writer_postgres as writer_postgres +def test_log_payload_skipped_when_trace_not_enabled(): + """log_payload_at_trace should return without logging when TRACE level is not enabled.""" + logger = MagicMock() + logger.isEnabledFor.return_value = False + + log_payload_at_trace(logger, "TestWriter", "test.topic", {"key": "value"}) + + logger.isEnabledFor.assert_called_once_with(TRACE_LEVEL) + logger.trace.assert_not_called() + + def test_trace_eventbridge(caplog): # Set trace level on the module's logger writer_eventbridge.logger.setLevel(TRACE_LEVEL) diff --git a/tests/unit/writers/test_writer_postgres.py b/tests/unit/writers/test_writer_postgres.py index c3fc0dc..b06e46a 100644 --- a/tests/unit/writers/test_writer_postgres.py +++ b/tests/unit/writers/test_writer_postgres.py @@ -372,3 +372,29 @@ def client(self, service_name, region_name): writer = WriterPostgres({}) healthy, msg = writer.check_health() assert not healthy and "host not configured" in msg + + +def test_check_health_database_not_configured(): + """check_health returns (True, 'database not configured') when database field is empty.""" + writer = WriterPostgres({}) + writer._secret_name = "mysecret" + writer._secret_region = "eu-west-1" + writer._db_config = {"database": ""} + healthy, msg = writer.check_health() + assert healthy + assert "database not configured" == msg + + +def test_check_health_load_config_exception(monkeypatch): + """check_health returns (False, error) when _load_db_config raises.""" + writer = WriterPostgres({}) + writer._secret_name = "mysecret" + writer._secret_region = "eu-west-1" + + def raise_err(): + raise ValueError("secret fetch failed") + + monkeypatch.setattr(writer, "_load_db_config", raise_err) + healthy, msg = writer.check_health() + assert not healthy + assert "secret fetch failed" == msg From b6965b4f7a453f179902445ded32a38160057aa4 Mon Sep 17 00:00:00 2001 From: "Tobias.Mikula" Date: Tue, 7 Apr 2026 15:32:40 +0200 Subject: [PATCH 3/3] Implementing code rabbit test project standards. --- tests/unit/conftest.py | 6 +- tests/unit/handlers/test_handler_api.py | 44 ++++---- tests/unit/handlers/test_handler_token.py | 121 ++++++++++----------- tests/unit/writers/test_writer_postgres.py | 7 +- 4 files changed, 88 insertions(+), 90 deletions(-) diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 554bcf4..72f85c6 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -109,8 +109,10 @@ def Bucket(self, _name): mock_events_client.put_events.return_value = {"FailedEntryCount": 0} mock_boto_client.return_value = mock_events_client - mock_kafka_producer = start_patch("confluent_kafka.Producer") - mock_kafka_producer.return_value = MagicMock() + mock_kafka_producer = start_patch("src.writers.writer_kafka.Producer") + mock_producer_instance = MagicMock() + mock_producer_instance.flush.return_value = 0 # 0 pending → flush loop breaks immediately + mock_kafka_producer.return_value = mock_producer_instance module = importlib.import_module("src.event_gate_lambda") diff --git a/tests/unit/handlers/test_handler_api.py b/tests/unit/handlers/test_handler_api.py index 2467d34..0139ba6 100644 --- a/tests/unit/handlers/test_handler_api.py +++ b/tests/unit/handlers/test_handler_api.py @@ -15,42 +15,42 @@ # import pytest -from unittest.mock import patch, mock_open +from unittest.mock import mock_open from src.handlers.handler_api import HandlerApi -def test_load_api_definition_success(): +def test_load_api_definition_success(mocker): """Test successful loading of API definition.""" mock_content = "openapi: 3.0.0\ninfo:\n title: Test API" - with patch("builtins.open", mock_open(read_data=mock_content)): - handler = HandlerApi().with_api_definition_loaded() - assert handler.api_spec == mock_content + mocker.patch("builtins.open", mock_open(read_data=mock_content)) + handler = HandlerApi().with_api_definition_loaded() + assert handler.api_spec == mock_content -def test_load_api_definition_file_not_found(): +def test_load_api_definition_file_not_found(mocker): """Test that RuntimeError is raised when api.yaml doesn't exist.""" - with patch("builtins.open", side_effect=FileNotFoundError("api.yaml not found")): - handler = HandlerApi() - with pytest.raises(RuntimeError, match="API specification initialization failed"): - handler.with_api_definition_loaded() + mocker.patch("builtins.open", side_effect=FileNotFoundError("api.yaml not found")) + handler = HandlerApi() + with pytest.raises(RuntimeError, match="API specification initialization failed"): + handler.with_api_definition_loaded() -def test_load_api_definition_empty_file_raises(): +def test_load_api_definition_empty_file_raises(mocker): """Test that RuntimeError is raised when api.yaml is empty.""" - with patch("builtins.open", mock_open(read_data="")): - handler = HandlerApi() - with pytest.raises(RuntimeError, match="API specification initialization failed"): - handler.with_api_definition_loaded() + mocker.patch("builtins.open", mock_open(read_data="")) + handler = HandlerApi() + with pytest.raises(RuntimeError, match="API specification initialization failed"): + handler.with_api_definition_loaded() -def test_get_api_returns_correct_response(): +def test_get_api_returns_correct_response(mocker): """Test get_api returns correct response structure.""" mock_content = "openapi: 3.0.0" - with patch("builtins.open", mock_open(read_data=mock_content)): - handler = HandlerApi().with_api_definition_loaded() - response = handler.get_api() + mocker.patch("builtins.open", mock_open(read_data=mock_content)) + handler = HandlerApi().with_api_definition_loaded() + response = handler.get_api() - assert 200 == response["statusCode"] - assert "application/yaml" == response["headers"]["Content-Type"] - assert mock_content == response["body"] + assert 200 == response["statusCode"] + assert "application/yaml" == response["headers"]["Content-Type"] + assert mock_content == response["body"] diff --git a/tests/unit/handlers/test_handler_token.py b/tests/unit/handlers/test_handler_token.py index 5ac89d1..70609b6 100644 --- a/tests/unit/handlers/test_handler_token.py +++ b/tests/unit/handlers/test_handler_token.py @@ -16,7 +16,7 @@ import json from datetime import datetime, timedelta, timezone -from unittest.mock import patch, Mock +from unittest.mock import Mock import jwt import pytest @@ -39,28 +39,27 @@ def test_get_token_endpoint(event_gate_module, make_event): assert "Location" in resp["headers"] -def test_post_expired_token(event_gate_module, make_event, valid_payload): +def test_post_expired_token(mocker, event_gate_module, make_event, valid_payload): """Expired JWT should yield 401 auth error.""" - - with patch.object( + mocker.patch.object( event_gate_module.handler_token, "decode_jwt", side_effect=jwt.ExpiredSignatureError("expired"), - ): - event = make_event( - "/topics/{topic_name}", - method="POST", - topic="public.cps.za.test", - body=valid_payload, - headers={"Authorization": "Bearer expiredtoken"}, - ) - resp = event_gate_module.lambda_handler(event) - assert 401 == resp["statusCode"] - body = json.loads(resp["body"]) - assert any(e["type"] == "auth" for e in body["errors"]) - - -def test_decode_jwt_all_second_key_succeeds(event_gate_module): + ) + event = make_event( + "/topics/{topic_name}", + method="POST", + topic="public.cps.za.test", + body=valid_payload, + headers={"Authorization": "Bearer expiredtoken"}, + ) + resp = event_gate_module.lambda_handler(event) + assert 401 == resp["statusCode"] + body = json.loads(resp["body"]) + assert any(e["type"] == "auth" for e in body["errors"]) + + +def test_decode_jwt_all_second_key_succeeds(mocker, event_gate_module): """First key fails signature, second key succeeds; claims returned from second key.""" # Arrange: two dummy public keys first_key = object() @@ -72,12 +71,12 @@ def decode_side_effect(_token, key, **_kwargs): raise jwt.PyJWTError("signature mismatch") return {"sub": "TestUser"} - with patch("jwt.decode", side_effect=decode_side_effect): - claims = event_gate_module.handler_token.decode_jwt("dummy-token") - assert "TestUser" == claims["sub"] + mocker.patch("jwt.decode", side_effect=decode_side_effect) + claims = event_gate_module.handler_token.decode_jwt("dummy-token") + assert "TestUser" == claims["sub"] -def test_decode_jwt_all_all_keys_fail(event_gate_module): +def test_decode_jwt_all_all_keys_fail(mocker, event_gate_module): """All keys fail; final PyJWTError with aggregate message is raised.""" bad_keys = [object(), object()] event_gate_module.handler_token.public_keys = bad_keys @@ -85,10 +84,10 @@ def test_decode_jwt_all_all_keys_fail(event_gate_module): def always_fail(_token, _key, **_kwargs): raise jwt.PyJWTError("bad signature") - with patch("jwt.decode", side_effect=always_fail): - with pytest.raises(jwt.PyJWTError) as exc: - event_gate_module.handler_token.decode_jwt("dummy-token") - assert "Verification failed for all public keys" in str(exc.value) + mocker.patch("jwt.decode", side_effect=always_fail) + with pytest.raises(jwt.PyJWTError) as exc: + event_gate_module.handler_token.decode_jwt("dummy-token") + assert "Verification failed for all public keys" in str(exc.value) ## Checking the freshness of public keys @@ -99,37 +98,37 @@ def always_fail(_token, _key, **_kwargs): (29, True), ], ) -def test_refresh_keys_age_check(token_handler, age_minutes, expected_called): +def test_refresh_keys_age_check(mocker, token_handler, age_minutes, expected_called): """Fresh keys should not trigger refresh; stale keys should.""" token_handler._last_loaded_at = datetime.now(timezone.utc) - timedelta(minutes=age_minutes) token_handler.public_keys = [Mock(spec=RSAPublicKey)] - with patch.object(token_handler, "with_public_keys_queried") as mock_load: - token_handler._refresh_keys_if_needed() - assert expected_called == mock_load.called + mock_load = mocker.patch.object(token_handler, "with_public_keys_queried") + token_handler._refresh_keys_if_needed() + assert expected_called == mock_load.called -def test_refresh_keys_handles_load_failure_gracefully(token_handler): +def test_refresh_keys_handles_load_failure_gracefully(mocker, token_handler): """If key refresh fails, should log warning and continue with existing keys.""" old_key = Mock(spec=RSAPublicKey) token_handler.public_keys = [old_key] token_handler._last_loaded_at = datetime.now(timezone.utc) - timedelta(minutes=29) - with patch.object(token_handler, "with_public_keys_queried", side_effect=RuntimeError("Network error")): - token_handler._refresh_keys_if_needed() - assert token_handler.public_keys == [old_key] + mocker.patch.object(token_handler, "with_public_keys_queried", side_effect=RuntimeError("Network error")) + token_handler._refresh_keys_if_needed() + assert token_handler.public_keys == [old_key] -def test_decode_jwt_triggers_refresh_check(token_handler): +def test_decode_jwt_triggers_refresh_check(mocker, token_handler): """Decoding JWT should check if keys need refresh before decoding.""" dummy_key = Mock(spec=RSAPublicKey) token_handler.public_keys = [dummy_key] token_handler._last_loaded_at = datetime.now(timezone.utc) - timedelta(minutes=10) - with patch.object(token_handler, "_refresh_keys_if_needed") as mock_refresh: - with patch("jwt.decode", return_value={"sub": "TestUser"}): - token_handler.decode_jwt("dummy-token") - mock_refresh.assert_called_once() + mock_refresh = mocker.patch.object(token_handler, "_refresh_keys_if_needed") + mocker.patch("jwt.decode", return_value={"sub": "TestUser"}) + token_handler.decode_jwt("dummy-token") + mock_refresh.assert_called_once() def test_handler_token_default_ssl_ca_bundle(): @@ -146,48 +145,48 @@ def test_handler_token_custom_ssl_ca_bundle_path(): assert "/path/to/custom/ca-bundle.pem" == handler.ssl_ca_bundle -def test_refresh_keys_skipped_when_never_loaded(token_handler): +def test_refresh_keys_skipped_when_never_loaded(mocker, token_handler): """When _last_loaded_at is None, refresh should return immediately without loading.""" assert token_handler._last_loaded_at is None - with patch.object(token_handler, "with_public_keys_queried") as mock_load: - token_handler._refresh_keys_if_needed() - mock_load.assert_not_called() + mock_load = mocker.patch.object(token_handler, "with_public_keys_queried") + token_handler._refresh_keys_if_needed() + mock_load.assert_not_called() -def test_with_public_keys_queried_multi_key_response(token_handler): +def test_with_public_keys_queried_multi_key_response(mocker, token_handler): """Response with `keys` list should load all keys.""" key_b64 = "dGVzdA==" mock_response = Mock() mock_response.json.return_value = {"keys": [{"key": key_b64}, {"key": key_b64}]} - with patch("requests.get", return_value=mock_response): - with patch( - "cryptography.hazmat.primitives.serialization.load_der_public_key", - return_value=Mock(spec=RSAPublicKey), - ): - result = token_handler.with_public_keys_queried() - assert result is token_handler - assert 2 == len(token_handler.public_keys) + mocker.patch("requests.get", return_value=mock_response) + mocker.patch( + "cryptography.hazmat.primitives.serialization.load_der_public_key", + return_value=Mock(spec=RSAPublicKey), + ) + result = token_handler.with_public_keys_queried() + assert result is token_handler + assert 2 == len(token_handler.public_keys) -def test_with_public_keys_queried_no_keys_raises(token_handler): +def test_with_public_keys_queried_no_keys_raises(mocker, token_handler): """Response without any keys should raise RuntimeError.""" mock_response = Mock() mock_response.json.return_value = {"other": "data"} - with patch("requests.get", return_value=mock_response): - with pytest.raises(RuntimeError, match="Token public key initialization failed"): - token_handler.with_public_keys_queried() + mocker.patch("requests.get", return_value=mock_response) + with pytest.raises(RuntimeError, match="Token public key initialization failed"): + token_handler.with_public_keys_queried() -def test_with_public_keys_queried_request_exception_raises(token_handler): +def test_with_public_keys_queried_request_exception_raises(mocker, token_handler): """Network error during key fetch should raise RuntimeError.""" import requests as req - with patch("requests.get", side_effect=req.ConnectionError("timeout")): - with pytest.raises(RuntimeError, match="Token public key initialization failed"): - token_handler.with_public_keys_queried() + mocker.patch("requests.get", side_effect=req.ConnectionError("timeout")) + with pytest.raises(RuntimeError, match="Token public key initialization failed"): + token_handler.with_public_keys_queried() @pytest.mark.parametrize( diff --git a/tests/unit/writers/test_writer_postgres.py b/tests/unit/writers/test_writer_postgres.py index b06e46a..2aa8a95 100644 --- a/tests/unit/writers/test_writer_postgres.py +++ b/tests/unit/writers/test_writer_postgres.py @@ -385,16 +385,13 @@ def test_check_health_database_not_configured(): assert "database not configured" == msg -def test_check_health_load_config_exception(monkeypatch): +def test_check_health_load_config_exception(mocker): """check_health returns (False, error) when _load_db_config raises.""" writer = WriterPostgres({}) writer._secret_name = "mysecret" writer._secret_region = "eu-west-1" - def raise_err(): - raise ValueError("secret fetch failed") - - monkeypatch.setattr(writer, "_load_db_config", raise_err) + mocker.patch.object(writer, "_load_db_config", side_effect=ValueError("secret fetch failed")) healthy, msg = writer.check_health() assert not healthy assert "secret fetch failed" == msg