Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 36 additions & 5 deletions packages/mcp/src/keycardai/mcp/server/auth/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ def get_token_verifier(
client_factory=self.client_factory,
)

def grant(self, resources: str | list[str]):
def grant(self, resources: str | list[str], user_identifier: Callable[..., str] | None = None):
"""Decorator for automatic delegated token exchange.

This decorator automates the OAuth token exchange process for accessing
Expand Down Expand Up @@ -478,6 +478,18 @@ async def my_async_tool(access_ctx: AccessContext, ctx: Context, user_id: str):
- Have a parameter annotated with `Context` type from MCP (e.g., `ctx: Context`)
- Can be either async or sync (the decorator handles both cases automatically)

When ``user_identifier`` is provided, the decorator uses impersonation
(substitute-user token exchange) instead of subject-token-based exchange.
The callable receives only the tool's **keyword** arguments (not positional
arguments) and should return the user identifier string::

@provider.grant(
"https://graph.microsoft.com",
user_identifier=lambda **kw: kw["user_email"],
)
async def get_calendar(access_ctx: AccessContext, ctx: Context, user_email: str):
token = access_ctx.access("https://graph.microsoft.com").access_token

Error handling:
- Sets error state in AccessContext if token exchange fails
- Preserves original function signature and behavior
Expand Down Expand Up @@ -643,27 +655,46 @@ async def wrapper(*args, **kwargs) -> Any:
_resource_list = (
[resources] if isinstance(resources, str) else resources
)

# Resolve user identifier for impersonation if callback provided
_resolved_user_id: str | None = None
if user_identifier is not None:
try:
_resolved_user_id = user_identifier(**kwargs)
except Exception as e:
_set_error({
"message": "Failed to resolve user_identifier from tool arguments.",
"raw_error": str(e),
}, None, _access_ctx)
return await _call_func(_is_async_func, func, *args, **kwargs)

_access_tokens = {}
for resource in _resource_list:
try:
# Prepare token exchange request using application identity provider
if self.application_credential:
if _resolved_user_id is not None:
# Impersonation path: use substitute-user token exchange
_token_response = await _client.impersonate(
user_identifier=_resolved_user_id,
resource=resource,
)
elif self.application_credential:
# Prepare token exchange request using application identity provider
_token_exchange_request = await self.application_credential.prepare_token_exchange_request(
client=_client,
subject_token=_keycardai_auth_info["access_token"],
resource=resource,
auth_info=_keycardai_auth_info,
)
_token_response = await _client.exchange_token(_token_exchange_request)
else:
# Basic token exchange without client authentication
_token_exchange_request = TokenExchangeRequest(
subject_token=_keycardai_auth_info["access_token"],
resource=resource,
subject_token_type="urn:ietf:params:oauth:token-type:access_token",
)
_token_response = await _client.exchange_token(_token_exchange_request)

# Execute token exchange
_token_response = await _client.exchange_token(_token_exchange_request)
_access_tokens[resource] = _token_response
except Exception as e:
_error_dict: dict[str, str] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Keycard zone URL (from Zone Settings in Keycard Console)
ZONE_URL=https://example.keycard.cloud

# Landing Page app: public client, used for browser-based authorization
LANDING_PAGE_CLIENT_ID=landing-page

# Background Agent app: confidential client, used for offline impersonation
AGENT_CLIENT_ID=background-agent
AGENT_CLIENT_SECRET=your-client-secret-here
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.env
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.12
133 changes: 133 additions & 0 deletions packages/oauth/examples/impersonation_token_exchange/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Impersonation Token Exchange Example

Demonstrates user impersonation via OAuth 2.0 token exchange (RFC 8693) using the Keycard OAuth SDK. Users grant access through a landing page by authenticating with their identity provider and approving resource access. A background agent then obtains resource tokens on their behalf, without the user being present.

## How It Works

### Landing page (one-time, interactive)

The landing page is a web application where users sign in and grant the background agent permission to act on their behalf.

1. Serves a local web page
2. The user clicks "Continue with Keycard" and authenticates with their identity provider
3. The user grants access to the requested resources
4. Keycard creates a delegated grant for each resource

The same landing page can be used again when new resources need to be authorized.

### Background agent (repeatable, offline)

The background agent is a confidential client (e.g. `client_secret_basic` or workload identity).

1. The agent authenticates and requests a token for a specific user and resource via `client.impersonate()`
2. Keycard validates the delegated grant and issues a scoped, short-lived resource token

No browser, no user interaction. Impersonation is forbidden by default. An administrator must explicitly allow specific applications to impersonate.

## Prerequisites

- Python 3.10+
- [uv](https://docs.astral.sh/uv/) installed
- Access to Keycard Console and a Keycard zone

## Configuration

Copy `.env.example` to `.env`:

```bash
cp .env.example .env
```

Set up the following in Keycard Console:

1. **Set `ZONE_URL`** in `.env` to your Keycard zone URL from the Zone Settings.
2. **Create a provider** (e.g. `https://github.com` as the identity provider).
3. **Create a resource** (e.g. `https://api.github.com`) and link it to the provider.
4. **Create a Landing Page application**
- Public credential, identifier: `landing-page`
- Redirect URI: `http://localhost/callback`
- Add the resource as a dependency
5. **Create a Background Agent application**
- Password credential, identifier: `background-agent`
- Set `AGENT_CLIENT_SECRET` in `.env`.
6. **Add a policy enabling impersonation** in Keycard Console. To allow the Background Agent app to impersonate a specific user:
```
permit (
principal is Keycard::Application,
action,
resource
)
when {
principal.identifier == "background-agent" &&
context.impersonate == true &&
context has subject &&
context.subject.identifier == "user@example.com"
};
```


## Usage

### Step 1: Install dependencies

```bash
uv sync
```

### Step 2: Landing page (interactive, one-time)

Starts the landing page where users sign in and grant the background agent access to resources. In production, this would be deployed as a hosted web application. Resources are determined by the application's dependencies configured in Keycard Console.

```bash
uv run python landing_page.py --port 3000
```

Example output:

```
═══ Landing Page ═══
Auth: PKCE (no secret)
Listening: http://localhost:3000

Landing page running at http://localhost:3000
Press Ctrl+C to stop.
```

Open `http://localhost:3000` in a browser and click "Continue with Keycard".

### Step 3: Get the user identifier from Keycard Console

After the user signs in, find their identifier in Keycard Console under the Users section. The Provider can be configured to map claims (e.g. `email`, `sub`) to the user identifier on creation. The identifier can also be changed from Keycard Console at any time.

### Step 4: Run background agent (offline, repeatable)

The background agent obtains a resource token for the user without any browser interaction.

```bash
uv run python background_agent.py \
--user-identifier user@example.com \
--resource https://api.github.com
```

Example output:

```
═══ Background Agent ═══
Auth: client_credentials
On behalf of: user@example.com
Access resource: https://api.github.com

Access Token: eyJhbG...
Token Type: Bearer
Expires In: 3600s
```

If the policy does not allow impersonation, you will see an error:

```
Error: OAuth error: access_denied - Access denied by policy. Policy set: <policy-set-id>. Policy set version: <policy-set-version>. Determining policies: default-user-grants.
```

### Step 5: Verify in audit logs

In Keycard Console, navigate to Audit Logs to see the user authorization and the credential issued to the background agent on behalf of the user.
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
#!/usr/bin/env python3
"""Background Agent app.

