Skip to content

Commit 81fb093

Browse files
committed
feat: support for impersonation token exchange
1 parent 0d011da commit 81fb093

21 files changed

Lines changed: 2083 additions & 15 deletions

File tree

packages/mcp/src/keycardai/mcp/server/auth/provider.py

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ def get_token_verifier(
429429
client_factory=self.client_factory,
430430
)
431431

432-
def grant(self, resources: str | list[str]):
432+
def grant(self, resources: str | list[str], user_identifier: Callable[..., str] | None = None):
433433
"""Decorator for automatic delegated token exchange.
434434
435435
This decorator automates the OAuth token exchange process for accessing
@@ -478,6 +478,18 @@ async def my_async_tool(access_ctx: AccessContext, ctx: Context, user_id: str):
478478
- Have a parameter annotated with `Context` type from MCP (e.g., `ctx: Context`)
479479
- Can be either async or sync (the decorator handles both cases automatically)
480480
481+
When ``user_identifier`` is provided, the decorator uses impersonation
482+
(substitute-user token exchange) instead of subject-token-based exchange.
483+
The callable receives only the tool's **keyword** arguments (not positional
484+
arguments) and should return the user identifier string::
485+
486+
@provider.grant(
487+
"https://graph.microsoft.com",
488+
user_identifier=lambda **kw: kw["user_email"],
489+
)
490+
async def get_calendar(access_ctx: AccessContext, ctx: Context, user_email: str):
491+
token = access_ctx.access("https://graph.microsoft.com").access_token
492+
481493
Error handling:
482494
- Sets error state in AccessContext if token exchange fails
483495
- Preserves original function signature and behavior
@@ -643,27 +655,46 @@ async def wrapper(*args, **kwargs) -> Any:
643655
_resource_list = (
644656
[resources] if isinstance(resources, str) else resources
645657
)
658+
659+
# Resolve user identifier for impersonation if callback provided
660+
_resolved_user_id: str | None = None
661+
if user_identifier is not None:
662+
try:
663+
_resolved_user_id = user_identifier(**kwargs)
664+
except Exception as e:
665+
_set_error({
666+
"message": "Failed to resolve user_identifier from tool arguments.",
667+
"raw_error": str(e),
668+
}, None, _access_ctx)
669+
return await _call_func(_is_async_func, func, *args, **kwargs)
670+
646671
_access_tokens = {}
647672
for resource in _resource_list:
648673
try:
649-
# Prepare token exchange request using application identity provider
650-
if self.application_credential:
674+
if _resolved_user_id is not None:
675+
# Impersonation path: use substitute-user token exchange
676+
_token_response = await _client.impersonate(
677+
user_identifier=_resolved_user_id,
678+
resource=resource,
679+
)
680+
elif self.application_credential:
681+
# Prepare token exchange request using application identity provider
651682
_token_exchange_request = await self.application_credential.prepare_token_exchange_request(
652683
client=_client,
653684
subject_token=_keycardai_auth_info["access_token"],
654685
resource=resource,
655686
auth_info=_keycardai_auth_info,
656687
)
688+
_token_response = await _client.exchange_token(_token_exchange_request)
657689
else:
658690
# Basic token exchange without client authentication
659691
_token_exchange_request = TokenExchangeRequest(
660692
subject_token=_keycardai_auth_info["access_token"],
661693
resource=resource,
662694
subject_token_type="urn:ietf:params:oauth:token-type:access_token",
663695
)
696+
_token_response = await _client.exchange_token(_token_exchange_request)
664697

665-
# Execute token exchange
666-
_token_response = await _client.exchange_token(_token_exchange_request)
667698
_access_tokens[resource] = _token_response
668699
except Exception as e:
669700
_error_dict: dict[str, str] = {
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# Keycard zone URL
2+
ZONE_URL=https://your-zone-id.keycard.cloud
3+
4+
# Landing Page app: public client (no secret), used for browser-based authorization
5+
LANDING_PAGE_CLIENT_ID=your-landing-page-client-id
6+
7+
# Background Agent app: confidential client, used for offline impersonation
8+
AGENT_CLIENT_ID=your-agent-client-id
9+
AGENT_CLIENT_SECRET=your-agent-client-secret
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.env
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
3.12
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
# Impersonation Token Exchange Example
2+
3+
Demonstrates user impersonation via OAuth 2.0 token exchange (RFC 8693) using the Keycard OAuth SDK. A background agent obtains resource-specific access tokens on behalf of a user, without the user being present.
4+
5+
## How It Works
6+
7+
A landing page collects user consent. A background agent uses that consent to get resource tokens later.
8+
9+
### Landing page (one-time, interactive)
10+
11+
The landing page is a web application where users sign in and grant the background agent permission to act on their behalf.
12+
13+
1. Serves a local web page
14+
2. The user clicks "Continue with Keycard" and authenticates with their identity provider
15+
3. The user authorizes the requested resource dependencies
16+
4. Keycard stores a delegated grant for each resource
17+
18+
The same landing page can be used again when new resources need to be authorized.
19+
20+
### Background agent (repeatable, offline)
21+
22+
The background agent is a confidential client (e.g. `client_secret_basic` or workload identity).
23+
24+
1. The agent authenticates and requests a token for a specific user and resource via `client.impersonate()`
25+
2. Keycard validates the delegated grant and issues a scoped, short-lived resource token
26+
27+
No browser, no user interaction. Impersonation is forbidden by default. An administrator must explicitly grant permission to specific applications.
28+
29+
## Prerequisites
30+
31+
- Python 3.10+
32+
- A Keycard zone, set up in Console:
33+
1. **Create a provider.**
34+
2. **Create a resource** (e.g. `https://api.github.com`) and link it to the provider.
35+
3. **Create a "Landing Page" application**
36+
- Public credential (client_id, no secret)
37+
- Redirect URI: `http://localhost/callback`
38+
- Add the resource as a dependency
39+
4. **Create a "Background Agent" application**
40+
- Password credential (client_id + client_secret)
41+
- Add the resource as a dependency
42+
5. **Enable impersonation** by adding a policy in Console. For example, to allow the Background Agent app to impersonate a specific user:
43+
```
44+
permit (
45+
principal is Keycard::Application,
46+
action,
47+
resource
48+
)
49+
when {
50+
principal.identifier == "background-agent" &&
51+
context.impersonate == true &&
52+
context has subject &&
53+
context.subject.identifier == "user@example.com"
54+
};
55+
```
56+
57+
## Install
58+
59+
```bash
60+
uv sync
61+
```
62+
63+
## Configuration
64+
65+
Copy `.env.example` to `.env` and fill in your values:
66+
67+
```bash
68+
cp .env.example .env
69+
```
70+
71+
All parameters can also be passed as CLI flags (run `--help` on each script for details).
72+
73+
74+
| Variable | Description |
75+
|---|---|
76+
| `ZONE_URL` | Keycard zone URL |
77+
| `LANDING_PAGE_CLIENT_ID` | Public client ID for the Landing Page app |
78+
| `AGENT_CLIENT_ID` | Confidential client ID for the Background Agent app |
79+
| `AGENT_CLIENT_SECRET` | Confidential client secret for the Background Agent app |
80+
81+
82+
## Usage
83+
84+
### Step 1: Landing page (interactive, one-time)
85+
86+
Starts a local web server where the user can sign in and establish delegated grants. Resources are determined by the application's dependencies configured in Keycard Console.
87+
88+
```bash
89+
uv run python landing_page.py --port 3000
90+
```
91+
92+
Open `http://localhost:3000` in a browser and click "Continue with Keycard".
93+
94+
### Step 2: Get the user identifier from Console
95+
96+
After the user logs in, find their identifier in Keycard Console under the Users section. The Provider can be configured to map claims (e.g. `email`, `sub`) to the user identifier on creation. The identifier can also be changed from the Console at any time.
97+
98+
### Step 3: Run background agent (offline, repeatable)
99+
100+
The background agent obtains a resource token for the user without any browser interaction.
101+
102+
```bash
103+
uv run python background_agent.py \
104+
--user-identifier user@example.com \
105+
--resource https://api.github.com
106+
```
107+
108+
## Example Output
109+
110+
```
111+
$ uv run python background_agent.py \
112+
--user-identifier user@example.com \
113+
--resource https://api.github.com
114+
Access Token: eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...
115+
Token Type: Bearer
116+
Expires In: 3600s
117+
```
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
#!/usr/bin/env python3
2+
"""Background Agent app (confidential client).
3+
4+
Authenticates with client credentials and obtains a resource-specific access
5+
token on behalf of a user, without the user being present. The user must have
6+
previously established a delegated grant for the resource through the landing
7+
page.
8+
"""
9+
10+
import argparse
11+
import os
12+
import sys
13+
from typing import NoReturn
14+
15+
from dotenv import load_dotenv
16+
17+
from keycardai.oauth import Client
18+
from keycardai.oauth.exceptions import OAuthHttpError, OAuthProtocolError
19+
from keycardai.oauth.http.auth import BasicAuth
20+
from keycardai.oauth.types.models import ClientConfig
21+
22+
23+
def run_impersonate(
24+
zone_url: str,
25+
client_id: str,
26+
client_secret: str,
27+
user_identifier: str,
28+
resource: str,
29+
) -> None:
30+
"""Impersonate a user and print the resulting resource token.
31+
32+
Args:
33+
zone_url: Keycard zone URL for metadata discovery.
34+
client_id: Confidential client ID.
35+
client_secret: Confidential client secret.
36+
user_identifier: User identifier (e.g. email, oid).
37+
resource: Target resource URI.
38+
"""
39+
try:
40+
with Client(
41+
base_url=zone_url,
42+
auth=BasicAuth(client_id, client_secret),
43+
config=ClientConfig(
44+
enable_metadata_discovery=True,
45+
auto_register_client=False,
46+
),
47+
) as client:
48+
response = client.impersonate(
49+
user_identifier=user_identifier,
50+
resource=resource,
51+
)
52+
53+
print(f"Access Token: {response.access_token}")
54+
print(f"Token Type: {response.token_type}")
55+
if response.expires_in:
56+
print(f"Expires In: {response.expires_in}s")
57+
if response.scope:
58+
print(f"Scope: {' '.join(response.scope)}")
59+
60+
except OAuthProtocolError as e:
61+
desc = f" - {e.error_description}" if e.error_description else ""
62+
raise SystemExit(f"Error: OAuth error: {e.error}{desc}") from None
63+
except OAuthHttpError as e:
64+
raise SystemExit(f"Error: HTTP {e.status_code}: {e.response_body}") from None
65+
66+
67+
# ---------------------------------------------------------------------------
68+
# CLI entry point
69+
# ---------------------------------------------------------------------------
70+
71+
def _error_exit(message: str) -> NoReturn:
72+
print(f"Error: {message}", file=sys.stderr)
73+
sys.exit(1)
74+
75+
76+
def main() -> None:
77+
load_dotenv()
78+
79+
parser = argparse.ArgumentParser(
80+
description="Background Agent: obtain a resource token on behalf of a user (offline)",
81+
)
82+
parser.add_argument(
83+
"--zone-url",
84+
default=os.getenv("ZONE_URL"),
85+
help="Keycard zone URL (env: ZONE_URL)",
86+
)
87+
parser.add_argument(
88+
"--client-id",
89+
default=os.getenv("AGENT_CLIENT_ID"),
90+
help="Confidential client ID (env: AGENT_CLIENT_ID)",
91+
)
92+
parser.add_argument(
93+
"--client-secret",
94+
default=os.getenv("AGENT_CLIENT_SECRET"),
95+
help="Confidential client secret (env: AGENT_CLIENT_SECRET)",
96+
)
97+
parser.add_argument(
98+
"--user-identifier",
99+
help="User identifier for impersonation",
100+
)
101+
parser.add_argument(
102+
"--resource",
103+
help="Resource URI to get a token for",
104+
)
105+
106+
args = parser.parse_args()
107+
108+
if not args.zone_url:
109+
_error_exit("--zone-url is required (or set ZONE_URL)")
110+
if not args.client_id:
111+
_error_exit("--client-id is required (or set AGENT_CLIENT_ID)")
112+
if not args.client_secret:
113+
_error_exit("--client-secret is required (or set AGENT_CLIENT_SECRET)")
114+
if not args.user_identifier:
115+
_error_exit("--user-identifier is required")
116+
if not args.resource:
117+
_error_exit("--resource is required")
118+
119+
print("═══ Background Agent (confidential client) ═══")
120+
print(" Auth: client_credentials")
121+
print(f" On behalf of: {args.user_identifier}")
122+
print(f" Access resource: {args.resource}")
123+
print()
124+
125+
run_impersonate(
126+
zone_url=args.zone_url,
127+
client_id=args.client_id,
128+
client_secret=args.client_secret,
129+
user_identifier=args.user_identifier,
130+
resource=args.resource,
131+
)
132+
133+
134+
if __name__ == "__main__":
135+
main()

0 commit comments

Comments
 (0)