Skip to content

Commit 8672fb5

Browse files
committed
Add Application to GetTaskRequest and FetchNextTask Initializations
1 parent 3cb96af commit 8672fb5

5 files changed

Lines changed: 17 additions & 7 deletions

File tree

clients/python/src/taskbroker_client/worker/client.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,9 @@ def _get_cur_stub(self) -> tuple[str, ConsumerServiceStub]:
250250
self._num_tasks_before_rebalance -= 1
251251
return self._cur_host, self._host_to_stubs[self._cur_host]
252252

253-
def get_task(self, namespace: str | None = None) -> InflightTaskActivation | None:
253+
def get_task(
254+
self, namespace: str | None = None, application: str | None = None
255+
) -> InflightTaskActivation | None:
254256
"""
255257
Fetch a pending task.
256258
@@ -259,7 +261,7 @@ def get_task(self, namespace: str | None = None) -> InflightTaskActivation | Non
259261
"""
260262
self._emit_health_check()
261263

262-
request = GetTaskRequest(namespace=namespace)
264+
request = GetTaskRequest(namespace=namespace, application=application)
263265
try:
264266
host, stub = self._get_cur_stub()
265267
with self._metrics.timer("taskworker.get_task.rpc", tags={"host": host}):

clients/python/src/taskbroker_client/worker/worker.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ def __init__(
5151
app_module: str,
5252
broker_hosts: list[str],
5353
max_child_task_count: int | None = None,
54+
application: str | None = None,
5455
namespace: str | None = None,
5556
concurrency: int = 1,
5657
child_tasks_queue_maxsize: int = DEFAULT_WORKER_QUEUE_SIZE,
@@ -65,6 +66,7 @@ def __init__(
6566
self.options = kwargs
6667
self._app_module = app_module
6768
self._max_child_task_count = max_child_task_count
69+
self._application = application
6870
self._namespace = namespace
6971
self._concurrency = concurrency
7072
app = import_app(app_module)
@@ -269,7 +271,7 @@ def _send_result(self, result: ProcessingResult, fetch: bool = True) -> bool:
269271
if fetch:
270272
fetch_next = None
271273
if not self._child_tasks.full():
272-
fetch_next = FetchNextTask(namespace=self._namespace)
274+
fetch_next = FetchNextTask(namespace=self._namespace, application=self._application)
273275

274276
next = self._send_update_task(result, fetch_next)
275277
if next:
@@ -377,7 +379,7 @@ def fetch_task(self) -> InflightTaskActivation | None:
377379
# Use the shutdown_event as a sleep mechanism
378380
self._shutdown_event.wait(self._gettask_backoff_seconds)
379381
try:
380-
activation = self.client.get_task(self._namespace)
382+
activation = self.client.get_task(self._namespace, self._application)
381383
except grpc.RpcError as e:
382384
logger.info(
383385
"taskworker.fetch_task.failed",

src/grpc/server.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ impl ConsumerService for TaskbrokerServer {
6565
&self,
6666
request: Request<SetTaskStatusRequest>,
6767
) -> Result<Response<SetTaskStatusResponse>, Status> {
68+
println!("Set task status...");
69+
6870
let start_time = Instant::now();
6971
let id = request.get_ref().id.clone();
7072

src/kafka/deserialize_activation.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ pub fn new(
3434
let namespace = activation.namespace.clone();
3535
let taskname = activation.taskname.clone();
3636

37+
println!("Task activation ingested - application: {}", application);
38+
3739
metrics::histogram!(
3840
"consumer.message.payload_size_bytes",
3941
"namespace" => namespace.clone(),

uv.lock

Lines changed: 5 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)