Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion awscli/customizations/configure/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import os
import sys

from awscli.compat import shlex
from awscli.compat import is_windows, shlex
from awscli.customizations.utils import uni_print

NOT_SET = '<not set>'
PREDEFINED_SECTION_NAMES = 'plugins'
Expand Down Expand Up @@ -62,3 +65,36 @@ def get_section_header(section_type, section_name):
if any(c in _WHITESPACE for c in section_name):
section_name = shlex.quote(section_name)
return f'{section_type} {section_name}'


PERMISSIONS_WARNING_TEMPLATE = (
"\naws: [WARNING]: The file '{path}' is accessible by other users. "
"Consider running 'chmod 600 {path}' to restrict access to only your user.\n"
)


def warn_if_permissive(file_path, err_stream=None):
if is_windows:
return

if not os.path.isfile(file_path):
return

if err_stream is None:
err_stream = sys.stderr

try:
file_mode = os.stat(file_path).st_mode
if is_overly_permissive(file_mode, 0o700):
uni_print(
PERMISSIONS_WARNING_TEMPLATE.format(
path=file_path,
),
out_file=err_stream,
)
except OSError:
return


def is_overly_permissive(file_mode, allowed_bits=0o700):
return bool((file_mode & 0o777) & ~allowed_bits)
4 changes: 3 additions & 1 deletion awscli/customizations/configure/configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,5 +213,7 @@ def _write_out_creds_file_values(self, new_values, profile_name):
self._session.get_config_variable('credentials_file')
)
self._config_writer.update_config(
credential_file_values, shared_credentials_filename
credential_file_values,
shared_credentials_filename,
check_permissions=True,
)
14 changes: 9 additions & 5 deletions awscli/customizations/configure/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import sys

from awscli.customizations.commands import BasicCommand
from awscli.customizations.configure import warn_if_permissive
from awscli.customizations.configure.writer import ConfigFileWriter
from awscli.customizations.utils import uni_print

Expand Down Expand Up @@ -69,7 +70,7 @@ class ConfigureImportCommand(BasicCommand):
def __init__(
self, session, csv_parser=None, importer=None, out_stream=None
):
super(ConfigureImportCommand, self).__init__(session)
super().__init__(session)
if csv_parser is None:
csv_parser = CSVCredentialParser()
self._csv_parser = csv_parser
Expand All @@ -90,20 +91,23 @@ def _get_config_path(self):
def _import_csv(self, contents):
self._check_possible_filepath(contents)
config_path = self._get_config_path()
warn_if_permissive(config_path)
credentials = self._csv_parser.parse_credentials(contents)
for credential in credentials:
self._importer.import_credential(
credential,
config_path,
profile_prefix=self._profile_prefix,
)
import_msg = 'Successfully imported %s profile(s)\n' % len(credentials)
import_msg = f'Successfully imported {len(credentials)} profile(s)\n'
uni_print(import_msg, out_file=self._out_stream)

