From d66016626ee4f08a59540cd702de3d92575d1ea1 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Thu, 9 Apr 2026 20:10:24 -0700 Subject: [PATCH] Expose Nexus Endpoint on Nexus Info --- .../nexus/InternalNexusOperationContext.java | 12 +++++++++++- .../io/temporal/internal/nexus/NexusInfoImpl.java | 9 ++++++++- .../internal/nexus/NexusTaskHandlerImpl.java | 3 ++- .../nexus/TemporalInterceptorMiddleware.java | 4 +++- .../java/io/temporal/nexus/NexusOperationInfo.java | 6 ++++++ .../workflow/nexus/NexusOperationInfoTest.java | 5 +++-- .../temporal/internal/testservice/StateMachines.java | 2 ++ 7 files changed, 35 insertions(+), 6 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/InternalNexusOperationContext.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/InternalNexusOperationContext.java index cd3c30c84..d7306ea96 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/InternalNexusOperationContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/InternalNexusOperationContext.java @@ -10,15 +10,21 @@ public class InternalNexusOperationContext { private final String namespace; private final String taskQueue; + private final String endpoint; private final Scope metricScope; private final WorkflowClient client; NexusOperationOutboundCallsInterceptor outboundCalls; Link startWorkflowResponseLink; public InternalNexusOperationContext( - String namespace, String taskQueue, Scope metricScope, WorkflowClient client) { + String namespace, + String taskQueue, + String endpoint, + Scope metricScope, + WorkflowClient client) { this.namespace = namespace; this.taskQueue = taskQueue; + this.endpoint = endpoint; this.metricScope = metricScope; this.client = client; } @@ -39,6 +45,10 @@ public String getNamespace() { return namespace; } + public String getEndpoint() { + return endpoint; + } + public void setOutboundInterceptor(NexusOperationOutboundCallsInterceptor outboundCalls) { this.outboundCalls = outboundCalls; } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusInfoImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusInfoImpl.java index ceb48756d..279628b04 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusInfoImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusInfoImpl.java @@ -5,10 +5,12 @@ class NexusInfoImpl implements NexusOperationInfo { private final String namespace; private final String taskQueue; + private final String endpoint; - NexusInfoImpl(String namespace, String taskQueue) { + NexusInfoImpl(String namespace, String taskQueue, String endpoint) { this.namespace = namespace; this.taskQueue = taskQueue; + this.endpoint = endpoint; } @Override @@ -20,4 +22,9 @@ public String getNamespace() { public String getTaskQueue() { return taskQueue; } + + @Override + public String getEndpoint() { + return endpoint; + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java index c88a09d22..7f10ba8c6 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java @@ -116,7 +116,8 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException } CurrentNexusOperationContext.set( - new InternalNexusOperationContext(namespace, taskQueue, metricsScope, client)); + new InternalNexusOperationContext( + namespace, taskQueue, request.getEndpoint(), metricsScope, client)); switch (request.getVariantCase()) { case START_OPERATION: diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/TemporalInterceptorMiddleware.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/TemporalInterceptorMiddleware.java index 9bfb76613..be76b8a91 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/TemporalInterceptorMiddleware.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/TemporalInterceptorMiddleware.java @@ -29,7 +29,9 @@ public OperationHandler intercept( temporalNexusContext.getMetricsScope(), temporalNexusContext.getWorkflowClient(), new NexusInfoImpl( - temporalNexusContext.getNamespace(), temporalNexusContext.getTaskQueue()))); + temporalNexusContext.getNamespace(), + temporalNexusContext.getTaskQueue(), + temporalNexusContext.getEndpoint()))); return new OperationInterceptorConverter(inboundCallsInterceptor); } diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/NexusOperationInfo.java b/temporal-sdk/src/main/java/io/temporal/nexus/NexusOperationInfo.java index 66738c29d..a3ee0e9a7 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/NexusOperationInfo.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/NexusOperationInfo.java @@ -14,4 +14,10 @@ public interface NexusOperationInfo { * @return Nexus Task Queue of the worker that is executing the Nexus Operation */ String getTaskQueue(); + + /** + * @return Endpoint that the Nexus request was addressed to before being forwarded to this worker. + * Supported from server v1.30.0. + */ + String getEndpoint(); } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/NexusOperationInfoTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/NexusOperationInfoTest.java index fc32af79b..692290349 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/NexusOperationInfoTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/NexusOperationInfoTest.java @@ -25,8 +25,9 @@ public class NexusOperationInfoTest { public void testOperationHeaders() { TestWorkflows.TestWorkflow1 workflowStub = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class); + String expectedEndpoint = testWorkflowRule.getNexusEndpoint().getSpec().getName(); Assert.assertEquals( - "UnitTest:" + testWorkflowRule.getTaskQueue(), + "UnitTest:" + testWorkflowRule.getTaskQueue() + ":" + expectedEndpoint, workflowStub.execute(testWorkflowRule.getTaskQueue())); } @@ -47,7 +48,7 @@ public OperationHandler operation() { return OperationHandler.sync( (context, details, input) -> { NexusOperationInfo info = Nexus.getOperationContext().getInfo(); - return info.getNamespace() + ":" + info.getTaskQueue(); + return info.getNamespace() + ":" + info.getTaskQueue() + ":" + info.getEndpoint(); }); } } diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java index 0cc7c52ec..c62d6f84a 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java @@ -740,6 +740,7 @@ private static void scheduleNexusOperation( .setTaskToken(taskToken.toBytes()) .setRequest( io.temporal.api.nexus.v1.Request.newBuilder() + .setEndpoint(attr.getEndpoint()) .setScheduledTime(ctx.currentTime()) .setCapabilities( io.temporal.api.nexus.v1.Request.Capabilities.newBuilder() @@ -998,6 +999,7 @@ private static void requestCancelNexusOperation( .setTaskToken(taskToken.toBytes()) .setRequest( io.temporal.api.nexus.v1.Request.newBuilder() + .setEndpoint(data.scheduledEvent.getEndpoint()) .putAllHeader(data.scheduledEvent.getNexusHeaderMap()) .setCancelOperation( CancelOperationRequest.newBuilder()