diff --git a/sssd_test_framework/roles/client.py b/sssd_test_framework/roles/client.py index d79bc581..5955e7f2 100644 --- a/sssd_test_framework/roles/client.py +++ b/sssd_test_framework/roles/client.py @@ -86,7 +86,7 @@ def __init__(self, *args, **kwargs) -> None: Methods for testing automount. """ - self.local: LocalUsersUtils = LocalUsersUtils(self.host, self.fs) + self.local: LocalUsersUtils = LocalUsersUtils(self.host, self.fs, client=self) """ Managing local users and groups. """ diff --git a/sssd_test_framework/roles/generic.py b/sssd_test_framework/roles/generic.py index c06f68ea..aed39354 100644 --- a/sssd_test_framework/roles/generic.py +++ b/sssd_test_framework/roles/generic.py @@ -25,6 +25,11 @@ "GenericNetgroup", "GenericNetgroupMember", "GenericSudoRule", + "SudoRuleUserField", + "SudoRuleHostField", + "SudoRuleCommandField", + "SudoRuleRunAsUserField", + "SudoRuleRunAsGroupField", "GenericAutomount", "GenericAutomountMap", "GenericAutomountKey", @@ -1009,15 +1014,19 @@ class GenericNetgroupMember(object): """ def __init__( - self, *, host: str | None = None, user: ProtocolName | str | None = None, ng: ProtocolName | str | None = None + self, + *, + host: str | None = None, + user: GenericUser | ProtocolName | str | None = None, + ng: GenericNetgroup | ProtocolName | str | None = None, ) -> None: """ :param host: Host, defaults to None :type host: str | None, optional :param user: User, defaults to None - :type user: ProtocolName | str | None, optional + :type user: GenericUser | ProtocolName | str | None, optional :param ng: Netgroup, defaults to None - :type ng: ProtocolName | str | None, optional + :type ng: GenericNetgroup | ProtocolName | str | None, optional """ self.host: str | None = host """Member host.""" @@ -1028,7 +1037,7 @@ def __init__( self.netgroup: str | None = self._get_name(ng) """Member netgroup.""" - def _get_name(self, item: ProtocolName | str | None = None) -> str | None: + def _get_name(self, item: GenericUser | GenericNetgroup | ProtocolName | str | None = None) -> str | None: if item is None: return None @@ -1038,6 +1047,17 @@ def _get_name(self, item: ProtocolName | str | None = None) -> str | None: return item +SudoRuleUserField = ( + str | GenericUser | GenericGroup | ProtocolName | list[str | GenericUser | GenericGroup | ProtocolName] | None +) +SudoRuleHostField = str | ProtocolName | list[str | ProtocolName] | None +SudoRuleCommandField = str | ProtocolName | list[str | ProtocolName] | None +SudoRuleRunAsUserField = ( + str | GenericUser | GenericGroup | ProtocolName | list[str | GenericUser | GenericGroup | ProtocolName] | None +) +SudoRuleRunAsGroupField = str | GenericGroup | ProtocolName | list[str | GenericGroup | ProtocolName] | None + + class GenericSudoRule(ABC, BaseObject): """ Generic sudo rule management. @@ -1055,12 +1075,12 @@ def name(self): def add( self, *, - user: str | GenericUser | GenericGroup | list[str | GenericUser | GenericGroup] | None = None, - host: str | list[str] | None = None, - command: str | list[str] | None = None, + user: SudoRuleUserField = None, + host: SudoRuleHostField = None, + command: SudoRuleCommandField = None, option: str | list[str] | None = None, - runasuser: str | GenericUser | GenericGroup | list[str | GenericUser | GenericGroup] | None = None, - runasgroup: str | GenericGroup | list[str | GenericGroup] | None = None, + runasuser: SudoRuleRunAsUserField = None, + runasgroup: SudoRuleRunAsGroupField = None, order: int | None = None, nopasswd: bool | None = None, ) -> GenericSudoRule: @@ -1068,17 +1088,17 @@ def add( Create new sudo rule. :param user: sudoUser attribute, defaults to None - :type user: str | GenericUser | GenericGroup | list[str | GenericUser | GenericGroup] | None, optional + :type user: SudoRuleUserField, optional :param host: sudoHost attribute, defaults to None - :type host: str | list[str] | None, optional + :type host: SudoRuleHostField, optional :param command: sudoCommand attribute, defaults to None - :type command: str | list[str] | None, optional + :type command: SudoRuleCommandField, optional :param option: sudoOption attribute, defaults to None :type option: str | list[str] | None, optional :param runasuser: sudoRunAsUser attribute, defaults to None - :type runasuser: str | GenericUser | GenericGroup | list[str | GenericUser | GenericGroup] | None, optional + :type runasuser: SudoRuleRunAsUserField, optional :param runasgroup: sudoRunAsGroup attribute, defaults to None - :type runasgroup: str | GenericGroup | list[str | GenericGroup] | None, optional + :type runasgroup: SudoRuleRunAsGroupField, optional :param order: sudoOrder attribute, defaults to None :type order: int | None, optional :param nopasswd: If true, no authentication is required (NOPASSWD), defaults to None (no change) @@ -1092,12 +1112,12 @@ def add( def modify( self, *, - user: str | GenericUser | GenericGroup | list[str | GenericUser | GenericGroup] | None = None, - host: str | list[str] | None = None, - command: str | list[str] | None = None, + user: SudoRuleUserField = None, + host: SudoRuleHostField = None, + command: SudoRuleCommandField = None, option: str | list[str] | None = None, - runasuser: str | GenericUser | GenericGroup | list[str | GenericUser | GenericGroup] | None = None, - runasgroup: str | GenericGroup | list[str | GenericGroup] | None = None, + runasuser: SudoRuleRunAsUserField = None, + runasgroup: SudoRuleRunAsGroupField = None, order: int | None = None, nopasswd: bool | None = None, ) -> GenericSudoRule: @@ -1105,17 +1125,17 @@ def modify( Create new sudo rule. :param user: sudoUser attribute, defaults to None - :type user: str | GenericUser | GenericGroup | list[str | GenericUser | GenericGroup] | None, optional + :type user: SudoRuleUserField, optional :param host: sudoHost attribute, defaults to None - :type host: str | list[str] | None, optional + :type host: SudoRuleHostField, optional :param command: sudoCommand attribute, defaults to None - :type command: str | list[str] | None, optional + :type command: SudoRuleCommandField, optional :param option: sudoOption attribute, defaults to None :type option: str | list[str] | None, optional :param runasuser: sudoRunAsUser attribute, defaults to None - :type runasuser: str | GenericUser | GenericGroup | list[str | GenericUser | GenericGroup] | None, optional + :type runasuser: SudoRuleRunAsUserField, optional :param runasgroup: sudoRunAsGroup attribute, defaults to None - :type runasgroup: str | GenericGroup | list[str | GenericGroup] | None, optional + :type runasgroup: SudoRuleRunAsGroupField, optional :param order: sudoOrder attribute, defaults to None :type order: int | None, optional :param nopasswd: If true, no authentication is required (NOPASSWD), defaults to None (no change) diff --git a/sssd_test_framework/utils/local_users.py b/sssd_test_framework/utils/local_users.py index 8f866656..a97dbc8b 100644 --- a/sssd_test_framework/utils/local_users.py +++ b/sssd_test_framework/utils/local_users.py @@ -3,7 +3,8 @@ from __future__ import annotations import re -from typing import Any, Literal +from datetime import datetime +from typing import TYPE_CHECKING, Any, Literal import jc from pytest_mh import MultihostHost, MultihostUtility @@ -11,7 +12,21 @@ from pytest_mh.conn import ProcessLogLevel from pytest_mh.utils.fs import LinuxFileSystem -from ..roles.generic import GenericNetgroupMember +from ..roles.generic import ( + GenericGroup, + GenericNetgroup, + GenericNetgroupMember, + GenericSudoRule, + GenericUser, + SudoRuleCommandField, + SudoRuleHostField, + SudoRuleRunAsGroupField, + SudoRuleRunAsUserField, + SudoRuleUserField, +) + +if TYPE_CHECKING: + from ..roles.client import Client __all__ = [ "LocalGroup", @@ -38,13 +53,16 @@ class LocalUsersUtils(MultihostUtility[MultihostHost]): All changes are automatically reverted when a test is finished. """ - def __init__(self, host: MultihostHost, fs: LinuxFileSystem) -> None: + def __init__(self, host: MultihostHost, fs: LinuxFileSystem, client: Client | None = None) -> None: """ :param host: Remote host instance. :type host: MultihostHost + :param client: Client role that owns this utility. + :type client: Client | None """ super().__init__(host) + self._client: Client | None = client self.cli: CLIBuilder = host.cli self.fs: LinuxFileSystem = fs self._users: list[str] = [] @@ -246,9 +264,12 @@ def sudorule(self, name: str) -> LocalSudoRule: return LocalSudoRule(self, name) -class LocalUser(object): +class LocalUser(GenericUser): """ Management of local users. + + :class:`LocalUser` is a :class:`GenericUser` for static typing; passkey-related + methods are not supported on local ``/etc/passwd`` users. """ def __init__(self, util: LocalUsersUtils, name: str) -> None: @@ -258,24 +279,32 @@ def __init__(self, util: LocalUsersUtils, name: str) -> None: :param name: User name. :type name: str """ - self.util = util - self.name = name + if util._client is None: + raise RuntimeError("LocalUser requires LocalUsersUtils to be bound to a Client (client= in constructor).") + super().__init__(util._client) + self.util: LocalUsersUtils = util + self._name: str = name - def __str__(self): + @property + def name(self) -> str: + return self._name + + def __str__(self) -> str: """ Returns a string representation of the LocalUser. """ - return self.name + return self._name def add( self, *, uid: int | None = None, gid: int | None = None, - password: str | None = "Secret123", + password: str = "Secret123", home: str | None = None, gecos: str | None = None, shell: str | None = None, + email: str | None = None, ) -> LocalUser: """ Create new local user. @@ -284,7 +313,7 @@ def add( :type uid: int | None, optional :param gid: Primary group id, defaults to None. :type gid: int | None, optional - :param password: Password, defaults to 'Secret123'. + :param password: Password, defaults to 'Secret123' (use empty string to skip ``passwd``). :type password: str, optional :param home: Home directory, defaults to None. :type home: str | None, optional @@ -292,14 +321,17 @@ def add( :type gecos: str | None, optional :param shell: Login shell, defaults to None. :type shell: str | None, optional + :param email: Not applied to local users (present for :class:`GenericUser` API compatibility). + :type email: str | None, optional :return: Self. :rtype: LocalUser """ + del email # Local /etc/passwd user management does not set a mail attribute here. if home is not None: self.util.fs.backup(home) args: CLIBuilderArgs = { - "name": (self.util.cli.option.POSITIONAL, self.name), + "name": (self.util.cli.option.POSITIONAL, self._name), "uid": (self.util.cli.option.VALUE, uid), "gid": (self.util.cli.option.VALUE, gid), "home": (self.util.cli.option.VALUE, home), @@ -307,13 +339,13 @@ def add( "shell": (self.util.cli.option.VALUE, shell), } - passwd = f" && passwd --stdin '{self.name}'" if password else "" - self.util.logger.info(f'Creating local user "{self.name}" on {self.util.host.hostname}') + passwd = f" && passwd --stdin '{self._name}'" if password else "" + self.util.logger.info(f'Creating local user "{self._name}" on {self.util.host.hostname}') self.util.host.conn.run( self.util.cli.command("useradd", args) + passwd, input=password, log_level=ProcessLogLevel.Error ) - self.util._users.append(self.name) + self.util._users.append(self._name) return self def modify( @@ -325,6 +357,7 @@ def modify( home: str | None = None, gecos: str | None = None, shell: str | None = None, + email: str | None = None, ) -> LocalUser: """ Modify existing local user. @@ -341,12 +374,15 @@ def modify( :type gecos: str | None, optional :param shell: Login shell, defaults to None. :type shell: str | None, optional + :param email: Not applied to local users (present for :class:`GenericUser` API compatibility). + :type email: str | None, optional :return: Self. :rtype: LocalUser """ + del email # Local /etc/passwd user management does not set a mail attribute here. args: CLIBuilderArgs = { - "name": (self.util.cli.option.POSITIONAL, self.name), + "name": (self.util.cli.option.POSITIONAL, self._name), "uid": (self.util.cli.option.VALUE, uid), "gid": (self.util.cli.option.VALUE, gid), "home": (self.util.cli.option.VALUE, home), @@ -354,21 +390,82 @@ def modify( "shell": (self.util.cli.option.VALUE, shell), } - passwd = f" && passwd --stdin '{self.name}'" if password else "" - self.util.logger.info(f'Modifying local user "{self.name}" on {self.util.host.hostname}') + passwd = f" && passwd --stdin '{self._name}'" if password else "" + self.util.logger.info(f'Modifying local user "{self._name}" on {self.util.host.hostname}') self.util.host.conn.run( self.util.cli.command("usermod", args) + passwd, input=password, log_level=ProcessLogLevel.Error ) return self + def reset(self, password: str | None = "Secret123") -> LocalUser: + """ + Reset user password. + + :param password: Password, defaults to 'Secret123' + :type password: str, optional + :return: Self. + :rtype: LocalUser + """ + return self.modify(password=password) + + def expire(self, expiration: str | None = "19700101000000") -> LocalUser: + """ + Set user password expiration date and time (via ``chage -E``). + + :param expiration: Date and time for user password expiration, defaults to 19700101000000 + :type expiration: str | None, optional + :return: Self. + :rtype: LocalUser + """ + exp = expiration if expiration is not None else "19700101000000" + end = datetime.strptime(exp, "%Y%m%d%H%M%S") + date_str = end.strftime("%Y-%m-%d") + self.util.logger.info( + f'Setting password expiration for local user "{self._name}" on {self.util.host.hostname}' + ) + self.util.host.conn.run(f"chage -E '{date_str}' '{self._name}'", log_level=ProcessLogLevel.Error) + return self + + def password_change_at_logon(self, **kwargs) -> LocalUser: + """ + Force user to change password next logon (``chage -d 0`` and password reset). + + :return: Self. + :rtype: LocalUser + """ + if "password" not in kwargs: + raise TypeError("Missing argument 'password'!") + self.modify(password=kwargs["password"]) + self.util.logger.info( + f'Requiring password change at next logon for local user "{self._name}" on {self.util.host.hostname}' + ) + self.util.host.conn.run(f"chage -d 0 '{self._name}'", log_level=ProcessLogLevel.Error) + return self + + def passkey_add(self, passkey_mapping: str) -> LocalUser: + """ + Add passkey mapping to the user. + + :raises NotImplementedError: Not supported for local users. + """ + raise NotImplementedError("LocalUser does not support passkey_add; use a directory-backed user.") + + def passkey_remove(self, passkey_mapping: str) -> LocalUser: + """ + Remove passkey mapping from the user. + + :raises NotImplementedError: Not supported for local users. + """ + raise NotImplementedError("LocalUser does not support passkey_remove; use a directory-backed user.") + def delete(self) -> None: """ Delete the user. """ - self.util.logger.info(f'Deleting local user "{self.name}" on {self.util.host.hostname}') - self.util.host.conn.run(f"userdel '{self.name}' --force --remove", log_level=ProcessLogLevel.Error) - self.util._users.remove(self.name) + self.util.logger.info(f'Deleting local user "{self._name}" on {self.util.host.hostname}') + self.util.host.conn.run(f"userdel '{self._name}' --force --remove", log_level=ProcessLogLevel.Error) + self.util._users.remove(self._name) def get(self, attrs: list[str] | None = None) -> dict[str, list[str]]: """ @@ -379,9 +476,9 @@ def get(self, attrs: list[str] | None = None) -> dict[str, list[str]]: :return: Dictionary with attribute name as a key. :rtype: dict[str, list[str]] """ - self.util.logger.info(f'Fetching local user "{self.name}" on {self.util.host.hostname}') + self.util.logger.info(f'Fetching local user "{self._name}" on {self.util.host.hostname}') result = self.util.host.conn.exec( - ["getent", "passwd", self.name], raise_on_error=False, log_level=ProcessLogLevel.Error + ["getent", "passwd", self._name], raise_on_error=False, log_level=ProcessLogLevel.Error ) if result.rc != 0: return {} @@ -399,9 +496,13 @@ def get(self, attrs: list[str] | None = None) -> dict[str, list[str]]: return {k: [str(v)] for k, v in jcresult[0].items() if not attrs or k in attrs} -class LocalGroup(object): +class LocalGroup(GenericGroup): """ Management of local groups. + + :class:`LocalGroup` is a :class:`GenericGroup` for static typing. Membership + changes only accept :class:`LocalUser` and :class:`LocalGroup`; directory + principals are not valid members of ``/etc/group``. """ def __init__(self, util: LocalUsersUtils, name: str) -> None: @@ -411,36 +512,60 @@ def __init__(self, util: LocalUsersUtils, name: str) -> None: :param name: Group name. :type name: str """ - self.util = util - self.name = name + if util._client is None: + raise RuntimeError("LocalGroup requires LocalUsersUtils to be bound to a Client (client= in constructor).") + super().__init__(util._client) + self.util: LocalUsersUtils = util + self._name: str = name + + @property + def name(self) -> str: + return self._name - def __str__(self): + def __str__(self) -> str: """ Returns a string representation of the LocalGroup. """ - return self.name + return self._name + + @staticmethod + def _member_principal_name(member: GenericUser | GenericGroup) -> str: + """ + Resolve a member to a local ``passwd``/``group`` name. + + :raises NotImplementedError: if ``member`` is not a local user or local group. + """ + if isinstance(member, (LocalUser, LocalGroup)): + return member.name + raise NotImplementedError( + "LocalGroup membership only supports LocalUser and LocalGroup; use directory-specific APIs otherwise." + ) def add( self, *, gid: int | None = None, + description: str | None = None, ) -> LocalGroup: """ Create new local group. :param gid: Group id, defaults to None. :type gid: int | None, optional + :param description: Not stored for pure local groups (present for :class:`GenericGroup` API compatibility). + :type description: str | None, optional :return: Self. :rtype: LocalGroup """ + del description # No description field in /etc/group via this API. args: CLIBuilderArgs = { - "name": (self.util.cli.option.POSITIONAL, self.name), + "name": (self.util.cli.option.POSITIONAL, self._name), "gid": (self.util.cli.option.VALUE, gid), } - self.util.logger.info(f'Creating local group "{self.name}" on {self.util.host.hostname}') + self.util.logger.info(f'Creating local group "{self._name}" on {self.util.host.hostname}') self.util.host.conn.run(self.util.cli.command("groupadd", args), log_level=ProcessLogLevel.Silent) - self.util._groups.append(self.name) + self.util._groups.append(self._name) return self @@ -448,6 +573,7 @@ def modify( self, *, gid: int | None = None, + description: str | None = None, ) -> LocalGroup: """ Modify existing local group. @@ -456,16 +582,19 @@ def modify( :param gid: Group id, defaults to None. :type gid: int | None, optional + :param description: Not stored for pure local groups (present for :class:`GenericGroup` API compatibility). + :type description: str | None, optional :return: Self. :rtype: LocalGroup """ + del description # No description field in /etc/group via this API. args: CLIBuilderArgs = { - "name": (self.util.cli.option.POSITIONAL, self.name), + "name": (self.util.cli.option.POSITIONAL, self._name), "gid": (self.util.cli.option.VALUE, gid), } - self.util.logger.info(f'Modifying local group "{self.name}" on {self.util.host.hostname}') + self.util.logger.info(f'Modifying local group "{self._name}" on {self.util.host.hostname}') self.util.host.conn.run(self.util.cli.command("groupmod", args), log_level=ProcessLogLevel.Error) return self @@ -474,9 +603,9 @@ def delete(self) -> None: """ Delete the group. """ - self.util.logger.info(f'Deleting local group "{self.name}" on {self.util.host.hostname}') - self.util.host.conn.run(f"groupdel '{self.name}' -f", log_level=ProcessLogLevel.Error) - self.util._groups.remove(self.name) + self.util.logger.info(f'Deleting local group "{self._name}" on {self.util.host.hostname}') + self.util.host.conn.run(f"groupdel '{self._name}' -f", log_level=ProcessLogLevel.Error) + self.util._groups.remove(self._name) def get(self, attrs: list[str] | None = None) -> dict[str, list[str]]: """ @@ -487,9 +616,9 @@ def get(self, attrs: list[str] | None = None) -> dict[str, list[str]]: :return: Dictionary with attribute name as a key. :rtype: dict[str, list[str]] """ - self.util.logger.info(f'Fetching local group "{self.name}" on {self.util.host.hostname}') + self.util.logger.info(f'Fetching local group "{self._name}" on {self.util.host.hostname}') result = self.util.host.conn.exec( - ["getent", "group", self.name], raise_on_error=False, log_level=ProcessLogLevel.Silent + ["getent", "group", self._name], raise_on_error=False, log_level=ProcessLogLevel.Silent ) if result.rc != 0: return {} @@ -506,62 +635,66 @@ def get(self, attrs: list[str] | None = None) -> dict[str, list[str]]: return {k: [str(v)] for k, v in jcresult[0].items() if not attrs or k in attrs} - def add_member(self, member: LocalUser) -> LocalGroup: + def add_member(self, member: GenericUser | GenericGroup) -> LocalGroup: """ Add group member. :param member: User or group to add as a member. - :type member: LocalUser + :type member: GenericUser | GenericGroup :return: Self. :rtype: LocalGroup """ return self.add_members([member]) - def add_members(self, members: list[LocalUser]) -> LocalGroup: + def add_members(self, members: list[GenericUser | GenericGroup]) -> LocalGroup: """ Add multiple group members. - :param member: List of users or groups to add as members. - :type member: list[LocalUser] + :param members: List of users or groups to add as members. + :type members: list[GenericUser | GenericGroup] :return: Self. :rtype: LocalGroup """ - self.util.logger.info(f'Adding members to group "{self.name}" on {self.util.host.hostname}') + self.util.logger.info(f'Adding members to group "{self._name}" on {self.util.host.hostname}') if not members: return self - cmd = "\n".join([f"groupmems --group '{self.name}' --add '{x.name}'" for x in members]) + cmd = "\n".join( + [f"groupmems --group '{self._name}' --add '{self._member_principal_name(x)}'" for x in members] + ) self.util.host.conn.run("set -ex\n" + cmd, log_level=ProcessLogLevel.Error) return self - def remove_member(self, member: LocalUser) -> LocalGroup: + def remove_member(self, member: GenericUser | GenericGroup) -> LocalGroup: """ Remove group member. :param member: User or group to remove from the group. - :type member: LocalUser + :type member: GenericUser | GenericGroup :return: Self. :rtype: LocalGroup """ return self.remove_members([member]) - def remove_members(self, members: list[LocalUser]) -> LocalGroup: + def remove_members(self, members: list[GenericUser | GenericGroup]) -> LocalGroup: """ Remove multiple group members. - :param member: List of users or groups to remove from the group. - :type member: list[LocalUser] + :param members: List of users or groups to remove from the group. + :type members: list[GenericUser | GenericGroup] :return: Self. :rtype: LocalGroup """ - self.util.logger.info(f'Removing members from group "{self.name}" on {self.util.host.hostname}') + self.util.logger.info(f'Removing members from group "{self._name}" on {self.util.host.hostname}') if not members: return self - cmd = "\n".join([f"groupmems --group '{self.name}' --delete '{x.name}'" for x in members]) + cmd = "\n".join( + [f"groupmems --group '{self._name}' --delete '{self._member_principal_name(x)}'" for x in members] + ) self.util.host.conn.run("set -ex\n" + cmd, log_level=ProcessLogLevel.Error) return self @@ -576,22 +709,22 @@ def __init__( self, *, host: str | None = None, - user: LocalUser | str | None = None, + user: GenericUser | str | None = None, group: LocalGroup | str | None = None, hostgroup: str | None = None, - ng: LocalNetgroup | str | None = None, + ng: GenericNetgroup | str | None = None, ) -> None: """ :param host: Host part of the triple, defaults to None. :type host: str | None, optional :param user: User part of the triple, defaults to None. - :type user: LocalUser | str | None, optional + :type user: GenericUser | str | None, optional :param group: Not supported for local netgroups. :type group: LocalGroup | str | None, optional :param hostgroup: Not supported for local netgroups. :type hostgroup: str | None, optional :param ng: Nested netgroup, defaults to None. - :type ng: LocalNetgroup | str | None, optional + :type ng: GenericNetgroup | str | None, optional :raises :class:`ValueError` for unsupported member kinds. """ @@ -624,9 +757,14 @@ def to_member_string(self) -> str: return f"({h},{u},)" -class LocalNetgroup(object): +class LocalNetgroup(GenericNetgroup): """ - Local netgroup management via. + Local netgroup management via ``/etc/netgroup``. + + :class:`LocalNetgroup` is a :class:`GenericNetgroup` for static typing. Only + :class:`LocalNetgroupMember` instances are supported in :meth:`add_members` + and :meth:`remove_members` (not arbitrary :class:`GenericNetgroupMember` + subclasses from other backends). """ def __init__(self, util: LocalUsersUtils, name: str) -> None: @@ -636,15 +774,24 @@ def __init__(self, util: LocalUsersUtils, name: str) -> None: :param name: Netgroup name. :type name: str """ - self.util = util - self.name = name + if util._client is None: + raise RuntimeError( + "LocalNetgroup requires LocalUsersUtils to be bound to a Client (client= in constructor)." + ) + super().__init__(util._client) + self.util: LocalUsersUtils = util + self._name: str = name self._members: list[str] = [] + @property + def name(self) -> str: + return self._name + def __str__(self) -> str: """ Return the netgroup name. """ - return self.name + return self._name def _format_line(self) -> str: """ @@ -655,8 +802,8 @@ def _format_line(self) -> str: """ if not self._members: # Empty triple (,,) — valid NIS form with empty host, user, and domain fields. - return f"{self.name}\t(,,)" - return f"{self.name}\t" + " ".join(self._members) + return f"{self._name}\t(,,)" + return f"{self._name}\t" + " ".join(self._members) def add(self) -> LocalNetgroup: """ @@ -666,26 +813,51 @@ def add(self) -> LocalNetgroup: :rtype: LocalNetgroup :raises: :class:`ValueError` for duplicate names. """ - self.util.logger.info(f'Creating local netgroup "{self.name}" on {self.util.host.hostname}') - existing = self.util._netgroups.get(self.name) + self.util.logger.info(f'Creating local netgroup "{self._name}" on {self.util.host.hostname}') + existing = self.util._netgroups.get(self._name) if existing is not None and existing is not self: raise ValueError( - f'Local netgroup "{self.name}" is already managed by another LocalNetgroup instance; ' + f'Local netgroup "{self._name}" is already managed by another LocalNetgroup instance; ' "reuse the object returned from the first client.local.netgroup() call." ) - self.util._netgroup_names_touched.add(self.name) - self.util._netgroups[self.name] = self + self.util._netgroup_names_touched.add(self._name) + self.util._netgroups[self._name] = self self.util._rewrite_netgroup_file() return self + def get(self, attrs: list[str] | None = None) -> dict[str, list[str]]: + """ + Get netgroup data from ``getent netgroup`` (reflecting ``/etc/netgroup``). + + Keys include ``cn`` (netgroup name) and ``nisNetgroupTriple`` (member tokens). + """ + self.util.logger.info(f'Fetching local netgroup "{self._name}" on {self.util.host.hostname}') + result = self.util.host.conn.exec( + ["getent", "netgroup", self._name], raise_on_error=False, log_level=ProcessLogLevel.Silent + ) + if result.rc != 0: + return {} + + line = result.stdout.strip().splitlines()[0] if result.stdout.strip() else "" + if not line: + return {} + + tokens = line.split() + if not tokens or tokens[0] != self._name: + return {} + + triples = tokens[1:] + out: dict[str, list[str]] = {"cn": [self._name], "nisNetgroupTriple": triples} + if attrs is None: + return out + return {k: v for k, v in out.items() if k in attrs} + def add_member( self, *, host: str | None = None, - user: LocalUser | str | None = None, - group: LocalGroup | str | None = None, - hostgroup: str | None = None, - ng: LocalNetgroup | str | None = None, + user: GenericUser | str | None = None, + ng: GenericNetgroup | str | None = None, ) -> LocalNetgroup: """ Add a netgroup member. @@ -693,9 +865,9 @@ def add_member( :return: Self. :rtype: LocalNetgroup """ - return self.add_members([LocalNetgroupMember(host=host, user=user, group=group, hostgroup=hostgroup, ng=ng)]) + return self.add_members([LocalNetgroupMember(host=host, user=user, ng=ng)]) - def add_members(self, members: list[LocalNetgroupMember]) -> LocalNetgroup: + def add_members(self, members: list[GenericNetgroupMember]) -> LocalNetgroup: """ Add multiple netgroup members. @@ -704,20 +876,22 @@ def add_members(self, members: list[LocalNetgroupMember]) -> LocalNetgroup: already in this netgroup or appears more than once in ``members``, later duplicates are skipped (nothing is appended for them). - :param members: Netgroup members. - :type members: list[LocalNetgroupMember] + :param members: Netgroup members (must be :class:`LocalNetgroupMember`). + :type members: list[GenericNetgroupMember] :return: Self. :rtype: LocalNetgroup """ - self.util.logger.info(f'Adding members to local netgroup "{self.name}" on {self.util.host.hostname}') + self.util.logger.info(f'Adding members to local netgroup "{self._name}" on {self.util.host.hostname}') if not members: return self - if self.name not in self.util._netgroups: - raise RuntimeError(f'Netgroup "{self.name}" was not created; call add() first') + if self._name not in self.util._netgroups: + raise RuntimeError(f'Netgroup "{self._name}" was not created; call add() first') for m in members: + if not isinstance(m, LocalNetgroupMember): + raise TypeError("Local netgroups only accept LocalNetgroupMember entries.") line = m.to_member_string() if line in self._members: continue @@ -729,10 +903,8 @@ def remove_member( self, *, host: str | None = None, - user: LocalUser | str | None = None, - group: LocalGroup | str | None = None, - hostgroup: str | None = None, - ng: "LocalNetgroup | str | None" = None, + user: GenericUser | str | None = None, + ng: GenericNetgroup | str | None = None, ) -> LocalNetgroup: """ Remove a netgroup member. @@ -740,28 +912,32 @@ def remove_member( :return: Self. :rtype: LocalNetgroup """ - return self.remove_members( - [LocalNetgroupMember(host=host, user=user, group=group, hostgroup=hostgroup, ng=ng)] - ) + return self.remove_members([LocalNetgroupMember(host=host, user=user, ng=ng)]) - def remove_members(self, members: list[LocalNetgroupMember]) -> LocalNetgroup: + def remove_members(self, members: list[GenericNetgroupMember]) -> LocalNetgroup: """ Remove netgroup members. - :param members: Members to remove. - :type members: list[LocalNetgroupMember] + :param members: Members to remove (must be :class:`LocalNetgroupMember`). + :type members: list[GenericNetgroupMember] :return: Self. :rtype: LocalNetgroup """ - self.util.logger.info(f'Removing members from local netgroup "{self.name}" on {self.util.host.hostname}') + self.util.logger.info(f'Removing members from local netgroup "{self._name}" on {self.util.host.hostname}') if not members: return self - if self.name not in self.util._netgroups: - raise RuntimeError(f'Netgroup "{self.name}" was not created; call add() first') + if self._name not in self.util._netgroups: + raise RuntimeError(f'Netgroup "{self._name}" was not created; call add() first') + + local_members: list[LocalNetgroupMember] = [] + for m in members: + if not isinstance(m, LocalNetgroupMember): + raise TypeError("Local netgroups only accept LocalNetgroupMember entries.") + local_members.append(m) - remove_strings = {m.to_member_string() for m in members} + remove_strings = {m.to_member_string() for m in local_members} self._members = [x for x in self._members if x not in remove_strings] self.util._rewrite_netgroup_file() return self @@ -770,10 +946,10 @@ def delete(self) -> None: """ Remove this netgroup from ``/etc/netgroup``. """ - self.util.logger.info(f'Deleting local netgroup "{self.name}" on {self.util.host.hostname}') - self.util._netgroup_names_touched.add(self.name) - if self.name in self.util._netgroups: - del self.util._netgroups[self.name] + self.util.logger.info(f'Deleting local netgroup "{self._name}" on {self.util.host.hostname}') + self.util._netgroup_names_touched.add(self._name) + if self._name in self.util._netgroups: + del self.util._netgroups[self._name] self._members.clear() self.util._rewrite_netgroup_file() @@ -936,17 +1112,12 @@ def delete(self) -> None: self.util._sudoaliases.remove(self) -LocalSudoRuleUserPiece = str | LocalUser | LocalGroup | LocalSudoAlias -LocalSudoRuleUserArg = LocalSudoRuleUserPiece | list[LocalSudoRuleUserPiece] -LocalSudoRuleHostArg = str | LocalSudoAlias | list[str | LocalSudoAlias] -LocalSudoRuleCommandArg = str | LocalSudoAlias | list[str | LocalSudoAlias] -LocalSudoRuleRunAsUserArg = str | LocalUser | LocalSudoAlias | list[str | LocalUser | LocalSudoAlias] -LocalSudoRuleRunAsGroupArg = str | LocalGroup | LocalSudoAlias | list[str | LocalGroup | LocalSudoAlias] - - -class LocalSudoRule(object): +class LocalSudoRule(GenericSudoRule): """ - Local sudo rule management. + Local sudo rule management (``/etc/sudoers.d/`` drop-ins). + + See :class:`GenericSudoRule` for parameter meanings. ``ProtocolName`` values + (including :class:`LocalSudoAlias`) are emitted as bare sudoers names. """ default_user: str = "ALL" @@ -959,37 +1130,45 @@ def __init__(self, util: LocalUsersUtils, name: str) -> None: :param name: Sudo rule name. :type name: str """ - self.name = name - self.util = util + if util._client is None: + raise RuntimeError( + "LocalSudoRule requires LocalUsersUtils to be bound to a Client (client= in constructor)." + ) + super().__init__(util._client) + self._name: str = name + self.util: LocalUsersUtils = util self.__rule: dict[str, Any] = dict() self.filename: str | None = None self.rule_str: str | None = None - def __str__(self): + @property + def name(self) -> str: + return self._name + + def __str__(self) -> str: """ Returns a string representation of the LocalSudoRule. """ if self.rule_str: return self.rule_str - else: - return self.name + return self._name @staticmethod def _format_list_item(item: str | Any, add_percent: bool = False) -> str: """ Format a single sudoers list element. - :param item: String, :class:`LocalUser`, :class:`LocalGroup`, or :class:`LocalSudoAlias`. + :param item: String, user/group, or name reference (e.g. :class:`LocalSudoAlias`). :type item: str | Any - :param add_percent: If true, prepend ``%`` to :class:`LocalGroup` entries. + :param add_percent: If true, prepend ``%`` to :class:`GenericGroup` entries. :type add_percent: bool, optional :return: Formatted fragment. :rtype: str """ if isinstance(item, LocalSudoAlias): return item.name - if isinstance(item, LocalGroup) and add_percent: - return f"%{str(item)}" + if isinstance(item, GenericGroup) and add_percent: + return f"%{item.name}" return str(item) @staticmethod @@ -999,7 +1178,7 @@ def _format_list(item: str | Any | list[str | Any], add_percent: bool = False) - :param item: A single value or list of values to format. :type item: str | Any | list[str | Any] - :param add_percent: If true, prepend ``%`` to :class:`LocalGroup` entries (sudo user field). + :param add_percent: If true, prepend ``%`` to :class:`GenericGroup` entries (sudo user field). :type add_percent: bool, optional :return: Formatted string. :rtype: str @@ -1008,15 +1187,37 @@ def _format_list(item: str | Any | list[str | Any], add_percent: bool = False) - return ", ".join(LocalSudoRule._format_list_item(x, add_percent) for x in item) return LocalSudoRule._format_list_item(item, add_percent) + def get(self, attrs: list[str] | None = None) -> dict[str, list[str]]: + """ + Return rule text as attributes (``cn``, ``sudoRule``). + + If ``rule_str`` is unset, reads the drop-in file when ``filename`` is set. + """ + line = self.rule_str.strip() if self.rule_str else "" + if not line and self.filename: + result = self.util.host.conn.exec( + ["cat", f"/etc/sudoers.d/{self.filename}"], + raise_on_error=False, + log_level=ProcessLogLevel.Silent, + ) + if result.rc == 0: + line = result.stdout.strip() + if not line: + return {} + data: dict[str, list[str]] = {"cn": [self._name], "sudoRule": [line]} + if attrs is None: + return data + return {k: v for k, v in data.items() if k in attrs} + def add( self, *, - user: LocalSudoRuleUserArg | None = default_user, - host: LocalSudoRuleHostArg | None = default_host, - command: LocalSudoRuleCommandArg | None = default_command, + user: SudoRuleUserField = default_user, + host: SudoRuleHostField = default_host, + command: SudoRuleCommandField = default_command, option: str | list[str] | None = None, - runasuser: LocalSudoRuleRunAsUserArg | None = None, - runasgroup: LocalSudoRuleRunAsGroupArg | None = None, + runasuser: SudoRuleRunAsUserField = None, + runasgroup: SudoRuleRunAsGroupField = None, order: int | None = None, nopasswd: bool | None = None, ) -> LocalSudoRule: @@ -1024,17 +1225,17 @@ def add( Create new sudo rule. :param user: sudoUser attribute, defaults to ALL. - :type user: LocalSudoRuleUserArg | None + :type user: SudoRuleUserField, optional :param host: sudoHost attribute, defaults to ALL. - :type host: LocalSudoRuleHostArg | None + :type host: SudoRuleHostField, optional :param command: sudoCommand attribute, defaults to ALL. - :type command: LocalSudoRuleCommandArg | None + :type command: SudoRuleCommandField, optional :param option: sudoOption attribute, defaults to None. :type option: str | list[str] | None, optional :param runasuser: sudoRunAsUser attribute, defaults to None. - :type runasuser: LocalSudoRuleRunAsUserArg | None, optional + :type runasuser: SudoRuleRunAsUserField, optional :param runasgroup: sudoRunAsGroup attribute, defaults to None. - :type runasgroup: LocalSudoRuleRunAsGroupArg | None, optional + :type runasgroup: SudoRuleRunAsGroupField, optional :param order: sudoOrder attribute, defaults to None. :type order: int | None, optional :param nopasswd: If true, no authentication is required (NOPASSWD), defaults to None (no change) @@ -1044,7 +1245,7 @@ def add( """ orderstr = f"{order:02d}" if order is not None else str(len(self.util._sudorules)) if self.filename is None: - self.filename = f"{orderstr}_{self.name}" + self.filename = f"{orderstr}_{self._name}" # Remember arguments so we can use them in modify if needed self.__rule = dict[str, Any]( @@ -1078,35 +1279,37 @@ def add( def modify( self, *, - user: LocalSudoRuleUserArg | None = None, - host: LocalSudoRuleHostArg | None = None, - command: LocalSudoRuleCommandArg | None = None, + user: SudoRuleUserField = None, + host: SudoRuleHostField = None, + command: SudoRuleCommandField = None, option: str | list[str] | None = None, - runasuser: LocalSudoRuleRunAsUserArg | None = None, - runasgroup: LocalSudoRuleRunAsGroupArg | None = None, + runasuser: SudoRuleRunAsUserField = None, + runasgroup: SudoRuleRunAsGroupField = None, order: int | None = None, nopasswd: bool | None = None, ) -> LocalSudoRule: """ - Modify existing Local sudo rule. + Modify existing local sudo rule. + + Parameters set to ``None`` keep the previous values. :param user: sudoUser attribute, defaults to None. - :type user: LocalSudoRuleUserArg | None, optional + :type user: SudoRuleUserField, optional :param host: sudoHost attribute, defaults to None. - :type host: LocalSudoRuleHostArg | None, optional - :param command: sudoCommand attribute defaults to None. - :type command: LocalSudoRuleCommandArg | None, optional + :type host: SudoRuleHostField, optional + :param command: sudoCommand attribute, defaults to None. + :type command: SudoRuleCommandField, optional :param option: sudoOption attribute, defaults to None. :type option: str | list[str] | None, optional :param runasuser: sudoRunAsUser attribute, defaults to None. - :type runasuser: LocalSudoRuleRunAsUserArg | None, optional + :type runasuser: SudoRuleRunAsUserField, optional :param runasgroup: sudoRunAsGroup attribute, defaults to None. - :type runasgroup: LocalSudoRuleRunAsGroupArg | None, optional + :type runasgroup: SudoRuleRunAsGroupField, optional :param order: sudoOrder attribute, defaults to None. :type order: int | None, optional :param nopasswd: If true, no authentication is required (NOPASSWD), defaults to None (no change). :type nopasswd: bool | None, optional - :return: New sudo rule object. + :return: Self. :rtype: LocalSudoRule """ self.delete()