Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions agentex/src/api/routes/agent_api_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,20 @@
CreateAPIKeyRequest,
CreateAPIKeyResponse,
)
from src.api.schemas.authorization_types import (
AgentexResource,
AgentexResourceType,
AuthorizedOperationType,
)
from src.domain.entities.agent_api_keys import AgentAPIKeyType
from src.domain.services.authorization_service import DAuthorizationService
from src.domain.use_cases.agent_api_keys_use_case import DAgentAPIKeysUseCase
from src.domain.use_cases.agents_use_case import DAgentsUseCase
from src.utils.agent_api_key_authorization import _check_api_key_or_collapse_to_404
from src.utils.authorization_shortcuts import (
DAuthorizedId,
DAuthorizedResourceIds,
)
from src.utils.logging import make_logger

logger = make_logger(__name__)
Expand Down Expand Up @@ -42,6 +52,16 @@ async def create_api_key(
detail="Only one of 'agent_id' or 'agent_name' should be provided to create an agent api_key.",
)
agent = await agent_use_case.get(id=request.agent_id, name=request.agent_name)

# Parent-agent FGAC: only callers with ``update`` on the parent agent may
# mint a new api_key under it. The new api_key has no resource row yet, so
# this is the only enforcement surface at create time. SpiceDB cannot
# transitively gate on a not-yet-created child.
await authorization_service.check(
resource=AgentexResource.agent(agent.id),
operation=AuthorizedOperationType.update,
)

