Skip to content

Commit 3be525a

Browse files
committed
adding cancelation and failure
Signed-off-by: Tihomir Surdilovic <tihomir@temporal.io>
1 parent 50f0981 commit 3be525a

File tree

7 files changed

+226
-45
lines changed

7 files changed

+226
-45
lines changed
Lines changed: 75 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,29 @@
11
package io.temporal.samples.packetdelivery;
22

33
import io.temporal.activity.ActivityOptions;
4-
import io.temporal.workflow.Async;
5-
import io.temporal.workflow.CompletablePromise;
6-
import io.temporal.workflow.Promise;
7-
import io.temporal.workflow.Workflow;
4+
import io.temporal.failure.ActivityFailure;
5+
import io.temporal.failure.CanceledFailure;
6+
import io.temporal.workflow.*;
87
import java.time.Duration;
98
import org.slf4j.Logger;
109

1110
public class PacketDelivery {
1211
private Packet packet;
1312
private boolean deliveryConfirmation = false;
1413
private CompletablePromise delivered = Workflow.newPromise();
15-
private String deliveryConfirmationCode = "";
14+
private CancellationScope cancellationScope;
1615

1716
private Logger logger = Workflow.getLogger(this.getClass().getName());
1817

1918
private final PacketDeliveryActivities activities =
19+
Workflow.newActivityStub(
20+
PacketDeliveryActivities.class,
21+
ActivityOptions.newBuilder()
22+
.setStartToCloseTimeout(Duration.ofSeconds(5))
23+
.setHeartbeatTimeout(Duration.ofSeconds(2))
24+
.build());
25+
26+
private final PacketDeliveryActivities compensationActivities =
2027
Workflow.newActivityStub(
2128
PacketDeliveryActivities.class,
2229
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(3)).build());
@@ -35,35 +42,73 @@ public void processDeliveryAsync() {
3542
}
3643

3744
public void processDelivery() {
38-
while (!deliveryConfirmationCode.equals("Confirmed")) {
39-
// Step 1 perform delivery
40-
logger.info(
41-
"** Performing delivery for packet: " + packet.getId() + " - " + packet.getContent());
42-
activities.performDelivery(packet);
43-
// Step 2 wait for delivery confirmation
44-
logger.info(
45-
"** Delivery for packet: "
46-
+ packet.getId()
47-
+ " - "
48-
+ packet.getContent()
49-
+ " awaiting delivery confirmation");
50-
Workflow.await(() -> deliveryConfirmation);
51-
logger.info(
52-
"** Delivery for packet: "
53-
+ packet.getId()
54-
+ " - "
55-
+ packet.getContent()
56-
+ " received confirmation");
57-
// Step 3 complete delivery processing
58-
logger.info(
59-
"** Completing delivery for packet: " + packet.getId() + " - " + packet.getContent());
60-
deliveryConfirmationCode = activities.completeDelivery(packet);
61-
// Reset deliveryConfirmation
62-
deliveryConfirmation = false;
45+
cancellationScope =
46+
Workflow.newCancellationScope(
47+
() -> {
48+
String deliveryConfirmationResult = "";
49+
while (!deliveryConfirmationResult.equals(PacketUtils.COMPLETION_SUCCESS)) {
50+
// Step 1 perform delivery
51+
logger.info(
52+
"** Performing delivery for packet: "
53+
+ packet.getId()
54+
+ " - "
55+
+ packet.getContent());
56+
activities.performDelivery(packet);
57+
// Step 2 wait for delivery confirmation
58+
logger.info(
59+
"** Delivery for packet: "
60+
+ packet.getId()
61+
+ " - "
62+
+ packet.getContent()
63+
+ " awaiting delivery confirmation");
64+
Workflow.await(() -> deliveryConfirmation);
65+
logger.info(
66+
"** Delivery for packet: "
67+
+ packet.getId()
68+
+ " - "
69+
+ packet.getContent()
70+
+ " received confirmation");
71+
// Step 3 complete delivery processing
72+
logger.info(
73+
"** Completing delivery for packet: "
74+
+ packet.getId()
75+
+ " - "
76+
+ packet.getContent());
77+
deliveryConfirmationResult = activities.completeDelivery(packet);
78+
// Reset deliveryConfirmation
79+
deliveryConfirmation = false;
80+
}
81+
});
82+
83+
try {
84+
cancellationScope.run();
85+
} catch (Exception e) {
86+
System.out.println("*************** E1: " + e.getClass().getName());
87+
if (e instanceof ActivityFailure) {
88+
ActivityFailure activityFailure = (ActivityFailure) e;
89+
if (activityFailure.getCause() instanceof CanceledFailure) {
90+
// Run compensation activity and complete
91+
System.out.println("*************** E11: " + e.getClass().getName());
92+
compensationActivities.compensateDelivery(packet);
93+
}
94+
}
95+
// Just for show for example that cancel could come in while we are waiting on approval signal
96+
// too
97+
else if (e instanceof CanceledFailure) {
98+
// Run compensation activity and complete
99+
compensationActivities.compensateDelivery(packet);
100+
}
101+
return;
63102
}
64103
}
65104

66105
public void confirmDelivery() {
67106
this.deliveryConfirmation = true;
68107
}
108+
109+
public void cancelDelivery(String reason) {
110+
if (cancellationScope != null) {
111+
cancellationScope.cancel(reason);
112+
}
113+
}
69114
}

