Skip to content
Merged
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
fastapi==0.88.0
pydantic==1.10.4
python-ms-core==0.0.23
python-ms-core==0.0.24
uvicorn==0.20.0
html_testRunner==1.2.1
geopandas==0.14.4
Expand Down
3 changes: 2 additions & 1 deletion src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ class Settings(BaseSettings):
app_name: str = 'python-osw-validation'
event_bus = EventBusSettings()
auth_permission_url: str = os.environ.get('AUTH_PERMISSION_URL', None)
max_concurrent_messages: int = os.environ.get('MAX_CONCURRENT_MESSAGES', 2)
max_concurrent_messages: int = os.environ.get('MAX_CONCURRENT_MESSAGES', 1)
max_receivable_messages: int = os.environ.get('MAX_RECEIVABLE_MESSAGES',-1) # -1 means no limit

@property
def auth_provider(self) -> str:
Expand Down
1 change: 1 addition & 0 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ async def startup_event(settings: Settings = Depends(get_settings)) -> None:
try:
# OSWValidator()
app.validator = OSWValidator()

except:
print('\n\n\x1b[31m Application startup failed due to missing or invalid .env file \x1b[0m')
print('\x1b[31m Please provide the valid .env file and .env file should contains following parameters\x1b[0m')
Expand Down
49 changes: 46 additions & 3 deletions src/osw_validator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import gc
import logging
import os
import signal
import time
import urllib.parse
from typing import List
from python_ms_core import Core
Expand All @@ -21,6 +24,9 @@ class OSWValidator:

def __init__(self):
self.core = Core()

## Print the core version
print(f'Core version: {self.core.__version__}')
options = {
'provider': self._settings.auth_provider,
'api_url': self._settings.auth_permission_url
Expand All @@ -31,7 +37,8 @@ def __init__(self):
self.logger = self.core.get_logger()
self.storage_client = self.core.get_storage_client()
self.auth = self.core.get_authorizer(config=options)
self.listener_thread = threading.Thread(target=self.start_listening)
self._shutdown_triggered = threading.Event()
self.listener_thread = threading.Thread(target=self.start_listening, daemon=True)
self.listener_thread.start()

def start_listening(self):
Expand All @@ -41,10 +48,11 @@ def process(message) -> None:
upload_message = Upload.data_from(queue_message)
self.validate(received_message=upload_message)

self.listening_topic.subscribe(subscription=self.subscription_name, callback=process)
self.listening_topic.subscribe(subscription=self.subscription_name, callback=process, max_receivable_messages=self._settings.max_receivable_messages)

def validate(self, received_message: Upload):
tdei_record_id: str = ''
status_sent = False
try:
tdei_record_id = received_message.message_id
logger.info(f'Received message for : {tdei_record_id} Message received for OSW validation !')
Expand All @@ -66,6 +74,7 @@ def validate(self, received_message: Upload):
validation_result = Validation(file_path=file_upload_path, storage_client=self.storage_client)
result = validation_result.validate()
self.send_status(result=result, upload_message=received_message)
status_sent = True
else:
raise Exception('File entity not found')
except Exception as e:
Expand All @@ -74,6 +83,13 @@ def validate(self, received_message: Upload):
result.is_valid = False
result.validation_message = f'Error occurred while validating OSW request {e}'
self.send_status(result=result, upload_message=received_message)
status_sent = True
finally:
if status_sent:
logger.info('Triggering server shutdown after status send.')
else:
logger.warning('Server shutdown skipped because status was not sent.')
self._stop_server_and_container(delay_seconds=2)

def send_status(self, result: ValidationResult, upload_message: Upload):
upload_message.data.success = result.is_valid
Expand All @@ -90,6 +106,7 @@ def send_status(self, result: ValidationResult, upload_message: Upload):
'data': resp_data
})
try:
logger.info('Sending validation result to response topic.')
self.core.get_topic(topic_name=self._settings.event_bus.validation_topic).publish(data=data)
logger.info(f'Publishing message for : {upload_message.message_id}')
except Exception as e:
Expand All @@ -113,4 +130,30 @@ def has_permission(self, roles: List[str], queue_message: Upload) -> bool:
return False

def stop_listening(self):
self.listener_thread.join(timeout=0) # Stop the thread during shutdown.Its still an attempt. Not sure if this will work.
self._stop_server_and_container()
if hasattr(self, 'listener_thread'):
self.listener_thread.join(timeout=0) # Stop the thread during shutdown.Its still an attempt. Not sure if this will work.

