Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
This sample shows how to expose a long-running workflow's queries, updates, and signals as Nexus
operations. There are two self-contained examples, each in its own directory:

| | `callerpattern/` | `ondemandpattern/` |
|---|---|--------------------------------------------------------------|
| **Pattern** | Signal an existing workflow | Create and run workflows on demand, and send signals to them |
| **Who creates the workflow?** | The handler worker starts it on boot | The caller starts it via a Nexus operation |
| **Who knows the workflow ID?** | Only the handler | The caller chooses and passes it in every operation |
| **Nexus service** | `NexusGreetingService` | `NexusRemoteGreetingService` |

Each directory is fully self-contained for clarity. The
`GreetingWorkflow`, `GreetingWorkflowImpl`, `GreetingActivity` and `GreetingActivityImpl` classes are **identical** between the two — only the
Nexus service interface and its implementation differ. This highlights that the same workflow can be
exposed through Nexus in different ways depending on whether the caller needs lifecycle control.

See each directory's README for running instructions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
## Entity pattern

The handler worker starts a `GreetingWorkflow` for a user ID.
`NexusGreetingServiceImpl` holds that ID and routes every Nexus operation to it.
The caller's input does not have that workflow ID as the caller doesn't know it - but the caller sends in the User ID,
and `NexusGreetingServiceImpl` knows how to get the desired workflow ID from that User ID (see the getWorkflowId call).

HandlerWorker is using the same getWorkflowId call to generate a workflow ID from a user ID when it launches the workflow.

The caller workflow:
1. Queries for supported languages (`getLanguages` — backed by a `@QueryMethod`)
2. Changes the language to Arabic (`setLanguage` — backed by an `@UpdateMethod` that calls an activity)
3. Confirms the change via a second query (`getLanguage`)
4. Approves the workflow (`approve` — backed by a `@SignalMethod`)

### Running

Start a Temporal server:

```bash
temporal server start-dev
```

Create the namespaces and Nexus endpoint:

```bash
temporal operator namespace create --namespace nexus-messaging-handler-namespace
temporal operator namespace create --namespace nexus-messaging-caller-namespace

temporal operator nexus endpoint create \
--name nexus-messaging-nexus-endpoint \
--target-namespace nexus-messaging-handler-namespace \
--target-task-queue nexus-messaging-handler-task-queue
```

In one terminal, start the handler worker:

```bash
./gradlew -q :core:execute -PmainClass=io.temporal.samples.nexus_messaging.callerpattern.handler.HandlerWorker
```

In a second terminal, start the caller worker:

```bash
./gradlew -q :core:execute -PmainClass=io.temporal.samples.nexus_messaging.callerpattern.caller.CallerWorker
```

In a third terminal, start the caller workflow:

```bash
./gradlew -q :core:execute -PmainClass=io.temporal.samples.nexus_messaging.callerpattern.caller.CallerStarter
```

Expected output:

