From 76b0a99ab03c8f30ba2adc754b393be99a6fae02 Mon Sep 17 00:00:00 2001 From: Aliaksandr Nikitsin Date: Wed, 28 Jan 2026 15:00:40 +0100 Subject: [PATCH] fix get host user --- framework/python/src/common/util.py | 67 ++++++++--------- .../python/src/net_orc/network_validator.py | 71 ++++++++++--------- modules/test/ntp/python/src/ntp_module.py | 6 ++ 3 files changed, 80 insertions(+), 64 deletions(-) diff --git a/framework/python/src/common/util.py b/framework/python/src/common/util.py index 7bb7ea73f..3ef8827dc 100644 --- a/framework/python/src/common/util.py +++ b/framework/python/src/common/util.py @@ -15,6 +15,7 @@ """Provides basic utilities for Testrun.""" import getpass import os +import pwd import subprocess import shlex import typing as t @@ -55,52 +56,54 @@ def interface_exists(interface): """Checks whether an interface is available""" return interface in netifaces.interfaces() + def prettify(mac_string): """Formats a MAC address with colons""" return ':'.join([f'{ord(b):02x}' for b in mac_string]) -def get_host_user(): - """Returns the username of the host user""" - user = get_os_user() - # If primary method failed, try secondary - if user is None: - user = get_user() +def get_sudo_user() -> t.Optional[str]: + """Returns the username of the sudo user if running under sudo, else None.""" + return os.environ.get('SUDO_USER') - return user -def get_os_user(): - """Attempts to get the username using os library""" +def get_pwd_user() -> t.Optional[str]: + """Returns the username of the current user using pwd module""" user = None - try: - user = os.getlogin() - except OSError: - # Handle the OSError exception - LOGGER.error('An OS error occurred whilst calling os.getlogin()') - except Exception: # pylint: disable=W0703 - # Catch any other unexpected exceptions - LOGGER.error('An unknown exception occurred whilst calling os.getlogin()') + user = pwd.getpwuid(os.getuid()).pw_name return user -def get_user(): - """Attempts to get the host user using the getpass library""" + +def get_host_user()-> t.Optional[str]: + """Returns the username of the host user""" + user_functions = [get_sudo_user, os.getlogin, getpass.getuser, get_pwd_user] user = None - try: - user = getpass.getuser() - except (KeyError, ImportError, ModuleNotFoundError, OSError) as e: + error_messages = [] + for func in user_functions: + try: + user = func() + if user is not None: + break + except (KeyError, ImportError, ModuleNotFoundError, OSError) as e: # Handle specific exceptions individually - if isinstance(e, KeyError): - LOGGER.error('USER environment variable not set or unavailable.') - elif isinstance(e, ImportError): - LOGGER.error('Unable to import the getpass module.') - elif isinstance(e, ModuleNotFoundError): - LOGGER.error('The getpass module was not found.') - elif isinstance(e, OSError): - LOGGER.error('An OS error occurred while retrieving the username.') - else: - LOGGER.error('An exception occurred:', e) + if isinstance(e, KeyError): + error_message = 'USER environment variable not set or unavailable.' + error_messages.append(error_message) + elif isinstance(e, ImportError): + error_messages.append('Unable to import the getpass module.') + elif isinstance(e, ModuleNotFoundError): + error_messages.append('The getpass module was not found.') + elif isinstance(e, OSError): + error_message = 'OS error occurred while retrieving the username.' + error_messages.append(error_message) + else: + error_messages.append('An exception occurred: ' + str(e)) + if user is None: + LOGGER.error(next(error_messages)) + return user + def set_file_owner(path, owner): """Change the owner of a file""" run_command(f'chown -R {owner} {path}') diff --git a/framework/python/src/net_orc/network_validator.py b/framework/python/src/net_orc/network_validator.py index c50d2463d..8ef700856 100644 --- a/framework/python/src/net_orc/network_validator.py +++ b/framework/python/src/net_orc/network_validator.py @@ -15,6 +15,7 @@ """Holds logic for validation of network services prior to runtime.""" import json import os +import pwd import shutil import time import docker @@ -179,44 +180,50 @@ def _start_network_device(self, device): LOGGER.info('Validation device ' + device.name + ' has finished') - def _get_host_user(self): - user = self._get_os_user() - - # If primary method failed, try secondary - if user is None: - user = self._get_user() + def _get_sudo_user(self): + """Returns the username of the sudo user.""" + return os.environ.get('SUDO_USER') - LOGGER.debug(f'Network validator host user: {user}') - return user - - def _get_os_user(self): + def _get_pwd_user(self): + """Returns the username of the current user using pwd module""" user = None - try: - user = os.getlogin() - except OSError: - # Handle the OSError exception - LOGGER.error('An OS error occurred while retrieving the login name.') - except Exception as error: # pylint: disable=W0703 - # Catch any other unexpected exceptions - LOGGER.error('An exception occurred:', error) + user = pwd.getpwuid(os.getuid()).pw_name return user - def _get_user(self): + def _get_host_user(self): + """Returns the username of the host user""" + user_functions = [ + self._get_sudo_user, + os.getlogin, + getpass.getuser, + self._get_pwd_user + ] user = None - try: - user = getpass.getuser() - except (KeyError, ImportError, ModuleNotFoundError, OSError) as e: + error_messages = [] + for func in user_functions: + try: + user = func() + if user is not None: + break + except (KeyError, ImportError, ModuleNotFoundError, OSError) as e: # Handle specific exceptions individually - if isinstance(e, KeyError): - LOGGER.error('USER environment variable not set or unavailable.') - elif isinstance(e, ImportError): - LOGGER.error('Unable to import the getpass module.') - elif isinstance(e, ModuleNotFoundError): - LOGGER.error('The getpass module was not found.') - elif isinstance(e, OSError): - LOGGER.error('An OS error occurred while retrieving the username.') - else: - LOGGER.error('An exception occurred:', e) + if isinstance(e, KeyError): + error_message = 'USER environment variable not set or unavailable.' + error_messages.append(error_message) + elif isinstance(e, ImportError): + error_messages.append('Unable to import the getpass module.') + elif isinstance(e, ModuleNotFoundError): + error_messages.append('The getpass module was not found.') + elif isinstance(e, OSError): + error_message = 'OS error occurred while retrieving the username.' + error_messages.append(error_message) + else: + error_messages.append('An exception occurred: ' + str(e)) + if user is None: + LOGGER.error(next(error_messages)) + + LOGGER.debug(f'Network validator host user: {user}') + return user def _get_device_status(self, module): diff --git a/modules/test/ntp/python/src/ntp_module.py b/modules/test/ntp/python/src/ntp_module.py index 5d007f4e6..76c3aeb6a 100644 --- a/modules/test/ntp/python/src/ntp_module.py +++ b/modules/test/ntp/python/src/ntp_module.py @@ -345,6 +345,12 @@ def _ntp_network_ntp_dhcp(self, config): ntp_whitelist_resolver.is_ip_whitelisted(ip) for ip in ntp_to_remote_ips ) + for ip in ntp_to_remote_ips: + if ntp_whitelist_resolver.is_ip_whitelisted(ip): + LOGGER.info(f'NTP server {ip} is in the trusted whitelist') + else: + LOGGER.info(f'NTP server {ip} is NOT in the trusted whitelist') + result = 'Feature Not Detected', 'Device has not sent any NTP requests' if device_sends_ntp: