Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 35 additions & 32 deletions framework/python/src/common/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Provides basic utilities for Testrun."""
import getpass
import os
import pwd
import subprocess
import shlex
import typing as t
Expand Down Expand Up @@ -55,52 +56,54 @@ def interface_exists(interface):
"""Checks whether an interface is available"""
return interface in netifaces.interfaces()


def prettify(mac_string):
"""Formats a MAC address with colons"""
return ':'.join([f'{ord(b):02x}' for b in mac_string])

def get_host_user():
"""Returns the username of the host user"""
user = get_os_user()

# If primary method failed, try secondary
if user is None:
user = get_user()
def get_sudo_user() -> t.Optional[str]:
"""Returns the username of the sudo user if running under sudo, else None."""
return os.environ.get('SUDO_USER')

return user

def get_os_user():
"""Attempts to get the username using os library"""
def get_pwd_user() -> t.Optional[str]:
"""Returns the username of the current user using pwd module"""
user = None
try:
user = os.getlogin()
except OSError:
# Handle the OSError exception
LOGGER.error('An OS error occurred whilst calling os.getlogin()')
except Exception: # pylint: disable=W0703
# Catch any other unexpected exceptions
LOGGER.error('An unknown exception occurred whilst calling os.getlogin()')
user = pwd.getpwuid(os.getuid()).pw_name
return user

def get_user():
"""Attempts to get the host user using the getpass library"""

def get_host_user()-> t.Optional[str]:
"""Returns the username of the host user"""
user_functions = [get_sudo_user, os.getlogin, getpass.getuser, get_pwd_user]
user = None
try:
user = getpass.getuser()
except (KeyError, ImportError, ModuleNotFoundError, OSError) as e:
error_messages = []
for func in user_functions:
try:
user = func()
if user is not None:
break
except (KeyError, ImportError, ModuleNotFoundError, OSError) as e:
# Handle specific exceptions individually
if isinstance(e, KeyError):
LOGGER.error('USER environment variable not set or unavailable.')
elif isinstance(e, ImportError):
LOGGER.error('Unable to import the getpass module.')
elif isinstance(e, ModuleNotFoundError):
LOGGER.error('The getpass module was not found.')
elif isinstance(e, OSError):
LOGGER.error('An OS error occurred while retrieving the username.')
else:
LOGGER.error('An exception occurred:', e)
if isinstance(e, KeyError):
error_message = 'USER environment variable not set or unavailable.'
error_messages.append(error_message)
elif isinstance(e, ImportError):
error_messages.append('Unable to import the getpass module.')
elif isinstance(e, ModuleNotFoundError):
error_messages.append('The getpass module was not found.')
elif isinstance(e, OSError):
error_message = 'OS error occurred while retrieving the username.'
error_messages.append(error_message)
else:
error_messages.append('An exception occurred: ' + str(e))
if user is None:
LOGGER.error(next(error_messages))

return user


def set_file_owner(path, owner):
"""Change the owner of a file"""
run_command(f'chown -R {owner} {path}')
Expand Down
71 changes: 39 additions & 32 deletions framework/python/src/net_orc/network_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Holds logic for validation of network services prior to runtime."""
import json
import os
import pwd
import shutil
import time
import docker
Expand Down Expand Up @@ -179,44 +180,50 @@ def _start_network_device(self, device):

LOGGER.info('Validation device ' + device.name + ' has finished')

def _get_host_user(self):
user = self._get_os_user()

# If primary method failed, try secondary
if user is None:
user = self._get_user()
def _get_sudo_user(self):
"""Returns the username of the sudo user."""
return os.environ.get('SUDO_USER')

LOGGER.debug(f'Network validator host user: {user}')
return user

def _get_os_user(self):
def _get_pwd_user(self):
"""Returns the username of the current user using pwd module"""
user = None
try:
user = os.getlogin()
except OSError:
# Handle the OSError exception
LOGGER.error('An OS error occurred while retrieving the login name.')
except Exception as error: # pylint: disable=W0703
# Catch any other unexpected exceptions
LOGGER.error('An exception occurred:', error)
user = pwd.getpwuid(os.getuid()).pw_name
return user

def _get_user(self):
def _get_host_user(self):
"""Returns the username of the host user"""
user_functions = [
self._get_sudo_user,
os.getlogin,
getpass.getuser,
self._get_pwd_user
]
user = None
try:
user = getpass.getuser()
except (KeyError, ImportError, ModuleNotFoundError, OSError) as e:
error_messages = []
for func in user_functions:
try:
user = func()
if user is not None:
break
except (KeyError, ImportError, ModuleNotFoundError, OSError) as e:
# Handle specific exceptions individually
if isinstance(e, KeyError):
LOGGER.error('USER environment variable not set or unavailable.')
elif isinstance(e, ImportError):
LOGGER.error('Unable to import the getpass module.')
elif isinstance(e, ModuleNotFoundError):
LOGGER.error('The getpass module was not found.')
elif isinstance(e, OSError):
LOGGER.error('An OS error occurred while retrieving the username.')
else:
LOGGER.error('An exception occurred:', e)
if isinstance(e, KeyError):
error_message = 'USER environment variable not set or unavailable.'
error_messages.append(error_message)
elif isinstance(e, ImportError):
error_messages.append('Unable to import the getpass module.')
elif isinstance(e, ModuleNotFoundError):
error_messages.append('The getpass module was not found.')
elif isinstance(e, OSError):
error_message = 'OS error occurred while retrieving the username.'
error_messages.append(error_message)
else:
error_messages.append('An exception occurred: ' + str(e))
if user is None:
LOGGER.error(next(error_messages))

LOGGER.debug(f'Network validator host user: {user}')

return user

def _get_device_status(self, module):
Expand Down
6 changes: 6 additions & 0 deletions modules/test/ntp/python/src/ntp_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,12 @@ def _ntp_network_ntp_dhcp(self, config):
ntp_whitelist_resolver.is_ip_whitelisted(ip) for ip in ntp_to_remote_ips
)

for ip in ntp_to_remote_ips:
if ntp_whitelist_resolver.is_ip_whitelisted(ip):
LOGGER.info(f'NTP server {ip} is in the trusted whitelist')
else:
LOGGER.info(f'NTP server {ip} is NOT in the trusted whitelist')

result = 'Feature Not Detected', 'Device has not sent any NTP requests'

if device_sends_ntp:
Expand Down
Loading