Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

**Activity** - The activity endpoint is in public preview and subject to change

**Authorization** - The authorization endpoint is in public preview and subject to change

## Tested Against Python Versions
* 3.7
* 3.8
Expand Down
2 changes: 2 additions & 0 deletions duo_client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from .accounts import Accounts
from .admin import Admin
from .auth import Auth
from .authorization import Authorization
from .client import __version__

__all__ = [
'Accounts',
'Admin',
'Auth',
'Authorization',
]
69 changes: 69 additions & 0 deletions duo_client/authorization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""
Duo Security Authorization API reference client implementation.
"""
from dataclasses import dataclass
from typing import ClassVar, Final, Optional

from . import client


@dataclass
class McpCapabilities:
route_fragment: ClassVar[Final[str]] = 'mcp_capabilities'
access_token: str
mcp_server_id: str
mcp_server_name: str = ''
tool: Optional[str] = None


class Authorization(client.Client):
def ping(self):
"""
Determine if the Duo Authorization service is up and responding.

Returns information about the service state: {
'time': <int:UNIX timestamp>,
}
"""
return self.json_api_call('GET', '/authorize/v1/ping', {})

def check(self):
"""
Determine if the integration key, secret key, and signature
generation are valid.

Returns information about the service state: {
'time': <int:UNIX timestamp>,
}
"""
return self.json_api_call('GET', '/authorize/v1/check', {})

def evaluate(self, input: McpCapabilities):
"""
Evaluate authorization policy for MCP server capabilities.

Returns: {
'allowed_capabilities': <list[str] or None>,
'authorized': <bool or None>,
'expires_at': <int:UNIX timestamp>,
'user_id': <str>,
'non_human_identity': <str>,
'policy_version_id': <int or None>,
}
"""
params = {
'access_token': input.access_token,
'mcp_server_id': input.mcp_server_id,
'mcp_server_name': input.mcp_server_name,
}
if input.tool is not None:
params['tool'] = input.tool
response = self.json_api_call('POST', f'/authorize/v1/{input.route_fragment}/evaluate', params)
return {
'allowed_capabilities': response.get('allowed_capabilities'),
'authorized': response.get('authorized'),
'expires_at': response.get('expires_at'),
'user_id': response.get('user_id'),
'non_human_identity': response.get('non_human_identity'),
'policy_version_id': response.get('policy_version_id'),
}
99 changes: 99 additions & 0 deletions examples/Authorization/evaluate_mcp_capabilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""
Example of Duo Authorization API MCP capabilities evaluation
"""

from argparse import ArgumentParser, Namespace
import duo_client
from duo_client.authorization import McpCapabilities
import getpass


def _get_arg(args: Namespace, name: str, prompt: str, secure=False):
"""Read arg from CLI flags or stdin, using getpass when sensitive information should not be echoed to tty"""
value = getattr(args, name)
if value is not None:
return value

if secure is True:
return getpass.getpass(prompt)
else:
return input(prompt)


def prompt_for_credentials(args: Namespace) -> dict:
"""Collect required API credentials from command line prompts

:return: dictionary containing Duo Authorization API ikey, skey and hostname strings
"""

ikey = _get_arg(args, "ikey", 'Duo Authorization API integration key ("DI..."): ')
skey = _get_arg(args, "skey", 'Duo Authorization API integration secret key: ', secure=True)
host = _get_arg(args, "api_host", 'Duo Authorization API hostname ("api-....duosecurity.com"): ')
access_token = _get_arg(args, "access_token", 'Access token: ', secure=True)
mcp_server_id = _get_arg(args, "mcp_server_id", 'MCP Server ID: ')

return {
"IKEY": ikey,
"SKEY": skey,
"APIHOST": host,
"ACCESS_TOKEN": access_token,
"MCP_SERVER_ID": mcp_server_id,
}


def main():
"""Main program entry point"""

