From 4d49036152bc160709b52dbedd5f51a33f58f747 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20V=C3=A1vra?= Date: Mon, 4 May 2026 16:31:51 +0200 Subject: [PATCH] Refactor local entities to be subclasses of generic ones We want LocalUser, LocalGroup, ... to have the same interface as generic ones so local provider can be handled like any remote one. --- sssd_test_framework/roles/client.py | 2 +- sssd_test_framework/roles/generic.py | 68 ++-- sssd_test_framework/utils/local_users.py | 483 ++++++++++++++++------- 3 files changed, 388 insertions(+), 165 deletions(-) diff --git a/sssd_test_framework/roles/client.py b/sssd_test_framework/roles/client.py index d79bc581..5955e7f2 100644 --- a/sssd_test_framework/roles/client.py +++ b/sssd_test_framework/roles/client.py @@ -86,7 +86,7 @@ def __init__(self, *args, **kwargs) -> None: Methods for testing automount. """ - self.local: LocalUsersUtils = LocalUsersUtils(self.host, self.fs) + self.local: LocalUsersUtils = LocalUsersUtils(self.host, self.fs, client=self) """ Managing local users and groups. """ diff --git a/sssd_test_framework/roles/generic.py b/sssd_test_framework/roles/generic.py index c06f68ea..aed39354 100644 --- a/sssd_test_framework/roles/generic.py +++ b/sssd_test_framework/roles/generic.py @@ -25,6 +25,11 @@ "GenericNetgroup", "GenericNetgroupMember", "GenericSudoRule", + "SudoRuleUserField", + "SudoRuleHostField", + "SudoRuleCommandField", + "SudoRuleRunAsUserField", + "SudoRuleRunAsGroupField", "GenericAutomount", "GenericAutomountMap", "GenericAutomountKey", @@ -1009,15 +1014,19 @@ class GenericNetgroupMember(object): """ def __init__( - self, *, host: str | None = None, user: ProtocolName | str | None = None, ng: ProtocolName | str | None = None + self, + *, + host: str | None = None, + user: GenericUser | ProtocolName | str | None = None, + ng: GenericNetgroup | ProtocolName | str | None = None, ) -> None: """ :param host: Host, defaults to None :type host: str | None, optional :param user: User, defaults to None - :type user: ProtocolName | str | None, optional + :type user: GenericUser | ProtocolName | str | None, optional :param ng: Netgroup, defaults to None - :type ng: ProtocolName | str | None, optional + :type ng: GenericNetgroup | ProtocolName | str | None, optional """ self.host: str | None = host """Member host.""" @@ -1028,7 +1037,7 @@ def __init__( self.netgroup: str | None = self._get_name(ng) """Member netgroup.""" - def _get_name(self, item: ProtocolName | str | None = None) -> str | None: + def _get_name(self, item: GenericUser | GenericNetgroup | ProtocolName | str | None = None) -> str | None: if item is None: return None @@ -1038,6 +1047,17 @@ def _get_name(self, item: ProtocolName | str | None = None) -> str | None: return item +SudoRuleUserField = ( + str | GenericUser | GenericGroup | ProtocolName | list[str | GenericUser | GenericGroup | ProtocolName] | None +) +SudoRuleHostField = str | ProtocolName | list[str | ProtocolName] | None +SudoRuleCommandField = str | ProtocolName | list[str | ProtocolName] | None +SudoRuleRunAsUserField = ( + str | GenericUser | GenericGroup | ProtocolName | list[str | GenericUser | GenericGroup | ProtocolName] | None +) +SudoRuleRunAsGroupField = str | GenericGroup | ProtocolName | list[str | GenericGroup | ProtocolName] | None + + class GenericSudoRule(ABC, BaseObject): """ Generic sudo rule management. @@ -1055,12 +1075,12 @@ def name(self): def add( self, *, - user: str | GenericUser | GenericGroup | list[str | GenericUser | GenericGroup] | None = None, - host: str | list[str] | None = None, - command: str | list[str] | None = None, + user: SudoRuleUserField = None, + host: SudoRuleHostField = None, + command: SudoRuleCommandField = None, option: str | list[str] | None = None, - runasuser: str | GenericUser | GenericGroup | list[str | GenericUser | GenericGroup] | None = None, - runasgroup: str | GenericGroup | list[str | GenericGroup] | None = None, + runasuser: SudoRuleRunAsUserField = None, + runasgroup: SudoRuleRunAsGroupField = None, order: int | None = None, nopasswd: bool | None = None, ) -> GenericSudoRule: @@ -1068,17 +1088,17 @@ def add( Create new sudo rule. :param user: sudoUser attribute, defaults to None - :type user: str | GenericUser | GenericGroup | list[str | GenericUser | GenericGroup] | None, optional + :type user: SudoRuleUserField, optional :param host: sudoHost attribute, defaults to None - :type host: str | list[str] | None, optional + :type host: SudoRuleHostField, optional :param command: sudoCommand attribute, defaults to None - :type command: str | list[str] | None, optional + :type command: SudoRuleCommandField, optional :param option: sudoOption attribute, defaults to None :type option: str | list[str] | None, optional :param runasuser: sudoRunAsUser attribute, defaults to None - :type runasuser: str | GenericUser | GenericGroup | list[str | GenericUser | GenericGroup] | None, optional + :type runasuser: SudoRuleRunAsUserField, optional :param runasgroup: sudoRunAsGroup attribute, defaults to None - :type runasgroup: str | GenericGroup | list[str | GenericGroup] | None, optional + :type runasgroup: SudoRuleRunAsGroupField, optional :param order: sudoOrder attribute, defaults to None :type order: int | None, optional :param nopasswd: If true, no authentication is required (NOPASSWD), defaults to None (no change) @@ -1092,12 +1112,12 @@ def add( def modify( self, *, - user: str | GenericUser | GenericGroup | list[str | GenericUser | GenericGroup] | None = None, - host: str | list[str] | None = None, - command: str | list[str] | None = None, + user: SudoRuleUserField = None, + host: SudoRuleHostField = None, + command: SudoRuleCommandField = None, option: str | list[str] | None = None, - runasuser: str | GenericUser | GenericGroup | list[str | GenericUser | GenericGroup] | None = None, - runasgroup: str | GenericGroup | list[str | GenericGroup] | None = None, + runasuser: SudoRuleRunAsUserField = None, + runasgroup: SudoRuleRunAsGroupField = None, order: int | None = None, nopasswd: bool | None = None, ) -> GenericSudoRule: @@ -1105,17 +1125,17 @@ def modify( Create new sudo rule. :param user: sudoUser attribute, defaults to None - :type user: str | GenericUser | GenericGroup | list[str | GenericUser | GenericGroup] | None, optional + :type user: SudoRuleUserField, optional :param host: sudoHost attribute, defaults to None - :type host: str | list[str] | None, optional + :type host: SudoRuleHostField, optional :param command: sudoCommand attribute, defaults to None - :type command: str | list[str] | None, optional + :type command: SudoRuleCommandField, optional :param option: sudoOption attribute, defaults to None :type option: str | list[str] | None, optional :param runasuser: sudoRunAsUser attribute, defaults to None - :type runasuser: str | GenericUser | GenericGroup | list[str | GenericUser | GenericGroup] | None, optional + :type runasuser: SudoRuleRunAsUserField, optional :param runasgroup: sudoRunAsGroup attribute, defaults to None - :type runasgroup: str | GenericGroup | list[str | GenericGroup] | None, optional + :type runasgroup: SudoRuleRunAsGroupField, optional :param order: sudoOrder attribute, defaults to None :type order: int | None, optional :param nopasswd: If true, no authentication is required (NOPASSWD), defaults to None (no change) diff --git a/sssd_test_framework/utils/local_users.py b/sssd_test_framework/utils/local_users.py index 8f866656..a97dbc8b 100644 --- a/sssd_test_framework/utils/local_users.py +++ b/sssd_test_framework/utils/local_users.py @@ -3,7 +3,8 @@ from __future__ import annotations import re -from typing import Any, Literal +from datetime import datetime +from typing import TYPE_CHECKING, Any, Literal import jc from pytest_mh import MultihostHost, MultihostUtility @@ -11,7 +12,21 @@ from pytest_mh.conn import ProcessLogLevel from pytest_mh.utils.fs import LinuxFileSystem -from ..roles.generic import GenericNetgroupMember +from ..roles.generic import ( + GenericGroup, + GenericNetgroup, + GenericNetgroupMember, + GenericSudoRule, + GenericUser, + SudoRuleCommandField, + SudoRuleHostField, + SudoRuleRunAsGroupField, + SudoRuleRunAsUserField, + SudoRuleUserField, +) + +if TYPE_CHECKING: + from ..roles.client import Client __all__ = [ "LocalGroup", @@ -38,13 +53,16 @@ class LocalUsersUtils(MultihostUtility[MultihostHost]): All changes are automatically reverted when a test is finished. """ - def __init__(self, host: MultihostHost, fs: LinuxFileSystem) -> None: + def __init__(self, host: MultihostHost, fs: LinuxFileSystem, client: Client | None = None) -> None: """ :param host: Remote host instance. :type host: MultihostHost + :param client: Client role that owns this utility. + :type client: Client | None """ super().__init__(host) + self._client: Client | None = client self.cli: CLIBuilder = host.cli self.fs: LinuxFileSystem = fs self._users: list[str] = [] @@ -246,9 +264,12 @@ def sudorule(self, name: str) -> LocalSudoRule: return LocalSudoRule(self, name) -class LocalUser(object): +class LocalUser(GenericUser): """ Management of local users. + + :class:`LocalUser` is a :class:`GenericUser` for static typing; passkey-related + methods are not supported on local ``/etc/passwd`` users. """ def __init__(self, util: LocalUsersUtils, name: str) -> None: @@ -258,24 +279,32 @@ def __init__(self, util: LocalUsersUtils, name: str) -> None: :param name: User name. :type name: str """ - self.util = util - self.name = name + if util._client is None: + raise RuntimeError("LocalUser requires LocalUsersUtils to be bound to a Client (client= in constructor).") + super().__init__(util._client) + self.util: LocalUsersUtils = util + self._name: str = name - def __str__(self): + @property + def name(self) -> str: + return self._name + + def __str__(self) -> str: """ Returns a string representation of the LocalUser. """ - return self.name + return self._name def add( self, *, uid: int | None = None, gid: int | None = None, - password: str | None = "Secret123", + password: str = "Secret123", home: str | None = None, gecos: str | None = None, shell: str | None = None, + email: str | None = None, ) -> LocalUser: """ Create new local user. @@ -284,7 +313,7 @@ def add( :type uid: int | None, optional :param gid: Primary group id, defaults to None. :type gid: int | None, optional - :param password: Password, defaults to 'Secret123'. + :param password: Password, defaults to 'Secret123' (use empty string to skip ``passwd``). :type password: str, optional :param home: Home directory, defaults to None. :type home: str | None, optional @@ -292,14 +321,17 @@ def add( :type gecos: str | None, optional :param shell: Login shell, defaults to None. :type shell: str | None, optional + :param email: Not applied to local users (present for :class:`GenericUser` API compatibility). + :type email: str | None, optional :return: Self. :rtype: LocalUser """ + del email # Local /etc/passwd user management does not set a mail attribute here. if home is not None: self.util.fs.backup(home) args: CLIBuilderArgs = { - "name": (self.util.cli.option.POSITIONAL, self.name), + "name": (self.util.cli.option.POSITIONAL, self._name), "uid": (self.util.cli.option.VALUE, uid), "gid": (self.util.cli.option.VALUE, gid), "home": (self.util.cli.option.VALUE, home), @@ -307,13 +339,13 @@ def add( "shell": (self.util.cli.option.VALUE, shell), } - passwd = f" && passwd --stdin '{self.name}'" if password else "" - self.util.logger.info(f'Creating local user "{self.name}" on {self.util.host.hostname}') + passwd = f" && passwd --stdin '{self._name}'" if password else "" + self.util.logger.info(f'Creating local user "{self._name}" on {self.util.host.hostname}') self.util.host.conn.run( self.util.cli.command("useradd", args) + passwd, input=password, log_level=ProcessLogLevel.Error ) - self.util._users.append(self.name) + self.util._users.append(self._name) return self def modify( @@ -325,6 +357,7 @@ def modify( home: str | None = None, gecos: str | None = None, shell: str | None = None, + email: str | None = None, ) -> LocalUser: """ Modify existing local user. @@ -341,12 +374,15 @@ def modify( :type gecos: str | None, optional :param shell: Login shell, defaults to None. :type shell: str | None, optional + :param email: Not applied to local users (present for :class:`GenericUser` API compatibility). + :type email: str | None, optional :return: Self. :rtype: LocalUser """ + del email # Local /etc/passwd user management does not set a mail attribute here. args: CLIBuilderArgs = { - "name": (self.util.cli.option.POSITIONAL, self.name), + "name": (self.util.cli.option.POSITIONAL, self._name), "uid": (self.util.cli.option.VALUE, uid), "gid": (self.util.cli.option.VALUE, gid), "home": (self.util.cli.option.VALUE, home), @@ -354,21 +390,82 @@ def modify( "shell": (self.util.cli.option.VALUE, shell), } - passwd = f" && passwd --stdin '{self.name}'" if password else "" - self.util.logger.info(f'Modifying local user "{self.name}" on {self.util.host.hostname}') + passwd = f" && passwd --stdin '{self._name}'" if password else "" + self.util.logger.info(f'Modifying local user "{self._name}" on {self.util.host.hostname}') self.util.host.conn.run( self.util.cli.command("usermod", args) + passwd, input=password, log_level=ProcessLogLevel.Error ) return self + def reset(self, password: str | None = "Secret123") -> LocalUser: + """ + Reset user password. + + :param password: Password, defaults to 'Secret123' + :type password: str, optional + :return: Self. + :rtype: LocalUser + """ + return self.modify(password=password) + + def expire(self, expiration: str | None = "19700101000000") -> LocalUser: + """ + Set user password expiration date and time (via ``chage -E``). + + :param expiration: Date and time for user password expiration, defaults to 19700101000000 + :type expiration: str | None, optional + :return: Self. + :rtype: LocalUser + """ + exp = expiration if expiration is not None else "19700101000000" + end = datetime.strptime(exp, "%Y%m%d%H%M%S") + date_str = end.strftime("%Y-%m-%d") + self.util.logger.info( + f'Setting password expiration for local user "{self._name}" on {self.util.host.hostname}' + ) + self.util.host.conn.run(f"chage -E '{date_str}' '{self._name}'", log_level=ProcessLogLevel.Error) + return self + + def password_change_at_logon(self, **kwargs) -> LocalUser: + """ + Force user to change password next logon (``chage -d 0`` and password reset). + + :return: Self. + :rtype: LocalUser + """ + if "password" not in kwargs: + raise TypeError("Missing argument 'password'!") + self.modify(password=kwargs["password"]) + self.util.logger.info( + f'Requiring password change at next logon for local user "{self._name}" on {self.util.host.hostname}' + ) + self.util.host.conn.run(f"chage -d 0 '{self._name}'", log_level=ProcessLogLevel.Error) + return self + + def passkey_add(self, passkey_mapping: str) -> LocalUser: + """ + Add passkey mapping to the user. + + :raises NotImplementedError: Not supported for local users. + """ + raise NotImplementedError("LocalUser does not support passkey_add; use a directory-backed user.") + + def passkey_remove(self, passkey_mapping: str) -> LocalUser: + """ + Remove passkey mapping from the user. + + :raises NotImplementedError: Not supported for local users. + """ + raise NotImplementedError("LocalUser does not support passkey_remove; use a directory-backed user.") + def delete(self) -> None: """ Delete the user. """ - self.util.logger.info(f'Deleting local user "{self.name}" on {self.util.host.hostname}') - self.util.host.conn.run(f"userdel '{self.name}' --force --remove", log_level=ProcessLogLevel.Error) - self.util._users.remove(self.name) + self.util.logger.info(f'Deleting local user "{self._name}" on {self.util.host.hostname}') + self.util.host.conn.run(f"userdel '{self._name}' --force --remove", log_level=ProcessLogLevel.Error) + self.util._users.remove(self._name) def get(self, attrs: list[str] | None = None) -> dict[str, list[str]]: """ @@ -379,9 +476,9 @@ def get(self, attrs: list[str] | None = None) -> dict[str, list[str]]: :return: Dictionary with attribute name as a key. :rtype: dict[str, list[str]] """ - self.util.logger.info(f'Fetching local user "{self.name}" on {self.util.host.hostname}') + self.util.logger.info(f'Fetching local user "{self._name}" on {self.util.host.hostname}') result = self.util.host.conn.exec( - ["getent", "passwd", self.name], raise_on_error=False, log_level=ProcessLogLevel.Error + ["getent", "passwd", self._name], raise_on_error=False, log_level=ProcessLogLevel.Error ) if result.rc != 0: return {} @@ -399,9 +496,13 @@ def get(self, attrs: list[str] | None = None) -> dict[str, list[str]]: return {k: [str(v)] for k, v in jcresult[0].items() if not attrs or k in attrs} -class LocalGroup(object): +class LocalGroup(GenericGroup): """ Management of local groups. + + :class:`LocalGroup` is a :class:`GenericGroup` for static typing. Membership + changes only accept :class:`LocalUser` and :class:`LocalGroup`; directory + principals are not valid members of ``/etc/group``. """ def __init__(self, util: LocalUsersUtils, name: str) -> None: @@ -411,36 +512,60 @@ def __init__(self, util: LocalUsersUtils, name: str) -> None: :param name: Group name. :type name: str """ - self.util = util - self.name = name + if util._client is None: + raise RuntimeError("LocalGroup requires LocalUsersUtils to be bound to a Client (client= in constructor).") + super().__init__(util._client) + self.util: LocalUsersUtils = util + self._name: str = name + + @property + def name(self) -> str: + return self._name - def __str__(self): + def __str__(self) -> str: """ Returns a string representation of the LocalGroup. """ - return self.name + return self._name + + @staticmethod + def _member_principal_name(member: GenericUser | GenericGroup) -> str: + """ + Resolve a member to a local ``passwd``/``group`` name. + + :raises NotImplementedError: if ``member`` is not a local user or local group. + """ + if isinstance(member, (LocalUser, LocalGroup)): + return member.name + raise NotImplementedError( + "LocalGroup membership only supports LocalUser and LocalGroup; use directory-specific APIs otherwise." + ) def add( self, *, gid: int | None = None, + description: str | None = None, ) -> LocalGroup: """ Create new local group. :param gid: Group id, defaults to None. :type gid: int | None, optional + :param description: Not stored for pure local groups (present for :class:`GenericGroup` API compatibility). + :type description: str | None, optional :return: Self. :rtype: LocalGroup """ + del description # No description field in /etc/group via this API. args: CLIBuilderArgs = { - "name": (self.util.cli.option.POSITIONAL, self.name), + "name": (self.util.cli.option.POSITIONAL, self._name), "gid": (self.util.cli.option.VALUE, gid), } - self.util.logger.info(f'Creating local group "{self.name}" on {self.util.host.hostname}') + self.util.logger.info(f'Creating local group "{self._name}" on {self.util.host.hostname}') self.util.host.conn.run(self.util.cli.command("groupadd", args), log_level=ProcessLogLevel.Silent) - self.util._groups.append(self.name) + self.util._groups.append(self._name) return self @@ -448,6 +573,7 @@ def modify( self, *, gid: int | None = None, + description: str | None = None, ) -> LocalGroup: """ Modify existing local group. @@ -456,16 +582,19 @@ def modify( :param gid: Group id, defaults to None. :type gid: int | None, optional + :param description: Not stored for pure local groups (present for :class:`GenericGroup` API compatibility). + :type description: str | None, optional :return: Self. :rtype: LocalGroup """ + del description # No description field in /etc/group via this API. args: CLIBuilderArgs = { - "name": (self.util.cli.option.POSITIONAL, self.name), + "name": (self.util.cli.option.POSITIONAL, self._name), "gid": (self.util.cli.option.VALUE, gid), } - self.util.logger.info(f'Modifying local group "{self.name}" on {self.util.host.hostname}') + self.util.logger.info(f'Modifying local group "{self._name}" on {self.util.host.hostname}') self.util.host.conn.run(self.util.cli.command("groupmod", args), log_level=ProcessLogLevel.Error) return self @@ -474,9 +603,9 @@ def delete(self) -> None: """ Delete the group. """ - self.util.logger.info(f'Deleting local group "{self.name}" on {self.util.host.hostname}') - self.util.host.conn.run(f"groupdel '{self.name}' -f", log_level=ProcessLogLevel.Error) - self.util._groups.remove(self.name) + self.util.logger.info(f'Deleting local group "{self._name}" on {self.util.host.hostname}') + self.util.host.conn.run(f"groupdel '{self._name}' -f", log_level=ProcessLogLevel.Error) + self.util._groups.remove(self._name) def get(self, attrs: list[str] | None = None) -> dict[str, list[str]]: """ @@ -487,9 +616,9 @@ def get(self, attrs: list[str] | None = None) -> dict[str, list[str]]: :return: Dictionary with attribute name as a key. :rtype: dict[str, list[str]] """ - self.util.logger.info(f'Fetching local group "{self.name}" on {self.util.host.hostname}') + self.util.logger.info(f'Fetching local group "{self._name}" on {self.util.host.hostname}') result = self.util.host.conn.exec( - ["getent", "group", self.name], raise_on_error=False, log_level=ProcessLogLevel.Silent + ["getent", "group", self._name], raise_on_error=False, log_level=ProcessLogLevel.Silent ) if result.rc != 0: return {} @@ -506,62 +635,66 @@ def get(self, attrs: list[str] | None = None) -> dict[str, list[str]]: return {k: [str(v)] for k, v in jcresult[0].items() if not attrs or k in attrs} - def add_member(self, member: LocalUser) -> LocalGroup: + def add_member(self, member: GenericUser | GenericGroup) -> LocalGroup: """ Add group member. :param member: User or group to add as a member. - :type member: LocalUser + :type member: GenericUser | GenericGroup :return: Self. :rtype: LocalGroup """ return self.add_members([member]) - def add_members(self, members: list[LocalUser]) -> LocalGroup: + def add_members(self, members: list[GenericUser | GenericGroup]) -> LocalGroup: """ Add multiple group members. - :param member: List of users or groups to add as members. - :type member: list[LocalUser] + :param members: List of users or groups to add as members. + :type members: list[GenericUser | GenericGroup] :return: Self. :rtype: LocalGroup """ - self.util.logger.info(f'Adding members to group "{self.name}" on {self.util.host.hostname}') + self.util.logger.info(f'Adding members to group "{self._name}" on {self.util.host.hostname}') if not members: return self - cmd = "\n".join([f"groupmems --group '{self.name}' --add '{x.name}'" for x in members]) + cmd = "\n".join( + [f"groupmems --group '{self._name}' --add '{self._member_principal_name(x)}'" for x in members] + ) self.util.host.conn.run("set -ex\n" + cmd, log_level=ProcessLogLevel.Error) return self - def remove_member(self, member: LocalUser) -> LocalGroup: + def remove_member(self, member: GenericUser | GenericGroup) -> LocalGroup: """ Remove group member. :param member: User or group to remove from the group. - :type member: LocalUser + :type member: GenericUser | GenericGroup :return: Self. :rtype: LocalGroup """ return self.remove_members([member]) - def remove_members(self, members: list[LocalUser]) -> LocalGroup: + def remove_members(self, members: list[GenericUser | GenericGroup]) -> LocalGroup: """ Remove multiple group members. - :param member: List of users or groups to remove from the group. - :type member: list[LocalUser] + :param members: List of users or groups to remove from the group. + :type members: list[GenericUser | GenericGroup] :return: Self. :rtype: LocalGroup """ - self.util.logger.info(f'Removing members from group "{self.name}" on {self.util.host.hostname}') + self.util.logger.info(f'Removing members from group "{self._name}" on {self.util.host.hostname}') if not members: return self - cmd = "\n".join([f"groupmems --group '{self.name}' --delete '{x.name}'" for x in members]) + cmd = "\n".join( + [f"groupmems --group '{self._name}' --delete '{self._member_principal_name(x)}'" for x in members] + ) self.util.host.conn.run("set -ex\n" + cmd, log_level=ProcessLogLevel.Error) return self @@ -576,22 +709,22 @@ def __init__( self, *, host: str | None = None, - user: LocalUser | str | None = None, + user: GenericUser | str | None = None, group: LocalGroup | str | None = None, hostgroup: str | None = None, - ng: LocalNetgroup | str | None = None, + ng: GenericNetgroup | str | None = None, ) -> None: """ :param host: Host part of the triple, defaults to None. :type host: str | None, optional :param user: User part of the triple, defaults to None. - :type user: LocalUser | str | None, optional + :type user: GenericUser | str | None, optional :param group: Not supported for local netgroups. :type group: LocalGroup | str | None, optional :param hostgroup: Not supported for local netgroups. :type hostgroup: str | None, optional :param ng: Nested netgroup, defaults to None. - :type ng: LocalNetgroup | str | None, optional + :type ng: GenericNetgroup | str | None, optional :raises :class:`ValueError` for unsupported member kinds. """ @@ -624,9 +757,14 @@ def to_member_string(self) -> str: return f"({h},{u},)" -class LocalNetgroup(object): +class LocalNetgroup(GenericNetgroup): """ - Local netgroup management via. + Local netgroup management via ``/etc/netgroup``. + + :class:`LocalNetgroup` is a :class:`GenericNetgroup` for static typing. Only + :class:`LocalNetgroupMember` instances are supported in :meth:`add_members` + and :meth:`remove_members` (not arbitrary :class:`GenericNetgroupMember` + subclasses from other backends). """ def __init__(self, util: LocalUsersUtils, name: str) -> None: @@ -636,15 +774,24 @@ def __init__(self, util: LocalUsersUtils, name: str) -> None: :param name: Netgroup name. :type name: str """ - self.util = util - self.name = name + if util._client is None: + raise RuntimeError( + "LocalNetgroup requires LocalUsersUtils to be bound to a Client (client= in constructor)." + ) + super().__init__(util._client) + self.util: LocalUsersUtils = util + self._name: str = name self._members: list[str] = [] + @property + def name(self) -> str: + return self._name + def __str__(self) -> str: """ Return the netgroup name. """ - return self.name + return self._name def _format_line(self) -> str: """ @@ -655,8 +802,8 @@ def _format_line(self) -> str: """ if not self._members: # Empty triple (,,) — valid NIS form with empty host, user, and domain fields. - return f"{self.name}\t(,,)" - return f"{self.name}\t" + " ".join(self._members) + return f"{self._name}\t(,,)" + return f"{self._name}\t" + " ".join(self._members) def add(self) -> LocalNetgroup: """ @@ -666,26 +813,51 @@ def add(self) -> LocalNetgroup: :rtype: LocalNetgroup :raises: :class:`ValueError` for duplicate names. """ - self.util.logger.info(f'Creating local netgroup "{self.name}" on {self.util.host.hostname}') - existing = self.util._netgroups.get(self.name) + self.util.logger.info(f'Creating local netgroup "{self._name}" on {self.util.host.hostname}') + existing = self.util._netgroups.get(self._name) if existing is not None and existing is not self: raise ValueError( - f'Local netgroup "{self.name}" is already managed by another LocalNetgroup instance; ' + f'Local netgroup "{self._name}" is already managed by another LocalNetgroup instance; ' "reuse the object returned from the first client.local.netgroup() call." ) - self.util._netgroup_names_touched.add(self.name) - self.util._netgroups[self.name] = self + self.util._netgroup_names_touched.add(self._name) + self.util._netgroups[self._name] = self self.util._rewrite_netgroup_file() return self + def get(self, attrs: list[str] | None = None) -> dict[str, list[str]]: + """ + Get netgroup data from ``getent netgroup`` (reflecting ``/etc/netgroup``). + + Keys include ``cn`` (netgroup name) and ``nisNetgroupTriple`` (member tokens). + """ + self.util.logger.info(f'Fetching local netgroup "{self._name}" on {self.util.host.hostname}') + result = self.util.host.conn.exec( + ["getent", "netgroup", self._name], raise_on_error=False, log_level=ProcessLogLevel.Silent + ) + if result.rc != 0: + return {} + + line = result.stdout.strip().splitlines()[0] if result.stdout.strip() else "" + if not line: + return {} + + tokens = line.split() + if not tokens or tokens[0] != self._name: + return {} + + triples = tokens[1:] + out: dict[str, list[str]] = {"cn": [self._name], "nisNetgroupTriple": triples} + if attrs is None: + return out + return {k: v for k, v in out.items() if k in attrs} + def add_member( self, *, host: str | None = None, - user: LocalUser | str | None = None, - group: LocalGroup | str | None = None, - hostgroup: str | None = None, - ng: LocalNetgroup | str | None = None, + user: GenericUser | str | None = None, + ng: GenericNetgroup | str | None = None, ) -> LocalNetgroup: """ Add a netgroup member. @@ -693,9 +865,9 @@ def add_member( :return: Self. :rtype: LocalNetgroup """ - return self.add_members([LocalNetgroupMember(host=host, user=user, group=group, hostgroup=hostgroup, ng=ng)]) + return self.add_members([LocalNetgroupMember(host=host, user=user, ng=ng)]) - def add_members(self, members: list[LocalNetgroupMember]) -> LocalNetgroup: + def add_members(self, members: list[GenericNetgroupMember]) -> LocalNetgroup: """ Add multiple netgroup members. @@ -704,20 +876,22 @@ def add_members(self, members: list[LocalNetgroupMember]) -> LocalNetgroup: already in this netgroup or appears more than once in ``members``, later duplicates are skipped (nothing is appended for them). - :param members: Netgroup members. - :type members: list[LocalNetgroupMember] + :param members: Netgroup members (must be :class:`LocalNetgroupMember`). + :type members: list[GenericNetgroupMember] :return: Self. :rtype: LocalNetgroup """ - self.util.logger.info(f'Adding members to local netgroup "{self.name}" on {self.util.host.hostname}') + self.util.logger.info(f'Adding members to local netgroup "{self._name}" on {self.util.host.hostname}') if not members: return self - if self.name not in self.util._netgroups: - raise RuntimeError(f'Netgroup "{self.name}" was not created; call add() first') + if self._name not in self.util._netgroups: + raise RuntimeError(f'Netgroup "{self._name}" was not created; call add() first') for m in members: + if not isinstance(m, LocalNetgroupMember): + raise TypeError("Local netgroups only accept LocalNetgroupMember entries.") line = m.to_member_string() if line in self._members: continue @@ -729,10 +903,8 @@ def remove_member( self, *, host: str | None = None, - user: LocalUser | str | None = None, - group: LocalGroup | str | None = None, - hostgroup: str | None = None, - ng: "LocalNetgroup | str | None" = None, + user: GenericUser | str | None = None, + ng: GenericNetgroup | str | None = None, ) -> LocalNetgroup: """ Remove a netgroup member. @@ -740,28 +912,32 @@ def remove_member( :return: Self. :rtype: LocalNetgroup """ - return self.remove_members( - [LocalNetgroupMember(host=host, user=user, group=group, hostgroup=hostgroup, ng=ng)] - ) + return self.remove_members([LocalNetgroupMember(host=host, user=user, ng=ng)]) - def remove_members(self, members: list[LocalNetgroupMember]) -> LocalNetgroup: + def remove_members(self, members: list[GenericNetgroupMember]) -> LocalNetgroup: """ Remove netgroup members. - :param members: Members to remove. - :type members: list[LocalNetgroupMember] + :param members: Members to remove (must be :class:`LocalNetgroupMember`). + :type members: list[GenericNetgroupMember] :return: Self. :rtype: LocalNetgroup """ - self.util.logger.info(f'Removing members from local netgroup "{self.name}" on {self.util.host.hostname}') + self.util.logger.info(f'Removing members from local netgroup "{self._name}" on {self.util.host.hostname}') if not members: return self - if self.name not in self.util._netgroups: - raise RuntimeError(f'Netgroup "{self.name}" was not created; call add() first') + if self._name not in self.util._netgroups: + raise RuntimeError(f'Netgroup "{self._name}" was not created; call add() first') + + local_members: list[LocalNetgroupMember] = [] + for m in members: + if not isinstance(m, LocalNetgroupMember): + raise TypeError("Local netgroups only accept LocalNetgroupMember entries.") + local_members.append(m) - remove_strings = {m.to_member_string() for m in members} + remove_strings = {m.to_member_string() for m in local_members} self._members = [x for x in self._members if x not in remove_strings] self.util._rewrite_netgroup_file() return self @@ -770,10 +946,10 @@ def delete(self) -> None: """ Remove this netgroup from ``/etc/netgroup``. """ - self.util.logger.info(f'Deleting local netgroup "{self.name}" on {self.util.host.hostname}') - self.util._netgroup_names_touched.add(self.name) - if self.name in self.util._netgroups: - del self.util._netgroups[self.name] + self.util.logger.info(f'Deleting local netgroup "{self._name}" on {self.util.host.hostname}') + self.util._netgroup_names_touched.add(self._name) + if self._name in self.util._netgroups: + del self.util._netgroups[self._name] self._members.clear() self.util._rewrite_netgroup_file() @@ -936,17 +1112,12 @@ def delete(self) -> None: self.util._sudoaliases.remove(self) -LocalSudoRuleUserPiece = str | LocalUser | LocalGroup | LocalSudoAlias -LocalSudoRuleUserArg = LocalSudoRuleUserPiece | list[LocalSudoRuleUserPiece] -LocalSudoRuleHostArg = str | LocalSudoAlias | list[str | LocalSudoAlias] -LocalSudoRuleCommandArg = str | LocalSudoAlias | list[str | LocalSudoAlias] -LocalSudoRuleRunAsUserArg = str | LocalUser | LocalSudoAlias | list[str | LocalUser | LocalSudoAlias] -LocalSudoRuleRunAsGroupArg = str | LocalGroup | LocalSudoAlias | list[str | LocalGroup | LocalSudoAlias] - - -class LocalSudoRule(object): +class LocalSudoRule(GenericSudoRule): """ - Local sudo rule management. + Local sudo rule management (``/etc/sudoers.d/`` drop-ins). + + See :class:`GenericSudoRule` for parameter meanings. ``ProtocolName`` values + (including :class:`LocalSudoAlias`) are emitted as bare sudoers names. """ default_user: str = "ALL" @@ -959,37 +1130,45 @@ def __init__(self, util: LocalUsersUtils, name: str) -> None: :param name: Sudo rule name. :type name: str """ - self.name = name - self.util = util + if util._client is None: + raise RuntimeError( + "LocalSudoRule requires LocalUsersUtils to be bound to a Client (client= in constructor)." + ) + super().__init__(util._client) + self._name: str = name + self.util: LocalUsersUtils = util self.__rule: dict[str, Any] = dict() self.filename: str | None = None self.rule_str: str | None = None - def __str__(self): + @property + def name(self) -> str: + return self._name + + def __str__(self) -> str: """ Returns a string representation of the LocalSudoRule. """ if self.rule_str: return self.rule_str - else: - return self.name + return self._name @staticmethod def _format_list_item(item: str | Any, add_percent: bool = False) -> str: """ Format a single sudoers list element. - :param item: String, :class:`LocalUser`, :class:`LocalGroup`, or :class:`LocalSudoAlias`. + :param item: String, user/group, or name reference (e.g. :class:`LocalSudoAlias`). :type item: str | Any - :param add_percent: If true, prepend ``%`` to :class:`LocalGroup` entries. + :param add_percent: If true, prepend ``%`` to :class:`GenericGroup` entries. :type add_percent: bool, optional :return: Formatted fragment. :rtype: str """ if isinstance(item, LocalSudoAlias): return item.name - if isinstance(item, LocalGroup) and add_percent: - return f"%{str(item)}" + if isinstance(item, GenericGroup) and add_percent: + return f"%{item.name}" return str(item) @staticmethod @@ -999,7 +1178,7 @@ def _format_list(item: str | Any | list[str | Any], add_percent: bool = False) - :param item: A single value or list of values to format. :type item: str | Any | list[str | Any] - :param add_percent: If true, prepend ``%`` to :class:`LocalGroup` entries (sudo user field). + :param add_percent: If true, prepend ``%`` to :class:`GenericGroup` entries (sudo user field). :type add_percent: bool, optional :return: Formatted string. :rtype: str @@ -1008,15 +1187,37 @@ def _format_list(item: str | Any | list[str | Any], add_percent: bool = False) - return ", ".join(LocalSudoRule._format_list_item(x, add_percent) for x in item) return LocalSudoRule._format_list_item(item, add_percent) + def get(self, attrs: list[str] | None = None) -> dict[str, list[str]]: + """ + Return rule text as attributes (``cn``, ``sudoRule``). + + If ``rule_str`` is unset, reads the drop-in file when ``filename`` is set. + """ + line = self.rule_str.strip() if self.rule_str else "" + if not line and self.filename: + result = self.util.host.conn.exec( + ["cat", f"/etc/sudoers.d/{self.filename}"], + raise_on_error=False, + log_level=ProcessLogLevel.Silent, + ) + if result.rc == 0: + line = result.stdout.strip() + if not line: + return {} + data: dict[str, list[str]] = {"cn": [self._name], "sudoRule": [line]} + if attrs is None: + return data + return {k: v for k, v in data.items() if k in attrs} + def add( self, *, - user: LocalSudoRuleUserArg | None = default_user, - host: LocalSudoRuleHostArg | None = default_host, - command: LocalSudoRuleCommandArg | None = default_command, + user: SudoRuleUserField = default_user, + host: SudoRuleHostField = default_host, + command: SudoRuleCommandField = default_command, option: str | list[str] | None = None, - runasuser: LocalSudoRuleRunAsUserArg | None = None, - runasgroup: LocalSudoRuleRunAsGroupArg | None = None, + runasuser: SudoRuleRunAsUserField = None, + runasgroup: SudoRuleRunAsGroupField = None, order: int | None = None, nopasswd: bool | None = None, ) -> LocalSudoRule: @@ -1024,17 +1225,17 @@ def add( Create new sudo rule. :param user: sudoUser attribute, defaults to ALL. - :type user: LocalSudoRuleUserArg | None + :type user: SudoRuleUserField, optional :param host: sudoHost attribute, defaults to ALL. - :type host: LocalSudoRuleHostArg | None + :type host: SudoRuleHostField, optional :param command: sudoCommand attribute, defaults to ALL. - :type command: LocalSudoRuleCommandArg | None + :type command: SudoRuleCommandField, optional :param option: sudoOption attribute, defaults to None. :type option: str | list[str] | None, optional :param runasuser: sudoRunAsUser attribute, defaults to None. - :type runasuser: LocalSudoRuleRunAsUserArg | None, optional + :type runasuser: SudoRuleRunAsUserField, optional :param runasgroup: sudoRunAsGroup attribute, defaults to None. - :type runasgroup: LocalSudoRuleRunAsGroupArg | None, optional + :type runasgroup: SudoRuleRunAsGroupField, optional :param order: sudoOrder attribute, defaults to None. :type order: int | None, optional :param nopasswd: If true, no authentication is required (NOPASSWD), defaults to None (no change) @@ -1044,7 +1245,7 @@ def add( """ orderstr = f"{order:02d}" if order is not None else str(len(self.util._sudorules)) if self.filename is None: - self.filename = f"{orderstr}_{self.name}" + self.filename = f"{orderstr}_{self._name}" # Remember arguments so we can use them in modify if needed self.__rule = dict[str, Any]( @@ -1078,35 +1279,37 @@ def add( def modify( self, *, - user: LocalSudoRuleUserArg | None = None, - host: LocalSudoRuleHostArg | None = None, - command: LocalSudoRuleCommandArg | None = None, + user: SudoRuleUserField = None, + host: SudoRuleHostField = None, + command: SudoRuleCommandField = None, option: str | list[str] | None = None, - runasuser: LocalSudoRuleRunAsUserArg | None = None, - runasgroup: LocalSudoRuleRunAsGroupArg | None = None, + runasuser: SudoRuleRunAsUserField = None, + runasgroup: SudoRuleRunAsGroupField = None, order: int | None = None, nopasswd: bool | None = None, ) -> LocalSudoRule: """ - Modify existing Local sudo rule. + Modify existing local sudo rule. + + Parameters set to ``None`` keep the previous values. :param user: sudoUser attribute, defaults to None. - :type user: LocalSudoRuleUserArg | None, optional + :type user: SudoRuleUserField, optional :param host: sudoHost attribute, defaults to None. - :type host: LocalSudoRuleHostArg | None, optional - :param command: sudoCommand attribute defaults to None. - :type command: LocalSudoRuleCommandArg | None, optional + :type host: SudoRuleHostField, optional + :param command: sudoCommand attribute, defaults to None. + :type command: SudoRuleCommandField, optional :param option: sudoOption attribute, defaults to None. :type option: str | list[str] | None, optional :param runasuser: sudoRunAsUser attribute, defaults to None. - :type runasuser: LocalSudoRuleRunAsUserArg | None, optional + :type runasuser: SudoRuleRunAsUserField, optional :param runasgroup: sudoRunAsGroup attribute, defaults to None. - :type runasgroup: LocalSudoRuleRunAsGroupArg | None, optional + :type runasgroup: SudoRuleRunAsGroupField, optional :param order: sudoOrder attribute, defaults to None. :type order: int | None, optional :param nopasswd: If true, no authentication is required (NOPASSWD), defaults to None (no change). :type nopasswd: bool | None, optional - :return: New sudo rule object. + :return: Self. :rtype: LocalSudoRule """ self.delete()