core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryActivities.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,6 @@ public interface PacketDeliveryActivities {
1010
void performDelivery(Packet packet);
1111

1212
String completeDelivery(Packet packet);
13+
14+
String compensateDelivery(Packet packet);
1315
}
Lines changed: 126 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,139 @@
11
package io.temporal.samples.packetdelivery;
22

3-
import java.util.ArrayList;
4-
import java.util.List;
3+
import io.temporal.activity.Activity;
4+
import io.temporal.activity.ActivityExecutionContext;
5+
import io.temporal.client.ActivityCompletionException;
6+
import io.temporal.client.WorkflowClient;
7+
import java.util.*;
58

69
public class PacketDeliveryActivitiesImpl implements PacketDeliveryActivities {
10+
private List<Packet> packets =
11+
Arrays.asList(
12+
new Packet(1, "books"),
13+
new Packet(2, "jewelry"),
14+
new Packet(3, "furniture"),
15+
new Packet(4, "food"),
16+
new Packet(5, "electronics"));
17+
private WorkflowClient client;
18+
19+
public PacketDeliveryActivitiesImpl(WorkflowClient client) {
20+
this.client = client;
21+
}
22+
723
@Override
824
public List<Packet> generatePackets() {
9-
List<Packet> result = new ArrayList<>();
10-
result.add(new Packet(1, "books"));
11-
result.add(new Packet(2, "jewelry"));
12-
result.add(new Packet(3, "furniture"));
13-
result.add(new Packet(4, "food"));
14-
result.add(new Packet(5, "electronics"));
15-
return result;
25+
return packets;
1626
}
1727

1828
@Override
1929
public void performDelivery(Packet packet) {
30+
ActivityExecutionContext context = Activity.getExecutionContext();
2031
System.out.println(
2132
"** Activity - Performing delivery for packet: "
2233
+ packet.getId()
2334
+ " with content: "
2435
+ packet.getContent());
25-
sleep(2);
36+
for (int i = 0; i < 4; i++) {
37+
try {
38+
// Perform the heartbeat. Used to notify the workflow that activity execution is alive
39+
context.heartbeat(i);
40+
} catch (ActivityCompletionException e) {
41+
System.out.println(
42+
"** Activity - Canceling delivery activity for packet: "
43+
+ packet.getId()
44+
+ " with content: "
45+
+ packet.getContent());
46+
throw e;
47+
}
48+
}
2649
}
2750

2851
@Override
2952
public String completeDelivery(Packet packet) {
53+
ActivityExecutionContext context = Activity.getExecutionContext();
3054
System.out.println(
3155
"** Activity - Completing delivery for package: "
3256
+ packet.getId()
3357
+ " with content: "
3458
+ packet.getContent());
59+
for (int i = 0; i < 4; i++) {
60+
try {
61+
// Perform the heartbeat. Used to notify the workflow that activity execution is alive
62+
context.heartbeat(i);
63+
} catch (ActivityCompletionException e) {
64+
System.out.println(
65+
"** Activity - Canceling complete delivery activity for packet: "
66+
+ packet.getId()
67+
+ " with content: "
68+
+ packet.getContent());
69+
throw e;
70+
}
71+
}
72+
// For sample we just confirm
73+
return randomCompletionDeliveryResult(packet);
74+
}
75+
76+
@Override
77+
public String compensateDelivery(Packet packet) {
78+
System.out.println(
79+
"** Activity - Compensating delivery for package: "
80+
+ packet.getId()
81+
+ " with content: "
82+
+ packet.getContent());
3583
sleep(1);
36-
// for sample we just confirm
37-
return "Confirmed";
84+
return PacketUtils.COMPENSATION_COMPLETED;
85+
}
86+
87+
/**
88+
* For this sample activity completion result can drive if 1. Delivery confirmation is completed,
89+
* in which case we complete delivery 2. Delivery confirmation is failed, in which case we run the
90+
* delivery again 3. Delivery confirmation is cancelled, in which case we want to cancel delivery
91+
* and perform "cleanup activity" Note that any delivery can cancel itself OR another delivery, so
92+
* for example Furniure delivery can cancel the Food delivery. For sample we have some specific
93+
* rules Which delivery can cancel which
94+
*/
95+
private String randomCompletionDeliveryResult(Packet packet) {
96+
Random random = new Random();
97+
double randomValue = random.nextDouble();
98+
if (randomValue < 0.10) { // 10% chance for delivery completion to be canceled
99+
int toCancelDelivery = determineCancelRules(packet);
100+
System.out.println(
101+
"** Activity - Delivery completion result for package: "
102+
+ packet.getId()
103+
+ " with content: "
104+
+ packet.getContent()
105+
+ ": "
106+
+ "Cancelling delivery: "
107+
+ toCancelDelivery);
108+
109+
// send cancellation signal for packet to be canceled
110+
PacketDeliveryWorkflow packetWorkflow =
111+
client.newWorkflowStub(
112+
PacketDeliveryWorkflow.class,
113+
Activity.getExecutionContext().getInfo().getWorkflowId());
114+
packetWorkflow.cancelDelivery(toCancelDelivery, "canceled from delivery " + packet.getId());
115+
116+
return PacketUtils.COMPLETION_CANCELLED;
117+
}
118+
if (randomValue < 0.20) { // 20% chance for delivery completion to fail
119+
System.out.println(
120+
"** Activity - Delivery completion result for package: "
121+
+ packet.getId()
122+
+ " with content: "
123+
+ packet.getContent()
124+
+ ": "
125+
+ "Failed");
126+
return PacketUtils.COMPLETION_FAILURE;
127+
}
128+
129+
System.out.println(
130+
"** Activity - Delivery completion result for package: "
131+
+ packet.getId()
132+
+ " with content: "
133+
+ packet.getContent()
134+
+ ": "
135+
+ "Successful");
136+
return PacketUtils.COMPLETION_SUCCESS;
38137
}
39138

40139
private void sleep(int seconds) {
@@ -44,4 +143,19 @@ private void sleep(int seconds) {
44143
System.out.println(e.getMessage());
45144
}
46145
}
146+
147+
/**
148+
* Sample rules for canceling different deliveries We just rotate the list 1-5 (packet ids) by
149+
* packet id and return first result
150+
*/
151+
private int determineCancelRules(Packet packet) {
152+
List<Integer> list = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5));
153+
Collections.rotate(list, packet.getId());
154+
System.out.println(
155+
"** Activity - Package delivery : "
156+
+ packet.getId()
157+
+ " canceling package delivery: "
158+
+ list.get(0));
159+
return list.get(0);
160+
}
47161
}