# Check if external agent API key already exists for this name and agent ID
existing_api_key = await agent_api_key_use_case.get_by_agent_id_and_name(
agent_id=agent.id,
Expand Down Expand Up @@ -81,6 +101,9 @@ async def create_api_key(
async def list_agent_api_keys(
agent_api_key_use_case: DAgentAPIKeysUseCase,
agent_use_case: DAgentsUseCase,
authorized_api_key_ids: DAuthorizedResourceIds(
AgentexResourceType.api_key, AuthorizedOperationType.read
),
agent_id: str | None = None,
agent_name: str | None = None,
limit: int = Query(default=50, ge=1, le=1000),
Expand All @@ -100,6 +123,11 @@ async def list_agent_api_keys(
agent_api_key_entities = await agent_api_key_use_case.list(
agent_id=agent.id, limit=limit, page_number=page_number
)
# Filter to the set of api_keys the caller can read. ``None`` means the
# authz backend declined to enumerate (e.g. bypass) — pass through.
if authorized_api_key_ids is not None:
allowed = set(authorized_api_key_ids)
agent_api_key_entities = [e for e in agent_api_key_entities if e.id in allowed]
return [
AgentAPIKey.model_validate(agent_api_key_entity)
for agent_api_key_entity in agent_api_key_entities
Expand All @@ -116,6 +144,7 @@ async def get_agent_api_key_by_name(
name: str,
agent_api_key_use_case: DAgentAPIKeysUseCase,
agent_use_case: DAgentsUseCase,
authorization_service: DAuthorizationService,
agent_id: str | None = None,
agent_name: str | None = None,
api_key_type: AgentAPIKeyType = AgentAPIKeyType.EXTERNAL,
Expand All @@ -139,6 +168,15 @@ async def get_agent_api_key_by_name(
status_code=404,
detail=f"Agent api_key '{name}' not found for agent ID {agent.id}",
)
# Name routes for api_key don't fit ``DAuthorizedName`` (the lookup key is
# ``(agent_id, name, api_key_type)``, not a single globally-unique name
# path param). Apply the collapse helper explicitly so present-but-denied
# surfaces as 404, mirroring tasks' name route in PR #249.
await _check_api_key_or_collapse_to_404(
authorization_service,
agent_api_key_entity.id,
AuthorizedOperationType.read,
)
return AgentAPIKey.model_validate(agent_api_key_entity)


Expand All @@ -151,6 +189,11 @@ async def get_agent_api_key_by_name(
async def get_agent_api_key(
id: str,
agent_api_key_use_case: DAgentAPIKeysUseCase,
_authorized_id: DAuthorizedId(
AgentexResourceType.api_key,
AuthorizedOperationType.read,
param_name="id",
),
) -> AgentAPIKey:
agent_api_key_entity = await agent_api_key_use_case.get(id=id)
return AgentAPIKey.model_validate(agent_api_key_entity)
Expand All @@ -166,7 +209,18 @@ async def delete_agent_api_key(
id: str,
agent_api_key_use_case: DAgentAPIKeysUseCase,
authorization_service: DAuthorizationService,
_authorized_id: DAuthorizedId(
AgentexResourceType.api_key,
AuthorizedOperationType.delete,
param_name="id",
),
) -> str:
# Two-factor mutation: SpiceDB's ``api_key.delete`` permission depends
# transitively on ``parent_agent->update`` per the schema, so the
# ``DAuthorizedId(..., delete)`` dep above enforces both api_key.delete
# and the parent-agent.update factor. No explicit second
# ``authorization_service.check`` on the parent agent is needed (matches
# Asher's PR #249 approach for tasks).
account_id = getattr(authorization_service.principal_context, "account_id", None)
await agent_api_key_use_case.delete(id=id, account_id=account_id)
return f"Agent API key with ID {id} deleted"
Expand Down Expand Up @@ -198,6 +252,25 @@ async def delete_agent_api_key_by_name(
detail="Only one of 'agent_id' or 'agent_name' should be provided to delete an agent api_key.",
)
agent = await agent_use_case.get(id=agent_id, name=agent_name)

# Resolve name -> id, then run the collapse-wrapped delete check before
# mutating. Two-factor (api_key.delete & parent_agent->update) is
# transitively enforced by the SpiceDB schema definition of
# ``api_key.delete``; we don't repeat a parent check here.
existing = await agent_api_key_use_case.get_by_agent_id_and_name(
agent_id=agent.id, name=api_key_name, api_key_type=api_key_type
)
if not existing:
raise HTTPException(
status_code=404,
detail=f"Agent api_key '{api_key_name}' not found for agent ID {agent.id}",
)
await _check_api_key_or_collapse_to_404(
authorization_service,
existing.id,
AuthorizedOperationType.delete,
)

account_id = getattr(authorization_service.principal_context, "account_id", None)
await agent_api_key_use_case.delete_by_agent_id_and_key_name(
agent_id=agent.id,
Expand Down
42 changes: 42 additions & 0 deletions agentex/src/utils/agent_api_key_authorization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from src.adapters.authorization.exceptions import AuthorizationError
from src.adapters.crud_store.exceptions import ItemDoesNotExist
from src.api.schemas.authorization_types import (
AgentexResource,
AuthorizedOperationType,
)


async def _check_api_key_or_collapse_to_404(
authorization,
api_key_id: str,
operation: AuthorizedOperationType,
) -> None:
"""Issue a check on an api_key resource. On any denial, surface 404 — even
when the api_key exists.

Same rationale as :func:`src.utils.task_authorization._check_task_or_collapse_to_404`:
the deny-reason discriminator needed to safely return 403 (in-tenant but
lacking the operation) vs 404 (cross-tenant or absent) is not available
here. ``api_key.name`` is unique only per (agent_id, name, api_key_type) —
not globally — but the existence-leak risk via the name routes is still
real: a caller probing ``GET /agent_api_keys/name/{name}?agent_id=...``
against another tenant's agent would otherwise be able to distinguish
"agent has a key with that name (403)" from "no such key (404)".

Until api_keys carry tenant scope at the data layer (or agentex-auth's
deny distinguishes "cross-tenant" from "in-tenant lacking permission"),
the safer default is to collapse both into 404. Trade-off: a user with
``read`` but not ``delete`` permission on an in-tenant api_key sees 404
on delete attempts instead of 403. UX regression for in-tenant permission
granularity, but eliminates the cross-tenant existence leak.

TODO(AGX1-290): Restore the 403/404 split for same-tenant calls once
api_keys carry tenant/account_id scope at the data layer.
"""
try:
await authorization.check(
resource=AgentexResource.api_key(api_key_id),
operation=operation,
)
except AuthorizationError:
raise ItemDoesNotExist(f"Item with id '{api_key_id}' does not exist.") from None
7 changes: 7 additions & 0 deletions agentex/src/utils/authorization_shortcuts.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from src.domain.repositories.task_repository import DTaskRepository
from src.domain.repositories.task_state_repository import DTaskStateRepository
from src.domain.services.authorization_service import DAuthorizationService
from src.utils.agent_api_key_authorization import _check_api_key_or_collapse_to_404


async def _get_parent_task_id(
Expand Down Expand Up @@ -55,6 +56,12 @@ async def _ensure_authorized_id(
resource=AgentexResource.task(task_id),
operation=operation,
)
elif resource_type == AgentexResourceType.api_key:
# Collapse api_key denials to 404 so name/id probes can't
# distinguish "present in another tenant" from "absent".
await _check_api_key_or_collapse_to_404(
authorization, resource_id, operation
)
else:
# For direct resources, check directly
await authorization.check(
Expand Down
Loading
Loading