def _check_possible_filepath(self, csv_data):
if ('\n' not in csv_data and
os.path.exists(csv_data) and
not csv_data.startswith('file://')):
if (
'\n' not in csv_data
and os.path.exists(csv_data)
and not csv_data.startswith('file://')
):
raise ValueError(
"You may be passing a file to import without the 'file://' prefix. "
"To import a CSV file, use --csv file://path/to/file.csv"
Expand Down
4 changes: 3 additions & 1 deletion awscli/customizations/configure/mfalogin.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,9 @@ def _write_temporary_credentials(self, temp_credentials, target_profile):
except AttributeError:
expiration_time = str(temp_credentials['Expiration'])

self._config_writer.update_config(credential_values, credentials_file)
self._config_writer.update_config(
credential_values, credentials_file, check_permissions=True
)

sys.stdout.write(
f"Temporary credentials written to profile '{target_profile}'\n"
Expand Down
6 changes: 5 additions & 1 deletion awscli/customizations/configure/set.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,18 @@ def _run_main(self, args, parsed_globals):
# Otherwise it's something in the [plugin] section
profile, varname = parts
config_filename = self._get_config_file('config_file')
check_permissions = False
if varname in self._WRITE_TO_CREDS_FILE:
# When writing to the creds file, the section is just the profile
section = profile
config_filename = self._get_config_file('credentials_file')
check_permissions = True
elif profile in PREDEFINED_SECTION_NAMES or profile == 'default':
section = profile
else:
section = profile_to_section(profile)
updated_config = {'__section__': section, varname: value}
self._config_writer.update_config(updated_config, config_filename)
self._config_writer.update_config(
updated_config, config_filename, check_permissions=check_permissions
)
return 0
10 changes: 8 additions & 2 deletions awscli/customizations/configure/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import os
import re

from . import SectionNotFoundError
from . import SectionNotFoundError, warn_if_permissive


class ConfigFileWriter:
Expand All @@ -37,7 +37,7 @@ def _validate_no_newlines_or_carriage_returns(
)
raise ValueError(err_msg)

def update_config(self, new_values, config_filename):
def update_config(self, new_values, config_filename, check_permissions=False):
"""Update config file with new values.

This method will update a section in a config file with
Expand All @@ -63,6 +63,10 @@ def update_config(self, new_values, config_filename):
:param config_filename: The config filename where values will be
written.

:type check_permissions: bool
:param check_permissions: If True, warn if the file has
permissions more permissive than 0o600.

"""
section_name = new_values.pop('__section__', 'default')
self._validate_no_newlines_or_carriage_returns(
Expand Down Expand Up @@ -111,6 +115,8 @@ def update_config(self, new_values, config_filename):
f.write(''.join(contents))
except SectionNotFoundError:
self._write_new_section(section_name, new_values, config_filename)
if check_permissions:
warn_if_permissive(config_filename)

def _create_file(self, config_filename):
# Create the file as well as the parent dir if needed.
Expand Down
21 changes: 20 additions & 1 deletion tests/unit/customizations/configure/test_configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ def assert_credentials_file_updated_with(self, new_values):
credentials_file_call = called_args[0]
expected_creds_file = os.path.expanduser('~/fake_credentials_filename')
self.assertEqual(
credentials_file_call, mock.call(new_values, expected_creds_file)
credentials_file_call,
mock.call(
new_values, expected_creds_file, check_permissions=True
),
)

def test_configure_command_sends_values_to_writer(self):
Expand Down Expand Up @@ -232,6 +235,22 @@ def test_iam_user_credentials_remove_session_token(self):
}
)

def test_check_permissions_when_credential_values_provided(self):
self.configure(args=[], parsed_globals=self.global_args)
expected_creds_file = os.path.expanduser('~/fake_credentials_filename')
self.writer.update_config.assert_any_call(
mock.ANY, expected_creds_file, check_permissions=True
)

def test_no_check_permissions_when_no_credential_values_changed(self):
user_presses_enter = None
precanned = PrecannedPrompter(value=user_presses_enter)
self.configure = configure.ConfigureCommand(
self.session, prompter=precanned, config_writer=self.writer
)
self.configure(args=[], parsed_globals=self.global_args)
self.writer.update_config.assert_not_called()


class TestInteractivePrompter(unittest.TestCase):
def setUp(self):
Expand Down
28 changes: 22 additions & 6 deletions tests/unit/customizations/configure/test_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,27 +98,44 @@ def test_raises_error_when_plain_file_path_passed(self):
f.write('User name,Access key ID,Secret access key\nuser,AKID,SAK')
try:
with self.assertRaises(ValueError) as cm:
self.import_command(args=['--csv', 'temp_creds.csv'], parsed_globals=None)
self.import_command(
args=['--csv', 'temp_creds.csv'], parsed_globals=None
)
self.assertIn("without the 'file://' prefix", str(cm.exception))
finally:
os.remove('temp_creds.csv')

def test_inline_csv_succeeds(self):
csv_string = 'User name,Access key ID,Secret access key\nuser,AKID,SAK'
self.import_command(args=['--csv', csv_string], parsed_globals=None)
self.assertIn('Successfully imported 1 profile', self.stdout.getvalue())
self.assertIn(
'Successfully imported 1 profile', self.stdout.getvalue()
)

def test_csv_content_from_file_succeeds(self):
with open('temp_creds.csv', 'w') as f:
f.write('User name,Access key ID,Secret access key\nuser,AKID,SAK')
try:
with open('temp_creds.csv', 'r') as f:
with open('temp_creds.csv') as f:
contents = f.read()
self.import_command(args=['--csv', contents], parsed_globals=None)
self.assertIn('Successfully imported 1 profile', self.stdout.getvalue())
self.assertIn(
'Successfully imported 1 profile', self.stdout.getvalue()
)
finally:
os.remove('temp_creds.csv')

@mock.patch('awscli.customizations.configure.importer.warn_if_permissive')
def test_warn_called_once_when_importing_credentials(self, mock_warn):
rows = (
'PROF1,PW,AKID1,SAK1,https://console.link\n'
'PROF2,PW,AKID2,SAK2,https://console.link\n'
)
content = CSV_HEADERS + rows
self.import_command(args=['--csv', content], parsed_globals=None)
mock_warn.assert_called_once_with(self.fake_credentials_filename)


class TestCSVCredentialParser(unittest.TestCase):
def setUp(self):
self.parser = CSVCredentialParser()
Expand All @@ -134,8 +151,7 @@ def test_csv_parser_downloaded_csv(self):

def test_csv_parser_simple(self):
contents = (
'User name,Access key ID,Secret access key\n'
'PROFILENAME,AKID,SAK\n'
'User name,Access key ID,Secret access key\nPROFILENAME,AKID,SAK\n'
)
self.assert_parse_matches_expected(contents)

Expand Down
39 changes: 37 additions & 2 deletions tests/unit/customizations/configure/test_mfalogin.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ def test_successful_mfa_login(self):
}

