Skip to content

Commit 87e57ae

Browse files
Add custom annotation sample
1 parent 7e00384 commit 87e57ae

File tree

5 files changed

+389
-0
lines changed

5 files changed

+389
-0
lines changed
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.customannotation;
21+
22+
import io.temporal.activity.ActivityInterface;
23+
import io.temporal.activity.ActivityOptions;
24+
import io.temporal.client.WorkflowClient;
25+
import io.temporal.client.WorkflowOptions;
26+
import io.temporal.common.RetryOptions;
27+
import io.temporal.serviceclient.WorkflowServiceStubs;
28+
import io.temporal.worker.Worker;
29+
import io.temporal.worker.WorkerFactory;
30+
import io.temporal.worker.WorkerFactoryOptions;
31+
import io.temporal.workflow.Workflow;
32+
import io.temporal.workflow.WorkflowInterface;
33+
import io.temporal.workflow.WorkflowMethod;
34+
import java.time.Duration;
35+
36+
public class CustomAnnotation {
37+
38+
// Define the task queue name
39+
static final String TASK_QUEUE = "CustomAnnotationTaskQueue";
40+
41+
// Define our workflow unique id
42+
static final String WORKFLOW_ID = "CustomAnnotationWorkflow";
43+
44+
/**
45+
* The Workflow Definition's Interface must contain one method annotated with @WorkflowMethod.
46+
*
47+
* <p>Workflow Definitions should not contain any heavyweight computations, non-deterministic
48+
* code, network calls, database operations, etc. Those things should be handled by the
49+
* Activities.
50+
*
51+
* @see WorkflowInterface
52+
* @see WorkflowMethod
53+
*/
54+
@WorkflowInterface
55+
public interface GreetingWorkflow {
56+
57+
/**
58+
* This is the method that is executed when the Workflow Execution is started. The Workflow
59+
* Execution completes when this method finishes execution.
60+
*/
61+
@WorkflowMethod
62+
String getGreeting(String name);
63+
}
64+
65+
/**
66+
* This is the Activity Definition's Interface. Activities are building blocks of any Temporal
67+
* Workflow and contain any business logic that could perform long running computation, network
68+
* calls, etc.
69+
*
70+
* <p>Annotating Activity Definition methods with @ActivityMethod is optional.
71+
*
72+
* @see ActivityInterface
73+
* @see io.temporal.activity.ActivityMethod
74+
*/
75+
@ActivityInterface
76+
public interface GreetingActivities {
77+
78+
/** Define your activity method which can be called during workflow execution */
79+
String composeGreeting(String greeting, String name);
80+
}
81+
82+
// Define the workflow implementation which implements our getGreeting workflow method.
83+
public static class GreetingWorkflowImpl implements GreetingWorkflow {
84+
85+
/**
86+
* Define the GreetingActivities stub. Activity stubs are proxies for activity invocations that
87+
* are executed outside of the workflow thread on the activity worker, that can be on a
88+
* different host. Temporal is going to dispatch the activity results back to the workflow and
89+
* unblock the stub as soon as activity is completed on the activity worker.
90+
*
91+
* <p>In the {@link ActivityOptions} definition the "setStartToCloseTimeout" option sets the
92+
* maximum time of a single Activity execution attempt. For this example it is set to 10
93+
* seconds.
94+
*
95+
* <p>In the {@link ActivityOptions} definition the "setInitialInterval" option sets the
96+
* interval of the first retry. It is set to 1 second. The "setDoNotRetry" option is a list of
97+
* application failures for which retries should not be performed.
98+
*
99+
* <p>By default the maximum number of retry attempts is set to "unlimited" however you can
100+
* change it by adding the "setMaximumAttempts" option to the retry options.
101+
*/
102+
private final GreetingActivities activities =
103+
Workflow.newActivityStub(
104+
GreetingActivities.class,
105+
ActivityOptions.newBuilder()
106+
.setStartToCloseTimeout(Duration.ofSeconds(10))
107+
.setRetryOptions(
108+
RetryOptions.newBuilder()
109+
.setInitialInterval(Duration.ofSeconds(1))
110+
.setDoNotRetry(IllegalArgumentException.class.getName())
111+
.build())
112+
.build());
113+
114+
@Override
115+
public String getGreeting(String name) {
116+
// This is a blocking call that returns only after activity is completed.
117+
return activities.composeGreeting("Hello", name);
118+
}
119+
}
120+
121+
/**
122+
* Implementation of our workflow activity interface. It overwrites our defined composeGreeting
123+
* activity method.
124+
*/
125+
static class GreetingActivitiesImpl implements GreetingActivities {
126+
private int callCount;
127+
private long lastInvocationTime;
128+
129+
/**
130+
* Our activity implementation simulates a failure 3 times. Given our previously set
131+
* RetryOptions, our workflow is going to retry our activity execution.
132+
*/
133+
@Override
134+
@NextRetryDelay(failureType = "java.lang.IllegalStateException", delaySeconds = 2)
135+
public synchronized String composeGreeting(String greeting, String name) {
136+
if (lastInvocationTime != 0) {
137+
long timeSinceLastInvocation = System.currentTimeMillis() - lastInvocationTime;
138+
System.out.print(timeSinceLastInvocation + " milliseconds since last invocation. ");
139+
}
140+
lastInvocationTime = System.currentTimeMillis();
141+
if (++callCount < 4) {
142+
System.out.println("composeGreeting activity is going to fail");
143+
144+
/*
145+
* We throw IllegalStateException here. It is not in the list of "do not retry" exceptions
146+
* set in our RetryOptions, so a workflow retry is going to be issued
147+
*/
148+
throw new IllegalStateException("not yet");
149+
}
150+
151+
// after 3 unsuccessful retries we finally can complete our activity execution
152+
System.out.println("composeGreeting activity is going to complete");
153+
return greeting + " " + name + "!";
154+
}
155+
}
156+
157+
/**
158+
* With our Workflow and Activities defined, we can now start execution. The main method starts
159+
* the worker and then the workflow.
160+
*/
161+
public static void main(String[] args) {
162+
163+
// Get a Workflow service stub.
164+
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
165+
166+
/*
167+
* Get a Workflow service client which can be used to start, Signal, and Query Workflow Executions.
168+
*/
169+
WorkflowClient client = WorkflowClient.newInstance(service);
170+
171+
/*
172+
* Define the workflow factory. It is used to create workflow workers for a specific task queue.
173+
*/
174+
WorkerFactory factory =
175+
WorkerFactory.newInstance(
176+
client,
177+
WorkerFactoryOptions.newBuilder()
178+
.setWorkerInterceptors(new NextRetryDelayActivityAnnotationInterceptor())
179+
.build());
180+
181+
/*
182+
* Define the workflow worker. Workflow workers listen to a defined task queue and process
183+
* workflows and activities.
184+
*/
185+
Worker worker = factory.newWorker(TASK_QUEUE);
186+
187+
/*
188+
* Register our workflow implementation with the worker.
189+
* Workflow implementations must be known to the worker at runtime in
190+
* order to dispatch workflow tasks.
191+
*/
192+
worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class);
193+
194+
/*
195+
* Register our Activity Types with the Worker. Since Activities are stateless and thread-safe,
196+
* the Activity Type is a shared instance.
197+
*/
198+
worker.registerActivitiesImplementations(new GreetingActivitiesImpl());
199+
200+
/*
201+
* Start all the workers registered for a specific task queue.
202+
* The started workers then start polling for workflows and activities.
203+
*/
204+
factory.start();
205+
206+
// Set our workflow options
207+
WorkflowOptions workflowOptions =
208+
WorkflowOptions.newBuilder().setWorkflowId(WORKFLOW_ID).setTaskQueue(TASK_QUEUE).build();
209+
210+
// Create the workflow client stub. It is used to start our workflow execution.
211+
GreetingWorkflow workflow = client.newWorkflowStub(GreetingWorkflow.class, workflowOptions);
212+
213+
/*
214+
* Execute our workflow and wait for it to complete. The call to our getGreeting method is
215+
* synchronous.
216+
*
217+
* See {@link io.temporal.samples.hello.HelloSignal} for an example of starting workflow
218+
* without waiting synchronously for its result.
219+
*/
220+
String greeting = workflow.getGreeting("World");
221+
222+
// Display workflow execution results
223+
System.out.println(greeting);
224+
System.exit(0);
225+
}
226+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package io.temporal.samples.customannotation;
2+
3+
import java.lang.annotation.*;
4+
5+
/**
6+
* NextRetryDelay is an annotation that can be used to specify the next retry delay for a particular
7+
* failure type in a Temporal activity. It is used to provide a custom fixed delay if the activity
8+
* fails with a specific exception type.
9+
*
10+
* <p>For this annotation to work, {@link NextRetryDelayActivityAnnotationInterceptor} must be
11+
* passed as a worker interceptor to the worker factory.
12+
*/
13+
@Documented
14+
@Target(ElementType.METHOD)
15+
@Repeatable(NextRetryDelays.class)
16+
@Retention(RetentionPolicy.RUNTIME)
17+
public @interface NextRetryDelay {
18+
/**
19+
* failureType is the type of failure that this retry delay applies to. It should be the fully
20+
* qualified class name of the exception type or the type of the {@link
21+
* io.temporal.failure.ApplicationFailure}.
22+
*/
23+
String failureType();
24+
25+
/**
26+
* delaySeconds is the fixed delay in seconds that should be applied for the specified failure
27+
* type.
28+
*/
29+
int delaySeconds();
30+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.customannotation;
21+
22+
import io.temporal.activity.ActivityExecutionContext;
23+
import io.temporal.common.interceptors.ActivityInboundCallsInterceptor;
24+
import io.temporal.common.interceptors.WorkerInterceptorBase;
25+
import io.temporal.common.metadata.POJOActivityImplMetadata;
26+
import io.temporal.common.metadata.POJOActivityMethodMetadata;
27+
import io.temporal.failure.ApplicationFailure;
28+
import io.temporal.failure.TemporalFailure;
29+
import java.lang.reflect.Method;
30+
import java.time.Duration;
31+
import java.util.HashMap;
32+
import java.util.List;
33+
import java.util.Map;
34+
35+
/**
36+
* Checks if the activity method has the @NextRetryDelay annotation. If it does, it will throw an
37+
* ApplicationFailure with a delay set to the value of the annotation.
38+
*/
39+
public class NextRetryDelayActivityAnnotationInterceptor extends WorkerInterceptorBase {
40+
41+
@Override
42+
public ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInterceptor next) {
43+
return new ActivityInboundCallsInterceptorAnnotation(next);
44+
}
45+
46+
public static class ActivityInboundCallsInterceptorAnnotation
47+
extends io.temporal.common.interceptors.ActivityInboundCallsInterceptorBase {
48+
private final ActivityInboundCallsInterceptor next;
49+
private Map<String, Integer> delaysPerType = new HashMap<>();
50+
51+
public ActivityInboundCallsInterceptorAnnotation(ActivityInboundCallsInterceptor next) {
52+
super(next);
53+
this.next = next;
54+
}
55+
56+
@Override
57+
public void init(ActivityExecutionContext context) {
58+
List<POJOActivityMethodMetadata> activityMethods =
59+
POJOActivityImplMetadata.newInstance(context.getInstance().getClass())
60+
.getActivityMethods();
61+
// TODO: handle dynamic activity types
62+
POJOActivityMethodMetadata currentActivityMethod =
63+
activityMethods.stream()
64+
.filter(x -> x.getActivityTypeName().equals(context.getInfo().getActivityType()))
65+
.findFirst()
66+
.get();
67+
// Get the implementation method from the interface method
68+
Method implementationMethod;
69+
try {
70+
implementationMethod =
71+
context
72+
.getInstance()
73+
.getClass()
74+
.getMethod(
75+
currentActivityMethod.getMethod().getName(),
76+
currentActivityMethod.getMethod().getParameterTypes());
77+
} catch (NoSuchMethodException e) {
78+
throw new RuntimeException(e);
79+
}
80+
// Get the @NextRetryDelay annotations from the implementation method
81+
NextRetryDelay[] an = implementationMethod.getAnnotationsByType(NextRetryDelay.class);
82+
for (NextRetryDelay a : an) {
83+
delaysPerType.put(a.failureType(), a.delaySeconds());
84+
}
85+
next.init(context);
86+
}
87+
88+
@Override
89+
public ActivityOutput execute(ActivityInput input) {
90+
if (delaysPerType.size() == 0) {
91+
return next.execute(input);
92+
}
93+
try {
94+
return next.execute(input);
95+
} catch (ApplicationFailure ae) {
96+
Integer delay = delaysPerType.get(ae.getType());
97+
if (delay != null) {
98+
// TODO: make sure to pass all the other parameters to the new ApplicationFailure
99+
throw ApplicationFailure.newFailureWithCauseAndDelay(
100+
ae.getMessage(), ae.getType(), ae.getCause(), Duration.ofSeconds(delay));
101+
}
102+
throw ae;
103+
} catch (TemporalFailure tf) {
104+
throw tf;
105+
} catch (Exception e) {
106+
Integer delay = delaysPerType.get(e.getClass().getName());
107+
if (delay != null) {
108+
throw ApplicationFailure.newFailureWithCauseAndDelay(
109+
e.getMessage(), e.getClass().getName(), e, Duration.ofSeconds(delay));
110+
}
111+
throw e;
112+
}
113+
}
114+
}
115+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package io.temporal.samples.customannotation;
2+
3+
import java.lang.annotation.*;
4+
5+
/** NextRetryDelays is a container annotation for multiple {@link NextRetryDelay} annotations. */
6+
@Documented
7+
@Target(ElementType.METHOD)
8+
@Retention(RetentionPolicy.RUNTIME)
9+
public @interface NextRetryDelays {
10+
NextRetryDelay[] value();
11+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Custom annotation
2+
3+
The sample demonstrates how to create a custom annotation using an interceptor. In this case the annotation allows specifying a fixed next retry delay for a certain failure type.
4+
5+
```bash
6+
./gradlew -q execute -PmainClass=io.temporal.samples.customannotation.CustomAnnotation
7+
```

0 commit comments

Comments
 (0)