```
Supported languages: [CHINESE, ENGLISH]
Language changed: ENGLISH -> ARABIC
Workflow approved
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.temporal.samples.nexus_messaging.callerpattern.caller;

import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowClientOptions;
import io.temporal.client.WorkflowOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.util.List;
import java.util.UUID;

public class CallerStarter {

public static void main(String[] args) {
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All samples should support env config. You can look at other samples to see how to create the client and follow the pattern there https://github.com/temporalio/samples-java/blob/main/core/src/main/java/io/temporal/samples/asyncchild/Starter.java#L22

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you as I didn't know about this - but these seem to be all non-nexus examples? I looked in the other Nexus samples and they aren't using the env files - but they are passing the namespace in on the commandline instead of in code. In code seems clearer to me, but I can switch to commandline arguments if desired.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So a bit of background, the reason why all the other samples and the other Nexus samples have this customization point is so users can easily run the samples against a local server and Temporal Cloud. The other Nexus samples simply predate the environment config approach, for this new samples lets use environment config approach since it is the standard.

WorkflowClient client =
WorkflowClient.newInstance(
service,
WorkflowClientOptions.newBuilder().setNamespace(CallerWorker.NAMESPACE).build());

CallerWorkflow workflow =
client.newWorkflowStub(
CallerWorkflow.class,
WorkflowOptions.newBuilder()
.setWorkflowId("nexus-messaging-caller-" + UUID.randomUUID())
.setTaskQueue(CallerWorker.TASK_QUEUE)
.build());

// Launch the worker, passing in an identifier which the Nexus service will use
// to find the matching workflow (See NexusGreetingServiceImpl::getWorkflowId)
List<String> log = workflow.run("user-1");
log.forEach(System.out::println);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.temporal.samples.nexus_messaging.callerpattern.caller;

import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowClientOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import io.temporal.worker.WorkflowImplementationOptions;
import io.temporal.workflow.NexusServiceOptions;
import java.util.Collections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CallerWorker {
private static final Logger logger = LoggerFactory.getLogger(CallerWorker.class);

public static final String NAMESPACE = "nexus-messaging-caller-namespace";
public static final String TASK_QUEUE = "nexus-messaging-caller-task-queue";
static final String NEXUS_ENDPOINT = "nexus-messaging-nexus-endpoint";

public static void main(String[] args) throws InterruptedException {
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
WorkflowClient client =
WorkflowClient.newInstance(
service, WorkflowClientOptions.newBuilder().setNamespace(NAMESPACE).build());

WorkerFactory factory = WorkerFactory.newInstance(client);
Worker worker = factory.newWorker(TASK_QUEUE);
worker.registerWorkflowImplementationTypes(
WorkflowImplementationOptions.newBuilder()
.setNexusServiceOptions(
// The key must match the @Service-annotated interface name.
Collections.singletonMap(
"NexusGreetingService",
NexusServiceOptions.newBuilder().setEndpoint(NEXUS_ENDPOINT).build()))
.build(),
CallerWorkflowImpl.class);

factory.start();
logger.info("Caller worker started, ctrl+c to exit");
Thread.currentThread().join();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.temporal.samples.nexus_messaging.callerpattern.caller;

import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.util.List;

@WorkflowInterface
public interface CallerWorkflow {
@WorkflowMethod
List<String> run(String userId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package io.temporal.samples.nexus_messaging.callerpattern.caller;

import io.temporal.failure.ApplicationFailure;
import io.temporal.samples.nexus_messaging.callerpattern.service.Language;
import io.temporal.samples.nexus_messaging.callerpattern.service.NexusGreetingService;
import io.temporal.workflow.NexusOperationOptions;
import io.temporal.workflow.NexusServiceOptions;
import io.temporal.workflow.Workflow;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;

public class CallerWorkflowImpl implements CallerWorkflow {

private static final Logger logger = Workflow.getLogger(CallerWorkflowImpl.class);

// The endpoint is configured at the worker level in CallerWorker; only operation options are
// set here.
NexusGreetingService greetingService =
Workflow.newNexusServiceStub(
NexusGreetingService.class,
NexusServiceOptions.newBuilder()
.setOperationOptions(
NexusOperationOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
.build())
.build());

@Override
public List<String> run(String userId) {

// Messages in the log array are passed back to the caller who will then log them to report what
// is happening.
// The same message is also logged for demo purposes, so that things are visible in the caller
// workflow output.
List<String> log = new ArrayList<>();

// Call a Nexus operation backed by a query against the entity workflow.
// The workflow must already be running on the handler, otherwise you will
// get an error saying the workflow has already terminated.
NexusGreetingService.GetLanguagesOutput languagesOutput =
greetingService.getLanguages(new NexusGreetingService.GetLanguagesInput(false, userId));
log.add("Supported languages: " + languagesOutput.getLanguages());
logger.info("Supported languages: {}", languagesOutput.getLanguages());

// Following are examples for each of the three messaging types -
// update, query, then signal.

// Call a Nexus operation backed by an update against the entity workflow.
Language previousLanguage =
greetingService.setLanguage(
new NexusGreetingService.SetLanguageInput(Language.ARABIC, userId));

// Call a Nexus operation backed by a query to confirm the language change.
Language currentLanguage =
greetingService.getLanguage(new NexusGreetingService.GetLanguageInput(userId));
if (currentLanguage != Language.ARABIC) {
throw ApplicationFailure.newFailure(
"Expected language ARABIC, got " + currentLanguage, "AssertionError");
}

log.add("Language changed: " + previousLanguage.name() + " -> " + Language.ARABIC.name());
logger.info("Language changed from {} to {}", previousLanguage, Language.ARABIC);

// Call a Nexus operation backed by a signal against the entity workflow.
greetingService.approve(new NexusGreetingService.ApproveInput("caller", userId));
log.add("Workflow approved");
logger.info("Workflow approved");

return log;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.temporal.samples.nexus_messaging.callerpattern.handler;

import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import io.temporal.samples.nexus_messaging.callerpattern.service.Language;

@ActivityInterface
public interface GreetingActivity {
// Simulates a call to a remote greeting service. Returns null if the language is not supported.
@ActivityMethod
String callGreetingService(Language language);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.temporal.samples.nexus_messaging.callerpattern.handler;

import io.temporal.samples.nexus_messaging.callerpattern.service.Language;
import java.util.EnumMap;
import java.util.Map;

public class GreetingActivityImpl implements GreetingActivity {

private static final Map<Language, String> GREETINGS = new EnumMap<>(Language.class);

static {
GREETINGS.put(Language.ARABIC, "مرحبا بالعالم");
GREETINGS.put(Language.CHINESE, "你好,世界");
GREETINGS.put(Language.ENGLISH, "Hello, world");
GREETINGS.put(Language.FRENCH, "Bonjour, monde");
GREETINGS.put(Language.HINDI, "नमस्ते दुनिया");
GREETINGS.put(Language.PORTUGUESE, "Olá mundo");
GREETINGS.put(Language.SPANISH, "Hola mundo");
}

@Override
public String callGreetingService(Language language) {
return GREETINGS.get(language);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.temporal.samples.nexus_messaging.callerpattern.handler;

import io.temporal.samples.nexus_messaging.callerpattern.service.Language;
import io.temporal.samples.nexus_messaging.callerpattern.service.NexusGreetingService;
import io.temporal.workflow.QueryMethod;
import io.temporal.workflow.SignalMethod;
import io.temporal.workflow.UpdateMethod;
import io.temporal.workflow.UpdateValidatorMethod;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;

/**
* A long-running "entity" workflow that backs the NexusGreetingService Nexus operations. The
* workflow exposes queries, an update, and a signal. These are private implementation details of
* the Nexus service: the caller only interacts via Nexus operations.
*/
@WorkflowInterface
public interface GreetingWorkflow {

@WorkflowMethod
String run();

// Returns the languages currently supported by the workflow.
@QueryMethod
NexusGreetingService.GetLanguagesOutput getLanguages(
NexusGreetingService.GetLanguagesInput input);

// Returns the currently active language.
@QueryMethod
Language getLanguage();

// Approves the workflow, allowing it to complete.
@SignalMethod
void approve(NexusGreetingService.ApproveInput input);

// Changes the active language synchronously (only supports languages already in the greetings
// map).
@UpdateMethod
Language setLanguage(NexusGreetingService.SetLanguageInput input);

@UpdateValidatorMethod(updateName = "setLanguage")
void validateSetLanguage(NexusGreetingService.SetLanguageInput input);

// Changes the active language, calling an activity to fetch a greeting for new languages.
@UpdateMethod
Language setLanguageUsingActivity(NexusGreetingService.SetLanguageInput input);
}
Loading
Loading