Skip to content

Commit 8f3fa8b

Browse files
committed
Progress checkin
1 parent 385adc4 commit 8f3fa8b

File tree

9 files changed

+249
-2
lines changed

9 files changed

+249
-2
lines changed

core/src/main/java/io/temporal/samples/nexus/caller/CallerStarter.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public static void main(String[] args) {
1616

1717
WorkflowOptions workflowOptions =
1818
WorkflowOptions.newBuilder().setTaskQueue(CallerWorker.DEFAULT_TASK_QUEUE_NAME).build();
19+
1920
EchoCallerWorkflow echoWorkflow =
2021
client.newWorkflowStub(EchoCallerWorkflow.class, workflowOptions);
2122
WorkflowExecution execution = WorkflowClient.start(echoWorkflow::echo, "Nexus Echo 👋");
@@ -24,6 +25,7 @@ public static void main(String[] args) {
2425
execution.getWorkflowId(),
2526
execution.getRunId());
2627
logger.info("Workflow result: {}", echoWorkflow.echo("Nexus Echo 👋"));
28+
2729
HelloCallerWorkflow helloWorkflow =
2830
client.newWorkflowStub(HelloCallerWorkflow.class, workflowOptions);
2931
execution = WorkflowClient.start(helloWorkflow::hello, "Nexus", SampleNexusService.Language.EN);
@@ -33,5 +35,13 @@ public static void main(String[] args) {
3335
execution.getRunId());
3436
logger.info(
3537
"Workflow result: {}", helloWorkflow.hello("Nexus", SampleNexusService.Language.ES));
38+
39+
MessageCallerWorkflow messageWorkflow =
40+
client.newWorkflowStub(MessageCallerWorkflow.class, workflowOptions);
41+
execution = WorkflowClient.start(messageWorkflow::doWork);
42+
logger.info(
43+
"Started doWork workflowId: {} runId: {}", execution.getWorkflowId(), execution.getRunId());
44+
messageWorkflow.doWork();
45+
logger.info("Workflow doWork done", messageWorkflow.doWork());
3646
}
3747
}

core/src/main/java/io/temporal/samples/nexus/caller/CallerWorker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ public static void main(String[] args) {
2525
NexusServiceOptions.newBuilder().setEndpoint("my-nexus-endpoint-name").build()))
2626
.build(),
2727
EchoCallerWorkflowImpl.class,
28-
HelloCallerWorkflowImpl.class);
28+
HelloCallerWorkflowImpl.class,
29+
MessageCallerWorkflowImpl.class);
2930

