diff --git a/.github/workflows/pre-commit.yaml b/.github/workflows/pre-commit.yaml index f177c15..40f8359 100644 --- a/.github/workflows/pre-commit.yaml +++ b/.github/workflows/pre-commit.yaml @@ -10,6 +10,6 @@ jobs: lint: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - uses: actions/setup-python@v2 - - uses: pre-commit/action@v2.0.0 \ No newline at end of file + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + - uses: pre-commit/action@v3.0.1 diff --git a/docs/source/kbatch-proxy.md b/docs/source/kbatch-proxy.md index 634356b..e7cd10a 100644 --- a/docs/source/kbatch-proxy.md +++ b/docs/source/kbatch-proxy.md @@ -66,6 +66,63 @@ kbatch-proxy: - user ``` +## Namespace customization +```{versionadded} 0.5 +``` + +The `kbatch_namespace_manifests_file` setting can point to a yaml file containing any number of kubernetes manifests to create within a given kbatch namespace. +This allows creation of ResourceQuotas, LimitRanges, NetworkPolicies, and other per-namespace resources you might want to have in every user namespace. +The manifests are created using kubernetes [server-side apply](https://kubernetes.io/docs/reference/using-api/server-side-apply/). + +For example, the resource quota: + +```yaml +apiVersion: v1 +kind: ResourceQuota +metadata: + name: mem-cpu-demo +spec: + hard: + count.pods: "10" + requests.cpu: "5" + requests.memory: 5Gi + limits.cpu: "20" + limits.memory: 20Gi +--- +# it's a good idea to include default resource requests/limits +# if you have a ResourceQuota, otherwise pods can't be created without them +apiVersion: v1 +kind: LimitRange +metadata: + name: default-resources +spec: + limits: + # default resources.limit + - default: + cpu: "1" + memory: "1Gi" + # default resources.request + defaultRequest: + cpu: "1" + memory: "1Gi" + type: Container +--- +apiVersion: networking.k8s.io/v1 +kind: NetworkPolicy +metadata: + name: default-deny +spec: + podSelector: {} + policyTypes: + - Egress + - Ingress +``` + +would limit the number of pods in _each_ kbatch namespace to 10, +limit resource requests and limits, +set a default resource request/limit for all containers that don't specify, +and deny _all_ network traffic into and out of the namespace. +so jobs would not have permission to access the network. -[jhub-service]: https://z2jh.jupyter.org/en/latest/administrator/services.html \ No newline at end of file +[jhub-service]: https://z2jh.jupyter.org/en/latest/administrator/services.html diff --git a/integration-tests/conftest.py b/integration-tests/conftest.py index c6fb02a..65b56fa 100644 --- a/integration-tests/conftest.py +++ b/integration-tests/conftest.py @@ -1,4 +1,5 @@ import os +from pathlib import Path import pytest from fastapi.testclient import TestClient @@ -7,6 +8,8 @@ # make sure this happens before kbatch_proxy.main is imported os.environ["kbatch_init_logging"] = "0" +integration_tests = Path(__file__).parent.resolve() + @pytest.fixture(scope="session", autouse=True) def check_cluster(): @@ -27,8 +30,25 @@ def check_cluster(): @pytest.fixture(autouse=True) -def mock_hub_auth(mocker): - def side_effect(token): +def kbatch_proxy_settings(mocker): + """apply kbatch_proxy settings""" + # make sure this happens before kbatch_proxy.main is imported + mocker.patch.dict( + os.environ, + { + "kbatch_init_logging": "0", + "JUPYTERHUB_SERVICE_NAME": "kbatch", + }, + ) + import kbatch_proxy.main + + kbatch_proxy.main.settings = settings = kbatch_proxy.main.Settings( + kbatch_namespace_manifests_file=str( + integration_tests / "data" / "namespace-manifests.yaml" + ), + ) + + def mock_auth(token): if token == "abc": return { "name": "testuser", @@ -44,19 +64,11 @@ def side_effect(token): else: return None - # env patch must be before module patch to avoid logging setup - mocker.patch.dict( - os.environ, - { - "kbatch_init_logging": "0", - "JUPYTERHUB_SERVICE_NAME": "kbatch", - }, - ) - mocker.patch("kbatch_proxy.main.auth.user_for_token", side_effect=side_effect) + mocker.patch.object(settings.auth, "user_for_token", mock_auth) @pytest.fixture -def client(mock_hub_auth, mocker): +def client(kbatch_proxy_settings, mocker): # import kbatch_proxy.main must be after mock_hub_auth from kbatch_proxy.main import app diff --git a/integration-tests/data/namespace-manifests.yaml b/integration-tests/data/namespace-manifests.yaml new file mode 100644 index 0000000..519ba95 --- /dev/null +++ b/integration-tests/data/namespace-manifests.yaml @@ -0,0 +1,44 @@ +apiVersion: v1 +kind: ResourceQuota +metadata: + name: mem-cpu-demo +spec: + hard: + requests.cpu: "1" + requests.memory: 1Gi + limits.cpu: "2" + limits.memory: 2Gi +--- +apiVersion: v1 +kind: LimitRange +metadata: + name: default-resources +spec: + limits: + # default is default limit + - default: + cpu: 500m + memory: 500M + # default request + defaultRequest: + cpu: 500m + memory: 100M + # bounds + max: + cpu: "1" + memory: 1Gi + min: + cpu: 100m + memory: 64M + type: Container +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: ns-config-map +data: + key: value +--- + +# trailing --- to test empty-element handling +--- diff --git a/integration-tests/test_lifecycle.py b/integration-tests/test_lifecycle.py index 6e4c9d6..82bade6 100644 --- a/integration-tests/test_lifecycle.py +++ b/integration-tests/test_lifecycle.py @@ -24,6 +24,8 @@ def _wrapped_invoke(*args, **kwargs): # always write stderr on error so it shows up in captured test output if result.exit_code: sys.stderr.write(result.stderr) + if result.exception and isinstance(result.exception, Exception): + raise result.exception return result runner.invoke = _wrapped_invoke diff --git a/kbatch-proxy/kbatch_proxy/main.py b/kbatch-proxy/kbatch_proxy/main.py index 0bb1579..7558b93 100644 --- a/kbatch-proxy/kbatch_proxy/main.py +++ b/kbatch-proxy/kbatch_proxy/main.py @@ -1,7 +1,7 @@ import json import logging import os -from functools import partial +from functools import cached_property, partial from typing import Dict, List, Optional, Tuple, Union import jupyterhub.services.auth @@ -17,8 +17,10 @@ V1CronJob, V1Job, V1JobTemplateSpec, + V1ObjectMeta, V1Secret, ) +from kubernetes.utils import create_from_dict # type:ignore from pydantic import BaseModel from pydantic_settings import BaseSettings, SettingsConfigDict @@ -49,12 +51,22 @@ class Settings(BaseSettings): # A path to a YAML file defining the profiles kbatch_profile_file: Optional[str] = None + # A path to a YAML file defining manifests to create + # via server-side apply when creating a namespace + # this is reapplied on _every_ job submission + kbatch_namespace_manifests_file: Optional[str] = None + # Jobs are cleaned up by Kubernetes after this many seconds. kbatch_job_ttl_seconds_after_finished: Optional[int] = 3600 # Additional environment variables to set in the job environment - kbatch_job_extra_env: Optional[Dict[str, str]] = None + kbatch_job_extra_env: Dict[str, str] = {} + + # Labels applied to kbatch-created resources + kbatch_labels: Dict[str, str] = { + "app.kubernetes.io/managed-by": "kbatch", + } - # Whether to automatically create new namespaces for a users + # Whether to automatically create new namespaces for all users kbatch_create_user_namespace: bool = True model_config = SettingsConfigDict( @@ -62,6 +74,71 @@ class Settings(BaseSettings): env_file_encoding="utf-8", ) + # derivative fields + # these are cached, only loaded once + + @cached_property + def auth(self) -> jupyterhub.services.auth.HubAuth: + return jupyterhub.services.auth.HubAuth( + api_token=self.jupyterhub_api_token, + cache_max_age=60, + ) + + @cached_property + def job_template(self) -> dict | None: + """Load kbatch_job_template_file""" + if self.kbatch_job_template_file: + logger.info("loading job template from %s", self.kbatch_job_template_file) + with open(self.kbatch_job_template_file) as f: + job_template = yaml.safe_load(f) + + # parse with Kubernetes to normalize keys with job_data + job_template = utils.parse(job_template, model=V1Job).to_dict() + utils.remove_nulls(job_template) + else: + job_template = None + return job_template + + @cached_property + def profile_data(self) -> dict: + """Load kbatch_profile_file""" + if self.kbatch_profile_file: + # TODO: we need some kind of validation on the keys / values here. Catch typos... + logger.info("loading profiles from %s", self.kbatch_profile_file) + with open(self.kbatch_profile_file) as f: + profile_data = yaml.safe_load(f) + else: + profile_data = {} + return profile_data + + @cached_property + def namespace_manifests(self) -> list[dict]: + """Load kbatch_namespace_manifests_file""" + if not self.kbatch_namespace_manifests_file: + return [] + + # TODO: we need some kind of validation on the keys / values here. Catch typos... + logger.info( + "loading namespace manifests from %s", self.kbatch_namespace_manifests_file + ) + with open(self.kbatch_namespace_manifests_file) as f: + namespace_manifests = [ + manifest + for manifest in yaml.safe_load_all(f) + if manifest # exclude None, e.g. for trailing `---` + ] + for manifest in namespace_manifests: + logger.info( + "Loaded manifest %s/%s: %s", + manifest["apiVersion"], + manifest["kind"], + manifest["metadata"]["name"], + ) + # apply common labels + labels = manifest["metadata"].setdefault("labels", {}) + labels.update(self.kbatch_labels) + return namespace_manifests + class User(BaseModel): name: str @@ -80,6 +157,7 @@ class UserOut(BaseModel): settings = Settings() + if settings.kbatch_init_logging: import rich.logging @@ -91,27 +169,6 @@ class UserOut(BaseModel): ) logger.addHandler(handler) -if settings.kbatch_job_template_file: - logger.info("loading job template from %s", settings.kbatch_job_template_file) - with open(settings.kbatch_job_template_file) as f: - job_template = yaml.safe_load(f) - - # parse with Kubernetes to normalize keys with job_data - job_template = utils.parse(job_template, model=V1Job).to_dict() - utils.remove_nulls(job_template) - -else: - job_template = None - - -if settings.kbatch_profile_file: - # TODO: we need some kind of validation on the keys / values here. Catch typos... - logger.info("loading profiles from %s", settings.kbatch_profile_file) - with open(settings.kbatch_profile_file) as f: - profile_data = yaml.safe_load(f) -else: - profile_data = {} - app = FastAPI() router = APIRouter(prefix=settings.kbatch_prefix) @@ -121,13 +178,9 @@ class UserOut(BaseModel): # JupyterHub configuration # TODO: make auth pluggable -auth = jupyterhub.services.auth.HubAuth( - api_token=settings.jupyterhub_api_token, - cache_max_age=60, -) - async def get_current_user(request: Request) -> User: + auth = settings.auth if not auth.access_scopes: raise RuntimeError( "JupyterHub OAuth scopes for access to kbatch not defined. " @@ -299,7 +352,7 @@ async def pod_logs( @router.get("/profiles/") async def profiles(): - return profile_data + return settings.profile_data @router.get("/") @@ -323,9 +376,48 @@ async def app_root(): return {"message": "kbatch"} -# ------- +# ----- # utils +# record which namespaces have had the manifests applied +_manifests_applied: set[str] = set() + + +def apply_namespace_manifests( + api: kubernetes.client.ApiClient, namespace: str, mainfests: list[dict] +) -> None: + """ + Apply arbitrary manifests when creating namespaces + + For example, allows creating ResourceQuotas, NetworkPolicies, etc. + """ + namespace_manifests = settings.namespace_manifests + if namespace in _manifests_applied: + logger.debug("Already applied manifests to %s", namespace) + return + logger.info( + "Applying %i manifest%s to %s", + len(namespace_manifests), + "s" if len(namespace_manifests) != 1 else "", + namespace, + ) + for manifest in namespace_manifests: + logger.info( + "Applying %s/%s %s/%s", + manifest["apiVersion"], + manifest["kind"], + namespace, + manifest["metadata"]["name"], + ) + create_from_dict(api, manifest, namespace=namespace, apply=True) + + # can we record this in an annotation on the namespace, instead? + # A generation number perhaps? + # in a typical deployment, HOSTNAME=kbatch-proxy-{rs.hash}-{pod.hash} + # rs.hash should uniquely identify a deployment generation + # for now, use once per kbatch-proxy process lifetime + _manifests_applied.add(namespace) + def ensure_namespace(api: kubernetes.client.CoreV1Api, namespace: str): """ @@ -338,7 +430,10 @@ def ensure_namespace(api: kubernetes.client.CoreV1Api, namespace: str): try: api.create_namespace( body=kubernetes.client.V1Namespace( - metadata=kubernetes.client.V1ObjectMeta(name=namespace) + metadata=V1ObjectMeta( + name=namespace, + labels=settings.kbatch_labels, + ) ) ) except kubernetes.client.ApiException as e: @@ -375,6 +470,7 @@ def _create_job( job_data = data["job"] # does it handle cronjob job specs appropriately? + job_template = settings.job_template if job_template: job_data = utils.merge_json_objects(job_data, job_template) @@ -402,7 +498,7 @@ def _create_job( job_to_patch, config_map=config_map, annotations={}, - labels={}, + labels=settings.kbatch_labels, username=user.name, ttl_seconds_after_finished=settings.kbatch_job_ttl_seconds_after_finished, extra_env=settings.kbatch_job_extra_env, @@ -427,6 +523,11 @@ def _create_job( if created: logger.info("Created namespace %s", user.namespace) + if settings.namespace_manifests: + apply_namespace_manifests( + api.api_client, user.namespace, settings.namespace_manifests + ) + logger.info("Submitting Secret") env_secret = api.create_namespaced_secret(namespace=user.namespace, body=env_secret) patch.add_env_secret_name(job_to_patch, env_secret) diff --git a/kbatch-proxy/tests/test_app.py b/kbatch-proxy/tests/test_app.py index 378594f..7ef5419 100644 --- a/kbatch-proxy/tests/test_app.py +++ b/kbatch-proxy/tests/test_app.py @@ -30,7 +30,9 @@ def side_effect(token): else: return None - mocker.patch("kbatch_proxy.main.auth.user_for_token", side_effect=side_effect) + mocker.patch( + "kbatch_proxy.main.settings.auth.user_for_token", side_effect=side_effect + ) mocker.patch.dict(os.environ, {"JUPYTERHUB_SERVICE_NAME": "kbatch"}) diff --git a/kbatch/kbatch/_backend.py b/kbatch/kbatch/_backend.py index 0f64297..17b8e23 100644 --- a/kbatch/kbatch/_backend.py +++ b/kbatch/kbatch/_backend.py @@ -81,22 +81,16 @@ def _make_job_spec( name="job", env=env_vars, # volume_mounts=[file_volume_mount], - resources=V1ResourceRequirements(), # TODO: this is important. validate it! working_dir="/code", ) resources = profile.get("resources", {}) - limits = resources.get("limits", {}) - requests = resources.get("requests", {}) - - container.resources.requests = {} - container.resources.limits = {} - - if requests: - container.resources.requests.update(requests) - if limits: - container.resources.limits.update(limits) + if resources: + container.resources = V1ResourceRequirements() + for key in ("limits", "requests"): + if key in resources: + setattr(container.resources, key, dict(resources[key])) pod_metadata = V1ObjectMeta( name=f"{name}-pod", diff --git a/kbatch/kbatch/_core.py b/kbatch/kbatch/_core.py index d6faf7e..830519f 100644 --- a/kbatch/kbatch/_core.py +++ b/kbatch/kbatch/_core.py @@ -314,7 +314,7 @@ def job_status(job): succeeded = status["succeeded"] or 0 failed = status["failed"] or 0 ready = status["ready"] or 0 - active = status["active"] or 0 + active = status["active"] if failed: return "[red]failed[/red]" elif ready: @@ -325,8 +325,11 @@ def job_status(job): # succeeded last because in multi-pod cases # only report success when they _all_ succeed return "[green]done[/green]" + elif active is None: + # failure to create pods may leave active as None + return "pending" else: - raise ValueError() + raise ValueError(f"Unrecognized status: {status!r}") def pod_status(row):