diff --git a/src/azure-cli-core/azure/cli/core/tests/test_util.py b/src/azure-cli-core/azure/cli/core/tests/test_util.py index ae329ed65b7..a1374d27f73 100644 --- a/src/azure-cli-core/azure/cli/core/tests/test_util.py +++ b/src/azure-cli-core/azure/cli/core/tests/test_util.py @@ -5,6 +5,7 @@ # pylint: disable=line-too-long from collections import namedtuple +import io import os import sys import unittest @@ -17,7 +18,9 @@ (get_file_json, truncate_text, shell_safe_json_parse, b64_to_hex, hash_string, random_string, open_page_in_browser, can_launch_browser, handle_exception, ConfiguredDefaultSetter, send_raw_request, should_disable_connection_verify, parse_proxy_resource_id, get_az_user_agent, get_az_rest_user_agent, - _get_parent_proc_name, is_wsl, run_cmd, run_az_cmd, roughly_parse_command) + _get_parent_proc_name, is_wsl, run_cmd, run_az_cmd, roughly_parse_command, + GRAPH_ACCESS_DENIED_RECOMMENDATION) +from azure.cli.core import azclierror from azure.cli.core.mock import DummyCli @@ -613,6 +616,51 @@ def test_handle_exception_httpoperationerror_no_response_text(self, mock_logger_ self.assertIn(str(mock_http_error), mock_logger_error.call_args[0][0]) self.assertEqual(ex_result, 1) + @mock.patch('azure.cli.core.azclierror.logger.error', autospec=True) + def test_handle_exception_http_error_graph_access_denied_adds_recommendation(self, mock_logger_error): + for error_code in ['accessDenied', 'ACCESSDENIED']: + with self.subTest(error_code=error_code): + mock_response = mock.MagicMock() + mock_response.status_code = 403 + mock_response.headers = {} + mock_response.url = 'https://graph.microsoft.com/v1.0/policies/authenticationStrengthPolicies/' + mock_response.text = json.dumps({"error": {"code": error_code, "message": "Request Authorization failed"}}) + mock_response.json.return_value = json.loads(mock_response.text) + http_error = azclierror.HTTPError(f'Forbidden({{"error":{{"code":"{error_code}"}}}})', mock_response) + + with mock.patch('sys.stderr', new=io.StringIO()) as stderr: + ex_result = handle_exception(http_error) + + self.assertTrue(mock_logger_error.called) + self.assertIn('Forbidden', str(mock_logger_error.call_args[0][0])) + self.assertEqual(ex_result, 1) + self.assertIn(GRAPH_ACCESS_DENIED_RECOMMENDATION, stderr.getvalue()) + + @mock.patch('azure.cli.core.azclierror.logger.error', autospec=True) + def test_handle_exception_http_error_without_graph_access_denied_no_recommendation(self, _): + test_cases = [ + (403, 'https://management.azure.com/subscriptions?api-version=2020-01-01', + {"error": {"code": "accessDenied", "message": "Request Authorization failed"}}), + (404, 'https://graph.microsoft.com/v1.0/policies/authenticationStrengthPolicies/', + {"error": {"code": "accessDenied", "message": "Request Authorization failed"}}) + ] + + for status_code, url, payload in test_cases: + with self.subTest(status_code=status_code, url=url): + mock_response = mock.MagicMock() + mock_response.status_code = status_code + mock_response.headers = {} + mock_response.url = url + mock_response.text = json.dumps(payload) + mock_response.json.return_value = payload + http_error = azclierror.HTTPError('Request failed', mock_response) + + with mock.patch('sys.stderr', new=io.StringIO()) as stderr: + ex_result = handle_exception(http_error) + + self.assertEqual(ex_result, 1) + self.assertNotIn(GRAPH_ACCESS_DENIED_RECOMMENDATION, stderr.getvalue()) + @staticmethod def _get_mock_HttpOperationError(response_text): from msrest.exceptions import HttpOperationError diff --git a/src/azure-cli-core/azure/cli/core/util.py b/src/azure-cli-core/azure/cli/core/util.py index b693ee00518..3f9112041b8 100644 --- a/src/azure-cli-core/azure/cli/core/util.py +++ b/src/azure-cli-core/azure/cli/core/util.py @@ -27,6 +27,15 @@ QUERY_REFERENCE = ("To learn more about --query, please visit: " "'https://learn.microsoft.com/cli/azure/query-azure-cli'") +GRAPH_ACCESS_DENIED_RECOMMENDATION = ( + "If this Microsoft Graph API call requires privileged permissions (for example " + "Policy.ReadWrite.ConditionalAccess), Azure CLI's first-party app may not be pre-authorized " + "for that scope (AADSTS65002). Use 'az login --service-principal' with an app registration " + "that has the required Microsoft Graph permissions granted." +) +GRAPH_ENDPOINT_PREFIX_LOWER = 'https://graph.microsoft.com/' +GRAPH_ACCESS_DENIED_ERROR_CODE_LOWER = 'accessdenied' + _PROXYID_RE = re.compile( '(?i)/subscriptions/(?P[^/]*)(/resourceGroups/(?P[^/]*))?' @@ -99,6 +108,9 @@ def handle_exception(ex): # pylint: disable=too-many-locals, too-many-statement az_error.set_recommendation("Interactive authentication is needed. Please run:\naz logout\naz login") else: az_error = azclierror.UnclassifiedUserFault(ex) + recommendation = _get_graph_access_denied_recommendation(ex.response) + if recommendation: + az_error.set_recommendation(recommendation) elif isinstance(ex, CLIError): # TODO: Fine-grained analysis here @@ -184,6 +196,33 @@ def extract_common_error_message(ex): return error_msg +def _get_graph_access_denied_recommendation(response): + if getattr(response, 'status_code', None) != 403: + return None + + response_url = getattr(response, 'url', '') + if not isinstance(response_url, str) or not response_url.lower().startswith(GRAPH_ENDPOINT_PREFIX_LOWER): + return None + + try: + response_json = response.json() + except Exception: # pylint: disable=broad-except + try: + response_json = json.loads(getattr(response, 'text', '')) + except Exception: # pylint: disable=broad-except + return None + + error = response_json.get('error') if isinstance(response_json, dict) else None + if not isinstance(error, dict): + return None + + error_code = error.get('code') + if isinstance(error_code, str) and error_code.lower() == GRAPH_ACCESS_DENIED_ERROR_CODE_LOWER: + return GRAPH_ACCESS_DENIED_RECOMMENDATION + + return None + + def extract_http_operation_error(ex): error_msg = None status_code = 'Unknown Code'