3031
factory.start();
3132
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package io.temporal.samples.nexus.caller;
2+
3+
import io.temporal.workflow.WorkflowInterface;
4+
import io.temporal.workflow.WorkflowMethod;
5+
6+
@WorkflowInterface
7+
public interface MessageCallerWorkflow {
8+
@WorkflowMethod
9+
String doWork();
10+
11+
// @SignalMethod
12+
// void signalWorkflow(String name, String workflowId);
13+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package io.temporal.samples.nexus.caller;
2+
3+
import io.temporal.samples.nexus.service.SampleNexusService;
4+
import io.temporal.workflow.NexusOperationHandle;
5+
import io.temporal.workflow.NexusOperationOptions;
6+
import io.temporal.workflow.NexusServiceOptions;
7+
import io.temporal.workflow.Workflow;
8+
import java.time.Duration;
9+
10+
public class MessageCallerWorkflowImpl implements MessageCallerWorkflow {
11+
12+
// private static final Logger logger = LoggerFactory.getLogger(CallerStarter.class);
13+
14+
SampleNexusService sampleNexusService =
15+
Workflow.newNexusServiceStub(
16+
SampleNexusService.class,
17+
NexusServiceOptions.newBuilder()
18+
.setOperationOptions(
19+
NexusOperationOptions.newBuilder()
20+
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
21+
.build())
22+
.build());
23+
24+
@Override
25+
public String doWork() {
26+
NexusOperationHandle<SampleNexusService.DoWorkOutput> handle =
27+
Workflow.startNexusOperation(
28+
sampleNexusService::doWork, new SampleNexusService.DoWorkInput());
29+
// Optionally wait for the operation to be started. NexusOperationExecution will contain the
30+
// operation token in case this operation is asynchronous.
31+
handle.getExecution().get();
32+
return handle.getResult().get().getMessage();
33+
}
34+
35+
// @Override
36+
// public void signalWorkflow(String name, String workflowId) {
37+
// NexusOperationHandle<SampleNexusService.SignalWorkflowOutput> handle =
38+
// Workflow.startNexusOperation(
39+
// sampleNexusService::signalWorkflow,
40+
// new SampleNexusService.SignalWorkflowInput(name, workflowId));
41+
// // Optionally wait for the operation to be started. NexusOperationExecution will contain the
42+
// // operation token in case this operation is asynchronous.
43+
// handle.getExecution().get();
44+
// // SampleNexusService.SignalWorkflowOutput output = handle.getResult().get();
45+
// handle.getResult().get();
46+
// logger.info("Signal returned");
47+
// }
48+
}

core/src/main/java/io/temporal/samples/nexus/handler/HandlerWorker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ public static void main(String[] args) {
1414
WorkerFactory factory = WorkerFactory.newInstance(client);
1515

1616
Worker worker = factory.newWorker(DEFAULT_TASK_QUEUE_NAME);
17-
worker.registerWorkflowImplementationTypes(HelloHandlerWorkflowImpl.class);
17+
worker.registerWorkflowImplementationTypes(
18+
HelloHandlerWorkflowImpl.class, MessageHandlerWorkflowImpl.class);
1819
worker.registerNexusServiceImplementation(new SampleNexusServiceImpl());
1920

2021
factory.start();
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package io.temporal.samples.nexus.handler;
2+
3+
import io.temporal.samples.nexus.service.SampleNexusService;
4+
import io.temporal.workflow.QueryMethod;
5+
import io.temporal.workflow.SignalMethod;
6+
import io.temporal.workflow.UpdateMethod;
7+
import io.temporal.workflow.UpdateValidatorMethod;
8+
import io.temporal.workflow.WorkflowInterface;
9+
import io.temporal.workflow.WorkflowMethod;
10+
11+
@WorkflowInterface
12+
public interface MessageHandlerWorkflow {
13+
14+
@WorkflowMethod
15+
SampleNexusService.DoWorkOutput doWork(SampleNexusService.DoWorkInput input);
16+
17+
@QueryMethod
18+
String queryWorkflow(String name);
19+
20+
@SignalMethod
21+
void signalWorkflow(String name);
22+
23+
@UpdateMethod
24+
int updateWorkflow(String name);
25+
26+
@UpdateValidatorMethod(updateName = "updateWorkflow")
27+
void setLanguageValidator(String name);
28+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package io.temporal.samples.nexus.handler;
2+
3+
import io.temporal.samples.nexus.caller.CallerStarter;
4+
import io.temporal.samples.nexus.service.SampleNexusService;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
8+
public class MessageHandlerWorkflowImpl implements MessageHandlerWorkflow {
9+
private static final Logger logger = LoggerFactory.getLogger(CallerStarter.class);
10+
boolean keepRunning = true;
11+
static final int SLEEP_TIME = 3;
12+
13+
@Override
14+
public SampleNexusService.DoWorkOutput doWork(SampleNexusService.DoWorkInput input) {
15+
logger.info("DoWork Called!");
16+
// This will run for 60 seconds or until it gets signalled.
17+
// int limit = 0;
18+
// while (keepRunning && limit < 6) {
19+
// try {
20+
// limit += SLEEP_TIME;
21+
// TimeUnit.SECONDS.sleep(SLEEP_TIME);
22+
// } catch (InterruptedException e) {
23+
// logger.info("doWork had an exception - ", e);
24+
// return new SampleNexusService.DoWorkOutput("DoWork had an exception - " +
25+
// e.getMessage());
26+
// }
27+
// }
28+
29+
String logMessage;
30+
if (keepRunning) {
31+
logMessage = "doWork hit the end and stopped.";
32+
} else {
33+
logMessage = "doWork was told to stop, and did.";
34+
}
35+
36+
logger.info(logMessage);
37+
return new SampleNexusService.DoWorkOutput(logMessage);
38+
}
39+
40+
@Override
41+
public String queryWorkflow(String name) {
42+
logger.info("Query {} was received", name);
43+
return "Query received";
44+
}
45+
46+
@Override
47+
public void signalWorkflow(String name) {
48+
logger.info("Signal was received");
49+
}
50+
51+
@Override
52+
public int updateWorkflow(String name) {
53+
logger.info("Update {} was received", name);
54+
return 10;
55+
}
56+
57+
@Override
58+
public void setLanguageValidator(String name) {
59+
if (name.equals("invalid")) {
60+
logger.info("Update {} was rejected", name);
61+
throw new IllegalArgumentException("Invalid update name!");
62+
}
63+
logger.info("Update {} was validated", name);
64+
}
65+
}

core/src/main/java/io/temporal/samples/nexus/handler/SampleNexusServiceImpl.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,4 +64,29 @@ public OperationHandler<SampleNexusService.HelloInput, SampleNexusService.HelloO
6464
WorkflowOptions.newBuilder().setWorkflowId(details.getRequestId()).build())
6565
::hello);
6666
}
67+
68+
@OperationImpl
69+
public OperationHandler<SampleNexusService.DoWorkInput, SampleNexusService.DoWorkOutput>
70+
doWork() {
71+
// Use the WorkflowRunOperation.fromWorkflowMethod constructor, which is the easiest
72+
// way to expose a workflow as an operation. To expose a workflow with a different input
73+
// parameters then the operation or from an untyped stub, use the
74+
// WorkflowRunOperation.fromWorkflowHandler constructor and the appropriate constructor method
75+
// on WorkflowHandle.
76+
return WorkflowRunOperation.fromWorkflowMethod(
77+
(ctx, details, input) ->
78+
Nexus.getOperationContext()
79+
.getWorkflowClient()
80+
.newWorkflowStub(
81+
MessageHandlerWorkflow.class,
82+
// Workflow IDs should typically be business meaningful IDs and are used to
83+
// dedupe workflow starts. For this example, we're using the request ID
84+
// allocated by Temporal when the caller workflow schedules
85+
// the operation, this ID is guaranteed to be stable across retries of this
86+
// operation.
87+
//
88+
// Task queue defaults to the task queue this operation is handled on.
89+
WorkflowOptions.newBuilder().setWorkflowId(details.getRequestId()).build())
90+
::doWork);
91+
}
6792
}