Authenticates with client credentials and obtains a resource-specific access
token on behalf of a user, without the user being present. The user must have
previously granted access through the landing page.
"""

import argparse
import os
import sys
from typing import NoReturn

from dotenv import load_dotenv

from keycardai.oauth import Client
from keycardai.oauth.exceptions import OAuthHttpError, OAuthProtocolError
from keycardai.oauth.http.auth import BasicAuth
from keycardai.oauth.types.models import ClientConfig


def run_background_agent(
zone_url: str,
client_id: str,
client_secret: str,
user_identifier: str,
resource: str,
) -> None:
"""Impersonate a user and print the resulting resource token.

Args:
zone_url: Keycard zone URL for metadata discovery.
client_id: Confidential client ID.
client_secret: Confidential client secret.
user_identifier: User identifier (e.g. email, oid).
resource: Target resource URI.
"""
try:
with Client(
base_url=zone_url,
auth=BasicAuth(client_id, client_secret),
config=ClientConfig(
enable_metadata_discovery=True,
auto_register_client=False,
),
) as client:
response = client.impersonate(
user_identifier=user_identifier,
resource=resource,
)

print(f"Access Token: {response.access_token[:6]}...")
print(f"Token Type: {response.token_type}")
if response.expires_in:
print(f"Expires In: {response.expires_in}s")
if response.scope:
print(f"Scope: {' '.join(response.scope)}")

except OAuthProtocolError as e:
desc = f" - {e.error_description}" if e.error_description else ""
raise SystemExit(f"Error: OAuth error: {e.error}{desc}") from None
except OAuthHttpError as e:
raise SystemExit(f"Error: HTTP {e.status_code}: {e.response_body}") from None


# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------

def _error_exit(message: str) -> NoReturn:
print(f"Error: {message}", file=sys.stderr)
sys.exit(1)


def main() -> None:
load_dotenv()

parser = argparse.ArgumentParser(
description="Background Agent: obtain a resource token on behalf of a user (offline)",
)
parser.add_argument(
"--zone-url",
default=os.getenv("ZONE_URL"),
help="Keycard zone URL (env: ZONE_URL)",
)
parser.add_argument(
"--client-id",
default=os.getenv("AGENT_CLIENT_ID"),
help="Confidential client ID (env: AGENT_CLIENT_ID)",
)
parser.add_argument(
"--client-secret",
default=os.getenv("AGENT_CLIENT_SECRET"),
help="Confidential client secret (env: AGENT_CLIENT_SECRET)",
)
parser.add_argument(
"--user-identifier",
help="User identifier for impersonation",
)
parser.add_argument(
"--resource",
help="Resource URI to get a token for",
)

args = parser.parse_args()

if not args.zone_url:
_error_exit("--zone-url is required (or set ZONE_URL)")
if not args.client_id:
_error_exit("--client-id is required (or set AGENT_CLIENT_ID)")
if not args.client_secret:
_error_exit("--client-secret is required (or set AGENT_CLIENT_SECRET)")
if not args.user_identifier:
_error_exit("--user-identifier is required")
if not args.resource:
_error_exit("--resource is required")

print("═══ Background Agent ═══")
print(" Auth: client_credentials")
print(f" On behalf of: {args.user_identifier}")
print(f" Access resource: {args.resource}")
print()

run_background_agent(
zone_url=args.zone_url,
client_id=args.client_id,
client_secret=args.client_secret,
user_identifier=args.user_identifier,
resource=args.resource,
)


if __name__ == "__main__":
main()
Loading
Loading