Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-example-image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
uses: getsentry/action-build-and-push-images@a53f146fc1ea3cb404f2dcf7378f5b60dd98d3ca
with:
image_name: 'taskbroker-python-example'
platforms: linux/${{ matrix.platform }}
platforms: linux/amd64
dockerfile_path: './clients/python/Dockerfile.example'
dockerfile_context: './clients/python'
ghcr: true
Expand Down
2 changes: 1 addition & 1 deletion clients/python/src/examples/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@


# Create a namespace and register tasks
exampletasks = app.taskregistry.create_namespace("examples")
exampletasks = app.create_namespace("examples")


@exampletasks.register(name="examples.simple_task")
Expand Down
39 changes: 38 additions & 1 deletion clients/python/src/taskbroker_client/app.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import datetime
import importlib
from collections.abc import Iterable
from typing import Any

from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation

from taskbroker_client.constants import DEFAULT_PROCESSING_DEADLINE
from taskbroker_client.imports import import_string
from taskbroker_client.metrics import MetricsBackend
from taskbroker_client.registry import TaskRegistry
from taskbroker_client.registry import TaskNamespace, TaskRegistry
from taskbroker_client.retry import Retry
from taskbroker_client.router import TaskRouter
from taskbroker_client.task import Task
from taskbroker_client.types import AtMostOnceStore, ProducerFactory


Expand Down Expand Up @@ -72,6 +76,39 @@ def set_config(self, config: dict[str, Any]) -> None:
if key in self._config:
self._config[key] = value

def create_namespace(
self,
name: str,
*,
retry: Retry | None = None,
expires: int | datetime.timedelta | None = None,
processing_deadline_duration: int = DEFAULT_PROCESSING_DEADLINE,
app_feature: str | None = None,
) -> TaskNamespace:
"""
Create a task namespace.

Namespaces are mapped onto topics through the configured router allowing
infrastructure to be scaled based on a region's requirements.

Namespaces can define default behavior for tasks defined within a namespace.
"""
return self._taskregistry.create_namespace(
name=name,
retry=retry,
expires=expires,
processing_deadline_duration=processing_deadline_duration,
app_feature=app_feature,
)

def get_task(self, namespace: str, task: str) -> Task[Any, Any]:
"""Fetch a task by namespace and name."""
return self._taskregistry.get(namespace).get(task)

def get_namespace(self, namespace: str) -> TaskNamespace:
"""Fetch a task by namespace and name."""
return self._taskregistry.get(namespace)

def set_modules(self, modules: Iterable[str]) -> None:
"""
Set the list of modules containing tasks to be loaded by workers and schedulers.
Expand Down
2 changes: 1 addition & 1 deletion clients/python/src/taskbroker_client/scheduler/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def add(self, key: str, task_config: ScheduleConfig) -> None:
except ValueError:
raise ValueError("Invalid task name. Must be in the format namespace:taskname")

task = self._app.taskregistry.get_task(namespace, taskname)
task = self._app.get_task(namespace, taskname)
entry = ScheduleEntry(key=key, task=task, schedule=task_config["schedule"])
self._entries.append(entry)
self._heap = []
Expand Down
8 changes: 4 additions & 4 deletions clients/python/src/taskbroker_client/worker/workerchild.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,18 @@ def child_process(
"""
app = import_app(app_module)
app.load_modules()
taskregistry = app.taskregistry
metrics = app.metrics

def _get_known_task(activation: TaskActivation) -> Task[Any, Any] | None:
if not taskregistry.contains(activation.namespace):
try:
namespace = app.get_namespace(activation.namespace)
except KeyError:
logger.error(
"taskworker.invalid_namespace",
extra={"namespace": activation.namespace, "taskname": activation.taskname},
)
return None

namespace = taskregistry.get(activation.namespace)
if not namespace.contains(activation.taskname):
logger.error(
"taskworker.invalid_taskname",
Expand Down Expand Up @@ -421,7 +421,7 @@ def record_task_execution(
},
)

namespace = taskregistry.get(activation.namespace)
namespace = app.get_namespace(activation.namespace)
metrics.incr(
"taskworker.cogs.usage",
value=int(execution_duration * 1000),
Expand Down
50 changes: 50 additions & 0 deletions clients/python/tests/test_app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import pytest
from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation

from taskbroker_client.app import TaskbrokerApp
from taskbroker_client.retry import Retry
from taskbroker_client.router import TaskRouter
from taskbroker_client.task import Task

from .conftest import StubAtMostOnce, producer_factory

Expand Down Expand Up @@ -47,3 +50,50 @@ def test_should_attempt_at_most_once() -> None:
app.at_most_once_store(at_most)
assert app.should_attempt_at_most_once(activation)
assert not app.should_attempt_at_most_once(activation)


def test_create_namespace() -> None:
app = TaskbrokerApp(name="acme", producer_factory=producer_factory, router_class=StubRouter())
ns = app.create_namespace("test")
assert ns.name == "test"
assert ns.topic == "honk"

retry = Retry(times=3)
ns = app.create_namespace(
"test-two",
retry=retry,
expires=60 * 10,
processing_deadline_duration=60,
app_feature="anvils",
)
assert ns.default_retry == retry
assert ns.default_processing_deadline_duration == 60
assert ns.default_expires == 60 * 10
assert ns.name == "test-two"
assert ns.application == "acme"
assert ns.topic == "honk"
assert ns.app_feature == "anvils"

fetched = app.get_namespace("test-two")
assert fetched == ns

with pytest.raises(KeyError):
app.get_namespace("invalid")


def test_get_task() -> None:
app = TaskbrokerApp(name="acme", producer_factory=producer_factory, router_class=StubRouter())
ns = app.create_namespace(name="tests")

@ns.register(name="test.simpletask")
def simple_task() -> None:
raise NotImplementedError

task = app.get_task(ns.name, "test.simpletask")
assert isinstance(task, Task)

with pytest.raises(KeyError):
app.get_task("nope", "test.simpletask")

with pytest.raises(KeyError):
app.get_task(ns.name, "nope")
Loading