core/src/main/java/io/temporal/samples/nexus/service/SampleNexusService.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.temporal.samples.nexus.service;
22

3+
import com.fasterxml.jackson.annotation.JsonAutoDetect;
34
import com.fasterxml.jackson.annotation.JsonCreator;
45
import com.fasterxml.jackson.annotation.JsonProperty;
56
import io.nexusrpc.Operation;
@@ -79,9 +80,64 @@ public String getMessage() {
7980
}
8081
}
8182

83+
class SignalWorkflowInput {
84+
private final String workflowId;
85+
private final String signalName;
86+
87+
@JsonCreator(mode = JsonCreator.Mode.PROPERTIES)
88+
public SignalWorkflowInput(
89+
@JsonProperty("signalName") String signalName,
90+
@JsonProperty("workflowId") String workflowId) {
91+
this.workflowId = workflowId;
92+
this.signalName = signalName;
93+
}
94+
95+
@JsonProperty("workflowId")
96+
public String getWorkflowId() {
97+
return workflowId;
98+
}
99+
100+
@JsonProperty("signalName")
101+
public String getSignalName() {
102+
return signalName;
103+
}
104+
}
105+
106+
class SignalWorkflowOutput {
107+
@JsonCreator
108+
public SignalWorkflowOutput() {}
109+
}
110+
;
111+
112+
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
113+
class DoWorkInput {
114+
@JsonCreator
115+
public DoWorkInput() {}
116+
}
117+
118+
class DoWorkOutput {
119+
private final String message;
120+
121+
@JsonCreator(mode = JsonCreator.Mode.PROPERTIES)
122+
public DoWorkOutput(@JsonProperty("message") String message) {
123+
this.message = message;
124+
}
125+
126+
@JsonProperty("message")
127+
public String getMessage() {
128+
return message;
129+
}
130+
}
131+
82132
@Operation
83133
HelloOutput hello(HelloInput input);
84134

85135
@Operation
86136
EchoOutput echo(EchoInput input);
137+
138+
@Operation
139+
DoWorkOutput doWork(DoWorkInput input);
140+
141+
@Operation
142+
SignalWorkflowOutput signalWorkflow(SignalWorkflowInput input);
87143
}

0 commit comments

Comments
 (0)