From e3045cb30ab6e75d83c85bbf5a41e42f57476e80 Mon Sep 17 00:00:00 2001 From: Dhruv Madhok Date: Tue, 26 May 2026 13:23:47 -0700 Subject: [PATCH 1/3] feat(AGX1-272): dual-write agent_api_keys to spark-authz behind FGAC_AGENT_API_KEYS_DUAL_WRITE flag Mirrors the AGX1-274 task dual-write pattern (PR #246) for agent_api_keys. - Adds creator_user_id / creator_service_account_id / spark_authz_zedtoken columns to agent_api_keys, with CHECK constraint and concurrent indexes. - On create, when FGAC_AGENT_API_KEYS_DUAL_WRITE is enabled for the caller's account, calls authorization_service.grant(AgentexResource.api_key(id)) BEFORE the Postgres write. Grant failure aborts the create. - On delete, best-effort revoke after the Postgres delete. Failures are logged but do not block the delete. - Adds AgentexResourceType.api_key and AgentexResource.api_key(...) factory. - Creates src/utils/feature_flags.py with both FGAC_TASKS_DUAL_WRITE and FGAC_AGENT_API_KEYS_DUAL_WRITE (file does not exist on main yet; if PR #246 lands first this becomes a rebase concern). Structural divergence from tasks: agent_api_keys have no service layer, so the dual-write logic lives in AgentAPIKeysUseCase rather than a separate service. This keeps the call site simple and avoids inventing a new layer. Route layer (read-side auth checks) is out of scope; that's PR B (AGX1-273). agentex-auth spark_mapping.py update is a sibling-repo concern. Co-Authored-By: Claude Opus 4.7 --- ...i_key_creator_and_zedtoken_b2c84edb77d6.py | 48 +++ .../database/migrations/migration_history.txt | 3 +- agentex/src/adapters/orm.py | 3 + agentex/src/api/routes/agent_api_keys.py | 15 +- .../src/api/schemas/authorization_types.py | 9 + agentex/src/domain/entities/agent_api_keys.py | 12 + .../use_cases/agent_api_keys_use_case.py | 137 +++++++- agentex/src/utils/feature_flags.py | 30 ++ .../fixtures/integration_client.py | 10 +- .../tests/integration/services/__init__.py | 0 .../test_agent_api_key_service_dual_write.py | 329 ++++++++++++++++++ .../test_agents_api_keys_use_case.py | 9 +- 12 files changed, 593 insertions(+), 12 deletions(-) create mode 100644 agentex/database/migrations/alembic/versions/2026_05_26_1200_add_agent_api_key_creator_and_zedtoken_b2c84edb77d6.py create mode 100644 agentex/src/utils/feature_flags.py create mode 100644 agentex/tests/integration/services/__init__.py create mode 100644 agentex/tests/integration/services/test_agent_api_key_service_dual_write.py diff --git a/agentex/database/migrations/alembic/versions/2026_05_26_1200_add_agent_api_key_creator_and_zedtoken_b2c84edb77d6.py b/agentex/database/migrations/alembic/versions/2026_05_26_1200_add_agent_api_key_creator_and_zedtoken_b2c84edb77d6.py new file mode 100644 index 00000000..609ccb6a --- /dev/null +++ b/agentex/database/migrations/alembic/versions/2026_05_26_1200_add_agent_api_key_creator_and_zedtoken_b2c84edb77d6.py @@ -0,0 +1,48 @@ +"""add_agent_api_key_creator_and_zedtoken + +Revision ID: b2c84edb77d6 +Revises: 6c942325c828 +Create Date: 2026-05-26 12:00:00.000000 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'b2c84edb77d6' +down_revision: Union[str, None] = '6c942325c828' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column('agent_api_keys', sa.Column('creator_user_id', sa.String(), nullable=True)) + op.add_column('agent_api_keys', sa.Column('creator_service_account_id', sa.String(), nullable=True)) + op.add_column('agent_api_keys', sa.Column('spark_authz_zedtoken', sa.Text(), nullable=True)) + with op.get_context().autocommit_block(): + op.execute( + "CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_agent_api_keys_creator_user_id " + "ON agent_api_keys (creator_user_id)" + ) + op.execute( + "CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_agent_api_keys_creator_service_account_id " + "ON agent_api_keys (creator_service_account_id)" + ) + op.create_check_constraint( + 'ck_agent_api_keys_one_creator', + 'agent_api_keys', + '(creator_user_id IS NULL) OR (creator_service_account_id IS NULL)', + ) + + +def downgrade() -> None: + op.drop_constraint('ck_agent_api_keys_one_creator', 'agent_api_keys', type_='check') + with op.get_context().autocommit_block(): + op.execute("DROP INDEX CONCURRENTLY IF EXISTS ix_agent_api_keys_creator_service_account_id") + op.execute("DROP INDEX CONCURRENTLY IF EXISTS ix_agent_api_keys_creator_user_id") + op.drop_column('agent_api_keys', 'spark_authz_zedtoken') + op.drop_column('agent_api_keys', 'creator_service_account_id') + op.drop_column('agent_api_keys', 'creator_user_id') diff --git a/agentex/database/migrations/migration_history.txt b/agentex/database/migrations/migration_history.txt index b18d86d8..ce5c5693 100644 --- a/agentex/database/migrations/migration_history.txt +++ b/agentex/database/migrations/migration_history.txt @@ -1,4 +1,5 @@ -a9959ebcbe98 -> 6c942325c828 (head), adding task cleaned at +6c942325c828 -> b2c84edb77d6 (head), add_agent_api_key_creator_and_zedtoken +a9959ebcbe98 -> 6c942325c828, adding task cleaned at e9c4ff9e6542 -> a9959ebcbe98, finalize_spans_task_id 9ff3ee32c81b -> e9c4ff9e6542, add_tasks_metadata_gin_index 57c5ed4f59ae -> 9ff3ee32c81b, uppercase deployment status enum labels diff --git a/agentex/src/adapters/orm.py b/agentex/src/adapters/orm.py index 42a66c1a..37b5f0b1 100644 --- a/agentex/src/adapters/orm.py +++ b/agentex/src/adapters/orm.py @@ -181,6 +181,9 @@ class AgentAPIKeyORM(BaseORM): name = Column(String(256), nullable=False, index=True) api_key_type = Column(SQLAlchemyEnum(AgentAPIKeyType), nullable=False) api_key = Column(String, nullable=False) + creator_user_id = Column(String, nullable=True, index=True) + creator_service_account_id = Column(String, nullable=True, index=True) + spark_authz_zedtoken = Column(Text, nullable=True) # Indexes for efficient querying __table_args__ = ( diff --git a/agentex/src/api/routes/agent_api_keys.py b/agentex/src/api/routes/agent_api_keys.py index dec3e00f..e041531f 100644 --- a/agentex/src/api/routes/agent_api_keys.py +++ b/agentex/src/api/routes/agent_api_keys.py @@ -8,6 +8,7 @@ CreateAPIKeyResponse, ) from src.domain.entities.agent_api_keys import AgentAPIKeyType +from src.domain.services.authorization_service import DAuthorizationService from src.domain.use_cases.agent_api_keys_use_case import DAgentAPIKeysUseCase from src.domain.use_cases.agents_use_case import DAgentsUseCase from src.utils.logging import make_logger @@ -28,6 +29,7 @@ async def create_api_key( request: CreateAPIKeyRequest, agent_api_key_use_case: DAgentAPIKeysUseCase, agent_use_case: DAgentsUseCase, + authorization_service: DAuthorizationService, ) -> CreateAPIKeyResponse: if not request.agent_id and not request.agent_name: raise HTTPException( @@ -52,11 +54,13 @@ async def create_api_key( raise HTTPException(status_code=409, detail=error_msg) new_api_key = request.api_key or secrets.token_hex(32) + account_id = getattr(authorization_service.principal_context, "account_id", None) agent_api_key_entity = await agent_api_key_use_case.create( agent_id=agent.id, api_key=str(new_api_key), name=request.name, api_key_type=request.api_key_type, + account_id=account_id, ) return CreateAPIKeyResponse( id=agent_api_key_entity.id, @@ -161,8 +165,10 @@ async def get_agent_api_key( async def delete_agent_api_key( id: str, agent_api_key_use_case: DAgentAPIKeysUseCase, + authorization_service: DAuthorizationService, ) -> str: - await agent_api_key_use_case.delete(id=id) + account_id = getattr(authorization_service.principal_context, "account_id", None) + await agent_api_key_use_case.delete(id=id, account_id=account_id) return f"Agent API key with ID {id} deleted" @@ -176,6 +182,7 @@ async def delete_agent_api_key_by_name( api_key_name: str, agent_api_key_use_case: DAgentAPIKeysUseCase, agent_use_case: DAgentsUseCase, + authorization_service: DAuthorizationService, agent_id: str | None = None, agent_name: str | None = None, api_key_type: AgentAPIKeyType = AgentAPIKeyType.EXTERNAL, @@ -191,8 +198,12 @@ async def delete_agent_api_key_by_name( detail="Only one of 'agent_id' or 'agent_name' should be provided to delete an agent api_key.", ) agent = await agent_use_case.get(id=agent_id, name=agent_name) + account_id = getattr(authorization_service.principal_context, "account_id", None) await agent_api_key_use_case.delete_by_agent_id_and_key_name( - agent_id=agent.id, key_name=api_key_name, api_key_type=api_key_type + agent_id=agent.id, + key_name=api_key_name, + api_key_type=api_key_type, + account_id=account_id, ) return f"Agent api_key '{api_key_name}' deleted" diff --git a/agentex/src/api/schemas/authorization_types.py b/agentex/src/api/schemas/authorization_types.py index 585fa3a3..7e98fdae 100644 --- a/agentex/src/api/schemas/authorization_types.py +++ b/agentex/src/api/schemas/authorization_types.py @@ -14,6 +14,7 @@ class AuthorizedOperationType(StrEnum): class AgentexResourceType(StrEnum): agent = "agent" task = "task" + api_key = "api_key" # Resources that inherit permissions from their parent task @@ -37,6 +38,10 @@ def agent(cls, selector: str) -> "AgentexResource": def task(cls, selector: str) -> "AgentexResource": return cls(type=AgentexResourceType.task, selector=selector) + @classmethod + def api_key(cls, selector: str) -> "AgentexResource": + return cls(type=AgentexResourceType.api_key, selector=selector) + class AgentexResourceOptionalSelector(BaseModel): type: AgentexResourceType @@ -49,3 +54,7 @@ def agent(cls, selector: str | None = None) -> "AgentexResourceOptionalSelector" @classmethod def task(cls, selector: str | None = None) -> "AgentexResourceOptionalSelector": return cls(type=AgentexResourceType.task, selector=selector) + + @classmethod + def api_key(cls, selector: str | None = None) -> "AgentexResourceOptionalSelector": + return cls(type=AgentexResourceType.api_key, selector=selector) diff --git a/agentex/src/domain/entities/agent_api_keys.py b/agentex/src/domain/entities/agent_api_keys.py index 27300175..95cd71ed 100644 --- a/agentex/src/domain/entities/agent_api_keys.py +++ b/agentex/src/domain/entities/agent_api_keys.py @@ -25,3 +25,15 @@ class AgentAPIKeyEntity(BaseModel): description="The type of the API key (either internal or external)", ) api_key: str = Field(..., description="The API key") + creator_user_id: str | None = Field( + None, + description="Identity ID of the user who created this API key (granted as FGAC owner)", + ) + creator_service_account_id: str | None = Field( + None, + description="Service identity ID of the service account that created this API key", + ) + spark_authz_zedtoken: str | None = Field( + None, + description="ZedToken from the Spark AuthZ grant for new-write isolation", + ) diff --git a/agentex/src/domain/use_cases/agent_api_keys_use_case.py b/agentex/src/domain/use_cases/agent_api_keys_use_case.py index bf455aee..8b01167c 100644 --- a/agentex/src/domain/use_cases/agent_api_keys_use_case.py +++ b/agentex/src/domain/use_cases/agent_api_keys_use_case.py @@ -13,6 +13,7 @@ ) from src.adapters.crud_store.exceptions import ItemDoesNotExist from src.api.middleware_utils import get_request_headers_to_forward, verify_auth_gateway +from src.api.schemas.authorization_types import AgentexResource from src.config.dependencies import ( DHttpxClient, resolve_environment_variable_dependency, @@ -27,6 +28,8 @@ DAgentAPIKeyRepository, ) from src.domain.repositories.agent_repository import DAgentRepository +from src.domain.services.authorization_service import DAuthorizationService +from src.utils.feature_flags import DFeatureFlagProvider, FeatureFlagName from src.utils.ids import orm_id from src.utils.logging import make_logger @@ -39,10 +42,14 @@ def __init__( agent_api_key_repository: DAgentAPIKeyRepository, agent_repository: DAgentRepository, client: DHttpxClient, + authorization_service: DAuthorizationService, + feature_flags: DFeatureFlagProvider, ): self.agent_api_key_repo = agent_api_key_repository self.agent_repo = agent_repository self.client = client + self.authorization_service = authorization_service + self.feature_flags = feature_flags self.auth_gateway_enabled = bool( resolve_environment_variable_dependency(EnvVarKeys.AGENTEX_AUTH_URL) ) @@ -76,6 +83,7 @@ async def create( agent_id: str, api_key_type: AgentAPIKeyType, api_key: str, + account_id: str | None = None, ) -> AgentAPIKeyEntity: agent = await self.get_agent(agent_id=agent_id) if not agent: @@ -83,17 +91,107 @@ async def create( status_code=404, detail=f"Agent ID {agent_id} not found.", ) + + principal_context = self.authorization_service.principal_context + creator_user_id = getattr(principal_context, "user_id", None) + creator_service_account_id = getattr( + principal_context, "service_account_id", None + ) + + api_key_id = orm_id() + zedtoken: str | None = None + + if self.feature_flags.is_enabled( + FeatureFlagName.FGAC_AGENT_API_KEYS_DUAL_WRITE, account_id + ): + zedtoken = await self._register_api_key_in_spark_authz( + api_key_id=api_key_id, + agent_id=agent.id, + account_id=account_id, + creator_user_id=creator_user_id, + creator_service_account_id=creator_service_account_id, + ) + # TODO: encrypt API key before storing it # Initialize a new agent api_key agent_api_key = AgentAPIKeyEntity( - id=orm_id(), + id=api_key_id, name=name, agent_id=agent.id, api_key_type=api_key_type, api_key=api_key, + creator_user_id=creator_user_id, + creator_service_account_id=creator_service_account_id, + spark_authz_zedtoken=zedtoken, ) return await self.agent_api_key_repo.create(item=agent_api_key) + async def _register_api_key_in_spark_authz( + self, + *, + api_key_id: str, + agent_id: str, + account_id: str | None, + creator_user_id: str | None, + creator_service_account_id: str | None, + ) -> str | None: + """Register a new agent_api_key in Spark AuthZ with creator as owner. + + Called BEFORE the Postgres write — a failure raises and prevents the + row from being persisted, so there is no compensating action to take. + Mirrors the dual-write pattern used for tasks (AGX1-274). + + The current ``Provider.spark`` adapter returns ``{}`` from ``grant``; + no ZedToken is surfaced today, so we always return ``None`` for the + new-write-isolation column. A follow-up will plumb the token through + once the adapter exposes it. + + Note: the ``agent_api_key`` SpiceDB schema has a ``parent_agent`` + relation that read/delete permissions cascade through. The current + ``AuthorizationGateway.grant`` signature does not accept a parent + relation — the agentex-auth adapter is expected to set + ``parent_agent`` based on the resource shape. This is the same + gap Asher's task PR has and is tracked as a follow-up. + """ + if creator_user_id is None and creator_service_account_id is None: + logger.warning( + "Skipping Spark AuthZ api_key registration: no creator resolvable", + extra={ + "api_key_id": api_key_id, + "agent_id": agent_id, + "account_id": account_id, + }, + ) + return None + await self.authorization_service.grant( + resource=AgentexResource.api_key(api_key_id), + ) + return None + + async def _deregister_api_key_from_spark_authz( + self, *, api_key_id: str, account_id: str | None + ) -> None: + """Best-effort revocation of an api_key's Spark AuthZ tuples on delete. + + Only invoked when the FGAC_AGENT_API_KEYS_DUAL_WRITE flag is enabled + for the caller's account. Failures are logged but do not block the + delete. + """ + if not self.feature_flags.is_enabled( + FeatureFlagName.FGAC_AGENT_API_KEYS_DUAL_WRITE, account_id + ): + return + try: + await self.authorization_service.revoke( + resource=AgentexResource.api_key(api_key_id), + ) + except Exception: + logger.warning( + "Spark AuthZ revoke failed for agent_api_key", + extra={"api_key_id": api_key_id, "account_id": account_id}, + exc_info=True, + ) + async def get(self, id: str) -> AgentAPIKeyEntity: return await self.agent_api_key_repo.get(id=id) @@ -123,22 +221,47 @@ async def get_external_by_agent_id_and_key( agent_id=agent_id, api_key=api_key ) - async def delete(self, id: str) -> None: - return await self.agent_api_key_repo.delete(id=id) + async def delete(self, id: str, account_id: str | None = None) -> None: + await self.agent_api_key_repo.delete(id=id) + await self._deregister_api_key_from_spark_authz( + api_key_id=id, account_id=account_id + ) async def delete_by_agent_id_and_key_name( - self, agent_id: str, key_name: str, api_key_type: AgentAPIKeyType + self, + agent_id: str, + key_name: str, + api_key_type: AgentAPIKeyType, + account_id: str | None = None, ) -> None: - return await self.agent_api_key_repo.delete_by_agent_id_and_key_name( + existing = await self.agent_api_key_repo.get_by_agent_id_and_name( + agent_id=agent_id, name=key_name, api_key_type=api_key_type + ) + await self.agent_api_key_repo.delete_by_agent_id_and_key_name( agent_id=agent_id, key_name=key_name, api_key_type=api_key_type ) + if existing is not None: + await self._deregister_api_key_from_spark_authz( + api_key_id=existing.id, account_id=account_id + ) async def delete_by_agent_name_and_key_name( - self, agent_name: str, key_name: str, api_key_type: AgentAPIKeyType + self, + agent_name: str, + key_name: str, + api_key_type: AgentAPIKeyType, + account_id: str | None = None, ) -> None: - return await self.agent_api_key_repo.delete_by_agent_name_and_key_name( + existing = await self.agent_api_key_repo.get_by_agent_name_and_key_name( + agent_name, key_name, api_key_type + ) + await self.agent_api_key_repo.delete_by_agent_name_and_key_name( agent_name=agent_name, key_name=key_name, api_key_type=api_key_type ) + if existing is not None: + await self._deregister_api_key_from_spark_authz( + api_key_id=existing.id, account_id=account_id + ) async def list( self, agent_id: str, limit: int, page_number: int diff --git a/agentex/src/utils/feature_flags.py b/agentex/src/utils/feature_flags.py new file mode 100644 index 00000000..a8d9d661 --- /dev/null +++ b/agentex/src/utils/feature_flags.py @@ -0,0 +1,30 @@ +import os +from enum import StrEnum +from typing import Annotated + +from fastapi import Depends + + +class FeatureFlagName(StrEnum): + FGAC_TASKS = "fgac-tasks" + FGAC_TASKS_DUAL_WRITE = "fgac-tasks-dual-write" + FGAC_AGENT_API_KEYS_DUAL_WRITE = "fgac-agent-api-keys-dual-write" + + +class FeatureFlagProvider: + """Per-account feature flag provider. + + v1: env-var allowlist (per-account, comma-separated). The env var name is + derived from the flag name, e.g. ``FGAC_AGENT_API_KEYS_DUAL_WRITE_ACCOUNTS``. + A follow-up will swap this for LaunchDarkly with an account_id context. + """ + + def is_enabled(self, name: FeatureFlagName, account_id: str | None) -> bool: + if not account_id: + return False + env_key = f"{name.value.upper().replace('-', '_')}_ACCOUNTS" + allowed = os.environ.get(env_key, "") + return account_id in {a.strip() for a in allowed.split(",") if a.strip()} + + +DFeatureFlagProvider = Annotated[FeatureFlagProvider, Depends(FeatureFlagProvider)] diff --git a/agentex/tests/integration/fixtures/integration_client.py b/agentex/tests/integration/fixtures/integration_client.py index f1396d57..218a60e4 100644 --- a/agentex/tests/integration/fixtures/integration_client.py +++ b/agentex/tests/integration/fixtures/integration_client.py @@ -6,7 +6,7 @@ import asyncio import os import uuid -from unittest.mock import AsyncMock +from unittest.mock import AsyncMock, Mock import pymongo import pytest @@ -398,10 +398,18 @@ def create_agents_use_case(): ) def create_agent_api_keys_use_case(): + from src.utils.feature_flags import FeatureFlagProvider + + noop_authorization_service = Mock() + noop_authorization_service.principal_context = None + noop_authorization_service.grant = AsyncMock(return_value={}) + noop_authorization_service.revoke = AsyncMock(return_value=None) return AgentAPIKeysUseCase( agent_api_key_repository=isolated_repositories["agent_api_key_repository"], agent_repository=isolated_repositories["agent_repository"], client=isolated_api_key_http_client, # Use mock client for forwarding requests + authorization_service=noop_authorization_service, + feature_flags=FeatureFlagProvider(), ) def create_deployment_history_use_case(): diff --git a/agentex/tests/integration/services/__init__.py b/agentex/tests/integration/services/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/agentex/tests/integration/services/test_agent_api_key_service_dual_write.py b/agentex/tests/integration/services/test_agent_api_key_service_dual_write.py new file mode 100644 index 00000000..89cdcc66 --- /dev/null +++ b/agentex/tests/integration/services/test_agent_api_key_service_dual_write.py @@ -0,0 +1,329 @@ +"""Integration tests for AgentAPIKeysUseCase dual-write to Spark AuthZ. + +These cover the AGX1-272 dual-write path: + +- Flag OFF: ``authorization_service.grant`` is NOT called and the api_key is + written to the repository with creator metadata populated from the + principal context. +- Flag ON: ``grant`` is called with ``AgentexResource.api_key()`` and the + row is written. +- Delete deregisters: ``revoke`` is called when ``delete`` runs under the flag. +- Spark failure prevents row: when ``grant`` raises, the api_key is NOT + persisted. +- Revoke failure does not block delete: when ``revoke`` raises, the DB + delete still completes and the failure is logged. +- No creator → no grant: if neither user_id nor service_account_id is + resolvable, the dual-write is a no-op (logged) and the row still lands. + +The tests intentionally mock the repository, authorization service, agent +repository, and HTTP client. The behaviour under test is the call sequencing +inside ``AgentAPIKeysUseCase`` — not Postgres or Spark itself. + +Note on structural divergence from the task PR (AGX1-274): tasks live behind +``AgentTaskService``; agent_api_keys have no service layer, so the dual-write +logic is colocated in ``AgentAPIKeysUseCase``. Mirrors the spirit of Asher's +PR rather than the exact layering. +""" + +from __future__ import annotations + +from types import SimpleNamespace +from unittest.mock import AsyncMock, Mock + +import pytest +from src.api.schemas.authorization_types import AgentexResource, AgentexResourceType +from src.domain.entities.agent_api_keys import AgentAPIKeyEntity, AgentAPIKeyType +from src.domain.entities.agents import ACPType, AgentEntity, AgentStatus +from src.domain.use_cases.agent_api_keys_use_case import AgentAPIKeysUseCase +from src.utils.feature_flags import FeatureFlagProvider +from src.utils.ids import orm_id + + +def _principal(user_id: str | None, account_id: str | None) -> SimpleNamespace: + """Minimal stand-in for AgentexAuthPrincipalContext.""" + return SimpleNamespace( + user_id=user_id, service_account_id=None, account_id=account_id + ) + + +def _agent() -> AgentEntity: + agent_id = orm_id() + return AgentEntity( + id=agent_id, + name=f"agent-{agent_id[:8]}", + description="dual-write test agent", + status=AgentStatus.READY, + acp_type=ACPType.SYNC, + acp_url="http://test-acp", + ) + + +def _build_use_case( + *, + flag_accounts: str, + principal: SimpleNamespace | None, + grant: AsyncMock | None = None, + revoke: AsyncMock | None = None, + agent: AgentEntity | None = None, + create_raises: Exception | None = None, + monkeypatch: pytest.MonkeyPatch, +) -> tuple[AgentAPIKeysUseCase, Mock, AsyncMock, AsyncMock]: + monkeypatch.setenv("FGAC_AGENT_API_KEYS_DUAL_WRITE_ACCOUNTS", flag_accounts) + + sample_agent = agent or _agent() + + agent_repository = Mock() + agent_repository.get = AsyncMock(return_value=sample_agent) + + agent_api_key_repository = Mock() + if create_raises is None: + agent_api_key_repository.create = AsyncMock(side_effect=lambda item: item) + else: + agent_api_key_repository.create = AsyncMock(side_effect=create_raises) + agent_api_key_repository.delete = AsyncMock(return_value=None) + agent_api_key_repository.get_by_agent_id_and_name = AsyncMock(return_value=None) + agent_api_key_repository.get_by_agent_name_and_key_name = AsyncMock( + return_value=None + ) + agent_api_key_repository.delete_by_agent_id_and_key_name = AsyncMock( + return_value=None + ) + agent_api_key_repository.delete_by_agent_name_and_key_name = AsyncMock( + return_value=None + ) + + authorization_service = Mock() + authorization_service.principal_context = principal + authorization_service.grant = grant or AsyncMock(return_value={}) + authorization_service.revoke = revoke or AsyncMock(return_value=None) + + feature_flags = FeatureFlagProvider() + + # Patch env var lookup inside UseCase __init__ so we don't depend on real + # env configuration to instantiate. + monkeypatch.setenv("AGENTEX_AUTH_URL", "") + monkeypatch.setenv("ENVIRONMENT", "test") + monkeypatch.setenv("WEBHOOK_REQUEST_TIMEOUT", "10") + + use_case = AgentAPIKeysUseCase( + agent_api_key_repository=agent_api_key_repository, + agent_repository=agent_repository, + client=Mock(), + authorization_service=authorization_service, + feature_flags=feature_flags, + ) + return ( + use_case, + agent_api_key_repository, + authorization_service.grant, + authorization_service.revoke, + ) + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_create_api_key_skips_grant_when_flag_off( + monkeypatch: pytest.MonkeyPatch, +) -> None: + agent = _agent() + use_case, repo, grant, _ = _build_use_case( + flag_accounts="", + principal=_principal(user_id="user-A", account_id="acct-1"), + agent=agent, + monkeypatch=monkeypatch, + ) + + api_key = await use_case.create( + name="k1", + agent_id=agent.id, + api_key_type=AgentAPIKeyType.EXTERNAL, + api_key="secret", + account_id="acct-1", + ) + + grant.assert_not_called() + repo.create.assert_awaited_once() + assert api_key.creator_user_id == "user-A" + assert api_key.creator_service_account_id is None + assert api_key.spark_authz_zedtoken is None + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_create_api_key_calls_grant_when_flag_on( + monkeypatch: pytest.MonkeyPatch, +) -> None: + agent = _agent() + use_case, repo, grant, _ = _build_use_case( + flag_accounts="acct-1", + principal=_principal(user_id="user-A", account_id="acct-1"), + agent=agent, + monkeypatch=monkeypatch, + ) + + api_key = await use_case.create( + name="k1", + agent_id=agent.id, + api_key_type=AgentAPIKeyType.EXTERNAL, + api_key="secret", + account_id="acct-1", + ) + + grant.assert_awaited_once() + granted_resource: AgentexResource = grant.await_args.kwargs["resource"] + assert granted_resource.type == AgentexResourceType.api_key + assert granted_resource.selector == api_key.id + repo.create.assert_awaited_once() + assert api_key.creator_user_id == "user-A" + # Provider.spark.grant returns {} today — no zedtoken yet. + assert api_key.spark_authz_zedtoken is None + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_delete_api_key_calls_revoke_when_flag_on( + monkeypatch: pytest.MonkeyPatch, +) -> None: + use_case, repo, _, revoke = _build_use_case( + flag_accounts="acct-1", + principal=_principal(user_id="user-A", account_id="acct-1"), + monkeypatch=monkeypatch, + ) + + api_key_id = orm_id() + await use_case.delete(id=api_key_id, account_id="acct-1") + + repo.delete.assert_awaited_once_with(id=api_key_id) + revoke.assert_awaited_once() + revoked_resource: AgentexResource = revoke.await_args.kwargs["resource"] + assert revoked_resource.type == AgentexResourceType.api_key + assert revoked_resource.selector == api_key_id + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_delete_api_key_skips_revoke_when_flag_off( + monkeypatch: pytest.MonkeyPatch, +) -> None: + use_case, repo, _, revoke = _build_use_case( + flag_accounts="", + principal=_principal(user_id="user-A", account_id="acct-1"), + monkeypatch=monkeypatch, + ) + + await use_case.delete(id=orm_id(), account_id="acct-1") + + repo.delete.assert_awaited_once() + revoke.assert_not_called() + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_create_api_key_grant_failure_prevents_db_row( + monkeypatch: pytest.MonkeyPatch, +) -> None: + grant = AsyncMock(side_effect=RuntimeError("spark unavailable")) + agent = _agent() + use_case, repo, _, _ = _build_use_case( + flag_accounts="acct-1", + principal=_principal(user_id="user-A", account_id="acct-1"), + grant=grant, + agent=agent, + monkeypatch=monkeypatch, + ) + + with pytest.raises(RuntimeError, match="spark unavailable"): + await use_case.create( + name="k1", + agent_id=agent.id, + api_key_type=AgentAPIKeyType.EXTERNAL, + api_key="secret", + account_id="acct-1", + ) + + repo.create.assert_not_awaited() + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_delete_api_key_revoke_failure_does_not_block_delete( + monkeypatch: pytest.MonkeyPatch, +) -> None: + revoke = AsyncMock(side_effect=RuntimeError("spark unavailable")) + use_case, repo, _, revoke_ref = _build_use_case( + flag_accounts="acct-1", + principal=_principal(user_id="user-A", account_id="acct-1"), + revoke=revoke, + monkeypatch=monkeypatch, + ) + + # Should NOT raise. + await use_case.delete(id=orm_id(), account_id="acct-1") + + repo.delete.assert_awaited_once() + revoke_ref.assert_awaited_once() + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_create_api_key_skips_grant_when_no_creator_resolvable( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """If neither user_id nor service_account_id is available on the principal, + the dual-write is a no-op (logged) and the row still lands without a tuple.""" + agent = _agent() + use_case, repo, grant, _ = _build_use_case( + flag_accounts="acct-1", + principal=_principal(user_id=None, account_id="acct-1"), + agent=agent, + monkeypatch=monkeypatch, + ) + + api_key = await use_case.create( + name="k1", + agent_id=agent.id, + api_key_type=AgentAPIKeyType.EXTERNAL, + api_key="secret", + account_id="acct-1", + ) + + grant.assert_not_called() + repo.create.assert_awaited_once() + assert api_key.creator_user_id is None + assert api_key.creator_service_account_id is None + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_delete_by_agent_id_and_key_name_revokes_existing( + monkeypatch: pytest.MonkeyPatch, +) -> None: + agent = _agent() + existing_id = orm_id() + use_case, repo, _, revoke = _build_use_case( + flag_accounts="acct-1", + principal=_principal(user_id="user-A", account_id="acct-1"), + agent=agent, + monkeypatch=monkeypatch, + ) + repo.get_by_agent_id_and_name = AsyncMock( + return_value=AgentAPIKeyEntity( + id=existing_id, + agent_id=agent.id, + name="k1", + api_key_type=AgentAPIKeyType.EXTERNAL, + api_key="secret", + ) + ) + + await use_case.delete_by_agent_id_and_key_name( + agent_id=agent.id, + key_name="k1", + api_key_type=AgentAPIKeyType.EXTERNAL, + account_id="acct-1", + ) + + repo.delete_by_agent_id_and_key_name.assert_awaited_once() + revoke.assert_awaited_once() + revoked_resource: AgentexResource = revoke.await_args.kwargs["resource"] + assert revoked_resource.selector == existing_id diff --git a/agentex/tests/unit/use_cases/test_agents_api_keys_use_case.py b/agentex/tests/unit/use_cases/test_agents_api_keys_use_case.py index 27d3d4d9..c0abcc26 100644 --- a/agentex/tests/unit/use_cases/test_agents_api_keys_use_case.py +++ b/agentex/tests/unit/use_cases/test_agents_api_keys_use_case.py @@ -1,4 +1,4 @@ -from unittest.mock import AsyncMock +from unittest.mock import AsyncMock, Mock from uuid import uuid4 import pytest @@ -9,6 +9,7 @@ from src.domain.repositories.agent_api_key_repository import AgentAPIKeyRepository from src.domain.repositories.agent_repository import AgentRepository from src.domain.use_cases.agent_api_keys_use_case import AgentAPIKeysUseCase +from src.utils.feature_flags import FeatureFlagProvider @pytest.fixture @@ -35,10 +36,16 @@ def agent_api_keys_use_case( agent_api_key_repository, agent_repository, mock_http_client ): """Real AgentAPIKeysUseCase instance with real repositories""" + authorization_service = Mock() + authorization_service.principal_context = None + authorization_service.grant = AsyncMock(return_value={}) + authorization_service.revoke = AsyncMock(return_value=None) return AgentAPIKeysUseCase( agent_api_key_repository=agent_api_key_repository, agent_repository=agent_repository, client=mock_http_client, + authorization_service=authorization_service, + feature_flags=FeatureFlagProvider(), ) From e72df6849b35cf1ee9cca6202274da4c97dc017f Mon Sep 17 00:00:00 2001 From: Dhruv Madhok Date: Tue, 26 May 2026 15:06:21 -0700 Subject: [PATCH 2/3] feat: register_resource with parent edge for agent_api_keys dual-write MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the parent_agent cascade gap surfaced on scaleapi/agentex#354. The api_key dual-write (AGX1-272, PR #248) currently calls grant() which writes the owner edge in SpiceDB but NOT the parent_agent edge. The agent_api_key schema requires `read = ... & parent_agent->read & ...`, so every downstream read/update fails closed without that edge. This PR adds register_resource/deregister_resource (Port + adapter + service) and swaps the api_keys use case from grant→register_resource with parent=AgentexResource.agent(agent_id). Now the owner edge and parent_agent edge are written atomically. Stack: - scaleapi/scaleapi#144657 — sgp-authz 0.7.0 (parent_resource kwarg). - scaleapi/agentex#355 — agentex-auth Port + adapter + HTTP routes. - scaleapi/scale-agentex#248 — original AGX1-272 dual-write (this stacks on it). - THIS PR — extends #248 to use the parent-aware path. Changes: - Port: abstract register_resource(resource, parent=None) and deregister_resource(resource). - Adapter proxy: POST /v1/authz/register and /v1/authz/deregister. - Service: mirror existing grant/revoke pattern (principal_context override, _bypass support, parent in log line for cascade debugging). - Use case: swap grant→register_resource passing parent=agent; swap revoke→deregister_resource. except Exception wrappers preserved (fail-closed on register, best-effort on deregister). - Tests: rename mocks to register_resource/deregister_resource; assert the parent edge is passed correctly. Test plan: - pytest agentex/tests/integration/services/test_agent_api_key_service_dual_write.py → 8 / 8 pass. - New test ``test_create_api_key_calls_grant_when_flag_on`` asserts parent.type == AgentexResourceType.agent and parent.selector == agent.id. Other resource types' grant→register_resource swap is out of scope. Co-Authored-By: Claude Opus 4.7 --- .../adapter_agentex_authz_proxy.py | 28 +++++ agentex/src/adapters/authorization/port.py | 26 +++++ .../domain/services/authorization_service.py | 57 ++++++++++ .../use_cases/agent_api_keys_use_case.py | 70 ++++++++---- .../test_agent_api_key_service_dual_write.py | 100 ++++++++++-------- 5 files changed, 215 insertions(+), 66 deletions(-) diff --git a/agentex/src/adapters/authorization/adapter_agentex_authz_proxy.py b/agentex/src/adapters/authorization/adapter_agentex_authz_proxy.py index 0e44479d..c8ba4f3a 100644 --- a/agentex/src/adapters/authorization/adapter_agentex_authz_proxy.py +++ b/agentex/src/adapters/authorization/adapter_agentex_authz_proxy.py @@ -85,6 +85,34 @@ async def list_resources( ) return response["items"] + async def register_resource( + self, + principal: AgentexAuthPrincipalContext, + resource: AgentexResource, + parent: AgentexResource | None = None, + ) -> None: + payload = { + "principal": principal, + "resource": resource.model_dump(), + "parent": parent.model_dump() if parent is not None else None, + } + await HttpRequestHandler.post_with_error_handling( + self.agentex_auth_url, "/v1/authz/register", json=payload + ) + + async def deregister_resource( + self, + principal: AgentexAuthPrincipalContext, + resource: AgentexResource, + ) -> None: + payload = { + "principal": principal, + "resource": resource.model_dump(), + } + await HttpRequestHandler.post_with_error_handling( + self.agentex_auth_url, "/v1/authz/deregister", json=payload + ) + DAgentexAuthorization = Annotated[ AgentexAuthorizationProxy, Depends(AgentexAuthorizationProxy) diff --git a/agentex/src/adapters/authorization/port.py b/agentex/src/adapters/authorization/port.py index 80c05200..f4f33a0a 100644 --- a/agentex/src/adapters/authorization/port.py +++ b/agentex/src/adapters/authorization/port.py @@ -49,3 +49,29 @@ async def list_resources( filter_operation: AuthorizedOperationType = AuthorizedOperationType.read, ) -> Iterable[str]: """List resource_ids for a given principal""" + + @abstractmethod + async def register_resource( + self, + principal: PrincipalT, + resource: AgentexResource, + parent: AgentexResource | None = None, + ) -> None: + """Register a newly created resource in SpiceDB with the principal as + owner. Optionally writes a lifecycle parent edge. + + Use this on resource create instead of ``grant`` when the resource + type's SpiceDB definition has a parent relation that permission + checks cascade through (e.g. ``agent_api_key`` declares + ``parent_agent``). Without writing that edge here the cascade fails + closed. + """ + + @abstractmethod + async def deregister_resource( + self, + principal: PrincipalT, + resource: AgentexResource, + ) -> None: + """Deregister a deleted resource and all of its relationships + (owner, parent, grantees) in a single atomic call.""" diff --git a/agentex/src/domain/services/authorization_service.py b/agentex/src/domain/services/authorization_service.py index 8400603b..06125d57 100644 --- a/agentex/src/domain/services/authorization_service.py +++ b/agentex/src/domain/services/authorization_service.py @@ -188,5 +188,62 @@ async def list_resources( ) return result + async def register_resource( + self, + resource: AgentexResource, + parent: AgentexResource | None = None, + *, + principal_context=..., + ) -> None: + """Register a newly created resource with the principal as owner. + + Prefer this over ``grant`` when the resource's SpiceDB schema has + a parent relation that permissions cascade through (e.g. + ``agent_api_key`` declares ``parent_agent``). Pass ``parent`` to + link the child to its parent atomically; without it the cascade + fails closed. + """ + if self._bypass(): + logger.info(f"Authorization bypassed for register_resource on {resource}") + return None + + effective_principal = ( + principal_context + if principal_context is not ... + else self.principal_context + ) + logger.info( + "[authorization_service] Registering %s:%s for principal %s (parent=%s)", + resource.type, + resource.selector, + effective_principal, + f"{parent.type}:{parent.selector}" if parent is not None else None, + ) + await self.gateway.register_resource(effective_principal, resource, parent) + + async def deregister_resource( + self, + resource: AgentexResource, + *, + principal_context=..., + ) -> None: + """Deregister a deleted resource and all of its relationships.""" + if self._bypass(): + logger.info(f"Authorization bypassed for deregister_resource on {resource}") + return None + + effective_principal = ( + principal_context + if principal_context is not ... + else self.principal_context + ) + logger.info( + "[authorization_service] Deregistering %s:%s for principal %s", + resource.type, + resource.selector, + effective_principal, + ) + await self.gateway.deregister_resource(effective_principal, resource) + DAuthorizationService = Annotated[AuthorizationService, Depends(AuthorizationService)] diff --git a/agentex/src/domain/use_cases/agent_api_keys_use_case.py b/agentex/src/domain/use_cases/agent_api_keys_use_case.py index 8b01167c..0f898056 100644 --- a/agentex/src/domain/use_cases/agent_api_keys_use_case.py +++ b/agentex/src/domain/use_cases/agent_api_keys_use_case.py @@ -135,23 +135,24 @@ async def _register_api_key_in_spark_authz( creator_user_id: str | None, creator_service_account_id: str | None, ) -> str | None: - """Register a new agent_api_key in Spark AuthZ with creator as owner. + """Register a new agent_api_key in Spark AuthZ with creator as owner + AND the parent_agent edge to the owning agent. Called BEFORE the Postgres write — a failure raises and prevents the - row from being persisted, so there is no compensating action to take. - Mirrors the dual-write pattern used for tasks (AGX1-274). - - The current ``Provider.spark`` adapter returns ``{}`` from ``grant``; - no ZedToken is surfaced today, so we always return ``None`` for the - new-write-isolation column. A follow-up will plumb the token through + row from being persisted, so there is no compensating action. + + The ``agent_api_key`` SpiceDB schema has a ``parent_agent`` relation + that read/update/delete permissions cascade through: + ``permission read = internal_effective_viewer & parent_agent->read & + internal_tenant_gate``. We MUST write the parent edge here or every + downstream permission check fails closed. ``register_resource`` + (added in agentex-auth and sgp-authz 0.7.0) writes both the owner + edge and the parent edge atomically in one round-trip. + + The current ``register_resource`` returns ``None``; no ZedToken is + surfaced today, so we always return ``None`` for the + spark_authz_zedtoken column. A follow-up will plumb the token through once the adapter exposes it. - - Note: the ``agent_api_key`` SpiceDB schema has a ``parent_agent`` - relation that read/delete permissions cascade through. The current - ``AuthorizationGateway.grant`` signature does not accept a parent - relation — the agentex-auth adapter is expected to set - ``parent_agent`` based on the resource shape. This is the same - gap Asher's task PR has and is tracked as a follow-up. """ if creator_user_id is None and creator_service_account_id is None: logger.warning( @@ -163,16 +164,36 @@ async def _register_api_key_in_spark_authz( }, ) return None - await self.authorization_service.grant( - resource=AgentexResource.api_key(api_key_id), - ) + try: + await self.authorization_service.register_resource( + resource=AgentexResource.api_key(api_key_id), + parent=AgentexResource.agent(agent_id), + ) + except Exception as exc: + # Fail closed: log + re-raise so the Postgres row is never written. + # The dual-write contract requires the SpiceDB tuple (and parent + # edge) to exist before the row does. + logger.exception( + "Spark AuthZ register_resource failed for agent_api_key; aborting create", + extra={ + "api_key_id": api_key_id, + "agent_id": agent_id, + "account_id": account_id, + "creator_user_id": creator_user_id, + "creator_service_account_id": creator_service_account_id, + "error_type": type(exc).__name__, + }, + ) + raise return None async def _deregister_api_key_from_spark_authz( self, *, api_key_id: str, account_id: str | None ) -> None: - """Best-effort revocation of an api_key's Spark AuthZ tuples on delete. + """Best-effort deregistration of an api_key's Spark AuthZ tuples on delete. + ``deregister_resource`` removes the resource and all of its + relationships (owner, parent_agent, any grantees) atomically. Only invoked when the FGAC_AGENT_API_KEYS_DUAL_WRITE flag is enabled for the caller's account. Failures are logged but do not block the delete. @@ -182,13 +203,18 @@ async def _deregister_api_key_from_spark_authz( ): return try: - await self.authorization_service.revoke( + await self.authorization_service.deregister_resource( resource=AgentexResource.api_key(api_key_id), ) - except Exception: + except Exception as exc: + # Best-effort: log and continue. Postgres row already deleted. logger.warning( - "Spark AuthZ revoke failed for agent_api_key", - extra={"api_key_id": api_key_id, "account_id": account_id}, + "Spark AuthZ deregister failed for agent_api_key; tuple may be orphaned", + extra={ + "api_key_id": api_key_id, + "account_id": account_id, + "error_type": type(exc).__name__, + }, exc_info=True, ) diff --git a/agentex/tests/integration/services/test_agent_api_key_service_dual_write.py b/agentex/tests/integration/services/test_agent_api_key_service_dual_write.py index 89cdcc66..f29bed4f 100644 --- a/agentex/tests/integration/services/test_agent_api_key_service_dual_write.py +++ b/agentex/tests/integration/services/test_agent_api_key_service_dual_write.py @@ -2,17 +2,19 @@ These cover the AGX1-272 dual-write path: -- Flag OFF: ``authorization_service.grant`` is NOT called and the api_key is - written to the repository with creator metadata populated from the - principal context. -- Flag ON: ``grant`` is called with ``AgentexResource.api_key()`` and the - row is written. -- Delete deregisters: ``revoke`` is called when ``delete`` runs under the flag. -- Spark failure prevents row: when ``grant`` raises, the api_key is NOT - persisted. -- Revoke failure does not block delete: when ``revoke`` raises, the DB - delete still completes and the failure is logged. -- No creator → no grant: if neither user_id nor service_account_id is +- Flag OFF: ``authorization_service.register_resource`` is NOT called and + the api_key is written to the repository with creator metadata populated + from the principal context. +- Flag ON: ``register_resource`` is called with ``AgentexResource.api_key()`` + and ``parent=AgentexResource.agent()`` (the parent edge is + load-bearing for cascade), and the row is written. +- Delete deregisters: ``deregister_resource`` is called when ``delete`` runs + under the flag. +- Spark failure prevents row: when ``register_resource`` raises, the api_key + is NOT persisted. +- Deregister failure does not block delete: when ``deregister_resource`` + raises, the DB delete still completes and the failure is logged. +- No creator → no register: if neither user_id nor service_account_id is resolvable, the dual-write is a no-op (logged) and the row still lands. The tests intentionally mock the repository, authorization service, agent @@ -62,8 +64,8 @@ def _build_use_case( *, flag_accounts: str, principal: SimpleNamespace | None, - grant: AsyncMock | None = None, - revoke: AsyncMock | None = None, + register_resource: AsyncMock | None = None, + deregister_resource: AsyncMock | None = None, agent: AgentEntity | None = None, create_raises: Exception | None = None, monkeypatch: pytest.MonkeyPatch, @@ -94,8 +96,12 @@ def _build_use_case( authorization_service = Mock() authorization_service.principal_context = principal - authorization_service.grant = grant or AsyncMock(return_value={}) - authorization_service.revoke = revoke or AsyncMock(return_value=None) + authorization_service.register_resource = register_resource or AsyncMock( + return_value=None + ) + authorization_service.deregister_resource = deregister_resource or AsyncMock( + return_value=None + ) feature_flags = FeatureFlagProvider() @@ -115,8 +121,8 @@ def _build_use_case( return ( use_case, agent_api_key_repository, - authorization_service.grant, - authorization_service.revoke, + authorization_service.register_resource, + authorization_service.deregister_resource, ) @@ -126,7 +132,7 @@ async def test_create_api_key_skips_grant_when_flag_off( monkeypatch: pytest.MonkeyPatch, ) -> None: agent = _agent() - use_case, repo, grant, _ = _build_use_case( + use_case, repo, register, _ = _build_use_case( flag_accounts="", principal=_principal(user_id="user-A", account_id="acct-1"), agent=agent, @@ -141,7 +147,7 @@ async def test_create_api_key_skips_grant_when_flag_off( account_id="acct-1", ) - grant.assert_not_called() + register.assert_not_called() repo.create.assert_awaited_once() assert api_key.creator_user_id == "user-A" assert api_key.creator_service_account_id is None @@ -154,7 +160,7 @@ async def test_create_api_key_calls_grant_when_flag_on( monkeypatch: pytest.MonkeyPatch, ) -> None: agent = _agent() - use_case, repo, grant, _ = _build_use_case( + use_case, repo, register, _ = _build_use_case( flag_accounts="acct-1", principal=_principal(user_id="user-A", account_id="acct-1"), agent=agent, @@ -169,13 +175,19 @@ async def test_create_api_key_calls_grant_when_flag_on( account_id="acct-1", ) - grant.assert_awaited_once() - granted_resource: AgentexResource = grant.await_args.kwargs["resource"] - assert granted_resource.type == AgentexResourceType.api_key - assert granted_resource.selector == api_key.id + register.assert_awaited_once() + registered_resource: AgentexResource = register.await_args.kwargs["resource"] + assert registered_resource.type == AgentexResourceType.api_key + assert registered_resource.selector == api_key.id + # parent_agent edge is load-bearing — without it the SpiceDB cascade + # `read = ... & parent_agent->read & ...` fails closed for every reader. + registered_parent: AgentexResource = register.await_args.kwargs["parent"] + assert registered_parent is not None + assert registered_parent.type == AgentexResourceType.agent + assert registered_parent.selector == agent.id repo.create.assert_awaited_once() assert api_key.creator_user_id == "user-A" - # Provider.spark.grant returns {} today — no zedtoken yet. + # Provider.spark.register_resource returns None today — no zedtoken yet. assert api_key.spark_authz_zedtoken is None @@ -184,7 +196,7 @@ async def test_create_api_key_calls_grant_when_flag_on( async def test_delete_api_key_calls_revoke_when_flag_on( monkeypatch: pytest.MonkeyPatch, ) -> None: - use_case, repo, _, revoke = _build_use_case( + use_case, repo, _, deregister = _build_use_case( flag_accounts="acct-1", principal=_principal(user_id="user-A", account_id="acct-1"), monkeypatch=monkeypatch, @@ -194,10 +206,10 @@ async def test_delete_api_key_calls_revoke_when_flag_on( await use_case.delete(id=api_key_id, account_id="acct-1") repo.delete.assert_awaited_once_with(id=api_key_id) - revoke.assert_awaited_once() - revoked_resource: AgentexResource = revoke.await_args.kwargs["resource"] - assert revoked_resource.type == AgentexResourceType.api_key - assert revoked_resource.selector == api_key_id + deregister.assert_awaited_once() + deregistered_resource: AgentexResource = deregister.await_args.kwargs["resource"] + assert deregistered_resource.type == AgentexResourceType.api_key + assert deregistered_resource.selector == api_key_id @pytest.mark.asyncio @@ -205,7 +217,7 @@ async def test_delete_api_key_calls_revoke_when_flag_on( async def test_delete_api_key_skips_revoke_when_flag_off( monkeypatch: pytest.MonkeyPatch, ) -> None: - use_case, repo, _, revoke = _build_use_case( + use_case, repo, _, deregister = _build_use_case( flag_accounts="", principal=_principal(user_id="user-A", account_id="acct-1"), monkeypatch=monkeypatch, @@ -214,7 +226,7 @@ async def test_delete_api_key_skips_revoke_when_flag_off( await use_case.delete(id=orm_id(), account_id="acct-1") repo.delete.assert_awaited_once() - revoke.assert_not_called() + deregister.assert_not_called() @pytest.mark.asyncio @@ -222,12 +234,12 @@ async def test_delete_api_key_skips_revoke_when_flag_off( async def test_create_api_key_grant_failure_prevents_db_row( monkeypatch: pytest.MonkeyPatch, ) -> None: - grant = AsyncMock(side_effect=RuntimeError("spark unavailable")) + register_resource = AsyncMock(side_effect=RuntimeError("spark unavailable")) agent = _agent() use_case, repo, _, _ = _build_use_case( flag_accounts="acct-1", principal=_principal(user_id="user-A", account_id="acct-1"), - grant=grant, + register_resource=register_resource, agent=agent, monkeypatch=monkeypatch, ) @@ -249,11 +261,11 @@ async def test_create_api_key_grant_failure_prevents_db_row( async def test_delete_api_key_revoke_failure_does_not_block_delete( monkeypatch: pytest.MonkeyPatch, ) -> None: - revoke = AsyncMock(side_effect=RuntimeError("spark unavailable")) - use_case, repo, _, revoke_ref = _build_use_case( + deregister = AsyncMock(side_effect=RuntimeError("spark unavailable")) + use_case, repo, _, deregister_ref = _build_use_case( flag_accounts="acct-1", principal=_principal(user_id="user-A", account_id="acct-1"), - revoke=revoke, + deregister_resource=deregister, monkeypatch=monkeypatch, ) @@ -261,7 +273,7 @@ async def test_delete_api_key_revoke_failure_does_not_block_delete( await use_case.delete(id=orm_id(), account_id="acct-1") repo.delete.assert_awaited_once() - revoke_ref.assert_awaited_once() + deregister_ref.assert_awaited_once() @pytest.mark.asyncio @@ -272,7 +284,7 @@ async def test_create_api_key_skips_grant_when_no_creator_resolvable( """If neither user_id nor service_account_id is available on the principal, the dual-write is a no-op (logged) and the row still lands without a tuple.""" agent = _agent() - use_case, repo, grant, _ = _build_use_case( + use_case, repo, register, _ = _build_use_case( flag_accounts="acct-1", principal=_principal(user_id=None, account_id="acct-1"), agent=agent, @@ -287,7 +299,7 @@ async def test_create_api_key_skips_grant_when_no_creator_resolvable( account_id="acct-1", ) - grant.assert_not_called() + register.assert_not_called() repo.create.assert_awaited_once() assert api_key.creator_user_id is None assert api_key.creator_service_account_id is None @@ -300,7 +312,7 @@ async def test_delete_by_agent_id_and_key_name_revokes_existing( ) -> None: agent = _agent() existing_id = orm_id() - use_case, repo, _, revoke = _build_use_case( + use_case, repo, _, deregister = _build_use_case( flag_accounts="acct-1", principal=_principal(user_id="user-A", account_id="acct-1"), agent=agent, @@ -324,6 +336,6 @@ async def test_delete_by_agent_id_and_key_name_revokes_existing( ) repo.delete_by_agent_id_and_key_name.assert_awaited_once() - revoke.assert_awaited_once() - revoked_resource: AgentexResource = revoke.await_args.kwargs["resource"] - assert revoked_resource.selector == existing_id + deregister.assert_awaited_once() + deregistered_resource: AgentexResource = deregister.await_args.kwargs["resource"] + assert deregistered_resource.selector == existing_id From 1fd2cf966bc81130b0e0f9a7b8026e65c7d22b71 Mon Sep 17 00:00:00 2001 From: Dhruv Madhok Date: Thu, 28 May 2026 10:47:32 -0700 Subject: [PATCH 3/3] refactor(AGX1-272): query egp-api-backend for FGAC_AGENT_API_KEYS_DUAL_WRITE MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per team discussion: rather than maintain a parallel env-var flag system in scale-agentex, route api_key dual-write flag checks through egp-api-backend's existing flag service. One source of truth across services, single flip surface for ops, fewer per-env env-var allowlists to keep in sync. Changes: - EnvVarKeys.EGP_API_BACKEND_URL — new env var for the egp-api-backend base URL. Used by the new HTTP-backed flag provider. - FeatureFlagProvider rewritten as an HTTP client of egp-api-backend's GET /feature-flag/{id} endpoint: * Forwards x-api-key / x-user-id / x-service-account-id / x-selected-account-id from the caller's principal_context so the endpoint's REQUIRE_IDENTITY_AND_OPTIONAL_ACCOUNT policy admits the request. * Coerces the response's `value` field to bool. * Fails closed to False on any error (config missing, no identity, non-2xx, transport failure, JSON parse failure) — the legacy no-Spark code path is the safe default. * `is_enabled` is now async (HTTP call). Signature is `is_enabled(name, *, principal_context, account_id)`. - AgentAPIKeysUseCase: both call sites now await is_enabled and pass principal_context. _deregister grabs principal_context from self.authorization_service. - Test fixtures: mock FeatureFlagProvider directly (Mock with is_enabled = AsyncMock(return_value=flag_on)) so dual-write tests stay hermetic. The pre-existing FeatureFlagProvider() no-arg constructions in test_agents_api_keys_use_case.py and integration_client.py now pass egp_api_backend_url=None (provider returns False without it, matching the prior "flag never enabled in unit tests" behavior). Out of scope: - Migrating Asher's FGAC_TASKS_DUAL_WRITE flag check off env vars. That's task-team-owned and we leave their existing pattern alone per the team discussion (new-work-only). - Caching the flag response. Each is_enabled is a fresh HTTP call. Egp-api-backend's flag endpoint is fast and the caller paths are already crossing the network for the actual register/deregister, so one extra round-trip is acceptable for now. Add caching later if load profiling shows it matters. Test plan: - uv run pytest agentex/tests/integration/services/test_agent_api_key_service_dual_write.py — 8/8 pass. - Existing 4 unrelated test_agents_api_keys_use_case.py docker-fixture errors predate this commit (verified via `git stash`). Co-Authored-By: Claude Opus 4.7 --- agentex/src/config/environment_variables.py | 3 + .../use_cases/agent_api_keys_use_case.py | 12 +- agentex/src/utils/feature_flags.py | 136 ++++++++++++++++-- .../fixtures/integration_client.py | 2 +- .../test_agent_api_key_service_dual_write.py | 27 ++-- .../test_agents_api_keys_use_case.py | 2 +- 6 files changed, 152 insertions(+), 30 deletions(-) diff --git a/agentex/src/config/environment_variables.py b/agentex/src/config/environment_variables.py index 2d41740a..44edea64 100644 --- a/agentex/src/config/environment_variables.py +++ b/agentex/src/config/environment_variables.py @@ -44,6 +44,7 @@ class EnvVarKeys(str, Enum): REDIS_STREAM_TTL_SECONDS = "REDIS_STREAM_TTL_SECONDS" IMAGE_PULL_SECRET_NAME = "IMAGE_PULL_SECRET_NAME" AGENTEX_AUTH_URL = "AGENTEX_AUTH_URL" + EGP_API_BACKEND_URL = "EGP_API_BACKEND_URL" ALLOWED_ORIGINS = "ALLOWED_ORIGINS" DD_AGENT_HOST = "DD_AGENT_HOST" DD_STATSD_PORT = "DD_STATSD_PORT" @@ -100,6 +101,7 @@ class EnvironmentVariables(BaseModel): ) IMAGE_PULL_SECRET_NAME: str | None = None AGENTEX_AUTH_URL: str | None = None + EGP_API_BACKEND_URL: str | None = None ALLOWED_ORIGINS: str | None = None HTTPX_MAX_CONNECTIONS: int = 200 # Max total connections allowed HTTPX_MAX_KEEPALIVE_CONNECTIONS: int = 100 # Max connections to keep alive @@ -166,6 +168,7 @@ def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None: ), IMAGE_PULL_SECRET_NAME=os.environ.get(EnvVarKeys.IMAGE_PULL_SECRET_NAME), AGENTEX_AUTH_URL=os.environ.get(EnvVarKeys.AGENTEX_AUTH_URL), + EGP_API_BACKEND_URL=os.environ.get(EnvVarKeys.EGP_API_BACKEND_URL), ALLOWED_ORIGINS=os.environ.get(EnvVarKeys.ALLOWED_ORIGINS, "*"), DD_AGENT_HOST=os.environ.get(EnvVarKeys.DD_AGENT_HOST), DD_STATSD_PORT=os.environ.get(EnvVarKeys.DD_STATSD_PORT), diff --git a/agentex/src/domain/use_cases/agent_api_keys_use_case.py b/agentex/src/domain/use_cases/agent_api_keys_use_case.py index 0f898056..1267f070 100644 --- a/agentex/src/domain/use_cases/agent_api_keys_use_case.py +++ b/agentex/src/domain/use_cases/agent_api_keys_use_case.py @@ -101,8 +101,10 @@ async def create( api_key_id = orm_id() zedtoken: str | None = None - if self.feature_flags.is_enabled( - FeatureFlagName.FGAC_AGENT_API_KEYS_DUAL_WRITE, account_id + if await self.feature_flags.is_enabled( + FeatureFlagName.FGAC_AGENT_API_KEYS_DUAL_WRITE, + principal_context=principal_context, + account_id=account_id, ): zedtoken = await self._register_api_key_in_spark_authz( api_key_id=api_key_id, @@ -198,8 +200,10 @@ async def _deregister_api_key_from_spark_authz( for the caller's account. Failures are logged but do not block the delete. """ - if not self.feature_flags.is_enabled( - FeatureFlagName.FGAC_AGENT_API_KEYS_DUAL_WRITE, account_id + if not await self.feature_flags.is_enabled( + FeatureFlagName.FGAC_AGENT_API_KEYS_DUAL_WRITE, + principal_context=self.authorization_service.principal_context, + account_id=account_id, ): return try: diff --git a/agentex/src/utils/feature_flags.py b/agentex/src/utils/feature_flags.py index a8d9d661..03289a5c 100644 --- a/agentex/src/utils/feature_flags.py +++ b/agentex/src/utils/feature_flags.py @@ -1,9 +1,30 @@ -import os +"""Per-account feature flag provider. + +Queries egp-api-backend's ``GET /feature-flag/{id}`` endpoint with the +caller's identity headers. The endpoint evaluates the flag against the +account_id resolved from those headers and returns a ``FeatureFlag`` +payload whose ``value`` field is a typed flag value (bool for ``boolean`` +flags, etc.). + +Falls back to disabled (``False``) on any error so a transient +egp-api-backend outage doesn't break the dual-write path — the flag-off +behavior is the safe legacy path. +""" + +from __future__ import annotations + from enum import StrEnum -from typing import Annotated +from typing import Annotated, Any from fastapi import Depends +from src.config.dependencies import DEnvironmentVariable +from src.config.environment_variables import EnvVarKeys +from src.utils.cached_httpx_client import get_async_client +from src.utils.logging import make_logger + +logger = make_logger(__name__) + class FeatureFlagName(StrEnum): FGAC_TASKS = "fgac-tasks" @@ -12,19 +33,112 @@ class FeatureFlagName(StrEnum): class FeatureFlagProvider: - """Per-account feature flag provider. + """Per-account feature flag provider backed by egp-api-backend. - v1: env-var allowlist (per-account, comma-separated). The env var name is - derived from the flag name, e.g. ``FGAC_AGENT_API_KEYS_DUAL_WRITE_ACCOUNTS``. - A follow-up will swap this for LaunchDarkly with an account_id context. + Calls ``GET {EGP_API_BACKEND_URL}/feature-flag/{name}`` with the + caller's identity headers. The endpoint evaluates the flag against the + caller's account and returns a ``FeatureFlag`` with a ``value`` field. + For boolean flags this method coerces the value to ``bool``. + + Returns ``False`` (flag off) when: + - ``EGP_API_BACKEND_URL`` is not configured; + - the caller's principal has no usable identity headers; + - egp-api-backend returns a non-2xx response; + - any network or parsing error occurs. + + Fail-closed-to-disabled is intentional: the legacy code path is the + safe default if FGAC dual-write is unreachable. """ - def is_enabled(self, name: FeatureFlagName, account_id: str | None) -> bool: - if not account_id: + def __init__( + self, + egp_api_backend_url: DEnvironmentVariable(EnvVarKeys.EGP_API_BACKEND_URL), + ): + self.egp_api_backend_url = egp_api_backend_url + + async def is_enabled( + self, + name: FeatureFlagName, + *, + principal_context: Any, + account_id: str | None, + ) -> bool: + if not self.egp_api_backend_url: return False - env_key = f"{name.value.upper().replace('-', '_')}_ACCOUNTS" - allowed = os.environ.get(env_key, "") - return account_id in {a.strip() for a in allowed.split(",") if a.strip()} + + headers = self._principal_headers(principal_context, account_id) + if not headers: + return False + + url = f"{self.egp_api_backend_url.rstrip('/')}/feature-flag/{name.value}" + try: + client = get_async_client() + response = await client.get(url, headers=headers) + except Exception as exc: + logger.warning( + "Feature flag fetch failed; treating as disabled", + extra={ + "flag": name.value, + "account_id": account_id, + "error_type": type(exc).__name__, + }, + ) + return False + + if response.status_code != 200: + logger.warning( + "Feature flag non-2xx response; treating as disabled", + extra={ + "flag": name.value, + "account_id": account_id, + "status_code": response.status_code, + }, + ) + return False + + try: + payload = response.json() + value = payload.get("value") + except Exception: + logger.warning( + "Feature flag response not JSON-parseable; treating as disabled", + extra={"flag": name.value, "account_id": account_id}, + ) + return False + + return bool(value) + + @staticmethod + def _principal_headers( + principal_context: Any, account_id: str | None + ) -> dict[str, str]: + """Build identity headers from the caller's principal_context so + egp-api-backend's ``REQUIRE_IDENTITY_AND_OPTIONAL_ACCOUNT`` policy + admits the request. + + Returns ``{}`` when no usable identity is present — the caller + should treat that as flag-off (the legacy path is safe). + """ + if principal_context is None: + return {} + + api_key = getattr(principal_context, "api_key", None) + user_id = getattr(principal_context, "user_id", None) + service_account_id = getattr(principal_context, "service_account_id", None) + + if not api_key and not user_id and not service_account_id: + return {} + + headers: dict[str, str] = {} + if api_key: + headers["x-api-key"] = api_key + if user_id: + headers["x-user-id"] = user_id + if service_account_id: + headers["x-service-account-id"] = service_account_id + if account_id: + headers["x-selected-account-id"] = account_id + return headers DFeatureFlagProvider = Annotated[FeatureFlagProvider, Depends(FeatureFlagProvider)] diff --git a/agentex/tests/integration/fixtures/integration_client.py b/agentex/tests/integration/fixtures/integration_client.py index 218a60e4..296322da 100644 --- a/agentex/tests/integration/fixtures/integration_client.py +++ b/agentex/tests/integration/fixtures/integration_client.py @@ -409,7 +409,7 @@ def create_agent_api_keys_use_case(): agent_repository=isolated_repositories["agent_repository"], client=isolated_api_key_http_client, # Use mock client for forwarding requests authorization_service=noop_authorization_service, - feature_flags=FeatureFlagProvider(), + feature_flags=FeatureFlagProvider(egp_api_backend_url=None), ) def create_deployment_history_use_case(): diff --git a/agentex/tests/integration/services/test_agent_api_key_service_dual_write.py b/agentex/tests/integration/services/test_agent_api_key_service_dual_write.py index f29bed4f..26d22bc2 100644 --- a/agentex/tests/integration/services/test_agent_api_key_service_dual_write.py +++ b/agentex/tests/integration/services/test_agent_api_key_service_dual_write.py @@ -37,7 +37,6 @@ from src.domain.entities.agent_api_keys import AgentAPIKeyEntity, AgentAPIKeyType from src.domain.entities.agents import ACPType, AgentEntity, AgentStatus from src.domain.use_cases.agent_api_keys_use_case import AgentAPIKeysUseCase -from src.utils.feature_flags import FeatureFlagProvider from src.utils.ids import orm_id @@ -62,7 +61,7 @@ def _agent() -> AgentEntity: def _build_use_case( *, - flag_accounts: str, + flag_on: bool, principal: SimpleNamespace | None, register_resource: AsyncMock | None = None, deregister_resource: AsyncMock | None = None, @@ -70,8 +69,6 @@ def _build_use_case( create_raises: Exception | None = None, monkeypatch: pytest.MonkeyPatch, ) -> tuple[AgentAPIKeysUseCase, Mock, AsyncMock, AsyncMock]: - monkeypatch.setenv("FGAC_AGENT_API_KEYS_DUAL_WRITE_ACCOUNTS", flag_accounts) - sample_agent = agent or _agent() agent_repository = Mock() @@ -103,7 +100,11 @@ def _build_use_case( return_value=None ) - feature_flags = FeatureFlagProvider() + # FeatureFlagProvider normally calls egp-api-backend over HTTP. Mock it + # so tests are hermetic; behaviour under test is the use case's response + # to the flag value, not the provider's transport. + feature_flags = Mock() + feature_flags.is_enabled = AsyncMock(return_value=flag_on) # Patch env var lookup inside UseCase __init__ so we don't depend on real # env configuration to instantiate. @@ -133,7 +134,7 @@ async def test_create_api_key_skips_grant_when_flag_off( ) -> None: agent = _agent() use_case, repo, register, _ = _build_use_case( - flag_accounts="", + flag_on=False, principal=_principal(user_id="user-A", account_id="acct-1"), agent=agent, monkeypatch=monkeypatch, @@ -161,7 +162,7 @@ async def test_create_api_key_calls_grant_when_flag_on( ) -> None: agent = _agent() use_case, repo, register, _ = _build_use_case( - flag_accounts="acct-1", + flag_on=True, principal=_principal(user_id="user-A", account_id="acct-1"), agent=agent, monkeypatch=monkeypatch, @@ -197,7 +198,7 @@ async def test_delete_api_key_calls_revoke_when_flag_on( monkeypatch: pytest.MonkeyPatch, ) -> None: use_case, repo, _, deregister = _build_use_case( - flag_accounts="acct-1", + flag_on=True, principal=_principal(user_id="user-A", account_id="acct-1"), monkeypatch=monkeypatch, ) @@ -218,7 +219,7 @@ async def test_delete_api_key_skips_revoke_when_flag_off( monkeypatch: pytest.MonkeyPatch, ) -> None: use_case, repo, _, deregister = _build_use_case( - flag_accounts="", + flag_on=False, principal=_principal(user_id="user-A", account_id="acct-1"), monkeypatch=monkeypatch, ) @@ -237,7 +238,7 @@ async def test_create_api_key_grant_failure_prevents_db_row( register_resource = AsyncMock(side_effect=RuntimeError("spark unavailable")) agent = _agent() use_case, repo, _, _ = _build_use_case( - flag_accounts="acct-1", + flag_on=True, principal=_principal(user_id="user-A", account_id="acct-1"), register_resource=register_resource, agent=agent, @@ -263,7 +264,7 @@ async def test_delete_api_key_revoke_failure_does_not_block_delete( ) -> None: deregister = AsyncMock(side_effect=RuntimeError("spark unavailable")) use_case, repo, _, deregister_ref = _build_use_case( - flag_accounts="acct-1", + flag_on=True, principal=_principal(user_id="user-A", account_id="acct-1"), deregister_resource=deregister, monkeypatch=monkeypatch, @@ -285,7 +286,7 @@ async def test_create_api_key_skips_grant_when_no_creator_resolvable( the dual-write is a no-op (logged) and the row still lands without a tuple.""" agent = _agent() use_case, repo, register, _ = _build_use_case( - flag_accounts="acct-1", + flag_on=True, principal=_principal(user_id=None, account_id="acct-1"), agent=agent, monkeypatch=monkeypatch, @@ -313,7 +314,7 @@ async def test_delete_by_agent_id_and_key_name_revokes_existing( agent = _agent() existing_id = orm_id() use_case, repo, _, deregister = _build_use_case( - flag_accounts="acct-1", + flag_on=True, principal=_principal(user_id="user-A", account_id="acct-1"), agent=agent, monkeypatch=monkeypatch, diff --git a/agentex/tests/unit/use_cases/test_agents_api_keys_use_case.py b/agentex/tests/unit/use_cases/test_agents_api_keys_use_case.py index c0abcc26..b89165ae 100644 --- a/agentex/tests/unit/use_cases/test_agents_api_keys_use_case.py +++ b/agentex/tests/unit/use_cases/test_agents_api_keys_use_case.py @@ -45,7 +45,7 @@ def agent_api_keys_use_case( agent_repository=agent_repository, client=mock_http_client, authorization_service=authorization_service, - feature_flags=FeatureFlagProvider(), + feature_flags=FeatureFlagProvider(egp_api_backend_url=None), )