def _stop_server_and_container(self, delay_seconds: float = 0.0):
"""
Attempt to gracefully stop the current process (stopping FastAPI/uvicorn and the Docker container).
"""
logger.info('Gracefully stopping FastAPI/uvicorn and Docker container')
if self._shutdown_triggered.is_set():
logger.info('Server stop already in progress; skipping duplicate trigger.')
return
self._shutdown_triggered.set()
logger.info('Server stop triggered; scheduling shutdown.')
def _terminate():
if delay_seconds:
time.sleep(delay_seconds)
try:
logger.info('Sending SIGTERM to stop server/container.')
os.kill(os.getpid(), signal.SIGTERM)
except Exception as err:
logger.warning(f'Error occurred while sending SIGTERM: {err}')
finally:
logger.info('Forcing process exit to stop server/container.')
os._exit(0)

threading.Thread(target=_terminate, daemon=True).start()
13 changes: 13 additions & 0 deletions tests/unit_tests/interface/test_validator_abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ def validate(self, message: QueueMessage) -> None:
pass


class SuperCallingValidator(ValidatorAbstract):
def validate(self, message: QueueMessage) -> None:
return super().validate(message)


class TestValidatorAbstract(unittest.TestCase):

def test_abstract_method_enforcement(self):
Expand All @@ -37,6 +42,14 @@ def test_validate_method_called(self):
# Assert that the mocked message object is a valid argument
self.assertTrue(hasattr(message, '__class__'))

def test_abstract_base_method_body_returns_none(self):
message = MagicMock(spec=QueueMessage)
validator = SuperCallingValidator()

result = validator.validate(message)

self.assertIsNone(result)


if __name__ == '__main__':
unittest.main()
4 changes: 2 additions & 2 deletions tests/unit_tests/models/test_queue_message_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

TEST_JSON_FILE = os.path.join(parent_dir, 'src/assets/osw-upload.json')

TEST_FILE = open(TEST_JSON_FILE)
TEST_DATA = json.loads(TEST_FILE.read())
with open(TEST_JSON_FILE) as test_file:
TEST_DATA = json.load(test_file)


class TestUpload(unittest.TestCase):
Expand Down
2 changes: 1 addition & 1 deletion tests/unit_tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def test_default_settings(self):
self.assertEqual(settings.app_name, 'python-osw-validation')
self.assertEqual(settings.event_bus.container_name, 'osw')
self.assertIsNone(settings.auth_permission_url)
self.assertEqual(settings.max_concurrent_messages, 2)
self.assertEqual(settings.max_concurrent_messages, 1)


if __name__ == '__main__':
Expand Down
40 changes: 40 additions & 0 deletions tests/unit_tests/test_main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import unittest
import asyncio
from unittest.mock import MagicMock, patch
from fastapi import status
from fastapi.testclient import TestClient
import src.main as main
from src.main import app, get_settings


Expand All @@ -22,6 +25,43 @@ def test_get_settings(self):
settings = get_settings()
self.assertIsNotNone(settings)

@patch('src.main.OSWValidator')
def test_startup_event_sets_validator(self, mock_validator):
validator = MagicMock()
mock_validator.return_value = validator
main.app.validator = None

asyncio.run(main.startup_event())

self.assertIs(main.app.validator, validator)

@patch('builtins.print')
@patch('src.main.psutil.Process')
@patch('src.main.os.getpid', return_value=123)
@patch('src.main.OSWValidator', side_effect=Exception('boom'))
def test_startup_event_handles_validator_init_failure(self, mock_validator, mock_getpid, mock_process, mock_print):
child_one = MagicMock()
child_two = MagicMock()
parent = MagicMock()
parent.children.return_value = [child_one, child_two]
mock_process.return_value = parent

asyncio.run(main.startup_event())

parent.children.assert_called_once_with(recursive=True)
child_one.kill.assert_called_once()
child_two.kill.assert_called_once()
parent.kill.assert_called_once()
self.assertGreaterEqual(mock_print.call_count, 6)

def test_shutdown_event_stops_validator(self):
validator = MagicMock()
main.app.validator = validator

asyncio.run(main.shutdown_event())

validator.stop_listening.assert_called_once()


if __name__ == '__main__':
unittest.main()
4 changes: 2 additions & 2 deletions tests/unit_tests/test_osw_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

TEST_JSON_FILE = os.path.join(parent_dir, 'src/assets/osw-upload.json')

TEST_FILE = open(TEST_JSON_FILE)
TEST_DATA = json.loads(TEST_FILE.read())
with open(TEST_JSON_FILE) as test_file:
TEST_DATA = json.load(test_file)


