diff --git a/charon/cmd/cmd_sign.py b/charon/cmd/cmd_sign.py index 629281cb..a83a3e3f 100644 --- a/charon/cmd/cmd_sign.py +++ b/charon/cmd/cmd_sign.py @@ -16,10 +16,9 @@ from typing import List from charon.config import get_config -from charon.pkgs.radas_signature_handler import sign_in_radas -from charon.cmd.internal import ( - _decide_mode, _safe_delete -) +from charon.pkgs.radas_sign import sign_in_radas +from charon.cmd.internal import _decide_mode + from click import command, option, argument import traceback @@ -39,14 +38,16 @@ "-r", help=""" The requester who sends the signing request. - """ + """, + required=True ) @option( "--result_path", "-p", help=""" The path which will save the sign result file. - """ + """, + required=True ) @option( "--ignore_patterns", @@ -57,14 +58,6 @@ not be allowed to upload to S3. Can accept more than one pattern. """ ) -@option( - "--work_dir", - "-w", - help=""" - The temporary working directory into which archives should - be extracted, when needed. - """ -) @option( "--config", "-c", @@ -79,7 +72,8 @@ help=""" rpm-sign key to be used, will replace {{ key }} in default configuration for signature. Does noting if detach_signature_command does not contain {{ key }} field. - """ + """, + required=True ) @option( "--debug", @@ -100,10 +94,9 @@ def sign( repo_url: str, requester: str, result_path: str, + sign_key: str, ignore_patterns: List[str] = None, - work_dir: str = None, config: str = None, - sign_key: str = "redhatdevel", debug=False, quiet=False, dryrun=False @@ -112,7 +105,6 @@ def sign( radas service. The repo_url points to the maven zip repository in quay.io, which will be sent as the source of the signing. """ - tmp_dir = work_dir logger.debug("%s", ignore_patterns) try: current = datetime.datetime.now().strftime("%Y%m%d%I%M") @@ -139,10 +131,8 @@ def sign( "ignore_patterns": ig_patterns, "radas_config": radas_conf } + logger.debug("params: %s", args) sign_in_radas(**args) # type: ignore except Exception: print(traceback.format_exc()) sys.exit(2) - finally: - if not debug and tmp_dir: - _safe_delete(tmp_dir) diff --git a/charon/config.py b/charon/config.py index 5e7734d0..396e4be3 100644 --- a/charon/config.py +++ b/charon/config.py @@ -75,34 +75,36 @@ def validate(self) -> bool: return True def umb_target(self) -> str: - return f"amqps://{self.__umb_host}:{self.__umb_host_port}" + return f"amqps://{self.__umb_host.strip()}:{self.__umb_host_port}" def result_queue(self) -> str: - return self.__result_queue + return self.__result_queue.strip() def request_queue(self) -> str: - return self.__request_queue + return self.__request_queue.strip() def client_ca(self) -> str: - return self.__client_ca + return self.__client_ca.strip() def client_key(self) -> str: - return self.__client_key + return self.__client_key.strip() def client_key_password(self) -> str: pass_file = self.__client_key_pass_file if os.access(pass_file, os.R_OK): with open(pass_file, "r") as f: - return f.read() + return f.read().strip() elif pass_file: logger.warning("The key password file is not accessible. Will ignore the password.") return "" def root_ca(self) -> str: - return self.__root_ca + return self.__root_ca.strip() def quay_radas_registry_config(self) -> Optional[str]: - return self.__quay_radas_registry_config + if self.__quay_radas_registry_config: + return self.__quay_radas_registry_config.strip() + return None def radas_sign_timeout_retry_count(self) -> int: return self.__radas_sign_timeout_retry_count diff --git a/charon/pkgs/maven.py b/charon/pkgs/maven.py index 9d895365..2f525ce3 100644 --- a/charon/pkgs/maven.py +++ b/charon/pkgs/maven.py @@ -16,7 +16,7 @@ from charon.utils.files import HashType import charon.pkgs.indexing as indexing import charon.pkgs.signature as signature -import charon.pkgs.radas_signature_handler as radas_signature +import charon.pkgs.radas_sign as radas_signature from charon.utils.files import overwrite_file, digest, write_manifest from charon.utils.archive import extract_zip_all from charon.utils.strings import remove_prefix diff --git a/charon/pkgs/radas_signature_handler.py b/charon/pkgs/radas_sign.py similarity index 61% rename from charon/pkgs/radas_signature_handler.py rename to charon/pkgs/radas_sign.py index 6f862d07..355daa77 100644 --- a/charon/pkgs/radas_signature_handler.py +++ b/charon/pkgs/radas_sign.py @@ -21,9 +21,9 @@ import sys import uuid from typing import List, Any, Tuple, Callable, Dict, Optional -from charon.config import get_config, RadasConfig +from charon.config import RadasConfig from charon.pkgs.oras_client import OrasClient -from proton import SSLDomain, Message, Event +from proton import SSLDomain, Message, Event, Sender from proton.handlers import MessagingHandler from proton.reactor import Container @@ -46,61 +46,42 @@ class RadasReceiver(MessagingHandler): Any errors encountered if signing fails, this will be empty list if successful """ - def __init__(self, sign_result_loc: str, request_id: str) -> None: + def __init__(self, sign_result_loc: str, request_id: str, rconf: RadasConfig) -> None: super().__init__() self.sign_result_loc = sign_result_loc self.request_id = request_id self.conn = None self.sign_result_status: Optional[str] = None self.sign_result_errors: List[str] = [] - - def on_start(self, event: Event) -> None: - """ - On start callback - """ - conf = get_config() - if not (conf and conf.is_radas_enabled()): - sys.exit(1) - - rconf = conf.get_radas_config() - # explicit check to pass the type checker - if rconf is None: - sys.exit(1) - - ssl_domain = SSLDomain(SSLDomain.MODE_CLIENT) - ssl_domain.set_credentials( - rconf.client_ca(), - rconf.client_key(), - rconf.client_key_password() + self.rconf = rconf + self.ssl = SSLDomain(SSLDomain.MODE_CLIENT) + self.ssl.set_trusted_ca_db(self.rconf.root_ca()) + self.ssl.set_peer_authentication(SSLDomain.VERIFY_PEER) + self.ssl.set_credentials( + self.rconf.client_ca(), + self.rconf.client_key(), + self.rconf.client_key_password() ) - ssl_domain.set_trusted_ca_db(rconf.root_ca()) - ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER) + def on_start(self, event: Event) -> None: self.conn = event.container.connect( - url=rconf.umb_target(), - ssl_domain=ssl_domain + url=self.rconf.umb_target(), + ssl_domain=self.ssl ) event.container.create_receiver( - self.conn, rconf.result_queue(), dynamic=True + self.conn, self.rconf.result_queue(), dynamic=True ) - logger.info("Listening on %s, queue: %s", rconf.umb_target(), rconf.result_queue()) + logger.info("Listening on %s, queue: %s", + self.rconf.umb_target(), + self.rconf.result_queue()) def on_message(self, event: Event) -> None: - """ - On message callback - """ self._process_message(event.message.body) def on_connection_error(self, event: Event) -> None: - """ - On connection error callback - """ logger.error("Received an error event:\n%s", event) def on_disconnected(self, event: Event) -> None: - """ - On disconnected callback - """ logger.error("Disconnected from AMQP broker.") def _process_message(self, msg: Any) -> None: @@ -146,53 +127,106 @@ class RadasSender(MessagingHandler): Attributes: payload (str): payload json string for radas to read, this value construct from the cmd flag + rconf (RadasConfig): the configurations for the radas messaging + system. """ - def __init__(self, payload: str): - super().__init__() + def __init__(self, payload: Any, rconf: RadasConfig): + super(RadasSender, self).__init__() self.payload = payload - self.container = None - self.conn = None - self.sender = None - - def on_start(self, event): - """ - On start callback - """ - conf = get_config() - if not (conf and conf.is_radas_enabled()): - sys.exit(1) - - rconf = conf.get_radas_config() - if rconf is None: - sys.exit(1) - - ssl_domain = SSLDomain(SSLDomain.MODE_CLIENT) - ssl_domain.set_credentials( - rconf.client_ca(), - rconf.client_key(), - rconf.client_key_password() + self.rconf = rconf + self.message_sent = False # Flag to track if message was sent + self.status: Optional[str] = None + self.retried = 0 + self.pending: Optional[Message] = None + self.message: Optional[Message] = None + self.container: Optional[Container] = None + self.sender: Optional[Sender] = None + self.log = logging.getLogger("charon.pkgs.radas_sign.RadasSender") + self.ssl = SSLDomain(SSLDomain.MODE_CLIENT) + self.ssl.set_trusted_ca_db(self.rconf.root_ca()) + self.ssl.set_peer_authentication(SSLDomain.VERIFY_PEER) + self.ssl.set_credentials( + self.rconf.client_ca(), + self.rconf.client_key(), + self.rconf.client_key_password() ) - ssl_domain.set_trusted_ca_db(rconf.root_ca()) - ssl_domain.set_peer_authentication(SSLDomain.VERIFY_PEER) + def on_start(self, event): self.container = event.container - self.conn = event.container.connect( - url=rconf.umb_target(), - ssl_domain=ssl_domain + conn = self.container.connect( + url=self.rconf.umb_target(), + ssl_domain=self.ssl ) - self.sender = event.container.create_sender(self.conn, rconf.request_queue()) - - def on_sendable(self): - """ - On message able to send callback - """ - request = self.payload - msg = Message(body=request) + if conn: + self.sender = self.container.create_sender(conn, self.rconf.request_queue()) + + def on_sendable(self, event): + if not self.message_sent: + msg = Message(body=self.payload, durable=True) + self.log.debug("Sending message: %s to %s", msg.id, event.sender.target.address) + self._send_msg(msg) + self.message = msg + self.message_sent = True + + def on_error(self, event): + self.log.error("Error happened during message sending, reason %s", + event.description) + self.status = "failed" + + def on_rejected(self, event): + self.pending = self.message + self._handle_failed_delivery("Rejected") + + def on_released(self, event): + self.pending = self.message + self._handle_failed_delivery("Released") + + def on_accepted(self, event): + self.log.info("Message accepted by receiver: %s", event.delivery) + self.status = "success" + self.close() # Close connection after confirmation + + def on_timer_task(self, event): + message_to_retry = self.message + self._send_msg(message_to_retry) + self.pending = None + + def close(self): + self.log.info("Message has been sent successfully, close connection") if self.sender: - self.sender.send(msg) + self.sender.close() if self.container: self.container.stop() + def _send_msg(self, msg: Message): + if self.sender and self.sender.credit > 0: + self.sender.send(msg) + self.log.debug("Message %s sent", msg.id) + else: + self.log.warning("Sender not ready or no credit available") + + def _handle_failed_delivery(self, reason: str): + if self.pending: + msg = self.pending + self.log.warning("Message %s failed for reason: %s", msg.id, reason) + max_retries = self.rconf.radas_sign_timeout_retry_count() + if self.retried < max_retries: + # Schedule retry + self.retried = self.retried + 1 + self.log.info("Scheduling retry %s/%s for message %s", + self.retried, max_retries, msg.id) + # Schedule retry after delay + if self.container: + self.container.schedule(self.rconf.radas_sign_timeout_retry_interval(), self) + else: + # Max retries exceeded + self.log.error("Message %s failed after %s retries", msg.id, max_retries) + self.status = "failed" + self.pending = None + else: + self.log.info("Message has been sent successfully, close connection") + self.close() + def generate_radas_sign(top_level: str, sign_result_loc: str) -> Tuple[List[str], List[str]]: """ @@ -291,8 +325,8 @@ def sign_in_radas(repo_url: str, This function will be responsible to do the overall controlling of the whole process, like trigger the send and register the receiver, and control the wait and timeout there. """ - logger.debug("params. repo_url: %s, requester: %s, sign_key: %s, result_path: %s," - "radas_config: %s", repo_url, requester, sign_key, result_path, radas_config) + logger.debug("params. repo_url: %s, requester: %s, sign_key: %s, result_path: %s", + repo_url, requester, sign_key, result_path) request_id = str(uuid.uuid4()) exclude = ignore_patterns if ignore_patterns else [] @@ -305,15 +339,16 @@ def sign_in_radas(repo_url: str, "exclude": exclude } - listener = RadasReceiver(result_path, request_id) - sender = RadasSender(json.dumps(payload)) + sender = RadasSender(json.dumps(payload), radas_config) + container = Container(sender) + container.run() - try: - Container(sender).run() - logger.info("Successfully sent signing request ID: %s", request_id) - Container(listener).run() - finally: - if listener.conn is not None: - listener.conn.close() - if sender.conn is not None: - sender.conn.close() + if not sender.status == "success": + logger.error("Something wrong happened in message sending, see logs") + sys.exit(1) + + listener = RadasReceiver(result_path, request_id, radas_config) + Container(listener).run() + + if listener.conn: + listener.conn.close() diff --git a/pyproject.toml b/pyproject.toml index 43ab9cb4..536df23e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,8 @@ dependencies = [ "subresource-integrity>=0.2", "jsonschema>=4.9.1", "urllib3>=1.25.10", - "semantic-version>=2.10.0" + "semantic-version>=2.10.0", + "python-qpid-proton>=0.39.0" ] [project.optional-dependencies] diff --git a/requirements-dev.txt b/requirements-dev.txt index f0ed1644..bc38b20f 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -2,3 +2,4 @@ -r tests/requirements.txt pyflakes pep8 +tox diff --git a/setup.py b/setup.py index 692b53eb..e2232b75 100755 --- a/setup.py +++ b/setup.py @@ -57,6 +57,7 @@ "subresource-integrity>=0.2", "jsonschema>=4.9.1", "urllib3>=1.25.10", - "semantic-version>=2.10.0" + "semantic-version>=2.10.0", + "python-qpid-proton>=0.39.0" ], ) diff --git a/tests/test_radas_send_handler.py b/tests/test_radas_send_handler.py deleted file mode 100644 index b37fa05a..00000000 --- a/tests/test_radas_send_handler.py +++ /dev/null @@ -1,98 +0,0 @@ -import tempfile -import os -from unittest import mock -import unittest -from charon.pkgs.radas_signature_handler import sign_in_radas - - -class RadasSignHandlerTest(unittest.TestCase): - def setUp(self) -> None: - super().setUp() - - def tearDown(self) -> None: - super().tearDown() - - def test_sign_in_radas_normal_flow(self): - with tempfile.TemporaryDirectory() as tmpdir: - # Mock configuration - mock_config = mock.MagicMock() - mock_config.is_radas_enabled.return_value = True - mock_radas_config = mock.MagicMock() - mock_config.get_radas_config.return_value = mock_radas_config - - # Mock Container run to avoid real AMQP connection - with mock.patch( - "charon.pkgs.radas_signature_handler.Container") as mock_container, \ - mock.patch( - "charon.pkgs.radas_signature_handler.get_config", return_value=mock_config), \ - mock.patch( - "charon.pkgs.radas_signature_handler.uuid.uuid4", return_value="mocked-uuid"): - - test_result_path = os.path.join(tmpdir, "results") - os.makedirs(test_result_path) - - sign_in_radas( - repo_url="quay.io/test/repo", - requester="test-user", - sign_key="test-key", - result_path=test_result_path, - ignore_patterns=[], - radas_config=mock_radas_config - ) - - # Verify Container.run() was called twice (sender and receiver) - self.assertEqual(mock_container.call_count, 2) - - # Verify request ID propagation - receiver_call = mock_container.call_args_list[1] - self.assertEqual(receiver_call.args[0].request_id, "mocked-uuid") - - def test_sign_in_radas_with_disabled_config(self): - mock_config = mock.MagicMock() - mock_config.is_radas_enabled.return_value = False - - with mock.patch( - "charon.pkgs.radas_signature_handler.get_config", return_value=mock_config), \ - self.assertRaises(SystemExit): - - sign_in_radas( - repo_url="quay.io/test/repo", - requester="test-user", - sign_key="test-key", - result_path="/tmp/results", - ignore_patterns=[], - radas_config=mock.MagicMock() - ) - - def test_sign_in_radas_connection_cleanup(self): - mock_config = mock.MagicMock() - mock_config.is_radas_enabled.return_value = True - mock_radas_config = mock.MagicMock() - - with mock.patch("charon.pkgs.radas_signature_handler.Container") as mock_container, \ - mock.patch("charon.pkgs.radas_signature_handler.get_config", return_value=mock_config): - - mock_sender_conn = mock.MagicMock() - mock_listener_conn = mock.MagicMock() - - def container_side_effect(*args, **kwargs): - if args[0].__class__.__name__ == "RadasReceiver": - args[0].conn = mock_listener_conn - elif args[0].__class__.__name__ == "RadasSender": - args[0].conn = mock_sender_conn - return mock.MagicMock() - - mock_container.side_effect = container_side_effect - - sign_in_radas( - repo_url="quay.io/test/repo", - requester="test-user", - sign_key="test-key", - result_path="/tmp/results", - ignore_patterns=[], - radas_config=mock_radas_config - ) - - # Verify connections are closed - mock_sender_conn.close.assert_called_once() - mock_listener_conn.close.assert_called_once() diff --git a/tests/test_radas_sign_handler.py b/tests/test_radas_sign_generation.py similarity index 99% rename from tests/test_radas_sign_handler.py rename to tests/test_radas_sign_generation.py index b667eaea..33c3d695 100644 --- a/tests/test_radas_sign_handler.py +++ b/tests/test_radas_sign_generation.py @@ -23,7 +23,7 @@ import builtins from unittest import mock from charon.utils.files import overwrite_file -from charon.pkgs.radas_signature_handler import generate_radas_sign +from charon.pkgs.radas_sign import generate_radas_sign logger = logging.getLogger(__name__) diff --git a/tests/test_radas_sign_sender.py b/tests/test_radas_sign_sender.py new file mode 100644 index 00000000..1e75b8fe --- /dev/null +++ b/tests/test_radas_sign_sender.py @@ -0,0 +1,86 @@ +import json +from unittest import mock +import unittest +from charon.pkgs.radas_sign import RadasSender + + +class RadasSignHandlerTest(unittest.TestCase): + def setUp(self) -> None: + super().setUp() + + def tearDown(self) -> None: + super().tearDown() + + def test_radas_sender(self): + # Mock configuration + mock_radas_config = mock.MagicMock() + mock_radas_config.validate.return_value = True + mock_radas_config.client_ca.return_value = "test-client-ca" + mock_radas_config.client_key.return_value = "test-client-key" + mock_radas_config.client_key_password.return_value = "test-client-key-pass" + mock_radas_config.root_ca.return_value = "test-root-ca" + mock_radas_config.radas_sign_timeout_retry_count.return_value = 5 + + test_payload = { + "request_id": "mock-id", + "requested_by": "test-user", + "type": "mrrc", + "file_reference": "quay.io/test/repo", + "sig_keyname": "test-key", + "exclude": [] + } + + # Mock Container run to avoid real AMQP connection + with mock.patch( + "charon.pkgs.radas_sign.Container") as mock_container, \ + mock.patch("charon.pkgs.radas_sign.SSLDomain") as ssl_domain, \ + mock.patch("charon.pkgs.radas_sign.Event") as event: + + json_payload = json.dumps(test_payload) + r_sender = RadasSender(json_payload, mock_radas_config) + self.assertEqual(ssl_domain.call_count, 1) + self.assertEqual(r_sender.payload, json_payload) + self.assertIs(r_sender.rconf, mock_radas_config) + self.assertIsNone(r_sender.message) + self.assertIsNone(r_sender.pending) + + # test on_start + mock_sender = mock.MagicMock() + mock_conn = mock.MagicMock() + mock_container.connect.return_value = mock_conn + mock_container.create_sender.return_value = mock_sender + event.container = mock_container + r_sender.on_start(event) + self.assertEqual(mock_container.connect.call_count, 1) + self.assertEqual(mock_container.create_sender.call_count, 1) + + # test on_sendable + mock_sender.credit = 1 + r_sender.on_sendable(event) + self.assertIsNotNone(r_sender.message) + self.assertEqual(mock_sender.send.call_count, 1) + + # test on_accepted + r_sender.on_accepted(event) + self.assertEqual(r_sender.status, "success") + self.assertEqual(r_sender.retried, 0) + self.assertEqual(r_sender.sender.close.call_count, 1) + self.assertEqual(r_sender.container.stop.call_count, 1) + + # test on_rejected + r_sender.on_rejected(event) + self.assertIsNone(r_sender.pending) + self.assertEqual(r_sender.retried, 1) + self.assertEqual(r_sender.container.schedule.call_count, 1) + + # test on_released + r_sender.on_released(event) + self.assertIsNone(r_sender.pending) + self.assertEqual(r_sender.retried, 2) + self.assertEqual(r_sender.container.schedule.call_count, 2) + + # test on_released + r_sender.on_timer_task(event) + self.assertIsNone(r_sender.pending) + self.assertEqual(r_sender.retried, 2) + self.assertEqual(mock_sender.send.call_count, 2)