Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions awscli/customizations/configure/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import string
from awscli.compat import shlex
import os
import sys

from awscli.compat import is_windows, shlex
from awscli.customizations.utils import uni_print

NOT_SET = '<not set>'
PREDEFINED_SECTION_NAMES = ('preview', 'plugins')
Expand Down Expand Up @@ -47,3 +50,35 @@ def profile_to_section(profile_name):
if any(c in _WHITESPACE for c in profile_name):
profile_name = shlex.quote(profile_name)
return 'profile %s' % profile_name


PERMISSIONS_WARNING_TEMPLATE = (
"\naws: [WARNING]: The file '{path}' is accessible by other users. "
"Consider running 'chmod 600 {path}' to restrict access to only your "
"user.\n"
)


def warn_if_permissive(file_path, err_stream=None):
if is_windows:
return

if not os.path.isfile(file_path):
return

if err_stream is None:
err_stream = sys.stderr

try:
file_mode = os.stat(file_path).st_mode
if is_overly_permissive(file_mode, 0o700):
uni_print(
PERMISSIONS_WARNING_TEMPLATE.format(path=file_path),
out_file=err_stream,
)
except OSError:
return


def is_overly_permissive(file_mode, allowed_bits=0o700):
return bool((file_mode & 0o777) & ~allowed_bits)
4 changes: 3 additions & 1 deletion awscli/customizations/configure/configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,6 @@ def _write_out_creds_file_values(self, new_values, profile_name):
self._session.get_config_variable('credentials_file'))
self._config_writer.update_config(
credential_file_values,
shared_credentials_filename)
shared_credentials_filename,
check_permissions=True,
)
8 changes: 7 additions & 1 deletion awscli/customizations/configure/set.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,20 @@ def _run_main(self, args, parsed_globals):
# of something in the [plugin] section.
profile, varname = parts
config_filename = self._get_config_file('config_file')
check_permissions = False
if varname in self._WRITE_TO_CREDS_FILE:
# When writing to the creds file, the section is just the profile
section = profile
config_filename = self._get_config_file('credentials_file')
check_permissions = True
elif profile in PREDEFINED_SECTION_NAMES or profile == 'default':
section = profile
else:
section = profile_to_section(profile)
updated_config = {'__section__': section, varname: value}
self._config_writer.update_config(updated_config, config_filename)
self._config_writer.update_config(
updated_config,
config_filename,
check_permissions=check_permissions,
)
return 0
12 changes: 10 additions & 2 deletions awscli/customizations/configure/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import os
import re

from . import SectionNotFoundError
from . import SectionNotFoundError, warn_if_permissive


class ConfigFileWriter(object):
Expand All @@ -24,7 +24,9 @@ class ConfigFileWriter(object):
r'(?P<value>.*)$'
)

def update_config(self, new_values, config_filename):
def update_config(
self, new_values, config_filename, check_permissions=False
):
"""Update config file with new values.

This method will update a section in a config file with
Expand All @@ -50,6 +52,10 @@ def update_config(self, new_values, config_filename):
:param config_filename: The config filename where values will be
written.

:type check_permissions: bool
:param check_permissions: If True, warn if the file has
permissions more permissive than 0o600.

"""
section_name = new_values.pop('__section__', 'default')
if not os.path.isfile(config_filename):
Expand All @@ -66,6 +72,8 @@ def update_config(self, new_values, config_filename):
f.write(''.join(contents))
except SectionNotFoundError:
self._write_new_section(section_name, new_values, config_filename)
if check_permissions:
warn_if_permissive(config_filename)

def _create_file(self, config_filename):
# Create the file as well as the parent dir if needed.
Expand Down
19 changes: 18 additions & 1 deletion tests/unit/customizations/configure/test_configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ def assert_credentials_file_updated_with(self, new_values):
credentials_file_call = called_args[0]
expected_creds_file = os.path.expanduser('~/fake_credentials_filename')
self.assertEqual(credentials_file_call,
mock.call(new_values, expected_creds_file))
mock.call(new_values, expected_creds_file,
check_permissions=True))

def test_configure_command_sends_values_to_writer(self):
self.configure(args=[], parsed_globals=self.global_args)
Expand Down Expand Up @@ -129,6 +130,22 @@ def test_session_says_profile_does_not_exist(self):
'region': 'new_value',
'output': 'new_value'}, 'myconfigfile')

def test_check_permissions_when_credential_values_provided(self):
self.configure(args=[], parsed_globals=self.global_args)
expected_creds_file = os.path.expanduser('~/fake_credentials_filename')
self.writer.update_config.assert_any_call(
mock.ANY, expected_creds_file, check_permissions=True
)

def test_no_check_permissions_when_no_credential_values_changed(self):
user_presses_enter = None
precanned = PrecannedPrompter(value=user_presses_enter)
self.configure = configure.ConfigureCommand(
self.session, prompter=precanned, config_writer=self.writer
)
self.configure(args=[], parsed_globals=self.global_args)
self.writer.update_config.assert_not_called()


class TestInteractivePrompter(unittest.TestCase):

Expand Down
81 changes: 81 additions & 0 deletions tests/unit/customizations/configure/test_permissions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import os
from io import StringIO
from unittest.mock import patch

import pytest

from awscli.customizations.configure import (
is_overly_permissive,
warn_if_permissive,
)


@pytest.mark.parametrize("mode", [0o700, 0o600, 0o400, 0o200, 0o000])
def test_acceptable_modes_are_not_overly_permissive(mode):
assert is_overly_permissive(mode) is False


@pytest.mark.parametrize("mode", [0o644, 0o666, 0o777, 0o610, 0o601])
def test_overly_permissive_modes_are_detected(mode):
assert is_overly_permissive(mode) is True


def _create_creds_file(tmp_path, mode):
path = tmp_path / "credentials"
path.write_text("[default]\naws_access_key_id=testing\n")
os.chmod(path, mode)
return str(path)


@patch("awscli.customizations.configure.is_windows", False)
def test_prints_warning_for_permissive_file(tmp_path):
path = _create_creds_file(tmp_path, 0o644)
err = StringIO()
warn_if_permissive(path, err_stream=err)
output = err.getvalue()
assert "aws: [WARNING]" in output
assert path in output
assert "accessible by other users" in output


@patch("awscli.customizations.configure.is_windows", False)
def test_no_warning_for_0o600_file(tmp_path):
path = _create_creds_file(tmp_path, 0o600)
err = StringIO()
warn_if_permissive(path, err_stream=err)
assert err.getvalue() == ""


@patch("awscli.customizations.configure.is_windows", False)
def test_no_warning_for_0o700_file(tmp_path):
path = _create_creds_file(tmp_path, 0o700)
err = StringIO()
warn_if_permissive(path, err_stream=err)
assert err.getvalue() == ""


@patch("awscli.customizations.configure.is_windows", False)
def test_skips_when_file_does_not_exist(tmp_path):
err = StringIO()
warn_if_permissive(str(tmp_path / "nonexistent"), err_stream=err)
assert err.getvalue() == ""


@patch("awscli.customizations.configure.is_windows", True)
def test_skips_on_windows(tmp_path):
path = _create_creds_file(tmp_path, 0o777)
err = StringIO()
warn_if_permissive(path, err_stream=err)
assert err.getvalue() == ""
Loading
Loading