core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflow.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,7 @@ public interface PacketDeliveryWorkflow {
1111

1212
@SignalMethod
1313
void confirmDelivery(int deliveryId);
14+
15+
@SignalMethod
16+
void cancelDelivery(int deliveryId, String reason);
1417
}

core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflowImpl.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@
1212
public class PacketDeliveryWorkflowImpl implements PacketDeliveryWorkflow {
1313

1414
private final Map<Integer, PacketDelivery> packetDeliveries = new HashMap<>();
15-
// private Logger logger = Workflow.getLogger(this.getClass().getName());
1615

1716
private final PacketDeliveryActivities activities =
1817
Workflow.newActivityStub(
1918
PacketDeliveryActivities.class,
20-
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(3)).build());
19+
ActivityOptions.newBuilder()
20+
.setStartToCloseTimeout(Duration.ofSeconds(5))
21+
.setHeartbeatTimeout(Duration.ofSeconds(2))
22+
.build());
2123

2224
@Override
2325
public String execute() {
@@ -41,4 +43,11 @@ public void confirmDelivery(int deliveryId) {
4143
packetDeliveries.get(deliveryId).confirmDelivery();
4244
}
4345
}
46+
47+
@Override
48+
public void cancelDelivery(int deliveryId, String reason) {
49+
if (packetDeliveries.containsKey(deliveryId)) {
50+
packetDeliveries.get(deliveryId).cancelDelivery(reason);
51+
}
52+
}
4453
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package io.temporal.samples.packetdelivery;
2+
3+
public class PacketUtils {
4+
public static String COMPLETION_SUCCESS = "Delivery Completion Successful";
5+
public static String COMPLETION_FAILURE = "Delivery Completion Failed";
6+
public static String COMPLETION_CANCELLED = "Delivery Completion Cancelled";
7+
public static String COMPENSATION_COMPLETED = "Delivery Compensation Completed";
8+
}

core/src/main/java/io/temporal/samples/packetdelivery/Starter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ public static void main(String[] args) {
1515
Worker worker = factory.newWorker("packet-delivery-taskqueue");
1616

1717
worker.registerWorkflowImplementationTypes(PacketDeliveryWorkflowImpl.class);
18-
worker.registerActivitiesImplementations(new PacketDeliveryActivitiesImpl());
18+
worker.registerActivitiesImplementations(new PacketDeliveryActivitiesImpl(client));
1919

2020
factory.start();
2121

0 commit comments

Comments
 (0)