Skip to content

Store worker attributes needed by server to propagate nexus tasks to worker#9231

Merged
rkannan82 merged 18 commits intomainfrom
kannan/activity-cancel/persist-worker-key
Apr 11, 2026
Merged

Store worker attributes needed by server to propagate nexus tasks to worker#9231
rkannan82 merged 18 commits intomainfrom
kannan/activity-cancel/persist-worker-key

Conversation

@rkannan82
Copy link
Copy Markdown
Contributor

@rkannan82 rkannan82 commented Feb 5, 2026

What changed?

As part of RecordActivityTaskStarted flow, store worker_control_task_queue for an activity in the mutable state (ActivityInfo).

Main changes:

  • executions.proto: Added the new worker_control_task_queue field.
  • mutable_state_impl.go: Update mutable state.
  • matching/forwarder.go: Propagate worker_control_task_queue when polls get forwarded. Otherwise, RecordActivityTaskStarted request will not have it set when invoked from a forwarded poll.

Why?

To support activity cancellation without activity heartbeat.

Overall flow:

How did you test it?

  • built
  • run locally and tested manually
  • covered by existing tests
  • added new unit test(s)
  • added new functional test(s)

@rkannan82 rkannan82 requested review from a team as code owners February 5, 2026 17:49
@rkannan82 rkannan82 changed the title Store worker_instance_key in ActivityInfo as part of RecordActivityTaskStarted call Store worker_instance_key for an activity in mutable state Feb 5, 2026
@rkannan82 rkannan82 requested a review from yycptt February 5, 2026 18:05
@rkannan82 rkannan82 changed the title Store worker_instance_key for an activity in mutable state Store worker attributes (in mutable state) needed by server to propagate nexus tasks to worker Feb 11, 2026
@rkannan82 rkannan82 changed the title Store worker attributes (in mutable state) needed by server to propagate nexus tasks to worker Store worker attributes needed by server to propagate nexus tasks to worker Feb 11, 2026
@rkannan82 rkannan82 requested a review from yycptt February 11, 2026 19:41
@rkannan82 rkannan82 force-pushed the kannan/activity-cancel/persist-worker-key branch 2 times, most recently from f1e1ee5 to e795849 Compare February 11, 2026 21:07
@rkannan82 rkannan82 changed the title Store worker attributes needed by server to propagate nexus tasks to worker [DO_NOT_MERGE] Store worker attributes needed by server to propagate nexus tasks to worker Feb 11, 2026
@rkannan82 rkannan82 force-pushed the kannan/activity-cancel/persist-worker-key branch from 43ace13 to 2ecbdfd Compare February 11, 2026 21:22
Copy link
Copy Markdown
Member

@yycptt yycptt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall. Please land after removing the unnecessary field.

@rkannan82 rkannan82 force-pushed the kannan/activity-cancel/persist-worker-key branch 2 times, most recently from 04db6f1 to b72cf9f Compare February 16, 2026 02:48
Add workerControlTaskQueue parameter to AddActivityTaskStartedEvent
and persist it in ActivityInfo when an activity starts. This enables
routing activity cancellation requests to the correct worker's control
queue via Nexus.

Changes:
- Add worker_control_task_queue field to ActivityInfo proto
- Update MutableState interface and implementation
- Pass workerControlTaskQueue from poll request for regular activities
- Pass from RespondWorkflowTaskCompleted request for eager activities
- Update all test call sites
@rkannan82 rkannan82 force-pushed the kannan/activity-cancel/persist-worker-key branch from b72cf9f to e319a6c Compare February 18, 2026 03:34
@rkannan82 rkannan82 force-pushed the kannan/activity-cancel/persist-worker-key branch from e319a6c to 5390759 Compare February 18, 2026 03:44
rkannan82 added a commit to temporalio/api that referenced this pull request Apr 6, 2026
…711)

<!-- Describe what has changed in this PR -->
**What changed?**

Added worker_control_task_queue to poll requests:
- PollActivityTaskQueueRequest
- PollWorkflowTaskQueueRequest
Note: worker_instance_key was already added to these requests in #686.

Added worker_instance_key and worker_control_task_queue to: 
- RespondWorkflowTaskCompletedRequest: This API is used to eagerly fetch
activity.

<!-- Tell your future self why have you made these changes -->
**Why?**
To enable server to send control tasks to worker. Each worker provides a
worker_control_task_queue (a dedicated per-worker Nexus task queue) so
the server can send control tasks directly to it.

Example flow:
- User cancels a workflow.
- Server sends activity cancellation tasks to all workers that could be
processing activities belonging to that workflow.
- Worker will receive the cancellation message even when activity
heartbeat is not enabled.