parser = ArgumentParser()
parser.add_argument("--ikey", type=str)
parser.add_argument("--skey", type=str)
parser.add_argument("--api-host", type=str)
parser.add_argument("--access-token", type=str)
parser.add_argument("--mcp-server-id", type=str)
parser.add_argument("--mcp-server-name", type=str, default='')
parser.add_argument("--tool", type=str, default=None)
args = parser.parse_args()

inputs = prompt_for_credentials(args)

authz_client = duo_client.Authorization(
ikey=inputs['IKEY'],
skey=inputs['SKEY'],
host=inputs['APIHOST'],
)

# Verify that the Duo service is available
duo_ping = authz_client.ping()
if 'time' in duo_ping:
print("\nDuo Authorization service check completed successfully.")
else:
print(f"Error: {duo_ping}")

# Verify that IKEY and SKEY information provided are valid
duo_check = authz_client.check()
if 'time' in duo_check:
print("IKEY and SKEY provided have been verified.")
else:
print(f"Error: {duo_check}")

# Evaluate MCP capabilities
capabilities = McpCapabilities(
access_token=inputs['ACCESS_TOKEN'],
mcp_server_id=inputs['MCP_SERVER_ID'],
mcp_server_name=args.mcp_server_name,
tool=args.tool,
)

print(f"\nEvaluating MCP capabilities for server {inputs['MCP_SERVER_ID']}...")
result = authz_client.evaluate(capabilities)

print(f"\nAuthorized: {result['authorized']}")
print(f"Allowed capabilities: {result['allowed_capabilities']}")
print(f"User ID: {result['user_id']}")
print(f"Non-human identity: {result['non_human_identity']}")
print(f"Policy version ID: {result['policy_version_id']}")
print(f"Expires at: {result['expires_at']}")


if __name__ == '__main__':
main()
Empty file added tests/authorization/__init__.py
Empty file.
15 changes: 15 additions & 0 deletions tests/authorization/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import unittest
from .. import util
import duo_client.authorization


class TestAuthorization(unittest.TestCase):

def setUp(self):
self.client = duo_client.authorization.Authorization(
'test_ikey', 'test_skey', 'example.com')
self.client._connect = lambda: util.MockHTTPConnection()


if __name__ == '__main__':
unittest.main()
58 changes: 58 additions & 0 deletions tests/authorization/test_authorization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import unittest
from .base import TestAuthorization
from .. import util
from duo_client.authorization import McpCapabilities


class TestPing(TestAuthorization):

def test_ping(self):
response = self.client.ping()
self.assertEqual(response['method'], 'GET')
self.assertIn('/authorize/v1/ping', response['uri'])


class TestCheck(TestAuthorization):

def test_check(self):
response = self.client.check()
self.assertEqual(response['method'], 'GET')
self.assertIn('/authorize/v1/check', response['uri'])


class TestEvaluate(TestAuthorization):

def setUp(self):
super().setUp()
self.mock_conn = util.MockHTTPConnection()
self.client._connect = lambda: self.mock_conn

def test_evaluate(self):
capabilities = McpCapabilities(
access_token='test_token',
mcp_server_id='server_123',
)
response = self.client.evaluate(capabilities)
self.assertEqual(self.mock_conn.method, 'POST')
self.assertIn('/authorize/v1/mcp_capabilities/evaluate', self.mock_conn.uri)
self.assertIn('allowed_capabilities', response)
self.assertIn('authorized', response)
self.assertIn('expires_at', response)
self.assertIn('user_id', response)
self.assertIn('non_human_identity', response)
self.assertIn('policy_version_id', response)

def test_evaluate_with_optional_params(self):
capabilities = McpCapabilities(
access_token='test_token',
mcp_server_id='server_123',
mcp_server_name='my_server',
tool='my_tool',
)
response = self.client.evaluate(capabilities)
self.assertEqual(self.mock_conn.method, 'POST')
self.assertIn('/authorize/v1/mcp_capabilities/evaluate', self.mock_conn.uri)


if __name__ == '__main__':
unittest.main()