diff --git a/.github/workflows/python-example-image.yml b/.github/workflows/python-example-image.yml index 63ddc96a..c280b5a1 100644 --- a/.github/workflows/python-example-image.yml +++ b/.github/workflows/python-example-image.yml @@ -22,7 +22,7 @@ jobs: uses: getsentry/action-build-and-push-images@a53f146fc1ea3cb404f2dcf7378f5b60dd98d3ca with: image_name: 'taskbroker-python-example' - platforms: linux/${{ matrix.platform }} + platforms: linux/amd64 dockerfile_path: './clients/python/Dockerfile.example' dockerfile_context: './clients/python' ghcr: true diff --git a/clients/python/src/examples/tasks.py b/clients/python/src/examples/tasks.py index 8e87c9e3..86eb312a 100644 --- a/clients/python/src/examples/tasks.py +++ b/clients/python/src/examples/tasks.py @@ -18,7 +18,7 @@ # Create a namespace and register tasks -exampletasks = app.taskregistry.create_namespace("examples") +exampletasks = app.create_namespace("examples") @exampletasks.register(name="examples.simple_task") diff --git a/clients/python/src/taskbroker_client/app.py b/clients/python/src/taskbroker_client/app.py index 1ff06dc7..63c6a15b 100644 --- a/clients/python/src/taskbroker_client/app.py +++ b/clients/python/src/taskbroker_client/app.py @@ -1,13 +1,17 @@ +import datetime import importlib from collections.abc import Iterable from typing import Any from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation +from taskbroker_client.constants import DEFAULT_PROCESSING_DEADLINE from taskbroker_client.imports import import_string from taskbroker_client.metrics import MetricsBackend -from taskbroker_client.registry import TaskRegistry +from taskbroker_client.registry import TaskNamespace, TaskRegistry +from taskbroker_client.retry import Retry from taskbroker_client.router import TaskRouter +from taskbroker_client.task import Task from taskbroker_client.types import AtMostOnceStore, ProducerFactory @@ -72,6 +76,39 @@ def set_config(self, config: dict[str, Any]) -> None: if key in self._config: self._config[key] = value + def create_namespace( + self, + name: str, + *, + retry: Retry | None = None, + expires: int | datetime.timedelta | None = None, + processing_deadline_duration: int = DEFAULT_PROCESSING_DEADLINE, + app_feature: str | None = None, + ) -> TaskNamespace: + """ + Create a task namespace. + + Namespaces are mapped onto topics through the configured router allowing + infrastructure to be scaled based on a region's requirements. + + Namespaces can define default behavior for tasks defined within a namespace. + """ + return self._taskregistry.create_namespace( + name=name, + retry=retry, + expires=expires, + processing_deadline_duration=processing_deadline_duration, + app_feature=app_feature, + ) + + def get_task(self, namespace: str, task: str) -> Task[Any, Any]: + """Fetch a task by namespace and name.""" + return self._taskregistry.get(namespace).get(task) + + def get_namespace(self, namespace: str) -> TaskNamespace: + """Fetch a task by namespace and name.""" + return self._taskregistry.get(namespace) + def set_modules(self, modules: Iterable[str]) -> None: """ Set the list of modules containing tasks to be loaded by workers and schedulers. diff --git a/clients/python/src/taskbroker_client/scheduler/runner.py b/clients/python/src/taskbroker_client/scheduler/runner.py index eee4abd2..707e087e 100644 --- a/clients/python/src/taskbroker_client/scheduler/runner.py +++ b/clients/python/src/taskbroker_client/scheduler/runner.py @@ -195,7 +195,7 @@ def add(self, key: str, task_config: ScheduleConfig) -> None: except ValueError: raise ValueError("Invalid task name. Must be in the format namespace:taskname") - task = self._app.taskregistry.get_task(namespace, taskname) + task = self._app.get_task(namespace, taskname) entry = ScheduleEntry(key=key, task=task, schedule=task_config["schedule"]) self._entries.append(entry) self._heap = [] diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index 08f1d40e..8230b088 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -101,18 +101,18 @@ def child_process( """ app = import_app(app_module) app.load_modules() - taskregistry = app.taskregistry metrics = app.metrics def _get_known_task(activation: TaskActivation) -> Task[Any, Any] | None: - if not taskregistry.contains(activation.namespace): + try: + namespace = app.get_namespace(activation.namespace) + except KeyError: logger.error( "taskworker.invalid_namespace", extra={"namespace": activation.namespace, "taskname": activation.taskname}, ) return None - namespace = taskregistry.get(activation.namespace) if not namespace.contains(activation.taskname): logger.error( "taskworker.invalid_taskname", @@ -421,7 +421,7 @@ def record_task_execution( }, ) - namespace = taskregistry.get(activation.namespace) + namespace = app.get_namespace(activation.namespace) metrics.incr( "taskworker.cogs.usage", value=int(execution_duration * 1000), diff --git a/clients/python/tests/test_app.py b/clients/python/tests/test_app.py index 8bc9af60..17493a8a 100644 --- a/clients/python/tests/test_app.py +++ b/clients/python/tests/test_app.py @@ -1,7 +1,10 @@ +import pytest from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation from taskbroker_client.app import TaskbrokerApp +from taskbroker_client.retry import Retry from taskbroker_client.router import TaskRouter +from taskbroker_client.task import Task from .conftest import StubAtMostOnce, producer_factory @@ -47,3 +50,50 @@ def test_should_attempt_at_most_once() -> None: app.at_most_once_store(at_most) assert app.should_attempt_at_most_once(activation) assert not app.should_attempt_at_most_once(activation) + + +def test_create_namespace() -> None: + app = TaskbrokerApp(name="acme", producer_factory=producer_factory, router_class=StubRouter()) + ns = app.create_namespace("test") + assert ns.name == "test" + assert ns.topic == "honk" + + retry = Retry(times=3) + ns = app.create_namespace( + "test-two", + retry=retry, + expires=60 * 10, + processing_deadline_duration=60, + app_feature="anvils", + ) + assert ns.default_retry == retry + assert ns.default_processing_deadline_duration == 60 + assert ns.default_expires == 60 * 10 + assert ns.name == "test-two" + assert ns.application == "acme" + assert ns.topic == "honk" + assert ns.app_feature == "anvils" + + fetched = app.get_namespace("test-two") + assert fetched == ns + + with pytest.raises(KeyError): + app.get_namespace("invalid") + + +def test_get_task() -> None: + app = TaskbrokerApp(name="acme", producer_factory=producer_factory, router_class=StubRouter()) + ns = app.create_namespace(name="tests") + + @ns.register(name="test.simpletask") + def simple_task() -> None: + raise NotImplementedError + + task = app.get_task(ns.name, "test.simpletask") + assert isinstance(task, Task) + + with pytest.raises(KeyError): + app.get_task("nope", "test.simpletask") + + with pytest.raises(KeyError): + app.get_task(ns.name, "nope")