diff --git a/README.md b/README.md index d096467..dbc6584 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,8 @@ **Activity** - The activity endpoint is in public preview and subject to change +**Authorization** - The authorization endpoint is in public preview and subject to change + ## Tested Against Python Versions * 3.7 * 3.8 diff --git a/duo_client/__init__.py b/duo_client/__init__.py index 3fb5d01..84e4680 100644 --- a/duo_client/__init__.py +++ b/duo_client/__init__.py @@ -1,10 +1,12 @@ from .accounts import Accounts from .admin import Admin from .auth import Auth +from .authorization import Authorization from .client import __version__ __all__ = [ 'Accounts', 'Admin', 'Auth', + 'Authorization', ] diff --git a/duo_client/authorization.py b/duo_client/authorization.py new file mode 100644 index 0000000..92705e1 --- /dev/null +++ b/duo_client/authorization.py @@ -0,0 +1,69 @@ +""" +Duo Security Authorization API reference client implementation. +""" +from dataclasses import dataclass +from typing import ClassVar, Final, Optional + +from . import client + + +@dataclass +class McpCapabilities: + route_fragment: ClassVar[Final[str]] = 'mcp_capabilities' + access_token: str + mcp_server_id: str + mcp_server_name: str = '' + tool: Optional[str] = None + + +class Authorization(client.Client): + def ping(self): + """ + Determine if the Duo Authorization service is up and responding. + + Returns information about the service state: { + 'time': , + } + """ + return self.json_api_call('GET', '/authorize/v1/ping', {}) + + def check(self): + """ + Determine if the integration key, secret key, and signature + generation are valid. + + Returns information about the service state: { + 'time': , + } + """ + return self.json_api_call('GET', '/authorize/v1/check', {}) + + def evaluate(self, input: McpCapabilities): + """ + Evaluate authorization policy for MCP server capabilities. + + Returns: { + 'allowed_capabilities': , + 'authorized': , + 'expires_at': , + 'user_id': , + 'non_human_identity': , + 'policy_version_id': , + } + """ + params = { + 'access_token': input.access_token, + 'mcp_server_id': input.mcp_server_id, + 'mcp_server_name': input.mcp_server_name, + } + if input.tool is not None: + params['tool'] = input.tool + response = self.json_api_call('POST', f'/authorize/v1/{input.route_fragment}/evaluate', params) + return { + 'allowed_capabilities': response.get('allowed_capabilities'), + 'authorized': response.get('authorized'), + 'expires_at': response.get('expires_at'), + 'user_id': response.get('user_id'), + 'non_human_identity': response.get('non_human_identity'), + 'policy_version_id': response.get('policy_version_id'), + } diff --git a/examples/Authorization/evaluate_mcp_capabilities.py b/examples/Authorization/evaluate_mcp_capabilities.py new file mode 100644 index 0000000..08bef24 --- /dev/null +++ b/examples/Authorization/evaluate_mcp_capabilities.py @@ -0,0 +1,99 @@ +""" +Example of Duo Authorization API MCP capabilities evaluation +""" + +from argparse import ArgumentParser, Namespace +import duo_client +from duo_client.authorization import McpCapabilities +import getpass + + +def _get_arg(args: Namespace, name: str, prompt: str, secure=False): + """Read arg from CLI flags or stdin, using getpass when sensitive information should not be echoed to tty""" + value = getattr(args, name) + if value is not None: + return value + + if secure is True: + return getpass.getpass(prompt) + else: + return input(prompt) + + +def prompt_for_credentials(args: Namespace) -> dict: + """Collect required API credentials from command line prompts + + :return: dictionary containing Duo Authorization API ikey, skey and hostname strings + """ + + ikey = _get_arg(args, "ikey", 'Duo Authorization API integration key ("DI..."): ') + skey = _get_arg(args, "skey", 'Duo Authorization API integration secret key: ', secure=True) + host = _get_arg(args, "api_host", 'Duo Authorization API hostname ("api-....duosecurity.com"): ') + access_token = _get_arg(args, "access_token", 'Access token: ', secure=True) + mcp_server_id = _get_arg(args, "mcp_server_id", 'MCP Server ID: ') + + return { + "IKEY": ikey, + "SKEY": skey, + "APIHOST": host, + "ACCESS_TOKEN": access_token, + "MCP_SERVER_ID": mcp_server_id, + } + + +def main(): + """Main program entry point""" + + parser = ArgumentParser() + parser.add_argument("--ikey", type=str) + parser.add_argument("--skey", type=str) + parser.add_argument("--api-host", type=str) + parser.add_argument("--access-token", type=str) + parser.add_argument("--mcp-server-id", type=str) + parser.add_argument("--mcp-server-name", type=str, default='') + parser.add_argument("--tool", type=str, default=None) + args = parser.parse_args() + + inputs = prompt_for_credentials(args) + + authz_client = duo_client.Authorization( + ikey=inputs['IKEY'], + skey=inputs['SKEY'], + host=inputs['APIHOST'], + ) + + # Verify that the Duo service is available + duo_ping = authz_client.ping() + if 'time' in duo_ping: + print("\nDuo Authorization service check completed successfully.") + else: + print(f"Error: {duo_ping}") + + # Verify that IKEY and SKEY information provided are valid + duo_check = authz_client.check() + if 'time' in duo_check: + print("IKEY and SKEY provided have been verified.") + else: + print(f"Error: {duo_check}") + + # Evaluate MCP capabilities + capabilities = McpCapabilities( + access_token=inputs['ACCESS_TOKEN'], + mcp_server_id=inputs['MCP_SERVER_ID'], + mcp_server_name=args.mcp_server_name, + tool=args.tool, + ) + + print(f"\nEvaluating MCP capabilities for server {inputs['MCP_SERVER_ID']}...") + result = authz_client.evaluate(capabilities) + + print(f"\nAuthorized: {result['authorized']}") + print(f"Allowed capabilities: {result['allowed_capabilities']}") + print(f"User ID: {result['user_id']}") + print(f"Non-human identity: {result['non_human_identity']}") + print(f"Policy version ID: {result['policy_version_id']}") + print(f"Expires at: {result['expires_at']}") + + +if __name__ == '__main__': + main() diff --git a/tests/authorization/__init__.py b/tests/authorization/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/authorization/base.py b/tests/authorization/base.py new file mode 100644 index 0000000..3debbee --- /dev/null +++ b/tests/authorization/base.py @@ -0,0 +1,15 @@ +import unittest +from .. import util +import duo_client.authorization + + +class TestAuthorization(unittest.TestCase): + + def setUp(self): + self.client = duo_client.authorization.Authorization( + 'test_ikey', 'test_skey', 'example.com') + self.client._connect = lambda: util.MockHTTPConnection() + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/authorization/test_authorization.py b/tests/authorization/test_authorization.py new file mode 100644 index 0000000..9b2e59e --- /dev/null +++ b/tests/authorization/test_authorization.py @@ -0,0 +1,58 @@ +import unittest +from .base import TestAuthorization +from .. import util +from duo_client.authorization import McpCapabilities + + +class TestPing(TestAuthorization): + + def test_ping(self): + response = self.client.ping() + self.assertEqual(response['method'], 'GET') + self.assertIn('/authorize/v1/ping', response['uri']) + + +class TestCheck(TestAuthorization): + + def test_check(self): + response = self.client.check() + self.assertEqual(response['method'], 'GET') + self.assertIn('/authorize/v1/check', response['uri']) + + +class TestEvaluate(TestAuthorization): + + def setUp(self): + super().setUp() + self.mock_conn = util.MockHTTPConnection() + self.client._connect = lambda: self.mock_conn + + def test_evaluate(self): + capabilities = McpCapabilities( + access_token='test_token', + mcp_server_id='server_123', + ) + response = self.client.evaluate(capabilities) + self.assertEqual(self.mock_conn.method, 'POST') + self.assertIn('/authorize/v1/mcp_capabilities/evaluate', self.mock_conn.uri) + self.assertIn('allowed_capabilities', response) + self.assertIn('authorized', response) + self.assertIn('expires_at', response) + self.assertIn('user_id', response) + self.assertIn('non_human_identity', response) + self.assertIn('policy_version_id', response) + + def test_evaluate_with_optional_params(self): + capabilities = McpCapabilities( + access_token='test_token', + mcp_server_id='server_123', + mcp_server_name='my_server', + tool='my_tool', + ) + response = self.client.evaluate(capabilities) + self.assertEqual(self.mock_conn.method, 'POST') + self.assertIn('/authorize/v1/mcp_capabilities/evaluate', self.mock_conn.uri) + + +if __name__ == '__main__': + unittest.main()