class PermissionResponse:
Expand Down
26 changes: 15 additions & 11 deletions tests/unit_tests/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@

class TestOSWValidatorService(unittest.TestCase):

@patch('src.osw_validator.threading.Thread')
@patch('src.osw_validator.Settings')
@patch('src.osw_validator.Core')
def setUp(self, mock_core, mock_settings):
def setUp(self, mock_core, mock_settings, mock_thread):
# Mock Settings
mock_settings.return_value.event_bus.upload_subscription = 'test_subscription'
mock_settings.return_value.event_bus.upload_topic = 'test_request_topic'
Expand All @@ -19,14 +20,19 @@ def setUp(self, mock_core, mock_settings):
mock_settings.return_value.event_bus.container_name = 'test_container'

# Mock Core
mock_core.__version__ = 'test-core-version'
mock_core.return_value.__version__ = 'test-core-version'
mock_core.return_value.get_topic.return_value = MagicMock()
mock_core.return_value.get_storage_client.return_value = MagicMock()
self.mock_listener_thread = MagicMock()
mock_thread.return_value = self.mock_listener_thread

# Initialize OSWValidator with mocked dependencies
self.service = OSWValidator()
self.service.storage_client = MagicMock()
self.service.container_name = 'test_container'
self.service.auth = MagicMock()
self.service._stop_server_and_container = MagicMock()

# Define a sample message with proper strings
self.sample_message = {
Expand All @@ -41,11 +47,12 @@ def setUp(self, mock_core, mock_settings):

@patch('src.osw_validator.QueueMessage')
@patch('src.osw_validator.Upload')
def test_subscribe_with_valid_message(self, mock_request_message, mock_queue_message):
def test_subscribe_with_valid_message(self, mock_upload, mock_queue_message):
# Arrange
mock_message = MagicMock()
mock_queue_message.to_dict.return_value = self.sample_message
mock_request_message.from_dict.return_value = mock_request_message
mock_upload_message = MagicMock()
mock_upload.data_from.return_value = mock_upload_message
self.service.validate = MagicMock()

# Act
Expand All @@ -54,7 +61,7 @@ def test_subscribe_with_valid_message(self, mock_request_message, mock_queue_mes
callback(mock_message)

# Assert
self.service.validate.assert_called_once_with(received_message=mock_request_message.data_from())
self.service.validate.assert_called_once_with(received_message=mock_upload_message)

@patch('src.osw_validator.Validation')
def test_validate_with_valid_file_path(self, mock_validation):
Expand Down Expand Up @@ -164,19 +171,16 @@ def test_validate_with_validation_only_in_message_type(self, mock_has_permission
self.assertTrue(actual_result.is_valid)
self.assertEqual(actual_upload_message, mock_request_message)

@patch('src.osw_validator.threading.Thread')
def test_stop_listening(self, mock_thread):
def test_stop_listening(self):
# Arrange
mock_thread_instance = MagicMock()
mock_thread.return_value = mock_thread_instance

self.service.listener_thread = mock_thread_instance
self.service.listener_thread = self.mock_listener_thread

# Act
result = self.service.stop_listening()

# Assert
mock_thread_instance.join.assert_called_once_with(timeout=0)
self.mock_listener_thread.join.assert_called_once_with(timeout=0)
self.service._stop_server_and_container.assert_called_once()
self.assertIsNone(result)

def test_has_permission_success(self):
Expand Down
9 changes: 7 additions & 2 deletions tests/unit_tests/test_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,18 @@ def test_validate_invalid_file_with_errors(self, mock_download_file, mock_clean_
for expected, error in zip(expected_errors, errors):
self.assertEqual(error['filename'], error_in_file)
self.assertEqual(error['feature_index'], expected['feature_index'])
self.assertEqual(error['error_message'][0], expected['error_message'])
self.assertTrue(
error['error_message'][0].startswith(
"Additional properties are not allowed ('crossing' was unexpected)"
)
)
# Ensure clean_up is called twice (once for the file, once for the folder)
self.assertEqual(mock_clean_up.call_count, 2)

@patch('src.validation.Validation.download_single_file', return_value=None)
@patch('src.validation.OSWValidation')
@patch('src.validation.Validation.clean_up')
def test_validate_invalid_zip(self, mock_clean_up, mock_osw_validation):
def test_validate_invalid_zip(self, mock_clean_up, mock_osw_validation, mock_download_file):
"""Test validate method for invalid zip file with errors."""
# Mock the OSWValidation validate method to return errors
mock_validation_result = MagicMock()
Expand Down
Loading