Implement OWASP security logging.#6689
Conversation
|
cc: @dbungert |
holmanb
left a comment
There was a problem hiding this comment.
Thanks for this @blackboxsw. First pass.
If the goal is for this to be cross-distro, then implementing this in the distro class doesn't seem optimal because distro methods are expected to be (and in this case are) overridden.
Also, is there a reason that the logger package cannot be used? Manually writing to files seems undesirable - I would have expected this to define a custom logger for this purpose.
Thank you for the type annotations. Besides just annotating the types, I'd like to see if we can also avoid unnecessary complexity - something that annotations can assist with. For example rather than checking an Optional parameter for trutheyness, we could instead avoid the possibility of None and pass a falsey value instead.
cloudinit/distros/__init__.py
Outdated
| sec_log_user_created( | ||
| userid="cloud-init", | ||
| new_userid=name, | ||
| attributes={"snapuser": True, "sudo": True}, | ||
| ) |
There was a problem hiding this comment.
Intermixing code of different different purposes like this will lead to difficulty maintaining and auditing the code. I would prefer if we could find a cleaner design.
There was a problem hiding this comment.
good point, let's go decorator route instead,
There was a problem hiding this comment.
Decorators didn't address the problem. These decorators are still mixing code of different purposes.
cloudinit/log/security_event_log.py
Outdated
| if params: | ||
| # Filter out None values and convert to strings | ||
| filtered_params = [str(p) for p in params if p is not None] | ||
| if filtered_params: |
There was a problem hiding this comment.
This conditional is unnecessary.
There was a problem hiding this comment.
It is in the event that all filtered_params are empty. We don't want a trailing ":" appended if filtered_params == []
There was a problem hiding this comment.
It is in the event that all filtered_params are empty.
This should never happen. See my comment above.
046db76 to
e7168c0
Compare
|
I believe I have addressed all comments and this is ready for re-review. |
…estart Implement OWASP structured logs in /var/log/cloud-init-security.log. Add security-related operations performed by cloud-init on behalf of user-data or platform meta-data: - user creation - user password change - system restart - system shutdown Default security log file can be changed by setting an alternative value for security_log_file in /etc/cloud/cloud.cfg(.d/*.cfg).
Replace LevelBasedFormatter with handler-level filters. SecurityOnlyFilter and NoSecurityFilter are applied at the handler level so SECURITY records flow only to a dedicated FileHandler on cloud-init-output.log using SECURITY_LOG_FORMAT, and are blocked from stderr and LogExporter. setup_security_logging() silently no-ops on OSError so unprivileged test environments are unaffected. Update test_logger_prints_security_as_ json_lines to assert file output and absence from stderr.
Use netdev_info to discover any interface with a global scope IP address. If no IPv4 address is present, return IPv6. If no network devices are up or no global addresses are present, avoid adding host_ip to event logs. Additionally, add "type": "security" to all events.
…down Correct type hint for netdev_info
df1d410 to
e91cc58
Compare
| @@ -1113,6 +1123,7 @@ def set_passwd(self, user, passwd, hashed=False): | |||
|
|
|||
| return True | |||
|
|
|||
| @sec_log_password_changed_batch | |||
| def chpasswd(self, plist_in: list, hashed: bool): | |||
| payload = ( | |||
| "\n".join( | |||
There was a problem hiding this comment.
Also migrated the sec_log_user_created decorator to add_user and add_snap_user as that is actually where a user gets created. Added @final decorators to both of those methods.
Refactored subclasses of add_user behavior into _add_user_preprocess_kwargs, _build_add_user_cmd and _post_add_user methods.
There was a problem hiding this comment.
This is still overridden in a child class.
There was a problem hiding this comment.
This is in distros/bsd.py, but bsd directly calls set_passwd for each name in plist_in. So BSD will still be logging separate OWASP events for each user, just as our @sec_log_password_changed_batch will.
As a result, I don't want to decorate chpasswd with @final and I also don't want to decorate cloudinit.distros.bsd.Disto.chpasswd with @sec_log passwd_changed_batch
cloudinit/log/security_event_log.py
Outdated
| :return: Dictionary containing the security event data. | ||
| """ | ||
| event = { | ||
| "datetime": datetime.datetime.now(datetime.timezone.utc).isoformat(), |
There was a problem hiding this comment.
This also concerns me - I would rather avoid side-effects in logging libraries. This causes a syscall.
There was a problem hiding this comment.
Also, it seems odd to be manually building building a format for the log here.
Python's builtin logger allows supplying custom Formatter objects - which is where I would expect the format to be defined. The formatter should already have the time and can be configured to the desired format.
cloudinit/log/security_event_log.py
Outdated
| """A decorator logging a system shutdown event.""" | ||
|
|
||
| @functools.wraps(func) | ||
| def decorator(*args, **kwargs): |
There was a problem hiding this comment.
Why does this use kwargs when the keyword names are known?
7f50e9a to
3563a87
Compare
- drop unintended @Final from Distro.set_passwd - simplify sec_log decorators to assume positional param idx 1 when kwarg empty - drop unchecked bool return values from _post_add_user
e342293 to
45bc3c7
Compare
holmanb
left a comment
There was a problem hiding this comment.
Thanks for this @blackboxsw. See the latest feedback.
In general I'd like to see us avoid iterating over interfaces, opening sockets, and calling datetime on a per-log basis - and these kinds of side-effects don't seem well-suited for a system logging library.
The same goes for parsing data from untyped Dicts. A major problem with building a solution that can be maintained is that distro-specific code is doing configuration validation and manipulation - the existing code doesn't have clear separation of concerns.
Also: I'd like to see a sample log produced by this code.
cloudinit/distros/__init__.py
Outdated
| } | ||
| @final | ||
| @sec_log_user_created | ||
| def add_user(self, name, **kwargs): |
There was a problem hiding this comment.
This no longer has a type annotation. Why did the type signature change?
There was a problem hiding this comment.
I should have migrated this annotation to express that we expect no return value. Given that this PR migrates the pre-flight check for pre-existing user outside the Distro.add_user` call, there is no longer a need to check a bool return code from add_user given the simplification to the method.
Now we only call add user if we are sure the user doesn't exist up in create_user.
pre_existing_user = util.is_user(name)
if pre_existing_user:
LOG.info("User %s already exists, skipping.", name)
else:
self.add_user(name, **kwargs)
We also don't have any direct callers to Distro.add_user in any cloud-init runtime operations. Any call-sites in upstream are invoked via the Distro.create_user method. This could expose some downstream images with custom private Distro implementations, but that is a highly unlikely scenario as we don't treat cloudinit as a python library supporting direct calls into our python modules and class APIs.
| @@ -1113,6 +1123,7 @@ def set_passwd(self, user, passwd, hashed=False): | |||
|
|
|||
| return True | |||
|
|
|||
| @sec_log_password_changed_batch | |||
| def chpasswd(self, plist_in: list, hashed: bool): | |||
| payload = ( | |||
| "\n".join( | |||
There was a problem hiding this comment.
This is still overridden in a child class.
cloudinit/distros/__init__.py
Outdated
| def shutdown_command(cls, *, mode, delay, message): | ||
| # called from cc_power_state_change.load_power_state | ||
| # called from cc_power_state_change.load_power_state and clean | ||
| if hasattr(cls, "_shutdown_command"): # Overridden in alpine.Distro |
There was a problem hiding this comment.
This class uses inheritance to define and override default behaviors. It seems unusual to reach for a low-level meta-programming utility rather than using inheritance. Why was this chosen instead?
There was a problem hiding this comment.
The intent was to leave us with only one place with sec_log decorators in the parent class and disallow creating the shutdown_command method on subclasses via @final. I'll refactor this to use the same approach we user for add user with _build_shutdown_cmd. I've refactored the validation of the delay value which was duplicated in alpine and pulled it into a pre-flight check in shutdown_command.
| LOG.info("Added user '%s' to group '%s'", member, name) | ||
|
|
||
| def shutdown_command(self, mode="poweroff", delay="now", message=None): | ||
| @classmethod |
| for key, val in kwargs.items(): | ||
| if key in pw_useradd_opts and val and isinstance(val, (str, int)): | ||
| pw_useradd_cmd.extend([pw_useradd_opts[key], str(val)]) | ||
|
|
cloudinit/log/security_event_log.py
Outdated
| userid = kwargs.get("user") | ||
| if not userid: | ||
| userid = args[1] |
There was a problem hiding this comment.
This seems fishy. Isn't args[1] the password?
There was a problem hiding this comment.
Updated the signature to expect known positional args. No more kwargs parsing.
cloudinit/log/security_event_log.py
Outdated
| if params: | ||
| # Filter out None values and convert to strings | ||
| filtered_params = [str(p) for p in params if p is not None] | ||
| if filtered_params: |
cloudinit/log/security_event_log.py
Outdated
| params = ["cloud-init", new_userid] | ||
| groups_msg = "" | ||
| groups_suffix = kwargs.get("groups", "") | ||
| if groups_suffix: | ||
| if isinstance(groups_suffix, (dict, list)): | ||
| groups_suffix = ",".join(groups_suffix) | ||
| for perms in ("sudo", "doas"): | ||
| if kwargs.get(perms): | ||
| groups_suffix += f",{perms}" | ||
| if groups_suffix: | ||
| groups_suffix = groups_suffix.strip(",") | ||
| groups_msg = f" in groups: {groups_suffix}" | ||
| params.append(f"groups:{groups_suffix}") |
There was a problem hiding this comment.
This doesn't seem ideal. This is interpreting arbitrary untyped data from a dict. This dict interpretation is a long ways from wherever this datastructure is defined, yet it is tightly coupled to the structure where it was defined.
There was a problem hiding this comment.
You are right, this feels fragile and is working around the complexity in both add_user and _build_add_user_cmd which both attempt to parse and handle "groups" being passed as an untyped dict config key value from merged user-data. An alternative to keep things closer to where distros processes this config content would be a helper function in cloudinit/distros/init.py which can expose the properly typed groups content in order to ensure the, add_user calls, _build_add_user_cmd and log decorator properly handle this information in the same manner. There is a lot of duplication here in the decorator and in Distro instance methods that we can avoid if all call-sites get a typed response from one place.
cloudinit/log/security_event_log.py
Outdated
| plist_in = kwargs.get("plist_in") | ||
| if not plist_in: | ||
| plist_in = args[1] |
There was a problem hiding this comment.
This construct seems confusing, and possibly wrong.
There was a problem hiding this comment.
Dropped in favor of known positional arg plist_in
cloudinit/distros/__init__.py
Outdated
| sec_log_user_created( | ||
| userid="cloud-init", | ||
| new_userid=name, | ||
| attributes={"snapuser": True, "sudo": True}, | ||
| ) |
There was a problem hiding this comment.
Decorators didn't address the problem. These decorators are still mixing code of different purposes.
Add _build_shutdown_cmd which is subclassed in Alpine. Move pre-flight delay type checking into Distro.shutdown_command and call the _build_shutdown_cmd to construct the distro-specific command.
blackboxsw
left a comment
There was a problem hiding this comment.
Addressed most review points, still looking at:
- adding a Distro.extract_group method to extract typed information from opaque kwargs config passed into add_user
- looking at custom logging formatted to provide the logging timestamp instead of calling datetime directly.
| "event": _build_event_string(event_type, event_params), | ||
| "level": level.value, | ||
| "description": description, | ||
| "hostname": util.get_hostname(), |
There was a problem hiding this comment.
I'm uncertain how we should resolve this concern and also deliver identifiable information within each log to provide breadcrumbs for security log aggregators about the source of the log.
Do you think we may instead want to just extract provided hostname data from the Platform via a call to util.get_hostname_fqdn to avoid using socket.gethostname. Or possibly @lru_cache around get_hostname to avoid recurring cost of a call?
I see our security logs as only being on the order of 10 events per boot vs 100's. So I don't know whether you are concerned about recurring cost, or just that fact that logging reaches out to some other system which could timeout.
It's possible we could drop this key, which places a responsibility on all security log aggregators to identify the source from which all logs originate.
cloudinit/log/security_event_log.py
Outdated
| params = ["cloud-init", new_userid] | ||
| groups_msg = "" | ||
| groups_suffix = kwargs.get("groups", "") | ||
| if groups_suffix: | ||
| if isinstance(groups_suffix, (dict, list)): | ||
| groups_suffix = ",".join(groups_suffix) | ||
| for perms in ("sudo", "doas"): | ||
| if kwargs.get(perms): | ||
| groups_suffix += f",{perms}" | ||
| if groups_suffix: | ||
| groups_suffix = groups_suffix.strip(",") | ||
| groups_msg = f" in groups: {groups_suffix}" | ||
| params.append(f"groups:{groups_suffix}") |
There was a problem hiding this comment.
You are right, this feels fragile and is working around the complexity in both add_user and _build_add_user_cmd which both attempt to parse and handle "groups" being passed as an untyped dict config key value from merged user-data. An alternative to keep things closer to where distros processes this config content would be a helper function in cloudinit/distros/init.py which can expose the properly typed groups content in order to ensure the, add_user calls, _build_add_user_cmd and log decorator properly handle this information in the same manner. There is a lot of duplication here in the decorator and in Distro instance methods that we can avoid if all call-sites get a typed response from one place.
cloudinit/log/security_event_log.py
Outdated
| if params: | ||
| # Filter out None values and convert to strings | ||
| filtered_params = [str(p) for p in params if p is not None] | ||
| if filtered_params: |
There was a problem hiding this comment.
It is in the event that all filtered_params are empty. We don't want a trailing ":" appended if filtered_params == []
cloudinit/distros/__init__.py
Outdated
| } | ||
| @final | ||
| @sec_log_user_created | ||
| def add_user(self, name, **kwargs): |
There was a problem hiding this comment.
I should have migrated this annotation to express that we expect no return value. Given that this PR migrates the pre-flight check for pre-existing user outside the Distro.add_user` call, there is no longer a need to check a bool return code from add_user given the simplification to the method.
Now we only call add user if we are sure the user doesn't exist up in create_user.
pre_existing_user = util.is_user(name)
if pre_existing_user:
LOG.info("User %s already exists, skipping.", name)
else:
self.add_user(name, **kwargs)
We also don't have any direct callers to Distro.add_user in any cloud-init runtime operations. Any call-sites in upstream are invoked via the Distro.create_user method. This could expose some downstream images with custom private Distro implementations, but that is a highly unlikely scenario as we don't treat cloudinit as a python library supporting direct calls into our python modules and class APIs.
cloudinit/distros/__init__.py
Outdated
| def shutdown_command(cls, *, mode, delay, message): | ||
| # called from cc_power_state_change.load_power_state | ||
| # called from cc_power_state_change.load_power_state and clean | ||
| if hasattr(cls, "_shutdown_command"): # Overridden in alpine.Distro |
There was a problem hiding this comment.
The intent was to leave us with only one place with sec_log decorators in the parent class and disallow creating the shutdown_command method on subclasses via @final. I'll refactor this to use the same approach we user for add user with _build_shutdown_cmd. I've refactored the validation of the delay value which was duplicated in alpine and pulled it into a pre-flight check in shutdown_command.
cloudinit/netinfo.py
Outdated
| for _, info in netdev_info().items(): | ||
| if not info["up"]: | ||
| continue | ||
| ipv4: List[dict] = info.get("ipv4", []) |
There was a problem hiding this comment.
Dropped cloudinit.netinfo.get_host_ip function to avoid the cost of shelling out to ip addr --json for every security event log. So this comment no longer applies.
This "up" check on the device would have also been a prerequisite to surfacing valid ipv4 settings. I also agree that a down interface will also not have any valid ipv4 of ipv6 addresses, so this check was unnecessary.
Agreed on typing in netinfo, and I think that is already tracked as an existing enhancement #5445 to eventually resolve those issues.
cloudinit/netinfo.py
Outdated
| """ | ||
| try: | ||
| first_ipv6: Optional[str] = None | ||
| for _, info in netdev_info().items(): |
There was a problem hiding this comment.
Will address this if we end up adding host_ip info in the future.
| "event": _build_event_string(event_type, event_params), | ||
| "level": level.value, | ||
| "description": description, | ||
| "hostname": util.get_hostname(), |
There was a problem hiding this comment.
OWASP log formats look for identifiable information in the individual logs to ease the burden on aggregators. To avoid calling to socket.gethostname(). We could instead call util.get_hostname_fqdn to get what the IMDS told us the hostname should be (which is run in cloud-init-local stage to grab this data from the IMDS before cloud-init. We could also use @lru_cache to avoid repeated cost for such an operation.
cloudinit/log/security_event_log.py
Outdated
| userid = kwargs.get("user") | ||
| if not userid: | ||
| userid = args[1] |
There was a problem hiding this comment.
Updated the signature to expect known positional args. No more kwargs parsing.
| @@ -1113,6 +1123,7 @@ def set_passwd(self, user, passwd, hashed=False): | |||
|
|
|||
| return True | |||
|
|
|||
| @sec_log_password_changed_batch | |||
| def chpasswd(self, plist_in: list, hashed: bool): | |||
| payload = ( | |||
| "\n".join( | |||
There was a problem hiding this comment.
This is in distros/bsd.py, but bsd directly calls set_passwd for each name in plist_in. So BSD will still be logging separate OWASP events for each user, just as our @sec_log_password_changed_batch will.
As a result, I don't want to decorate chpasswd with @final and I also don't want to decorate cloudinit.distros.bsd.Disto.chpasswd with @sec_log passwd_changed_batch
6938fbc to
46291ca
Compare
cloudinit/log/security_event_log.py
Outdated
| # Filter out None values and convert to strings | ||
| filtered_params = [str(p) for p in params if p is not None] |
There was a problem hiding this comment.
According to the type annotation, the filter will never do anything.
cloudinit/log/security_event_log.py
Outdated
| if params: | ||
| # Filter out None values and convert to strings | ||
| filtered_params = [str(p) for p in params if p is not None] | ||
| if filtered_params: |
There was a problem hiding this comment.
It is in the event that all filtered_params are empty.
This should never happen. See my comment above.
cloudinit/log/security_event_log.py
Outdated
|
|
||
| def sec_log_password_changed_batch(func): | ||
| @functools.wraps(func) | ||
| def decorator(self, plist_in, *args, **kwargs): |
There was a problem hiding this comment.
Can we get a type annotation on plist_in?
| """A decorator logging a password change event.""" | ||
|
|
||
| @functools.wraps(func) | ||
| def decorator(self, user: str, *args, **kwargs): |
There was a problem hiding this comment.
The function that this decorates doesn't have an annotation. Can we add it there too please?
cloudinit/log/security_event_log.py
Outdated
| event_type=OWASPEventType.AUTHN_PASSWORD_CHANGE, | ||
| level=OWASPEventLevel.INFO, | ||
| description=f"Password changed for user '{user}'", | ||
| event_params=["cloud-init", user], |
There was a problem hiding this comment.
Why is every event_params getting passed ["cloud-init", ...]? If this is to be added by default, this doesn't seem like the right place for it, since it has to be duplicated at every call site.
There was a problem hiding this comment.
Moved into _log_security_event to simplify call-sites.
cloudinit/net/dhcp.py
Outdated
| interface: str, | ||
| dhcp_log_func: Optional[Callable[[str, str, str], None]] = None, | ||
| distro=None, | ||
| metric=None, |
There was a problem hiding this comment.
Dropped unrelated stay delta from separate PR.
doc/rtd/reference/user_files.rst
Outdated
| - :file:`/var/log/cloud-init.log`: The primary log file. Verbose, but useful. | ||
| - :file:`/var/log/cloud-init-output.log`: Captures the output from each stage. | ||
| Output from user scripts goes here. | ||
| - :file:`/var/log/cloud-init-security.log`: This file logs OWASP security |
cloudinit/log/security_event_log.py
Outdated
| """A decorator to log a user creation event and group attributes.""" | ||
|
|
||
| @functools.wraps(func) | ||
| def decorator(self, name, *args, **kwargs): |
There was a problem hiding this comment.
Please add an annotation for name.
cloudinit/log/security_event_log.py
Outdated
| # User creation operation did not raise an Exception | ||
| _log_security_event( | ||
| event_type=OWASPEventType.USER_CREATED, | ||
| level=OWASPEventLevel.WARN, |
There was a problem hiding this comment.
It's defined as WARN level on Canonical SSDLC user event guidance
There was a problem hiding this comment.
This happens due to trusted inputs and this will case a WARN on every boot - the typically arguments for "why" to WARN don't seem to apply in this case.
Also, since that is just an example this seems up for interpretation. I think it makes more sense not to warn on every boot.
There was a problem hiding this comment.
Maybe just set it to INFO and include a short comment in case anyone wonders why.
cloudinit/log/security_event_log.py
Outdated
| groups_suffix = kwargs.get("groups", "") | ||
| if groups_suffix: | ||
| if isinstance(groups_suffix, (dict, list)): | ||
| groups_suffix = ",".join(groups_suffix) | ||
| for perms in ("sudo", "doas"): | ||
| if kwargs.get(perms): | ||
| groups_suffix += f",{perms}" | ||
| if groups_suffix: | ||
| groups_suffix = groups_suffix.strip(",") | ||
| groups_msg = f" in groups: {groups_suffix}" | ||
| params.append(f"groups:{groups_suffix}") |
There was a problem hiding this comment.
As mentioned elsewhere, this does both kwarg parsing and manipulation of data types that have nothing to do with logging.
There was a problem hiding this comment.
Added both Distro._user_groups_to_list and Distro._get_elevated_roles to perform that processing in Distro and avoid pushing that kwarg handling into log/security_event_log.
0e7b9f1 to
571b53b
Compare
Also call Distro._user_groups_to_list in sec_log_user_created to avoid duplicated processing of add_user kwargs in security_event_log.
571b53b to
a18a102
Compare
Provide an initial spike for logging OWASP formatted events in cloud-init for discussion. Integration tests will be added upon agreement for security logging procedures.
Proposed Commit Message
Additional Context
A followup PR will provide USER update functionality due to
groups: { admingrp: [root, sys]}because this will require a refactor of Distro.create_group for multiple distrosTest Steps
Merge type