self.config_writer.update_config.assert_called_with(
expected_values, '/tmp/credentials'
expected_values, '/tmp/credentials', check_permissions=True
)

def test_serial_number_from_parameter(self):
Expand Down Expand Up @@ -566,4 +566,39 @@ def test_empty_credential_input_handling(self):

self.assertEqual(rc, 1)
all_writes = ''.join(str(call) for call in mock_stderr.write.call_args_list)
self.assertIn("aws: [ERROR]: AWS Access Key ID is required", all_writes)
self.assertIn("aws: [ERROR]: AWS Access Key ID is required", all_writes)

def test_check_permissions_when_writing_temporary_credentials(self):
self.prompter.get_credential_value.side_effect = [
'arn:aws:iam::123456789012:mfa/user',
'123456',
]
self.prompter.get_value.return_value = 'session-test'

expiration = datetime.datetime(2023, 5, 19, 18, 6, 10)
sts_response = {
'Credentials': {
'AccessKeyId': 'ASIAIOSFODNN7EXAMPLE',
'SecretAccessKey': 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY',
'SessionToken': 'SESSION_TOKEN',
'Expiration': expiration,
}
}

sts_client = mock.Mock()
sts_client.get_session_token.return_value = sts_response
self.session.create_client.return_value = sts_client

with mock.patch('sys.stdin.isatty', return_value=True):
with mock.patch(
'os.path.expanduser', return_value='/tmp/credentials'
):
with mock.patch('sys.stdout'):
rc = self.command._run_main(
self.parsed_args, self.parsed_globals
)

self.assertEqual(rc, 0)
self.config_writer.update_config.assert_called_once_with(
mock.ANY, '/tmp/credentials', check_permissions=True
)
81 changes: 81 additions & 0 deletions tests/unit/customizations/configure/test_permissions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import os
from io import StringIO
from unittest.mock import patch

import pytest

from awscli.customizations.configure import (
is_overly_permissive,
warn_if_permissive,
)


@pytest.mark.parametrize("mode", [0o700, 0o600, 0o400, 0o200, 0o000])
def test_acceptable_modes_are_not_overly_permissive(mode):
assert is_overly_permissive(mode) is False


@pytest.mark.parametrize("mode", [0o644, 0o666, 0o777, 0o610, 0o601])
def test_overly_permissive_modes_are_detected(mode):
assert is_overly_permissive(mode) is True


def _create_creds_file(tmp_path, mode):
path = tmp_path / "credentials"
path.write_text("[default]\naws_access_key_id=testing\n")
os.chmod(path, mode)
return str(path)


@patch("awscli.customizations.configure.is_windows", False)
def test_prints_warning_for_permissive_file(tmp_path):
path = _create_creds_file(tmp_path, 0o644)
err = StringIO()
warn_if_permissive(path, err_stream=err)
output = err.getvalue()
assert "aws: [WARNING]" in output
assert path in output
assert "accessible by other users" in output


@patch("awscli.customizations.configure.is_windows", False)
def test_no_warning_for_0o600_file(tmp_path):
path = _create_creds_file(tmp_path, 0o600)
err = StringIO()
warn_if_permissive(path, err_stream=err)
assert err.getvalue() == ""


@patch("awscli.customizations.configure.is_windows", False)
def test_no_warning_for_0o700_file(tmp_path):
path = _create_creds_file(tmp_path, 0o700)
err = StringIO()
warn_if_permissive(path, err_stream=err)
assert err.getvalue() == ""


@patch("awscli.customizations.configure.is_windows", False)
def test_skips_when_file_does_not_exist(tmp_path):
err = StringIO()
warn_if_permissive(str(tmp_path / "nonexistent"), err_stream=err)
assert err.getvalue() == ""


@patch("awscli.customizations.configure.is_windows", True)
def test_skips_on_windows(tmp_path):
path = _create_creds_file(tmp_path, 0o777)
err = StringIO()
warn_if_permissive(path, err_stream=err)
assert err.getvalue() == ""
Loading
Loading