Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions helm/blueapi/config_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,34 @@
"type": "object",
"$id": "OIDCConfig"
},
"OpaConfig": {
"additionalProperties": false,
"properties": {
"root": {
"default": "http://localhost:8181/",
"format": "uri",
"maxLength": 2083,
"minLength": 1,
"title": "Root",
"type": "string"
},
"tiled_service_account_check": {
"title": "Tiled Service Account Check",
"type": "string"
},
"submit_plan_check": {
"title": "Submit Plan Check",
"type": "string"
}
},
"required": [
"tiled_service_account_check",
"submit_plan_check"
],
"title": "OpaConfig",
"type": "object",
"$id": "OpaConfig"
},
"PlanSource": {
"additionalProperties": false,
"properties": {
Expand Down Expand Up @@ -612,6 +640,17 @@
}
],
"default": null
},
"opa": {
"anyOf": [
{
"$ref": "OpaConfig"
},
{
"type": "null"
}
],
"default": null
}
},
"title": "ApplicationConfig",
Expand Down
38 changes: 38 additions & 0 deletions helm/blueapi/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,34 @@
},
"additionalProperties": false
},
"OpaConfig": {
"$id": "OpaConfig",
"title": "OpaConfig",
"type": "object",
"required": [
"tiled_service_account_check",
"submit_plan_check"
],
"properties": {
"root": {
"title": "Root",
"default": "http://localhost:8181/",
"type": "string",
"format": "uri",
"maxLength": 2083,
"minLength": 1
},
"submit_plan_check": {
"title": "Submit Plan Check",
"type": "string"
},
"tiled_service_account_check": {
"title": "Tiled Service Account Check",
"type": "string"
}
},
"additionalProperties": false
},
"PlanSource": {
"$id": "PlanSource",
"title": "PlanSource",
Expand Down Expand Up @@ -1011,6 +1039,16 @@
}
]
},
"opa": {
"anyOf": [
{
"$ref": "OpaConfig"
},
{
"type": "null"
}
]
},
"scratch": {
"anyOf": [
{
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ dependencies = [
"tomlkit",
"graypy>=2.1.0",
"httpx>=0.28.1",
"aiohttp>=3.13.5",
]
dynamic = ["version"]
license.file = "LICENSE"
Expand Down
8 changes: 8 additions & 0 deletions src/blueapi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,12 @@ class Tag(StrEnum):
META = "Meta"


class OpaConfig(BlueapiBaseModel):
root: HttpUrl = HttpUrl("http://localhost:8181")
tiled_service_account_check: str
submit_plan_check: str


class ApplicationConfig(BlueapiBaseModel):
"""
Config for the worker application as a whole. Root of
Expand Down Expand Up @@ -335,6 +341,7 @@ class ApplicationConfig(BlueapiBaseModel):
oidc: OIDCConfig | None = None
auth_token_path: Path | None = None
numtracker: NumtrackerConfig | None = None
opa: OpaConfig | None = None

def __eq__(self, other: object) -> bool:
if isinstance(other, ApplicationConfig):
Expand All @@ -343,6 +350,7 @@ def __eq__(self, other: object) -> bool:
& (self.env == other.env)
& (self.logging == other.logging)
& (self.api == other.api)
& (self.opa == other.opa)
)
return False

Expand Down
82 changes: 82 additions & 0 deletions src/blueapi/service/authorization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import logging
import re
from collections.abc import Mapping
from typing import Any

import aiohttp
from aiohttp import ClientSession
from fastapi import HTTPException
from starlette.status import HTTP_401_UNAUTHORIZED

from blueapi.config import OpaConfig
from blueapi.service.model import TaskRequest

LOGGER = logging.getLogger(__name__)

INSTRUMENT_SESSION_RE = re.compile(r"^[a-z]{2}(?P<proposal>\d+)-(?P<visit>\d+)$")


class OpaClient:
client: aiohttp.ClientSession

def __init__(self, instrument: str, config: OpaConfig):
LOGGER.info("Creating OpaClient for %s with config %s", instrument, config)
self._instrument = instrument
self._conf = config
self._session = ClientSession(base_url=config.root.encoded_string())

def for_token(self, token: str) -> "OpaUserClient":
return OpaUserClient(self, token)

async def close(self):
await self._session.close()

async def _call_opa(self, endpoint, data: Mapping[str, Any]) -> bool:
try:
resp = await self._session.post(
endpoint,
json={"input": {"beamline": self._instrument, **data}},
)
return (await resp.json())["result"]
except Exception:
LOGGER.exception("Failed to run check", exc_info=True)
raise

async def require_tiled_service_account(self, token: str):
if not await self._call_opa(
self._conf.tiled_service_account_check,
{"token": token, "beamline": self._instrument},
):
raise ValueError(
f"Tiled service account is not valid for '{self._instrument}'"
)

async def submit_plan_check(self, token: str, instrument_session: str):
if not (match := INSTRUMENT_SESSION_RE.match(instrument_session)):
raise ValueError("Invalid instrument session")

if not await self._call_opa(
self._conf.submit_plan_check,
{
"token": token,
"audience": "account",
"proposal": int(match["proposal"]),
"visit": int(match["visit"]),
},
):
raise HTTPException(status_code=HTTP_401_UNAUTHORIZED)


class OpaUserClient:
client: OpaClient
token: str

def __init__(self, client: OpaClient, token: str):
self.client = client
self.token = token

async def check_submit_plan(self, task: TaskRequest):
LOGGER.info("Checking permissions to run task: %s", task)
await self.client.submit_plan_check(
token=self.token, instrument_session=task.instrument_session
)
73 changes: 63 additions & 10 deletions src/blueapi/service/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import urllib.parse
from collections.abc import Awaitable, Callable
from contextlib import asynccontextmanager
from typing import Annotated, Any
from typing import Annotated, Any, cast

import jwt
from fastapi import (
Expand All @@ -19,7 +19,7 @@
from fastapi.datastructures import Address
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import RedirectResponse, StreamingResponse
from fastapi.security import OAuth2AuthorizationCodeBearer
from fastapi.security.utils import get_authorization_scheme_param
from observability_utils.tracing import (
add_span_attributes,
get_tracer,
Expand All @@ -32,14 +32,17 @@
from pydantic import ValidationError
from pydantic.json_schema import SkipJsonSchema
from starlette.responses import JSONResponse
from starlette.status import HTTP_401_UNAUTHORIZED
from super_state_machine.errors import TransitionError

from blueapi import __version__
from blueapi.config import ApplicationConfig, OIDCConfig, Tag
from blueapi.config import ApplicationConfig, OIDCConfig, ServiceAccount, Tag
from blueapi.service import interface
from blueapi.service.authentication import TiledAuth
from blueapi.worker import TrackableTask, WorkerState
from blueapi.worker.event import TaskStatusEnum

from .authorization import OpaClient, OpaUserClient
from .model import (
DeviceModel,
DeviceResponse,
Expand Down Expand Up @@ -92,13 +95,34 @@ def teardown_runner():
def lifespan(config: ApplicationConfig):
@asynccontextmanager
async def inner(app: FastAPI):
if not config.env.metadata:
raise ValueError("Instrument name is required in metadata")
setup_runner(config)
yield
try:
if config.env.metadata and config.opa:
app.state.authz = OpaClient(config.env.metadata.instrument, config.opa)
if isinstance(config.tiled.authentication, ServiceAccount) and config.oidc:
await validate_tiled_config(
config.tiled.authentication, config.oidc, app.state.authz
)
yield
finally:
if app.state.authz:
await app.state.authz.close()
teardown_runner()

return inner


async def validate_tiled_config(
tiled: ServiceAccount, oidc: OIDCConfig, opa: OpaClient
):
LOGGER.info("Validating tiled configuration")
tiled.token_url = oidc.token_endpoint
auth = TiledAuth(tiled)
await opa.require_tiled_service_account(auth.get_access_token())


open_router = APIRouter()
secure_router = APIRouter(deprecated=True)
secure_router_v1 = APIRouter(prefix="/api/v1")
Expand Down Expand Up @@ -140,15 +164,27 @@ def get_app(config: ApplicationConfig):
return app


def bearer_token(req: Request) -> str | None:
auth = req.headers.get("Authorization")
scheme, param = get_authorization_scheme_param(auth)
if scheme.casefold() != "bearer":
return None
return param.strip()


def decode_access_token(config: OIDCConfig):
jwkclient = jwt.PyJWKClient(config.jwks_uri)
oauth_scheme = OAuth2AuthorizationCodeBearer(
authorizationUrl=config.authorization_endpoint,
tokenUrl=config.token_endpoint,
refreshUrl=config.token_endpoint,
)

def inner(request: Request, access_token: str = Depends(oauth_scheme)):
def inner(
request: Request, access_token: Annotated[str | None, Depends(bearer_token)]
):
if not access_token:
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)

signing_key = jwkclient.get_signing_key_from_jwt(access_token)
decoded: dict[str, Any] = jwt.decode(
access_token,
Expand All @@ -166,6 +202,22 @@ def inner(request: Request, access_token: str = Depends(oauth_scheme)):
TRACER = get_tracer("interface")


async def opa(
request: Request, token: Annotated[str, Depends(bearer_token)]
) -> OpaUserClient | None:
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed
if client := cast(OpaClient, getattr(request.app.state, "authz", None)):
return client.for_token(token)
return None


async def submit_permission(
task_request: Annotated[TaskRequest, Body()],
opa: Annotated[OpaUserClient, Depends(opa)],
):
if opa:
await opa.check_submit_plan(task_request)


async def on_key_error_404(_: Request, __: Exception):
return JSONResponse(
status_code=status.HTTP_404_NOT_FOUND,
Expand Down Expand Up @@ -292,6 +344,7 @@ def submit_task(
response: Response,
task_request: Annotated[TaskRequest, Body(..., examples=[example_task_request])],
runner: Annotated[WorkerDispatcher, Depends(_runner)],
_: Annotated[None, Depends(submit_permission)],
) -> TaskResponse:
"""Submit a task to the worker."""
try:
Expand Down
6 changes: 6 additions & 0 deletions tests/unit_tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,11 @@ def test_config_yaml_parsed(temp_yaml_config_file):
}
],
},
"opa": {
"root": "http://opa.example.com/",
"submit_plan_check": "v1/submit_plan",
"tiled_service_account_check": "v1/tiled_service_account",
},
},
{
"stomp": {
Expand Down Expand Up @@ -392,6 +397,7 @@ def test_config_yaml_parsed(temp_yaml_config_file):
}
],
},
"opa": None,
},
],
indirect=True,
Expand Down
2 changes: 2 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading