diff --git a/agentex/database/migrations/alembic/versions/2026_05_26_1200_add_agent_api_key_creator_and_zedtoken_b2c84edb77d6.py b/agentex/database/migrations/alembic/versions/2026_05_26_1200_add_agent_api_key_creator_and_zedtoken_b2c84edb77d6.py new file mode 100644 index 00000000..609ccb6a --- /dev/null +++ b/agentex/database/migrations/alembic/versions/2026_05_26_1200_add_agent_api_key_creator_and_zedtoken_b2c84edb77d6.py @@ -0,0 +1,48 @@ +"""add_agent_api_key_creator_and_zedtoken + +Revision ID: b2c84edb77d6 +Revises: 6c942325c828 +Create Date: 2026-05-26 12:00:00.000000 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'b2c84edb77d6' +down_revision: Union[str, None] = '6c942325c828' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column('agent_api_keys', sa.Column('creator_user_id', sa.String(), nullable=True)) + op.add_column('agent_api_keys', sa.Column('creator_service_account_id', sa.String(), nullable=True)) + op.add_column('agent_api_keys', sa.Column('spark_authz_zedtoken', sa.Text(), nullable=True)) + with op.get_context().autocommit_block(): + op.execute( + "CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_agent_api_keys_creator_user_id " + "ON agent_api_keys (creator_user_id)" + ) + op.execute( + "CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_agent_api_keys_creator_service_account_id " + "ON agent_api_keys (creator_service_account_id)" + ) + op.create_check_constraint( + 'ck_agent_api_keys_one_creator', + 'agent_api_keys', + '(creator_user_id IS NULL) OR (creator_service_account_id IS NULL)', + ) + + +def downgrade() -> None: + op.drop_constraint('ck_agent_api_keys_one_creator', 'agent_api_keys', type_='check') + with op.get_context().autocommit_block(): + op.execute("DROP INDEX CONCURRENTLY IF EXISTS ix_agent_api_keys_creator_service_account_id") + op.execute("DROP INDEX CONCURRENTLY IF EXISTS ix_agent_api_keys_creator_user_id") + op.drop_column('agent_api_keys', 'spark_authz_zedtoken') + op.drop_column('agent_api_keys', 'creator_service_account_id') + op.drop_column('agent_api_keys', 'creator_user_id') diff --git a/agentex/database/migrations/migration_history.txt b/agentex/database/migrations/migration_history.txt index b18d86d8..ce5c5693 100644 --- a/agentex/database/migrations/migration_history.txt +++ b/agentex/database/migrations/migration_history.txt @@ -1,4 +1,5 @@ -a9959ebcbe98 -> 6c942325c828 (head), adding task cleaned at +6c942325c828 -> b2c84edb77d6 (head), add_agent_api_key_creator_and_zedtoken +a9959ebcbe98 -> 6c942325c828, adding task cleaned at e9c4ff9e6542 -> a9959ebcbe98, finalize_spans_task_id 9ff3ee32c81b -> e9c4ff9e6542, add_tasks_metadata_gin_index 57c5ed4f59ae -> 9ff3ee32c81b, uppercase deployment status enum labels diff --git a/agentex/src/adapters/authorization/adapter_agentex_authz_proxy.py b/agentex/src/adapters/authorization/adapter_agentex_authz_proxy.py index 0e44479d..c8ba4f3a 100644 --- a/agentex/src/adapters/authorization/adapter_agentex_authz_proxy.py +++ b/agentex/src/adapters/authorization/adapter_agentex_authz_proxy.py @@ -85,6 +85,34 @@ async def list_resources( ) return response["items"] + async def register_resource( + self, + principal: AgentexAuthPrincipalContext, + resource: AgentexResource, + parent: AgentexResource | None = None, + ) -> None: + payload = { + "principal": principal, + "resource": resource.model_dump(), + "parent": parent.model_dump() if parent is not None else None, + } + await HttpRequestHandler.post_with_error_handling( + self.agentex_auth_url, "/v1/authz/register", json=payload + ) + + async def deregister_resource( + self, + principal: AgentexAuthPrincipalContext, + resource: AgentexResource, + ) -> None: + payload = { + "principal": principal, + "resource": resource.model_dump(), + } + await HttpRequestHandler.post_with_error_handling( + self.agentex_auth_url, "/v1/authz/deregister", json=payload + ) + DAgentexAuthorization = Annotated[ AgentexAuthorizationProxy, Depends(AgentexAuthorizationProxy) diff --git a/agentex/src/adapters/authorization/port.py b/agentex/src/adapters/authorization/port.py index 80c05200..f4f33a0a 100644 --- a/agentex/src/adapters/authorization/port.py +++ b/agentex/src/adapters/authorization/port.py @@ -49,3 +49,29 @@ async def list_resources( filter_operation: AuthorizedOperationType = AuthorizedOperationType.read, ) -> Iterable[str]: """List resource_ids for a given principal""" + + @abstractmethod + async def register_resource( + self, + principal: PrincipalT, + resource: AgentexResource, + parent: AgentexResource | None = None, + ) -> None: + """Register a newly created resource in SpiceDB with the principal as + owner. Optionally writes a lifecycle parent edge. + + Use this on resource create instead of ``grant`` when the resource + type's SpiceDB definition has a parent relation that permission + checks cascade through (e.g. ``agent_api_key`` declares + ``parent_agent``). Without writing that edge here the cascade fails + closed. + """ + + @abstractmethod + async def deregister_resource( + self, + principal: PrincipalT, + resource: AgentexResource, + ) -> None: + """Deregister a deleted resource and all of its relationships + (owner, parent, grantees) in a single atomic call.""" diff --git a/agentex/src/adapters/orm.py b/agentex/src/adapters/orm.py index 42a66c1a..37b5f0b1 100644 --- a/agentex/src/adapters/orm.py +++ b/agentex/src/adapters/orm.py @@ -181,6 +181,9 @@ class AgentAPIKeyORM(BaseORM): name = Column(String(256), nullable=False, index=True) api_key_type = Column(SQLAlchemyEnum(AgentAPIKeyType), nullable=False) api_key = Column(String, nullable=False) + creator_user_id = Column(String, nullable=True, index=True) + creator_service_account_id = Column(String, nullable=True, index=True) + spark_authz_zedtoken = Column(Text, nullable=True) # Indexes for efficient querying __table_args__ = ( diff --git a/agentex/src/api/routes/agent_api_keys.py b/agentex/src/api/routes/agent_api_keys.py index dec3e00f..e041531f 100644 --- a/agentex/src/api/routes/agent_api_keys.py +++ b/agentex/src/api/routes/agent_api_keys.py @@ -8,6 +8,7 @@ CreateAPIKeyResponse, ) from src.domain.entities.agent_api_keys import AgentAPIKeyType +from src.domain.services.authorization_service import DAuthorizationService from src.domain.use_cases.agent_api_keys_use_case import DAgentAPIKeysUseCase from src.domain.use_cases.agents_use_case import DAgentsUseCase from src.utils.logging import make_logger @@ -28,6 +29,7 @@ async def create_api_key( request: CreateAPIKeyRequest, agent_api_key_use_case: DAgentAPIKeysUseCase, agent_use_case: DAgentsUseCase, + authorization_service: DAuthorizationService, ) -> CreateAPIKeyResponse: if not request.agent_id and not request.agent_name: raise HTTPException( @@ -52,11 +54,13 @@ async def create_api_key( raise HTTPException(status_code=409, detail=error_msg) new_api_key = request.api_key or secrets.token_hex(32) + account_id = getattr(authorization_service.principal_context, "account_id", None) agent_api_key_entity = await agent_api_key_use_case.create( agent_id=agent.id, api_key=str(new_api_key), name=request.name, api_key_type=request.api_key_type, + account_id=account_id, ) return CreateAPIKeyResponse( id=agent_api_key_entity.id, @@ -161,8 +165,10 @@ async def get_agent_api_key( async def delete_agent_api_key( id: str, agent_api_key_use_case: DAgentAPIKeysUseCase, + authorization_service: DAuthorizationService, ) -> str: - await agent_api_key_use_case.delete(id=id) + account_id = getattr(authorization_service.principal_context, "account_id", None) + await agent_api_key_use_case.delete(id=id, account_id=account_id) return f"Agent API key with ID {id} deleted" @@ -176,6 +182,7 @@ async def delete_agent_api_key_by_name( api_key_name: str, agent_api_key_use_case: DAgentAPIKeysUseCase, agent_use_case: DAgentsUseCase, + authorization_service: DAuthorizationService, agent_id: str | None = None, agent_name: str | None = None, api_key_type: AgentAPIKeyType = AgentAPIKeyType.EXTERNAL, @@ -191,8 +198,12 @@ async def delete_agent_api_key_by_name( detail="Only one of 'agent_id' or 'agent_name' should be provided to delete an agent api_key.", ) agent = await agent_use_case.get(id=agent_id, name=agent_name) + account_id = getattr(authorization_service.principal_context, "account_id", None) await agent_api_key_use_case.delete_by_agent_id_and_key_name( - agent_id=agent.id, key_name=api_key_name, api_key_type=api_key_type + agent_id=agent.id, + key_name=api_key_name, + api_key_type=api_key_type, + account_id=account_id, ) return f"Agent api_key '{api_key_name}' deleted" diff --git a/agentex/src/api/schemas/authorization_types.py b/agentex/src/api/schemas/authorization_types.py index 585fa3a3..7e98fdae 100644 --- a/agentex/src/api/schemas/authorization_types.py +++ b/agentex/src/api/schemas/authorization_types.py @@ -14,6 +14,7 @@ class AuthorizedOperationType(StrEnum): class AgentexResourceType(StrEnum): agent = "agent" task = "task" + api_key = "api_key" # Resources that inherit permissions from their parent task @@ -37,6 +38,10 @@ def agent(cls, selector: str) -> "AgentexResource": def task(cls, selector: str) -> "AgentexResource": return cls(type=AgentexResourceType.task, selector=selector) + @classmethod + def api_key(cls, selector: str) -> "AgentexResource": + return cls(type=AgentexResourceType.api_key, selector=selector) + class AgentexResourceOptionalSelector(BaseModel): type: AgentexResourceType @@ -49,3 +54,7 @@ def agent(cls, selector: str | None = None) -> "AgentexResourceOptionalSelector" @classmethod def task(cls, selector: str | None = None) -> "AgentexResourceOptionalSelector": return cls(type=AgentexResourceType.task, selector=selector) + + @classmethod + def api_key(cls, selector: str | None = None) -> "AgentexResourceOptionalSelector": + return cls(type=AgentexResourceType.api_key, selector=selector) diff --git a/agentex/src/config/environment_variables.py b/agentex/src/config/environment_variables.py index 2d41740a..44edea64 100644 --- a/agentex/src/config/environment_variables.py +++ b/agentex/src/config/environment_variables.py @@ -44,6 +44,7 @@ class EnvVarKeys(str, Enum): REDIS_STREAM_TTL_SECONDS = "REDIS_STREAM_TTL_SECONDS" IMAGE_PULL_SECRET_NAME = "IMAGE_PULL_SECRET_NAME" AGENTEX_AUTH_URL = "AGENTEX_AUTH_URL" + EGP_API_BACKEND_URL = "EGP_API_BACKEND_URL" ALLOWED_ORIGINS = "ALLOWED_ORIGINS" DD_AGENT_HOST = "DD_AGENT_HOST" DD_STATSD_PORT = "DD_STATSD_PORT" @@ -100,6 +101,7 @@ class EnvironmentVariables(BaseModel): ) IMAGE_PULL_SECRET_NAME: str | None = None AGENTEX_AUTH_URL: str | None = None + EGP_API_BACKEND_URL: str | None = None ALLOWED_ORIGINS: str | None = None HTTPX_MAX_CONNECTIONS: int = 200 # Max total connections allowed HTTPX_MAX_KEEPALIVE_CONNECTIONS: int = 100 # Max connections to keep alive @@ -166,6 +168,7 @@ def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None: ), IMAGE_PULL_SECRET_NAME=os.environ.get(EnvVarKeys.IMAGE_PULL_SECRET_NAME), AGENTEX_AUTH_URL=os.environ.get(EnvVarKeys.AGENTEX_AUTH_URL), + EGP_API_BACKEND_URL=os.environ.get(EnvVarKeys.EGP_API_BACKEND_URL), ALLOWED_ORIGINS=os.environ.get(EnvVarKeys.ALLOWED_ORIGINS, "*"), DD_AGENT_HOST=os.environ.get(EnvVarKeys.DD_AGENT_HOST), DD_STATSD_PORT=os.environ.get(EnvVarKeys.DD_STATSD_PORT), diff --git a/agentex/src/domain/entities/agent_api_keys.py b/agentex/src/domain/entities/agent_api_keys.py index 27300175..95cd71ed 100644 --- a/agentex/src/domain/entities/agent_api_keys.py +++ b/agentex/src/domain/entities/agent_api_keys.py @@ -25,3 +25,15 @@ class AgentAPIKeyEntity(BaseModel): description="The type of the API key (either internal or external)", ) api_key: str = Field(..., description="The API key") + creator_user_id: str | None = Field( + None, + description="Identity ID of the user who created this API key (granted as FGAC owner)", + ) + creator_service_account_id: str | None = Field( + None, + description="Service identity ID of the service account that created this API key", + ) + spark_authz_zedtoken: str | None = Field( + None, + description="ZedToken from the Spark AuthZ grant for new-write isolation", + ) diff --git a/agentex/src/domain/services/authorization_service.py b/agentex/src/domain/services/authorization_service.py index 8400603b..06125d57 100644 --- a/agentex/src/domain/services/authorization_service.py +++ b/agentex/src/domain/services/authorization_service.py @@ -188,5 +188,62 @@ async def list_resources( ) return result + async def register_resource( + self, + resource: AgentexResource, + parent: AgentexResource | None = None, + *, + principal_context=..., + ) -> None: + """Register a newly created resource with the principal as owner. + + Prefer this over ``grant`` when the resource's SpiceDB schema has + a parent relation that permissions cascade through (e.g. + ``agent_api_key`` declares ``parent_agent``). Pass ``parent`` to + link the child to its parent atomically; without it the cascade + fails closed. + """ + if self._bypass(): + logger.info(f"Authorization bypassed for register_resource on {resource}") + return None + + effective_principal = ( + principal_context + if principal_context is not ... + else self.principal_context + ) + logger.info( + "[authorization_service] Registering %s:%s for principal %s (parent=%s)", + resource.type, + resource.selector, + effective_principal, + f"{parent.type}:{parent.selector}" if parent is not None else None, + ) + await self.gateway.register_resource(effective_principal, resource, parent) + + async def deregister_resource( + self, + resource: AgentexResource, + *, + principal_context=..., + ) -> None: + """Deregister a deleted resource and all of its relationships.""" + if self._bypass(): + logger.info(f"Authorization bypassed for deregister_resource on {resource}") + return None + + effective_principal = ( + principal_context + if principal_context is not ... + else self.principal_context + ) + logger.info( + "[authorization_service] Deregistering %s:%s for principal %s", + resource.type, + resource.selector, + effective_principal, + ) + await self.gateway.deregister_resource(effective_principal, resource) + DAuthorizationService = Annotated[AuthorizationService, Depends(AuthorizationService)] diff --git a/agentex/src/domain/use_cases/agent_api_keys_use_case.py b/agentex/src/domain/use_cases/agent_api_keys_use_case.py index bf455aee..1267f070 100644 --- a/agentex/src/domain/use_cases/agent_api_keys_use_case.py +++ b/agentex/src/domain/use_cases/agent_api_keys_use_case.py @@ -13,6 +13,7 @@ ) from src.adapters.crud_store.exceptions import ItemDoesNotExist from src.api.middleware_utils import get_request_headers_to_forward, verify_auth_gateway +from src.api.schemas.authorization_types import AgentexResource from src.config.dependencies import ( DHttpxClient, resolve_environment_variable_dependency, @@ -27,6 +28,8 @@ DAgentAPIKeyRepository, ) from src.domain.repositories.agent_repository import DAgentRepository +from src.domain.services.authorization_service import DAuthorizationService +from src.utils.feature_flags import DFeatureFlagProvider, FeatureFlagName from src.utils.ids import orm_id from src.utils.logging import make_logger @@ -39,10 +42,14 @@ def __init__( agent_api_key_repository: DAgentAPIKeyRepository, agent_repository: DAgentRepository, client: DHttpxClient, + authorization_service: DAuthorizationService, + feature_flags: DFeatureFlagProvider, ): self.agent_api_key_repo = agent_api_key_repository self.agent_repo = agent_repository self.client = client + self.authorization_service = authorization_service + self.feature_flags = feature_flags self.auth_gateway_enabled = bool( resolve_environment_variable_dependency(EnvVarKeys.AGENTEX_AUTH_URL) ) @@ -76,6 +83,7 @@ async def create( agent_id: str, api_key_type: AgentAPIKeyType, api_key: str, + account_id: str | None = None, ) -> AgentAPIKeyEntity: agent = await self.get_agent(agent_id=agent_id) if not agent: @@ -83,17 +91,137 @@ async def create( status_code=404, detail=f"Agent ID {agent_id} not found.", ) + + principal_context = self.authorization_service.principal_context + creator_user_id = getattr(principal_context, "user_id", None) + creator_service_account_id = getattr( + principal_context, "service_account_id", None + ) + + api_key_id = orm_id() + zedtoken: str | None = None + + if await self.feature_flags.is_enabled( + FeatureFlagName.FGAC_AGENT_API_KEYS_DUAL_WRITE, + principal_context=principal_context, + account_id=account_id, + ): + zedtoken = await self._register_api_key_in_spark_authz( + api_key_id=api_key_id, + agent_id=agent.id, + account_id=account_id, + creator_user_id=creator_user_id, + creator_service_account_id=creator_service_account_id, + ) + # TODO: encrypt API key before storing it # Initialize a new agent api_key agent_api_key = AgentAPIKeyEntity( - id=orm_id(), + id=api_key_id, name=name, agent_id=agent.id, api_key_type=api_key_type, api_key=api_key, + creator_user_id=creator_user_id, + creator_service_account_id=creator_service_account_id, + spark_authz_zedtoken=zedtoken, ) return await self.agent_api_key_repo.create(item=agent_api_key) + async def _register_api_key_in_spark_authz( + self, + *, + api_key_id: str, + agent_id: str, + account_id: str | None, + creator_user_id: str | None, + creator_service_account_id: str | None, + ) -> str | None: + """Register a new agent_api_key in Spark AuthZ with creator as owner + AND the parent_agent edge to the owning agent. + + Called BEFORE the Postgres write — a failure raises and prevents the + row from being persisted, so there is no compensating action. + + The ``agent_api_key`` SpiceDB schema has a ``parent_agent`` relation + that read/update/delete permissions cascade through: + ``permission read = internal_effective_viewer & parent_agent->read & + internal_tenant_gate``. We MUST write the parent edge here or every + downstream permission check fails closed. ``register_resource`` + (added in agentex-auth and sgp-authz 0.7.0) writes both the owner + edge and the parent edge atomically in one round-trip. + + The current ``register_resource`` returns ``None``; no ZedToken is + surfaced today, so we always return ``None`` for the + spark_authz_zedtoken column. A follow-up will plumb the token through + once the adapter exposes it. + """ + if creator_user_id is None and creator_service_account_id is None: + logger.warning( + "Skipping Spark AuthZ api_key registration: no creator resolvable", + extra={ + "api_key_id": api_key_id, + "agent_id": agent_id, + "account_id": account_id, + }, + ) + return None + try: + await self.authorization_service.register_resource( + resource=AgentexResource.api_key(api_key_id), + parent=AgentexResource.agent(agent_id), + ) + except Exception as exc: + # Fail closed: log + re-raise so the Postgres row is never written. + # The dual-write contract requires the SpiceDB tuple (and parent + # edge) to exist before the row does. + logger.exception( + "Spark AuthZ register_resource failed for agent_api_key; aborting create", + extra={ + "api_key_id": api_key_id, + "agent_id": agent_id, + "account_id": account_id, + "creator_user_id": creator_user_id, + "creator_service_account_id": creator_service_account_id, + "error_type": type(exc).__name__, + }, + ) + raise + return None + + async def _deregister_api_key_from_spark_authz( + self, *, api_key_id: str, account_id: str | None + ) -> None: + """Best-effort deregistration of an api_key's Spark AuthZ tuples on delete. + + ``deregister_resource`` removes the resource and all of its + relationships (owner, parent_agent, any grantees) atomically. + Only invoked when the FGAC_AGENT_API_KEYS_DUAL_WRITE flag is enabled + for the caller's account. Failures are logged but do not block the + delete. + """ + if not await self.feature_flags.is_enabled( + FeatureFlagName.FGAC_AGENT_API_KEYS_DUAL_WRITE, + principal_context=self.authorization_service.principal_context, + account_id=account_id, + ): + return + try: + await self.authorization_service.deregister_resource( + resource=AgentexResource.api_key(api_key_id), + ) + except Exception as exc: + # Best-effort: log and continue. Postgres row already deleted. + logger.warning( + "Spark AuthZ deregister failed for agent_api_key; tuple may be orphaned", + extra={ + "api_key_id": api_key_id, + "account_id": account_id, + "error_type": type(exc).__name__, + }, + exc_info=True, + ) + async def get(self, id: str) -> AgentAPIKeyEntity: return await self.agent_api_key_repo.get(id=id) @@ -123,22 +251,47 @@ async def get_external_by_agent_id_and_key( agent_id=agent_id, api_key=api_key ) - async def delete(self, id: str) -> None: - return await self.agent_api_key_repo.delete(id=id) + async def delete(self, id: str, account_id: str | None = None) -> None: + await self.agent_api_key_repo.delete(id=id) + await self._deregister_api_key_from_spark_authz( + api_key_id=id, account_id=account_id + ) async def delete_by_agent_id_and_key_name( - self, agent_id: str, key_name: str, api_key_type: AgentAPIKeyType + self, + agent_id: str, + key_name: str, + api_key_type: AgentAPIKeyType, + account_id: str | None = None, ) -> None: - return await self.agent_api_key_repo.delete_by_agent_id_and_key_name( + existing = await self.agent_api_key_repo.get_by_agent_id_and_name( + agent_id=agent_id, name=key_name, api_key_type=api_key_type + ) + await self.agent_api_key_repo.delete_by_agent_id_and_key_name( agent_id=agent_id, key_name=key_name, api_key_type=api_key_type ) + if existing is not None: + await self._deregister_api_key_from_spark_authz( + api_key_id=existing.id, account_id=account_id + ) async def delete_by_agent_name_and_key_name( - self, agent_name: str, key_name: str, api_key_type: AgentAPIKeyType + self, + agent_name: str, + key_name: str, + api_key_type: AgentAPIKeyType, + account_id: str | None = None, ) -> None: - return await self.agent_api_key_repo.delete_by_agent_name_and_key_name( + existing = await self.agent_api_key_repo.get_by_agent_name_and_key_name( + agent_name, key_name, api_key_type + ) + await self.agent_api_key_repo.delete_by_agent_name_and_key_name( agent_name=agent_name, key_name=key_name, api_key_type=api_key_type ) + if existing is not None: + await self._deregister_api_key_from_spark_authz( + api_key_id=existing.id, account_id=account_id + ) async def list( self, agent_id: str, limit: int, page_number: int diff --git a/agentex/src/utils/feature_flags.py b/agentex/src/utils/feature_flags.py new file mode 100644 index 00000000..03289a5c --- /dev/null +++ b/agentex/src/utils/feature_flags.py @@ -0,0 +1,144 @@ +"""Per-account feature flag provider. + +Queries egp-api-backend's ``GET /feature-flag/{id}`` endpoint with the +caller's identity headers. The endpoint evaluates the flag against the +account_id resolved from those headers and returns a ``FeatureFlag`` +payload whose ``value`` field is a typed flag value (bool for ``boolean`` +flags, etc.). + +Falls back to disabled (``False``) on any error so a transient +egp-api-backend outage doesn't break the dual-write path — the flag-off +behavior is the safe legacy path. +""" + +from __future__ import annotations + +from enum import StrEnum +from typing import Annotated, Any + +from fastapi import Depends + +from src.config.dependencies import DEnvironmentVariable +from src.config.environment_variables import EnvVarKeys +from src.utils.cached_httpx_client import get_async_client +from src.utils.logging import make_logger + +logger = make_logger(__name__) + + +class FeatureFlagName(StrEnum): + FGAC_TASKS = "fgac-tasks" + FGAC_TASKS_DUAL_WRITE = "fgac-tasks-dual-write" + FGAC_AGENT_API_KEYS_DUAL_WRITE = "fgac-agent-api-keys-dual-write" + + +class FeatureFlagProvider: + """Per-account feature flag provider backed by egp-api-backend. + + Calls ``GET {EGP_API_BACKEND_URL}/feature-flag/{name}`` with the + caller's identity headers. The endpoint evaluates the flag against the + caller's account and returns a ``FeatureFlag`` with a ``value`` field. + For boolean flags this method coerces the value to ``bool``. + + Returns ``False`` (flag off) when: + - ``EGP_API_BACKEND_URL`` is not configured; + - the caller's principal has no usable identity headers; + - egp-api-backend returns a non-2xx response; + - any network or parsing error occurs. + + Fail-closed-to-disabled is intentional: the legacy code path is the + safe default if FGAC dual-write is unreachable. + """ + + def __init__( + self, + egp_api_backend_url: DEnvironmentVariable(EnvVarKeys.EGP_API_BACKEND_URL), + ): + self.egp_api_backend_url = egp_api_backend_url + + async def is_enabled( + self, + name: FeatureFlagName, + *, + principal_context: Any, + account_id: str | None, + ) -> bool: + if not self.egp_api_backend_url: + return False + + headers = self._principal_headers(principal_context, account_id) + if not headers: + return False + + url = f"{self.egp_api_backend_url.rstrip('/')}/feature-flag/{name.value}" + try: + client = get_async_client() + response = await client.get(url, headers=headers) + except Exception as exc: + logger.warning( + "Feature flag fetch failed; treating as disabled", + extra={ + "flag": name.value, + "account_id": account_id, + "error_type": type(exc).__name__, + }, + ) + return False + + if response.status_code != 200: + logger.warning( + "Feature flag non-2xx response; treating as disabled", + extra={ + "flag": name.value, + "account_id": account_id, + "status_code": response.status_code, + }, + ) + return False + + try: + payload = response.json() + value = payload.get("value") + except Exception: + logger.warning( + "Feature flag response not JSON-parseable; treating as disabled", + extra={"flag": name.value, "account_id": account_id}, + ) + return False + + return bool(value) + + @staticmethod + def _principal_headers( + principal_context: Any, account_id: str | None + ) -> dict[str, str]: + """Build identity headers from the caller's principal_context so + egp-api-backend's ``REQUIRE_IDENTITY_AND_OPTIONAL_ACCOUNT`` policy + admits the request. + + Returns ``{}`` when no usable identity is present — the caller + should treat that as flag-off (the legacy path is safe). + """ + if principal_context is None: + return {} + + api_key = getattr(principal_context, "api_key", None) + user_id = getattr(principal_context, "user_id", None) + service_account_id = getattr(principal_context, "service_account_id", None) + + if not api_key and not user_id and not service_account_id: + return {} + + headers: dict[str, str] = {} + if api_key: + headers["x-api-key"] = api_key + if user_id: + headers["x-user-id"] = user_id + if service_account_id: + headers["x-service-account-id"] = service_account_id + if account_id: + headers["x-selected-account-id"] = account_id + return headers + + +DFeatureFlagProvider = Annotated[FeatureFlagProvider, Depends(FeatureFlagProvider)] diff --git a/agentex/tests/integration/fixtures/integration_client.py b/agentex/tests/integration/fixtures/integration_client.py index f1396d57..296322da 100644 --- a/agentex/tests/integration/fixtures/integration_client.py +++ b/agentex/tests/integration/fixtures/integration_client.py @@ -6,7 +6,7 @@ import asyncio import os import uuid -from unittest.mock import AsyncMock +from unittest.mock import AsyncMock, Mock import pymongo import pytest @@ -398,10 +398,18 @@ def create_agents_use_case(): ) def create_agent_api_keys_use_case(): + from src.utils.feature_flags import FeatureFlagProvider + + noop_authorization_service = Mock() + noop_authorization_service.principal_context = None + noop_authorization_service.grant = AsyncMock(return_value={}) + noop_authorization_service.revoke = AsyncMock(return_value=None) return AgentAPIKeysUseCase( agent_api_key_repository=isolated_repositories["agent_api_key_repository"], agent_repository=isolated_repositories["agent_repository"], client=isolated_api_key_http_client, # Use mock client for forwarding requests + authorization_service=noop_authorization_service, + feature_flags=FeatureFlagProvider(egp_api_backend_url=None), ) def create_deployment_history_use_case(): diff --git a/agentex/tests/integration/services/__init__.py b/agentex/tests/integration/services/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/agentex/tests/integration/services/test_agent_api_key_service_dual_write.py b/agentex/tests/integration/services/test_agent_api_key_service_dual_write.py new file mode 100644 index 00000000..26d22bc2 --- /dev/null +++ b/agentex/tests/integration/services/test_agent_api_key_service_dual_write.py @@ -0,0 +1,342 @@ +"""Integration tests for AgentAPIKeysUseCase dual-write to Spark AuthZ. + +These cover the AGX1-272 dual-write path: + +- Flag OFF: ``authorization_service.register_resource`` is NOT called and + the api_key is written to the repository with creator metadata populated + from the principal context. +- Flag ON: ``register_resource`` is called with ``AgentexResource.api_key()`` + and ``parent=AgentexResource.agent()`` (the parent edge is + load-bearing for cascade), and the row is written. +- Delete deregisters: ``deregister_resource`` is called when ``delete`` runs + under the flag. +- Spark failure prevents row: when ``register_resource`` raises, the api_key + is NOT persisted. +- Deregister failure does not block delete: when ``deregister_resource`` + raises, the DB delete still completes and the failure is logged. +- No creator → no register: if neither user_id nor service_account_id is + resolvable, the dual-write is a no-op (logged) and the row still lands. + +The tests intentionally mock the repository, authorization service, agent +repository, and HTTP client. The behaviour under test is the call sequencing +inside ``AgentAPIKeysUseCase`` — not Postgres or Spark itself. + +Note on structural divergence from the task PR (AGX1-274): tasks live behind +``AgentTaskService``; agent_api_keys have no service layer, so the dual-write +logic is colocated in ``AgentAPIKeysUseCase``. Mirrors the spirit of Asher's +PR rather than the exact layering. +""" + +from __future__ import annotations + +from types import SimpleNamespace +from unittest.mock import AsyncMock, Mock + +import pytest +from src.api.schemas.authorization_types import AgentexResource, AgentexResourceType +from src.domain.entities.agent_api_keys import AgentAPIKeyEntity, AgentAPIKeyType +from src.domain.entities.agents import ACPType, AgentEntity, AgentStatus +from src.domain.use_cases.agent_api_keys_use_case import AgentAPIKeysUseCase +from src.utils.ids import orm_id + + +def _principal(user_id: str | None, account_id: str | None) -> SimpleNamespace: + """Minimal stand-in for AgentexAuthPrincipalContext.""" + return SimpleNamespace( + user_id=user_id, service_account_id=None, account_id=account_id + ) + + +def _agent() -> AgentEntity: + agent_id = orm_id() + return AgentEntity( + id=agent_id, + name=f"agent-{agent_id[:8]}", + description="dual-write test agent", + status=AgentStatus.READY, + acp_type=ACPType.SYNC, + acp_url="http://test-acp", + ) + + +def _build_use_case( + *, + flag_on: bool, + principal: SimpleNamespace | None, + register_resource: AsyncMock | None = None, + deregister_resource: AsyncMock | None = None, + agent: AgentEntity | None = None, + create_raises: Exception | None = None, + monkeypatch: pytest.MonkeyPatch, +) -> tuple[AgentAPIKeysUseCase, Mock, AsyncMock, AsyncMock]: + sample_agent = agent or _agent() + + agent_repository = Mock() + agent_repository.get = AsyncMock(return_value=sample_agent) + + agent_api_key_repository = Mock() + if create_raises is None: + agent_api_key_repository.create = AsyncMock(side_effect=lambda item: item) + else: + agent_api_key_repository.create = AsyncMock(side_effect=create_raises) + agent_api_key_repository.delete = AsyncMock(return_value=None) + agent_api_key_repository.get_by_agent_id_and_name = AsyncMock(return_value=None) + agent_api_key_repository.get_by_agent_name_and_key_name = AsyncMock( + return_value=None + ) + agent_api_key_repository.delete_by_agent_id_and_key_name = AsyncMock( + return_value=None + ) + agent_api_key_repository.delete_by_agent_name_and_key_name = AsyncMock( + return_value=None + ) + + authorization_service = Mock() + authorization_service.principal_context = principal + authorization_service.register_resource = register_resource or AsyncMock( + return_value=None + ) + authorization_service.deregister_resource = deregister_resource or AsyncMock( + return_value=None + ) + + # FeatureFlagProvider normally calls egp-api-backend over HTTP. Mock it + # so tests are hermetic; behaviour under test is the use case's response + # to the flag value, not the provider's transport. + feature_flags = Mock() + feature_flags.is_enabled = AsyncMock(return_value=flag_on) + + # Patch env var lookup inside UseCase __init__ so we don't depend on real + # env configuration to instantiate. + monkeypatch.setenv("AGENTEX_AUTH_URL", "") + monkeypatch.setenv("ENVIRONMENT", "test") + monkeypatch.setenv("WEBHOOK_REQUEST_TIMEOUT", "10") + + use_case = AgentAPIKeysUseCase( + agent_api_key_repository=agent_api_key_repository, + agent_repository=agent_repository, + client=Mock(), + authorization_service=authorization_service, + feature_flags=feature_flags, + ) + return ( + use_case, + agent_api_key_repository, + authorization_service.register_resource, + authorization_service.deregister_resource, + ) + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_create_api_key_skips_grant_when_flag_off( + monkeypatch: pytest.MonkeyPatch, +) -> None: + agent = _agent() + use_case, repo, register, _ = _build_use_case( + flag_on=False, + principal=_principal(user_id="user-A", account_id="acct-1"), + agent=agent, + monkeypatch=monkeypatch, + ) + + api_key = await use_case.create( + name="k1", + agent_id=agent.id, + api_key_type=AgentAPIKeyType.EXTERNAL, + api_key="secret", + account_id="acct-1", + ) + + register.assert_not_called() + repo.create.assert_awaited_once() + assert api_key.creator_user_id == "user-A" + assert api_key.creator_service_account_id is None + assert api_key.spark_authz_zedtoken is None + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_create_api_key_calls_grant_when_flag_on( + monkeypatch: pytest.MonkeyPatch, +) -> None: + agent = _agent() + use_case, repo, register, _ = _build_use_case( + flag_on=True, + principal=_principal(user_id="user-A", account_id="acct-1"), + agent=agent, + monkeypatch=monkeypatch, + ) + + api_key = await use_case.create( + name="k1", + agent_id=agent.id, + api_key_type=AgentAPIKeyType.EXTERNAL, + api_key="secret", + account_id="acct-1", + ) + + register.assert_awaited_once() + registered_resource: AgentexResource = register.await_args.kwargs["resource"] + assert registered_resource.type == AgentexResourceType.api_key + assert registered_resource.selector == api_key.id + # parent_agent edge is load-bearing — without it the SpiceDB cascade + # `read = ... & parent_agent->read & ...` fails closed for every reader. + registered_parent: AgentexResource = register.await_args.kwargs["parent"] + assert registered_parent is not None + assert registered_parent.type == AgentexResourceType.agent + assert registered_parent.selector == agent.id + repo.create.assert_awaited_once() + assert api_key.creator_user_id == "user-A" + # Provider.spark.register_resource returns None today — no zedtoken yet. + assert api_key.spark_authz_zedtoken is None + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_delete_api_key_calls_revoke_when_flag_on( + monkeypatch: pytest.MonkeyPatch, +) -> None: + use_case, repo, _, deregister = _build_use_case( + flag_on=True, + principal=_principal(user_id="user-A", account_id="acct-1"), + monkeypatch=monkeypatch, + ) + + api_key_id = orm_id() + await use_case.delete(id=api_key_id, account_id="acct-1") + + repo.delete.assert_awaited_once_with(id=api_key_id) + deregister.assert_awaited_once() + deregistered_resource: AgentexResource = deregister.await_args.kwargs["resource"] + assert deregistered_resource.type == AgentexResourceType.api_key + assert deregistered_resource.selector == api_key_id + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_delete_api_key_skips_revoke_when_flag_off( + monkeypatch: pytest.MonkeyPatch, +) -> None: + use_case, repo, _, deregister = _build_use_case( + flag_on=False, + principal=_principal(user_id="user-A", account_id="acct-1"), + monkeypatch=monkeypatch, + ) + + await use_case.delete(id=orm_id(), account_id="acct-1") + + repo.delete.assert_awaited_once() + deregister.assert_not_called() + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_create_api_key_grant_failure_prevents_db_row( + monkeypatch: pytest.MonkeyPatch, +) -> None: + register_resource = AsyncMock(side_effect=RuntimeError("spark unavailable")) + agent = _agent() + use_case, repo, _, _ = _build_use_case( + flag_on=True, + principal=_principal(user_id="user-A", account_id="acct-1"), + register_resource=register_resource, + agent=agent, + monkeypatch=monkeypatch, + ) + + with pytest.raises(RuntimeError, match="spark unavailable"): + await use_case.create( + name="k1", + agent_id=agent.id, + api_key_type=AgentAPIKeyType.EXTERNAL, + api_key="secret", + account_id="acct-1", + ) + + repo.create.assert_not_awaited() + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_delete_api_key_revoke_failure_does_not_block_delete( + monkeypatch: pytest.MonkeyPatch, +) -> None: + deregister = AsyncMock(side_effect=RuntimeError("spark unavailable")) + use_case, repo, _, deregister_ref = _build_use_case( + flag_on=True, + principal=_principal(user_id="user-A", account_id="acct-1"), + deregister_resource=deregister, + monkeypatch=monkeypatch, + ) + + # Should NOT raise. + await use_case.delete(id=orm_id(), account_id="acct-1") + + repo.delete.assert_awaited_once() + deregister_ref.assert_awaited_once() + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_create_api_key_skips_grant_when_no_creator_resolvable( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """If neither user_id nor service_account_id is available on the principal, + the dual-write is a no-op (logged) and the row still lands without a tuple.""" + agent = _agent() + use_case, repo, register, _ = _build_use_case( + flag_on=True, + principal=_principal(user_id=None, account_id="acct-1"), + agent=agent, + monkeypatch=monkeypatch, + ) + + api_key = await use_case.create( + name="k1", + agent_id=agent.id, + api_key_type=AgentAPIKeyType.EXTERNAL, + api_key="secret", + account_id="acct-1", + ) + + register.assert_not_called() + repo.create.assert_awaited_once() + assert api_key.creator_user_id is None + assert api_key.creator_service_account_id is None + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_delete_by_agent_id_and_key_name_revokes_existing( + monkeypatch: pytest.MonkeyPatch, +) -> None: + agent = _agent() + existing_id = orm_id() + use_case, repo, _, deregister = _build_use_case( + flag_on=True, + principal=_principal(user_id="user-A", account_id="acct-1"), + agent=agent, + monkeypatch=monkeypatch, + ) + repo.get_by_agent_id_and_name = AsyncMock( + return_value=AgentAPIKeyEntity( + id=existing_id, + agent_id=agent.id, + name="k1", + api_key_type=AgentAPIKeyType.EXTERNAL, + api_key="secret", + ) + ) + + await use_case.delete_by_agent_id_and_key_name( + agent_id=agent.id, + key_name="k1", + api_key_type=AgentAPIKeyType.EXTERNAL, + account_id="acct-1", + ) + + repo.delete_by_agent_id_and_key_name.assert_awaited_once() + deregister.assert_awaited_once() + deregistered_resource: AgentexResource = deregister.await_args.kwargs["resource"] + assert deregistered_resource.selector == existing_id diff --git a/agentex/tests/unit/use_cases/test_agents_api_keys_use_case.py b/agentex/tests/unit/use_cases/test_agents_api_keys_use_case.py index 27d3d4d9..b89165ae 100644 --- a/agentex/tests/unit/use_cases/test_agents_api_keys_use_case.py +++ b/agentex/tests/unit/use_cases/test_agents_api_keys_use_case.py @@ -1,4 +1,4 @@ -from unittest.mock import AsyncMock +from unittest.mock import AsyncMock, Mock from uuid import uuid4 import pytest @@ -9,6 +9,7 @@ from src.domain.repositories.agent_api_key_repository import AgentAPIKeyRepository from src.domain.repositories.agent_repository import AgentRepository from src.domain.use_cases.agent_api_keys_use_case import AgentAPIKeysUseCase +from src.utils.feature_flags import FeatureFlagProvider @pytest.fixture @@ -35,10 +36,16 @@ def agent_api_keys_use_case( agent_api_key_repository, agent_repository, mock_http_client ): """Real AgentAPIKeysUseCase instance with real repositories""" + authorization_service = Mock() + authorization_service.principal_context = None + authorization_service.grant = AsyncMock(return_value={}) + authorization_service.revoke = AsyncMock(return_value=None) return AgentAPIKeysUseCase( agent_api_key_repository=agent_api_key_repository, agent_repository=agent_repository, client=mock_http_client, + authorization_service=authorization_service, + feature_flags=FeatureFlagProvider(egp_api_backend_url=None), )