<!-- Are there any breaking changes on binary or code level? -->
Breaking changes: None
[Server PR](temporalio/temporal#9231)

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
temporal-cicd bot pushed a commit to temporalio/api-go that referenced this pull request Apr 6, 2026
…(#711)

<!-- Describe what has changed in this PR -->
**What changed?**

Added worker_control_task_queue to poll requests:
- PollActivityTaskQueueRequest
- PollWorkflowTaskQueueRequest
Note: worker_instance_key was already added to these requests in #686.

Added worker_instance_key and worker_control_task_queue to:
- RespondWorkflowTaskCompletedRequest: This API is used to eagerly fetch
activity.

<!-- Tell your future self why have you made these changes -->
**Why?**
To enable server to send control tasks to worker. Each worker provides a
worker_control_task_queue (a dedicated per-worker Nexus task queue) so
the server can send control tasks directly to it.

Example flow:
- User cancels a workflow.
- Server sends activity cancellation tasks to all workers that could be
processing activities belonging to that workflow.
- Worker will receive the cancellation message even when activity
heartbeat is not enabled.

<!-- Are there any breaking changes on binary or code level? -->
Breaking changes: None
[Server PR](temporalio/temporal#9231)

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Resolves merge conflicts in executions.proto, mutable_state_impl_test.go,
and go.sum. Updates api-go replace directive to 5423d0dd678a which includes
the worker control task queue attributes from temporalio/api#711.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@rkannan82 rkannan82 requested review from a team as code owners April 6, 2026 23:25
Match the comment style from temporalio/api#711.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@rkannan82 rkannan82 changed the title [DO_NOT_MERGE] Store worker attributes needed by server to propagate nexus tasks to worker Store worker attributes needed by server to propagate nexus tasks to worker Apr 7, 2026
spkane31 pushed a commit to temporalio/api that referenced this pull request Apr 9, 2026
…711)

<!-- Describe what has changed in this PR -->
**What changed?**

Added worker_control_task_queue to poll requests:
- PollActivityTaskQueueRequest
- PollWorkflowTaskQueueRequest
Note: worker_instance_key was already added to these requests in #686.

Added worker_instance_key and worker_control_task_queue to: 
- RespondWorkflowTaskCompletedRequest: This API is used to eagerly fetch
activity.

<!-- Tell your future self why have you made these changes -->
**Why?**
To enable server to send control tasks to worker. Each worker provides a
worker_control_task_queue (a dedicated per-worker Nexus task queue) so
the server can send control tasks directly to it.

Example flow:
- User cancels a workflow.
- Server sends activity cancellation tasks to all workers that could be
processing activities belonging to that workflow.
- Worker will receive the cancellation message even when activity
heartbeat is not enabled.

<!-- Are there any breaking changes on binary or code level? -->
Breaking changes: None
[Server PR](temporalio/temporal#9231)

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
rkannan82 and others added 2 commits April 9, 2026 15:00
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@rkannan82 rkannan82 enabled auto-merge (squash) April 9, 2026 22:09
@rkannan82 rkannan82 disabled auto-merge April 9, 2026 22:10
Main already has the direct dependency at the same version.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@rkannan82 rkannan82 enabled auto-merge (squash) April 9, 2026 22:21
@rkannan82 rkannan82 requested a review from ShahabT April 9, 2026 22:54
@rkannan82 rkannan82 merged commit 4997c93 into main Apr 11, 2026
46 checks passed
@rkannan82 rkannan82 deleted the kannan/activity-cancel/persist-worker-key branch April 11, 2026 00:51
rkannan82 added a commit that referenced this pull request Apr 11, 2026
…Nexus (#9232)

## What changed?

New outbound task type (`WorkerCommandsTask`) that carries worker
commands to be dispatched to workers via Nexus. Uses the generic
`WorkerCommand` proto (not cancel-activity-specific), so this task type
can carry any future command types.

Suggested review order: proto changes → `worker_commands_task.go` →
`task_generator.go` → `workflow_task_completed_handler.go`

Key pieces:
- **Proto**: `TASK_TYPE_WORKER_COMMANDS` enum, `WorkerCommandsTask` in
`OutboundTaskInfo` with `repeated WorkerCommand`.
- **Task definition**: `worker_commands_task.go` — implements outbound
`Task` and `HasDestination` interfaces.
- **Task creation** (`workflow_task_completed_handler.go`,
`task_generator.go`): When `RequestCancelActivityTask` is processed for
a started activity whose worker has a control queue, collects a
`CancelActivityCommand` with the activity's task token. Commands are
batched by destination control queue and flushed as one
`WorkerCommandsTask` per queue at the end of WFT processing.
- **Serialization**: `task_serializers.go` for persistence
round-tripping.

Dispatch is a no-op here — handled in #9233. Gated by dynamic config
`EnableCancelActivityWorkerCommand` (default: off).

## Why?

To support proactive activity cancellation without waiting for
heartbeat. This is the task creation leg of the flow.

1. [#9231] Store `worker_control_task_queue` in `ActivityInfo` at
activity start.
2. **[This PR]** On `RequestCancelActivityTask`, batch commands by
control queue into `WorkerCommandsTask` outbound tasks.
3. [#9233] Dispatch each task as a Nexus `ExecuteCommands` operation to
the worker, with a 3-attempt retry cap.
4. [SDK] Worker receives the cancel command and cancels the running
activity.

Gated by dynamic config `EnableCancelActivityWorkerCommand` (default:
off).


## How did you test it?

**Unit tests** cover task generation, command batching (including
multi-queue batching), task serialization round-tripping, and the
feature-flag-off path.

---------

Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants