diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/README.md b/core/src/main/java/io/temporal/samples/nexus_messaging/README.md new file mode 100644 index 00000000..0ff69765 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/README.md @@ -0,0 +1,16 @@ +This sample shows how to expose a long-running workflow's queries, updates, and signals as Nexus +operations. There are two self-contained examples, each in its own directory: + +| | `callerpattern/` | `ondemandpattern/` | +|---|---|--------------------------------------------------------------| +| **Pattern** | Signal an existing workflow | Create and run workflows on demand, and send signals to them | +| **Who creates the workflow?** | The handler worker starts it on boot | The caller starts it via a Nexus operation | +| **Who knows the workflow ID?** | Only the handler | The caller chooses and passes it in every operation | +| **Nexus service** | `NexusGreetingService` | `NexusRemoteGreetingService` | + +Each directory is fully self-contained for clarity. The +`GreetingWorkflow`, `GreetingWorkflowImpl`, `GreetingActivity` and `GreetingActivityImpl` classes are **identical** between the two — only the +Nexus service interface and its implementation differ. This highlights that the same workflow can be +exposed through Nexus in different ways depending on whether the caller needs lifecycle control. + +See each directory's README for running instructions. diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/README.md b/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/README.md new file mode 100644 index 00000000..a19fc5b0 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/README.md @@ -0,0 +1,60 @@ +## Entity pattern + +The handler worker starts a `GreetingWorkflow` for a user ID. +`NexusGreetingServiceImpl` holds that ID and routes every Nexus operation to it. +The caller's input does not have that workflow ID as the caller doesn't know it - but the caller sends in the User ID, +and `NexusGreetingServiceImpl` knows how to get the desired workflow ID from that User ID (see the getWorkflowId call). + +HandlerWorker is using the same getWorkflowId call to generate a workflow ID from a user ID when it launches the workflow. + +The caller workflow: +1. Queries for supported languages (`getLanguages` — backed by a `@QueryMethod`) +2. Changes the language to Arabic (`setLanguage` — backed by an `@UpdateMethod` that calls an activity) +3. Confirms the change via a second query (`getLanguage`) +4. Approves the workflow (`approve` — backed by a `@SignalMethod`) + +### Running + +Start a Temporal server: + +```bash +temporal server start-dev +``` + +Create the namespaces and Nexus endpoint: + +```bash +temporal operator namespace create --namespace nexus-messaging-handler-namespace +temporal operator namespace create --namespace nexus-messaging-caller-namespace + +temporal operator nexus endpoint create \ + --name nexus-messaging-nexus-endpoint \ + --target-namespace nexus-messaging-handler-namespace \ + --target-task-queue nexus-messaging-handler-task-queue +``` + +In one terminal, start the handler worker: + +```bash +./gradlew -q :core:execute -PmainClass=io.temporal.samples.nexus_messaging.callerpattern.handler.HandlerWorker +``` + +In a second terminal, start the caller worker: + +```bash +./gradlew -q :core:execute -PmainClass=io.temporal.samples.nexus_messaging.callerpattern.caller.CallerWorker +``` + +In a third terminal, start the caller workflow: + +```bash +./gradlew -q :core:execute -PmainClass=io.temporal.samples.nexus_messaging.callerpattern.caller.CallerStarter +``` + +Expected output: + +``` +Supported languages: [CHINESE, ENGLISH] +Language changed: ENGLISH -> ARABIC +Workflow approved +``` diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/caller/CallerStarter.java b/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/caller/CallerStarter.java new file mode 100644 index 00000000..e947eb6c --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/caller/CallerStarter.java @@ -0,0 +1,32 @@ +package io.temporal.samples.nexus_messaging.callerpattern.caller; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.client.WorkflowOptions; +import io.temporal.serviceclient.WorkflowServiceStubs; +import java.util.List; +import java.util.UUID; + +public class CallerStarter { + + public static void main(String[] args) { + WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + WorkflowClient client = + WorkflowClient.newInstance( + service, + WorkflowClientOptions.newBuilder().setNamespace(CallerWorker.NAMESPACE).build()); + + CallerWorkflow workflow = + client.newWorkflowStub( + CallerWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId("nexus-messaging-caller-" + UUID.randomUUID()) + .setTaskQueue(CallerWorker.TASK_QUEUE) + .build()); + + // Launch the worker, passing in an identifier which the Nexus service will use + // to find the matching workflow (See NexusGreetingServiceImpl::getWorkflowId) + List log = workflow.run("user-1"); + log.forEach(System.out::println); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/caller/CallerWorker.java b/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/caller/CallerWorker.java new file mode 100644 index 00000000..50cda0f4 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/caller/CallerWorker.java @@ -0,0 +1,43 @@ +package io.temporal.samples.nexus_messaging.callerpattern.caller; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; +import io.temporal.worker.WorkflowImplementationOptions; +import io.temporal.workflow.NexusServiceOptions; +import java.util.Collections; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CallerWorker { + private static final Logger logger = LoggerFactory.getLogger(CallerWorker.class); + + public static final String NAMESPACE = "nexus-messaging-caller-namespace"; + public static final String TASK_QUEUE = "nexus-messaging-caller-task-queue"; + static final String NEXUS_ENDPOINT = "nexus-messaging-nexus-endpoint"; + + public static void main(String[] args) throws InterruptedException { + WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + WorkflowClient client = + WorkflowClient.newInstance( + service, WorkflowClientOptions.newBuilder().setNamespace(NAMESPACE).build()); + + WorkerFactory factory = WorkerFactory.newInstance(client); + Worker worker = factory.newWorker(TASK_QUEUE); + worker.registerWorkflowImplementationTypes( + WorkflowImplementationOptions.newBuilder() + .setNexusServiceOptions( + // The key must match the @Service-annotated interface name. + Collections.singletonMap( + "NexusGreetingService", + NexusServiceOptions.newBuilder().setEndpoint(NEXUS_ENDPOINT).build())) + .build(), + CallerWorkflowImpl.class); + + factory.start(); + logger.info("Caller worker started, ctrl+c to exit"); + Thread.currentThread().join(); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/caller/CallerWorkflow.java b/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/caller/CallerWorkflow.java new file mode 100644 index 00000000..1f6fa1f8 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/caller/CallerWorkflow.java @@ -0,0 +1,11 @@ +package io.temporal.samples.nexus_messaging.callerpattern.caller; + +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import java.util.List; + +@WorkflowInterface +public interface CallerWorkflow { + @WorkflowMethod + List run(String userId); +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/caller/CallerWorkflowImpl.java b/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/caller/CallerWorkflowImpl.java new file mode 100644 index 00000000..31958a26 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/caller/CallerWorkflowImpl.java @@ -0,0 +1,73 @@ +package io.temporal.samples.nexus_messaging.callerpattern.caller; + +import io.temporal.failure.ApplicationFailure; +import io.temporal.samples.nexus_messaging.callerpattern.service.Language; +import io.temporal.samples.nexus_messaging.callerpattern.service.NexusGreetingService; +import io.temporal.workflow.NexusOperationOptions; +import io.temporal.workflow.NexusServiceOptions; +import io.temporal.workflow.Workflow; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import org.slf4j.Logger; + +public class CallerWorkflowImpl implements CallerWorkflow { + + private static final Logger logger = Workflow.getLogger(CallerWorkflowImpl.class); + + // The endpoint is configured at the worker level in CallerWorker; only operation options are + // set here. + NexusGreetingService greetingService = + Workflow.newNexusServiceStub( + NexusGreetingService.class, + NexusServiceOptions.newBuilder() + .setOperationOptions( + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build()) + .build()); + + @Override + public List run(String userId) { + + // Messages in the log array are passed back to the caller who will then log them to report what + // is happening. + // The same message is also logged for demo purposes, so that things are visible in the caller + // workflow output. + List log = new ArrayList<>(); + + // Call a Nexus operation backed by a query against the entity workflow. + // The workflow must already be running on the handler, otherwise you will + // get an error saying the workflow has already terminated. + NexusGreetingService.GetLanguagesOutput languagesOutput = + greetingService.getLanguages(new NexusGreetingService.GetLanguagesInput(false, userId)); + log.add("Supported languages: " + languagesOutput.getLanguages()); + logger.info("Supported languages: {}", languagesOutput.getLanguages()); + + // Following are examples for each of the three messaging types - + // update, query, then signal. + + // Call a Nexus operation backed by an update against the entity workflow. + Language previousLanguage = + greetingService.setLanguage( + new NexusGreetingService.SetLanguageInput(Language.ARABIC, userId)); + + // Call a Nexus operation backed by a query to confirm the language change. + Language currentLanguage = + greetingService.getLanguage(new NexusGreetingService.GetLanguageInput(userId)); + if (currentLanguage != Language.ARABIC) { + throw ApplicationFailure.newFailure( + "Expected language ARABIC, got " + currentLanguage, "AssertionError"); + } + + log.add("Language changed: " + previousLanguage.name() + " -> " + Language.ARABIC.name()); + logger.info("Language changed from {} to {}", previousLanguage, Language.ARABIC); + + // Call a Nexus operation backed by a signal against the entity workflow. + greetingService.approve(new NexusGreetingService.ApproveInput("caller", userId)); + log.add("Workflow approved"); + logger.info("Workflow approved"); + + return log; + } +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/handler/GreetingActivity.java b/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/handler/GreetingActivity.java new file mode 100644 index 00000000..a7bfcd75 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/handler/GreetingActivity.java @@ -0,0 +1,12 @@ +package io.temporal.samples.nexus_messaging.callerpattern.handler; + +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import io.temporal.samples.nexus_messaging.callerpattern.service.Language; + +@ActivityInterface +public interface GreetingActivity { + // Simulates a call to a remote greeting service. Returns null if the language is not supported. + @ActivityMethod + String callGreetingService(Language language); +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/handler/GreetingActivityImpl.java b/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/handler/GreetingActivityImpl.java new file mode 100644 index 00000000..2a4b9492 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/handler/GreetingActivityImpl.java @@ -0,0 +1,25 @@ +package io.temporal.samples.nexus_messaging.callerpattern.handler; + +import io.temporal.samples.nexus_messaging.callerpattern.service.Language; +import java.util.EnumMap; +import java.util.Map; + +public class GreetingActivityImpl implements GreetingActivity { + + private static final Map GREETINGS = new EnumMap<>(Language.class); + + static { + GREETINGS.put(Language.ARABIC, "مرحبا بالعالم"); + GREETINGS.put(Language.CHINESE, "你好,世界"); + GREETINGS.put(Language.ENGLISH, "Hello, world"); + GREETINGS.put(Language.FRENCH, "Bonjour, monde"); + GREETINGS.put(Language.HINDI, "नमस्ते दुनिया"); + GREETINGS.put(Language.PORTUGUESE, "Olá mundo"); + GREETINGS.put(Language.SPANISH, "Hola mundo"); + } + + @Override + public String callGreetingService(Language language) { + return GREETINGS.get(language); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/handler/GreetingWorkflow.java b/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/handler/GreetingWorkflow.java new file mode 100644 index 00000000..6b12720f --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/handler/GreetingWorkflow.java @@ -0,0 +1,47 @@ +package io.temporal.samples.nexus_messaging.callerpattern.handler; + +import io.temporal.samples.nexus_messaging.callerpattern.service.Language; +import io.temporal.samples.nexus_messaging.callerpattern.service.NexusGreetingService; +import io.temporal.workflow.QueryMethod; +import io.temporal.workflow.SignalMethod; +import io.temporal.workflow.UpdateMethod; +import io.temporal.workflow.UpdateValidatorMethod; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +/** + * A long-running "entity" workflow that backs the NexusGreetingService Nexus operations. The + * workflow exposes queries, an update, and a signal. These are private implementation details of + * the Nexus service: the caller only interacts via Nexus operations. + */ +@WorkflowInterface +public interface GreetingWorkflow { + + @WorkflowMethod + String run(); + + // Returns the languages currently supported by the workflow. + @QueryMethod + NexusGreetingService.GetLanguagesOutput getLanguages( + NexusGreetingService.GetLanguagesInput input); + + // Returns the currently active language. + @QueryMethod + Language getLanguage(); + + // Approves the workflow, allowing it to complete. + @SignalMethod + void approve(NexusGreetingService.ApproveInput input); + + // Changes the active language synchronously (only supports languages already in the greetings + // map). + @UpdateMethod + Language setLanguage(NexusGreetingService.SetLanguageInput input); + + @UpdateValidatorMethod(updateName = "setLanguage") + void validateSetLanguage(NexusGreetingService.SetLanguageInput input); + + // Changes the active language, calling an activity to fetch a greeting for new languages. + @UpdateMethod + Language setLanguageUsingActivity(NexusGreetingService.SetLanguageInput input); +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/handler/GreetingWorkflowImpl.java b/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/handler/GreetingWorkflowImpl.java new file mode 100644 index 00000000..a6ab634e --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/handler/GreetingWorkflowImpl.java @@ -0,0 +1,103 @@ +package io.temporal.samples.nexus_messaging.callerpattern.handler; + +import io.temporal.activity.ActivityOptions; +import io.temporal.failure.ApplicationFailure; +import io.temporal.samples.nexus_messaging.callerpattern.service.Language; +import io.temporal.samples.nexus_messaging.callerpattern.service.NexusGreetingService; +import io.temporal.workflow.Workflow; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; +import org.slf4j.Logger; + +public class GreetingWorkflowImpl implements GreetingWorkflow { + + private boolean approvedForRelease = false; + private final Map greetings = new EnumMap<>(Language.class); + private Language language = Language.ENGLISH; + + private static final Logger logger = Workflow.getLogger(GreetingWorkflowImpl.class); + + private final GreetingActivity greetingActivity = + Workflow.newActivityStub( + GreetingActivity.class, + ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(10)).build()); + + public GreetingWorkflowImpl() { + greetings.put(Language.CHINESE, "你好,世界"); + greetings.put(Language.ENGLISH, "Hello, world"); + } + + @Override + public String run() { + // Wait until approved and all in-flight update handlers have finished. + Workflow.await(() -> approvedForRelease && Workflow.isEveryHandlerFinished()); + return greetings.get(language); + } + + @Override + public NexusGreetingService.GetLanguagesOutput getLanguages( + NexusGreetingService.GetLanguagesInput input) { + List result; + if (input.isIncludeUnsupported()) { + result = new ArrayList<>(Arrays.asList(Language.values())); + } else { + result = new ArrayList<>(greetings.keySet()); + } + Collections.sort(result); + return new NexusGreetingService.GetLanguagesOutput(result); + } + + @Override + public Language getLanguage() { + return language; + } + + @Override + public void approve(NexusGreetingService.ApproveInput input) { + logger.info( + "Approval signal received for workflow {}", + NexusGreetingServiceImpl.getWorkflowId(input.getUserId())); + approvedForRelease = true; + } + + @Override + public Language setLanguage(NexusGreetingService.SetLanguageInput input) { + logger.info( + "setLanguage update received for workflow {}", + NexusGreetingServiceImpl.getWorkflowId(input.getUserId())); + Language previous = language; + language = input.getLanguage(); + return previous; + } + + @Override + public void validateSetLanguage(NexusGreetingService.SetLanguageInput input) { + logger.info( + "validateSetLanguage called for workflow {}", + NexusGreetingServiceImpl.getWorkflowId(input.getUserId())); + if (!greetings.containsKey(input.getLanguage())) { + throw new IllegalArgumentException(input.getLanguage().name() + " is not supported"); + } + } + + @Override + public Language setLanguageUsingActivity(NexusGreetingService.SetLanguageInput input) { + if (!greetings.containsKey(input.getLanguage())) { + String greeting = greetingActivity.callGreetingService(input.getLanguage()); + if (greeting == null) { + throw ApplicationFailure.newFailure( + "Greeting service does not support " + input.getLanguage().name(), + "UnsupportedLanguage"); + } + greetings.put(input.getLanguage(), greeting); + } + Language previous = language; + language = input.getLanguage(); + return previous; + } +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/handler/HandlerWorker.java b/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/handler/HandlerWorker.java new file mode 100644 index 00000000..f4d4ce35 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/handler/HandlerWorker.java @@ -0,0 +1,58 @@ +package io.temporal.samples.nexus_messaging.callerpattern.handler; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.client.WorkflowExecutionAlreadyStarted; +import io.temporal.client.WorkflowOptions; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HandlerWorker { + private static final Logger logger = LoggerFactory.getLogger(HandlerWorker.class); + + public static final String NAMESPACE = "nexus-messaging-handler-namespace"; + public static final String TASK_QUEUE = "nexus-messaging-handler-task-queue"; + static final String USER_ID = "user-1"; + + public static void main(String[] args) throws InterruptedException { + WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + WorkflowClient client = + WorkflowClient.newInstance( + service, WorkflowClientOptions.newBuilder().setNamespace(NAMESPACE).build()); + + // Start the long-running entity workflow that backs the Nexus service, if not already running. + // Create a workflow ID derived from the given user ID. + // This would be for a process that would create a workflow for each UserID, + // if you had a single long running workflow for all users then you could + // remove all the USER_IDs from the inputs and just make everything refer + // to a single workflow ID. + String workflowId = NexusGreetingServiceImpl.getWorkflowId(USER_ID); + + GreetingWorkflow greetingWorkflow = + client.newWorkflowStub( + GreetingWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(workflowId) + .setTaskQueue(TASK_QUEUE) + .build()); + try { + WorkflowClient.start(greetingWorkflow::run); + logger.info("Started greeting workflow: {}", workflowId); + } catch (WorkflowExecutionAlreadyStarted e) { + logger.info("Greeting workflow already running: {}", workflowId); + } + + WorkerFactory factory = WorkerFactory.newInstance(client); + Worker worker = factory.newWorker(TASK_QUEUE); + worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class); + worker.registerActivitiesImplementations(new GreetingActivityImpl()); + worker.registerNexusServiceImplementation(new NexusGreetingServiceImpl()); + + factory.start(); + logger.info("Handler worker started, ctrl+c to exit"); + Thread.currentThread().join(); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/handler/NexusGreetingServiceImpl.java b/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/handler/NexusGreetingServiceImpl.java new file mode 100644 index 00000000..c4c8c31d --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/handler/NexusGreetingServiceImpl.java @@ -0,0 +1,79 @@ +package io.temporal.samples.nexus_messaging.callerpattern.handler; + +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.nexus.Nexus; +import io.temporal.samples.nexus_messaging.callerpattern.service.Language; +import io.temporal.samples.nexus_messaging.callerpattern.service.NexusGreetingService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Nexus operation handler implementation. Each operation receives a userId, which is mapped to a + * workflow ID using {@link #WORKFLOW_ID_PREFIX}. The operations are synchronous because queries and + * updates against a running workflow complete quickly. + */ +@ServiceImpl(service = NexusGreetingService.class) +public class NexusGreetingServiceImpl { + + private static final Logger logger = LoggerFactory.getLogger(NexusGreetingServiceImpl.class); + + static final String WORKFLOW_ID_PREFIX = "GreetingWorkflow_for_"; + + // This example assumes you might have multiple workflows, one for each user. + // If you had a single workflow for all users, then you could remove the + // getWorkflowId method, remove the user ID from each input, and just + // use the single worflow ID in the getWorkflowStub method below. + public static String getWorkflowId(String userId) { + return WORKFLOW_ID_PREFIX + userId; + } + + private GreetingWorkflow getWorkflowStub(String userId) { + return Nexus.getOperationContext() + .getWorkflowClient() + .newWorkflowStub(GreetingWorkflow.class, getWorkflowId(userId)); + } + + @OperationImpl + public OperationHandler< + NexusGreetingService.GetLanguagesInput, NexusGreetingService.GetLanguagesOutput> + getLanguages() { + return OperationHandler.sync( + (ctx, details, input) -> { + logger.info("Query for GetLanguages was received for user {}", input.getUserId()); + return getWorkflowStub(input.getUserId()).getLanguages(input); + }); + } + + @OperationImpl + public OperationHandler getLanguage() { + return OperationHandler.sync( + (ctx, details, input) -> { + logger.info("Query for GetLanguage was received for user {}", input.getUserId()); + return getWorkflowStub(input.getUserId()).getLanguage(); + }); + } + + // Routes to setLanguageUsingActivity (not setLanguage) so that new languages not already in the + // greetings map can be fetched via an activity. + @OperationImpl + public OperationHandler setLanguage() { + return OperationHandler.sync( + (ctx, details, input) -> { + logger.info("Update for SetLanguage was received for user {}", input.getUserId()); + return getWorkflowStub(input.getUserId()).setLanguageUsingActivity(input); + }); + } + + @OperationImpl + public OperationHandler + approve() { + return OperationHandler.sync( + (ctx, details, input) -> { + logger.info("Signal for Approve was received for user {}", input.getUserId()); + getWorkflowStub(input.getUserId()).approve(input); + return new NexusGreetingService.ApproveOutput(); + }); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/service/Language.java b/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/service/Language.java new file mode 100644 index 00000000..9ca70913 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/service/Language.java @@ -0,0 +1,11 @@ +package io.temporal.samples.nexus_messaging.callerpattern.service; + +public enum Language { + ARABIC, + CHINESE, + ENGLISH, + FRENCH, + HINDI, + PORTUGUESE, + SPANISH +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/service/NexusGreetingService.java b/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/service/NexusGreetingService.java new file mode 100644 index 00000000..f42149c4 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/service/NexusGreetingService.java @@ -0,0 +1,132 @@ +package io.temporal.samples.nexus_messaging.callerpattern.service; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.nexusrpc.Operation; +import io.nexusrpc.Service; +import java.util.List; + +/** + * Nexus service definition. Shared between the handler and caller. The caller uses this to create a + * type-safe Nexus client stub; the handler implements the operations. + */ +@Service +public interface NexusGreetingService { + + class GetLanguagesInput { + private final boolean includeUnsupported; + private final String userId; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public GetLanguagesInput( + @JsonProperty("includeUnsupported") boolean includeUnsupported, + @JsonProperty("userId") String userId) { + this.includeUnsupported = includeUnsupported; + this.userId = userId; + } + + @JsonProperty("includeUnsupported") + public boolean isIncludeUnsupported() { + return includeUnsupported; + } + + @JsonProperty("userId") + public String getUserId() { + return userId; + } + } + + class GetLanguagesOutput { + private final List languages; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public GetLanguagesOutput(@JsonProperty("languages") List languages) { + this.languages = languages; + } + + @JsonProperty("languages") + public List getLanguages() { + return languages; + } + } + + class GetLanguageInput { + private final String userId; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public GetLanguageInput(@JsonProperty("userId") String userId) { + this.userId = userId; + } + + @JsonProperty("userId") + public String getUserId() { + return userId; + } + } + + class ApproveInput { + private final String name; + private final String userId; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public ApproveInput(@JsonProperty("name") String name, @JsonProperty("userId") String userId) { + this.name = name; + this.userId = userId; + } + + @JsonProperty("name") + public String getName() { + return name; + } + + @JsonProperty("userId") + public String getUserId() { + return userId; + } + } + + @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY) + class ApproveOutput { + @JsonCreator + public ApproveOutput() {} + } + + class SetLanguageInput { + private final Language language; + private final String userId; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public SetLanguageInput( + @JsonProperty("language") Language language, @JsonProperty("userId") String userId) { + this.language = language; + this.userId = userId; + } + + @JsonProperty("language") + public Language getLanguage() { + return language; + } + + @JsonProperty("userId") + public String getUserId() { + return userId; + } + } + + // Returns the languages supported by the greeting workflow. + @Operation + GetLanguagesOutput getLanguages(GetLanguagesInput input); + + // Returns the currently active language. + @Operation + Language getLanguage(GetLanguageInput input); + + // Changes the active language, returning the previous one. + @Operation + Language setLanguage(SetLanguageInput input); + + // Approves the workflow, allowing it to complete. + @Operation + ApproveOutput approve(ApproveInput input); +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/README.md b/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/README.md new file mode 100644 index 00000000..fbf3fde7 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/README.md @@ -0,0 +1,66 @@ +## On-demand pattern + +No workflow is pre-started. The caller creates and controls workflow instances through Nexus +operations. `NexusRemoteGreetingService` adds a `runFromRemote` operation that starts a new +`GreetingWorkflow`, and every other operation includes a `workflowId` so the handler knows which +instance to target. + +The caller workflow: +1. Starts two remote `GreetingWorkflow` instances via `runFromRemote` (backed by `WorkflowRunOperation`) +2. Queries each for supported languages +3. Changes the language on each (Arabic and Hindi) +4. Confirms the changes via queries +5. Approves both workflows +6. Waits for each to complete and returns their results + +### Running + +Start a Temporal server: + +```bash +temporal server start-dev +``` + +Create the namespaces and Nexus endpoint: + +```bash +temporal operator namespace create --namespace nexus-messaging-handler-namespace +temporal operator namespace create --namespace nexus-messaging-caller-namespace + +temporal operator nexus endpoint create \ + --name nexus-messaging-nexus-endpoint \ + --target-namespace nexus-messaging-handler-namespace \ + --target-task-queue nexus-messaging-handler-task-queue +``` + +In one terminal, start the handler worker: + +```bash +./gradlew -q :core:execute -PmainClass=io.temporal.samples.nexus_messaging.ondemandpattern.handler.HandlerWorker +``` + +In a second terminal, start the caller worker: + +```bash +./gradlew -q :core:execute -PmainClass=io.temporal.samples.nexus_messaging.ondemandpattern.caller.CallerRemoteWorker +``` + +In a third terminal, start the caller workflow: + +```bash +./gradlew -q :core:execute -PmainClass=io.temporal.samples.nexus_messaging.ondemandpattern.caller.CallerRemoteStarter +``` + +Expected output: + +``` +started remote greeting workflow: UserId One +started remote greeting workflow: UserId Two +Supported languages for UserId One: [CHINESE, ENGLISH] +Supported languages for UserId Two: [CHINESE, ENGLISH] +UserId One changed language: ENGLISH -> ARABIC +UserId Two changed language: ENGLISH -> HINDI +Workflows approved +Workflow one result: مرحبا بالعالم +Workflow two result: नमस्ते दुनिया +``` diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/caller/CallerRemoteStarter.java b/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/caller/CallerRemoteStarter.java new file mode 100644 index 00000000..35be6ff0 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/caller/CallerRemoteStarter.java @@ -0,0 +1,30 @@ +package io.temporal.samples.nexus_messaging.ondemandpattern.caller; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.client.WorkflowOptions; +import io.temporal.serviceclient.WorkflowServiceStubs; +import java.util.List; +import java.util.UUID; + +public class CallerRemoteStarter { + + public static void main(String[] args) { + WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + WorkflowClient client = + WorkflowClient.newInstance( + service, + WorkflowClientOptions.newBuilder().setNamespace(CallerRemoteWorker.NAMESPACE).build()); + + CallerRemoteWorkflow workflow = + client.newWorkflowStub( + CallerRemoteWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId("nexus-messaging-remote-caller-" + UUID.randomUUID()) + .setTaskQueue(CallerRemoteWorker.TASK_QUEUE) + .build()); + + List log = workflow.run(); + log.forEach(System.out::println); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/caller/CallerRemoteWorker.java b/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/caller/CallerRemoteWorker.java new file mode 100644 index 00000000..832a2f15 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/caller/CallerRemoteWorker.java @@ -0,0 +1,43 @@ +package io.temporal.samples.nexus_messaging.ondemandpattern.caller; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; +import io.temporal.worker.WorkflowImplementationOptions; +import io.temporal.workflow.NexusServiceOptions; +import io.temporal.workflow.Workflow; +import java.util.Collections; +import org.slf4j.Logger; + +public class CallerRemoteWorker { + private static final Logger logger = Workflow.getLogger(CallerRemoteWorker.class); + + public static final String NAMESPACE = "nexus-messaging-caller-namespace"; + public static final String TASK_QUEUE = "nexus-messaging-caller-remote-task-queue"; + static final String NEXUS_ENDPOINT = "nexus-messaging-nexus-endpoint"; + + public static void main(String[] args) throws InterruptedException { + WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + WorkflowClient client = + WorkflowClient.newInstance( + service, WorkflowClientOptions.newBuilder().setNamespace(NAMESPACE).build()); + + WorkerFactory factory = WorkerFactory.newInstance(client); + Worker worker = factory.newWorker(TASK_QUEUE); + worker.registerWorkflowImplementationTypes( + WorkflowImplementationOptions.newBuilder() + .setNexusServiceOptions( + // The key must match the @Service-annotated interface name. + Collections.singletonMap( + "NexusRemoteGreetingService", + NexusServiceOptions.newBuilder().setEndpoint(NEXUS_ENDPOINT).build())) + .build(), + CallerRemoteWorkflowImpl.class); + + factory.start(); + logger.info("Caller remote worker started, ctrl+c to exit"); + Thread.currentThread().join(); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/caller/CallerRemoteWorkflow.java b/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/caller/CallerRemoteWorkflow.java new file mode 100644 index 00000000..5f3485f2 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/caller/CallerRemoteWorkflow.java @@ -0,0 +1,11 @@ +package io.temporal.samples.nexus_messaging.ondemandpattern.caller; + +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import java.util.List; + +@WorkflowInterface +public interface CallerRemoteWorkflow { + @WorkflowMethod + List run(); +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/caller/CallerRemoteWorkflowImpl.java b/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/caller/CallerRemoteWorkflowImpl.java new file mode 100644 index 00000000..6b4fe599 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/caller/CallerRemoteWorkflowImpl.java @@ -0,0 +1,155 @@ +package io.temporal.samples.nexus_messaging.ondemandpattern.caller; + +import io.temporal.samples.nexus_messaging.ondemandpattern.service.Language; +import io.temporal.samples.nexus_messaging.ondemandpattern.service.NexusRemoteGreetingService; +import io.temporal.workflow.NexusOperationHandle; +import io.temporal.workflow.NexusOperationOptions; +import io.temporal.workflow.NexusServiceOptions; +import io.temporal.workflow.Workflow; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import org.slf4j.Logger; + +public class CallerRemoteWorkflowImpl implements CallerRemoteWorkflow { + + private static final Logger logger = Workflow.getLogger(CallerRemoteWorkflowImpl.class); + + // This is going to create two workflows and send messages to them. + // We need to have an ID to differentiate so that Nexus knows how to name + // a workflow and then how to know the correct destination workflow. + // So here we are just going to define two workflow IDs with different user IDs. + private static final String REMOTE_WORKFLOW_ONE = "UserId One"; + private static final String REMOTE_WORKFLOW_TWO = "UserId Two"; + + NexusRemoteGreetingService greetingRemoteServiceOne = + Workflow.newNexusServiceStub( + NexusRemoteGreetingService.class, + NexusServiceOptions.newBuilder() + .setOperationOptions( + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build()) + .build()); + NexusRemoteGreetingService greetingRemoteServiceTwo = + Workflow.newNexusServiceStub( + NexusRemoteGreetingService.class, + NexusServiceOptions.newBuilder() + .setOperationOptions( + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build()) + .build()); + + @Override + public List run() { + // Messages in the log array are passed back to the caller who will then log them to report what + // is happening. + // The same message is also logged for demo purposes, so that things are visible in the caller + // workflow output. + List log = new ArrayList<>(); + + // Each call is performed twice in this example. This assumes there are two users we want + // to process. The first call starts two workflows, one for each user. + // Subsequent calls perform different actions between the two users. + // There are examples for each of the three messaging types - + // update, query, then signal. + + // This is an Async Nexus operation — starts a workflow on the handler and returns a handle. + // Unlike the sync operations below (getLanguages, setLanguage, etc.), this does not block + // until the workflow completes. It is backed by WorkflowRunOperation on the handler side. + NexusOperationHandle handleOne = + Workflow.startNexusOperation( + greetingRemoteServiceOne::runFromRemote, + new NexusRemoteGreetingService.RunFromRemoteInput(REMOTE_WORKFLOW_ONE)); + // Wait for the operation to be started (workflow is now running on the handler). + handleOne.getExecution().get(); + log.add("started remote greeting workflow: " + REMOTE_WORKFLOW_ONE); + logger.info("started remote greeting workflow {}", REMOTE_WORKFLOW_ONE); + + NexusOperationHandle handleTwo = + Workflow.startNexusOperation( + greetingRemoteServiceOne::runFromRemote, + new NexusRemoteGreetingService.RunFromRemoteInput(REMOTE_WORKFLOW_TWO)); + // Wait for the operation to be started (workflow is now running on the handler). + handleTwo.getExecution().get(); + log.add("started remote greeting workflow: " + REMOTE_WORKFLOW_TWO); + logger.info("started remote greeting workflow {}", REMOTE_WORKFLOW_TWO); + + // Query the remote workflow for supported languages. + NexusRemoteGreetingService.GetLanguagesOutput languagesOutput = + greetingRemoteServiceOne.getLanguages( + new NexusRemoteGreetingService.GetLanguagesInput(false, REMOTE_WORKFLOW_ONE)); + log.add( + "Supported languages for " + REMOTE_WORKFLOW_ONE + ": " + languagesOutput.getLanguages()); + logger.info( + "supported languages are {} for workflow {}", + languagesOutput.getLanguages(), + REMOTE_WORKFLOW_ONE); + + languagesOutput = + greetingRemoteServiceTwo.getLanguages( + new NexusRemoteGreetingService.GetLanguagesInput(false, REMOTE_WORKFLOW_TWO)); + log.add( + "Supported languages for " + REMOTE_WORKFLOW_TWO + ": " + languagesOutput.getLanguages()); + logger.info( + "supported languages are {} for workflow {}", + languagesOutput.getLanguages(), + REMOTE_WORKFLOW_TWO); + + // Update the language on the remote workflow. + Language previousLanguageOne = + greetingRemoteServiceOne.setLanguage( + new NexusRemoteGreetingService.SetLanguageInput(Language.ARABIC, REMOTE_WORKFLOW_ONE)); + + Language previousLanguageTwo = + greetingRemoteServiceTwo.setLanguage( + new NexusRemoteGreetingService.SetLanguageInput(Language.HINDI, REMOTE_WORKFLOW_TWO)); + + // Confirm the change by querying. + Language currentLanguage = + greetingRemoteServiceOne.getLanguage( + new NexusRemoteGreetingService.GetLanguageInput(REMOTE_WORKFLOW_ONE)); + log.add( + REMOTE_WORKFLOW_ONE + + " changed language: " + + previousLanguageOne.name() + + " -> " + + currentLanguage.name()); + logger.info( + "Language changed from {} to {} for workflow {}", + previousLanguageOne, + currentLanguage, + REMOTE_WORKFLOW_ONE); + + currentLanguage = + greetingRemoteServiceTwo.getLanguage( + new NexusRemoteGreetingService.GetLanguageInput(REMOTE_WORKFLOW_TWO)); + log.add( + REMOTE_WORKFLOW_TWO + + " changed language: " + + previousLanguageTwo.name() + + " -> " + + currentLanguage.name()); + logger.info( + "Language changed from {} to {} for workflow {}", + previousLanguageTwo, + currentLanguage, + REMOTE_WORKFLOW_TWO); + + // Approve the remote workflow so it can complete. + greetingRemoteServiceOne.approve( + new NexusRemoteGreetingService.ApproveInput("remote-caller", REMOTE_WORKFLOW_ONE)); + greetingRemoteServiceTwo.approve( + new NexusRemoteGreetingService.ApproveInput("remote-caller", REMOTE_WORKFLOW_TWO)); + log.add("Workflows approved"); + + // Wait for the remote workflow to finish and return its result. + String result = handleOne.getResult().get(); + log.add("Workflow one result: " + result); + + result = handleTwo.getResult().get(); + log.add("Workflow two result: " + result); + return log; + } +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/handler/GreetingActivity.java b/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/handler/GreetingActivity.java new file mode 100644 index 00000000..aa69c643 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/handler/GreetingActivity.java @@ -0,0 +1,12 @@ +package io.temporal.samples.nexus_messaging.ondemandpattern.handler; + +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import io.temporal.samples.nexus_messaging.ondemandpattern.service.Language; + +@ActivityInterface +public interface GreetingActivity { + // Simulates a call to a remote greeting service. Returns null if the language is not supported. + @ActivityMethod + String callGreetingService(Language language); +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/handler/GreetingActivityImpl.java b/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/handler/GreetingActivityImpl.java new file mode 100644 index 00000000..354e8dfa --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/handler/GreetingActivityImpl.java @@ -0,0 +1,25 @@ +package io.temporal.samples.nexus_messaging.ondemandpattern.handler; + +import io.temporal.samples.nexus_messaging.ondemandpattern.service.Language; +import java.util.EnumMap; +import java.util.Map; + +public class GreetingActivityImpl implements GreetingActivity { + + private static final Map GREETINGS = new EnumMap<>(Language.class); + + static { + GREETINGS.put(Language.ARABIC, "مرحبا بالعالم"); + GREETINGS.put(Language.CHINESE, "你好,世界"); + GREETINGS.put(Language.ENGLISH, "Hello, world"); + GREETINGS.put(Language.FRENCH, "Bonjour, monde"); + GREETINGS.put(Language.HINDI, "नमस्ते दुनिया"); + GREETINGS.put(Language.PORTUGUESE, "Olá mundo"); + GREETINGS.put(Language.SPANISH, "Hola mundo"); + } + + @Override + public String callGreetingService(Language language) { + return GREETINGS.get(language); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/handler/GreetingWorkflow.java b/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/handler/GreetingWorkflow.java new file mode 100644 index 00000000..1c37564a --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/handler/GreetingWorkflow.java @@ -0,0 +1,90 @@ +package io.temporal.samples.nexus_messaging.ondemandpattern.handler; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.temporal.samples.nexus_messaging.ondemandpattern.service.Language; +import io.temporal.samples.nexus_messaging.ondemandpattern.service.NexusRemoteGreetingService; +import io.temporal.workflow.QueryMethod; +import io.temporal.workflow.SignalMethod; +import io.temporal.workflow.UpdateMethod; +import io.temporal.workflow.UpdateValidatorMethod; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +/** + * A long-running "entity" workflow that backs the NexusRemoteGreetingService Nexus operations. The + * workflow exposes queries, an update, and a signal. These are private implementation details of + * the Nexus service: the caller only interacts via Nexus operations. + */ +@WorkflowInterface +public interface GreetingWorkflow { + + class ApproveInput { + private final String name; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public ApproveInput(@JsonProperty("name") String name) { + this.name = name; + } + + @JsonProperty("name") + public String getName() { + return name; + } + } + + class GetLanguagesInput { + private final boolean includeUnsupported; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public GetLanguagesInput(@JsonProperty("includeUnsupported") boolean includeUnsupported) { + this.includeUnsupported = includeUnsupported; + } + + @JsonProperty("includeUnsupported") + public boolean isIncludeUnsupported() { + return includeUnsupported; + } + } + + class SetLanguageInput { + private final Language language; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public SetLanguageInput(@JsonProperty("language") Language language) { + this.language = language; + } + + @JsonProperty("language") + public Language getLanguage() { + return language; + } + } + + @WorkflowMethod + String run(); + + // Returns the languages currently supported by the workflow. + @QueryMethod + NexusRemoteGreetingService.GetLanguagesOutput getLanguages(GetLanguagesInput input); + + // Returns the currently active language. + @QueryMethod + Language getLanguage(); + + // Approves the workflow, allowing it to complete. + @SignalMethod + void approve(ApproveInput input); + + // Changes the active language synchronously (only supports languages already in the greetings + // map). + @UpdateMethod + Language setLanguage(SetLanguageInput input); + + @UpdateValidatorMethod(updateName = "setLanguage") + void validateSetLanguage(SetLanguageInput input); + + // Changes the active language, calling an activity to fetch a greeting for new languages. + @UpdateMethod + Language setLanguageUsingActivity(SetLanguageInput input); +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/handler/GreetingWorkflowImpl.java b/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/handler/GreetingWorkflowImpl.java new file mode 100644 index 00000000..499069de --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/handler/GreetingWorkflowImpl.java @@ -0,0 +1,98 @@ +package io.temporal.samples.nexus_messaging.ondemandpattern.handler; + +import io.temporal.activity.ActivityOptions; +import io.temporal.failure.ApplicationFailure; +import io.temporal.samples.nexus_messaging.ondemandpattern.service.Language; +import io.temporal.samples.nexus_messaging.ondemandpattern.service.NexusRemoteGreetingService; +import io.temporal.workflow.Workflow; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; +import org.slf4j.Logger; + +public class GreetingWorkflowImpl implements GreetingWorkflow { + + private static final Logger logger1 = Workflow.getLogger(GreetingWorkflowImpl.class); + private boolean approvedForRelease = false; + private final Map greetings = new EnumMap<>(Language.class); + private Language language = Language.ENGLISH; + + private static final Logger logger = logger1; + + private final GreetingActivity greetingActivity = + Workflow.newActivityStub( + GreetingActivity.class, + ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(10)).build()); + + public GreetingWorkflowImpl() { + greetings.put(Language.CHINESE, "你好,世界"); + greetings.put(Language.ENGLISH, "Hello, world"); + } + + @Override + public String run() { + // Wait until approved and all in-flight update handlers have finished. + Workflow.await(() -> approvedForRelease && Workflow.isEveryHandlerFinished()); + return greetings.get(language); + } + + @Override + public NexusRemoteGreetingService.GetLanguagesOutput getLanguages( + GreetingWorkflow.GetLanguagesInput input) { + List result; + if (input.isIncludeUnsupported()) { + result = new ArrayList<>(Arrays.asList(Language.values())); + } else { + result = new ArrayList<>(greetings.keySet()); + } + Collections.sort(result); + return new NexusRemoteGreetingService.GetLanguagesOutput(result); + } + + @Override + public Language getLanguage() { + return language; + } + + @Override + public void approve(ApproveInput input) { + logger.info("Approval signal received"); + approvedForRelease = true; + } + + @Override + public Language setLanguage(GreetingWorkflow.SetLanguageInput input) { + logger.info("setLanguage update received"); + Language previous = language; + language = input.getLanguage(); + return previous; + } + + @Override + public void validateSetLanguage(GreetingWorkflow.SetLanguageInput input) { + logger.info("validateSetLanguage called"); + if (!greetings.containsKey(input.getLanguage())) { + throw new IllegalArgumentException(input.getLanguage().name() + " is not supported"); + } + } + + @Override + public Language setLanguageUsingActivity(GreetingWorkflow.SetLanguageInput input) { + if (!greetings.containsKey(input.getLanguage())) { + String greeting = greetingActivity.callGreetingService(input.getLanguage()); + if (greeting == null) { + throw ApplicationFailure.newFailure( + "Greeting service does not support " + input.getLanguage().name(), + "UnsupportedLanguage"); + } + greetings.put(input.getLanguage(), greeting); + } + Language previous = language; + language = input.getLanguage(); + return previous; + } +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/handler/HandlerWorker.java b/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/handler/HandlerWorker.java new file mode 100644 index 00000000..5cc1f645 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/handler/HandlerWorker.java @@ -0,0 +1,33 @@ +package io.temporal.samples.nexus_messaging.ondemandpattern.handler; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HandlerWorker { + private static final Logger logger = LoggerFactory.getLogger(HandlerWorker.class); + + public static final String NAMESPACE = "nexus-messaging-handler-namespace"; + public static final String TASK_QUEUE = "nexus-messaging-handler-task-queue"; + + public static void main(String[] args) throws InterruptedException { + WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + WorkflowClient client = + WorkflowClient.newInstance( + service, WorkflowClientOptions.newBuilder().setNamespace(NAMESPACE).build()); + + WorkerFactory factory = WorkerFactory.newInstance(client); + Worker worker = factory.newWorker(TASK_QUEUE); + worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class); + worker.registerActivitiesImplementations(new GreetingActivityImpl()); + worker.registerNexusServiceImplementation(new NexusRemoteGreetingServiceImpl()); + + factory.start(); + logger.info("Handler worker started, ctrl+c to exit"); + Thread.currentThread().join(); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/handler/NexusRemoteGreetingServiceImpl.java b/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/handler/NexusRemoteGreetingServiceImpl.java new file mode 100644 index 00000000..c5dc4439 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/handler/NexusRemoteGreetingServiceImpl.java @@ -0,0 +1,96 @@ +package io.temporal.samples.nexus_messaging.ondemandpattern.handler; + +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.client.WorkflowOptions; +import io.temporal.nexus.Nexus; +import io.temporal.nexus.WorkflowHandle; +import io.temporal.nexus.WorkflowRunOperation; +import io.temporal.samples.nexus_messaging.ondemandpattern.service.Language; +import io.temporal.samples.nexus_messaging.ondemandpattern.service.NexusRemoteGreetingService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Nexus operation handler for the on-demand pattern. Each operation receives the target workflow ID + * in its input, and {@code runFromRemote} starts a brand-new GreetingWorkflow. + */ +@ServiceImpl(service = NexusRemoteGreetingService.class) +public class NexusRemoteGreetingServiceImpl { + + private static final Logger logger = + LoggerFactory.getLogger(NexusRemoteGreetingServiceImpl.class); + + private GreetingWorkflow getWorkflowStub(String workflowId) { + return Nexus.getOperationContext() + .getWorkflowClient() + .newWorkflowStub(GreetingWorkflow.class, workflowId); + } + + // Starts a new GreetingWorkflow with the caller-specified workflow ID. This is an async + // Nexus operation backed by WorkflowRunOperation. + @OperationImpl + public OperationHandler runFromRemote() { + return WorkflowRunOperation.fromWorkflowHandle( + (ctx, details, input) -> { + logger.info("RunFromRemote was received for workflow {}", input.getWorkflowId()); + return WorkflowHandle.fromWorkflowMethod( + Nexus.getOperationContext() + .getWorkflowClient() + .newWorkflowStub( + GreetingWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(input.getWorkflowId()) + .setTaskQueue(HandlerWorker.TASK_QUEUE) + .build()) + ::run); + }); + } + + @OperationImpl + public OperationHandler< + NexusRemoteGreetingService.GetLanguagesInput, + NexusRemoteGreetingService.GetLanguagesOutput> + getLanguages() { + return OperationHandler.sync( + (ctx, details, input) -> { + logger.info("Query for GetLanguages was received for workflow {}", input.getWorkflowId()); + return getWorkflowStub(input.getWorkflowId()) + .getLanguages(new GreetingWorkflow.GetLanguagesInput(input.isIncludeUnsupported())); + }); + } + + @OperationImpl + public OperationHandler getLanguage() { + return OperationHandler.sync( + (ctx, details, input) -> { + logger.info("Query for GetLanguage was received for workflow {}", input.getWorkflowId()); + return getWorkflowStub(input.getWorkflowId()).getLanguage(); + }); + } + + // Uses setLanguageUsingActivity so that new languages are fetched via an activity. + @OperationImpl + public OperationHandler setLanguage() { + return OperationHandler.sync( + (ctx, details, input) -> { + logger.info("Update for SetLanguage was received for workflow {}", input.getWorkflowId()); + return getWorkflowStub(input.getWorkflowId()) + .setLanguageUsingActivity(new GreetingWorkflow.SetLanguageInput(input.getLanguage())); + }); + } + + @OperationImpl + public OperationHandler< + NexusRemoteGreetingService.ApproveInput, NexusRemoteGreetingService.ApproveOutput> + approve() { + return OperationHandler.sync( + (ctx, details, input) -> { + logger.info("Signal for Approve was received for workflow {}", input.getWorkflowId()); + getWorkflowStub(input.getWorkflowId()) + .approve(new GreetingWorkflow.ApproveInput(input.getName())); + return new NexusRemoteGreetingService.ApproveOutput(); + }); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/service/Language.java b/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/service/Language.java new file mode 100644 index 00000000..bb23915f --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/service/Language.java @@ -0,0 +1,11 @@ +package io.temporal.samples.nexus_messaging.ondemandpattern.service; + +public enum Language { + ARABIC, + CHINESE, + ENGLISH, + FRENCH, + HINDI, + PORTUGUESE, + SPANISH +} diff --git a/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/service/NexusRemoteGreetingService.java b/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/service/NexusRemoteGreetingService.java new file mode 100644 index 00000000..42f9ecf6 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/service/NexusRemoteGreetingService.java @@ -0,0 +1,155 @@ +package io.temporal.samples.nexus_messaging.ondemandpattern.service; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.nexusrpc.Operation; +import io.nexusrpc.Service; +import java.util.List; + +/** + * Nexus service definition for the on-demand pattern. Every operation includes a {@code workflowId} + * so the caller controls which workflow instance is targeted. This also exposes a {@code + * runFromRemote} operation that starts a new GreetingWorkflow. + */ +@Service +public interface NexusRemoteGreetingService { + + class RunFromRemoteInput { + private final String workflowId; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public RunFromRemoteInput(@JsonProperty("workflowId") String workflowId) { + this.workflowId = workflowId; + } + + @JsonProperty("workflowId") + public String getWorkflowId() { + return workflowId; + } + } + + class GetLanguagesInput { + private final boolean includeUnsupported; + private final String workflowId; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public GetLanguagesInput( + @JsonProperty("includeUnsupported") boolean includeUnsupported, + @JsonProperty("workflowId") String workflowId) { + this.includeUnsupported = includeUnsupported; + this.workflowId = workflowId; + } + + @JsonProperty("includeUnsupported") + public boolean isIncludeUnsupported() { + return includeUnsupported; + } + + @JsonProperty("workflowId") + public String getWorkflowId() { + return workflowId; + } + } + + @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY) + class GetLanguageInput { + private final String workflowId; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public GetLanguageInput(@JsonProperty("workflowId") String workflowId) { + this.workflowId = workflowId; + } + + @JsonProperty("workflowId") + public String getWorkflowId() { + return workflowId; + } + } + + class SetLanguageInput { + private final Language language; + private final String workflowId; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public SetLanguageInput( + @JsonProperty("language") Language language, + @JsonProperty("workflowId") String workflowId) { + this.language = language; + this.workflowId = workflowId; + } + + @JsonProperty("language") + public Language getLanguage() { + return language; + } + + @JsonProperty("workflowId") + public String getWorkflowId() { + return workflowId; + } + } + + class ApproveInput { + private final String name; + private final String workflowId; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public ApproveInput( + @JsonProperty("name") String name, @JsonProperty("workflowId") String workflowId) { + this.name = name; + this.workflowId = workflowId; + } + + @JsonProperty("name") + public String getName() { + return name; + } + + @JsonProperty("workflowId") + public String getWorkflowId() { + return workflowId; + } + } + + class GetLanguagesOutput { + private final List languages; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public GetLanguagesOutput(@JsonProperty("languages") List languages) { + this.languages = languages; + } + + @JsonProperty("languages") + public List getLanguages() { + return languages; + } + } + + @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY) + class ApproveOutput { + @JsonCreator + public ApproveOutput() {} + } + + // Starts a new GreetingWorkflow with the given workflow ID. This is an asynchronous Nexus + // operation: the caller receives a handle and can wait for the workflow to complete. + @Operation + String runFromRemote(RunFromRemoteInput input); + + // Returns the languages supported by the specified workflow. + @Operation + GetLanguagesOutput getLanguages(GetLanguagesInput input); + + // Returns the currently active language of the specified workflow. + @Operation + Language getLanguage(GetLanguageInput input); + + // Changes the active language on the specified workflow, returning the previous one. + @Operation + Language setLanguage(SetLanguageInput input); + + // Approves the specified workflow, allowing it to complete. + @Operation + ApproveOutput approve(ApproveInput input); +}