Skip to content

Commit 2a51eab

Browse files
committed
[WIP] Async package delivery sample
Signed-off-by: Tihomir Surdilovic <tihomir@temporal.io>
1 parent 902591b commit 2a51eab

7 files changed

Lines changed: 266 additions & 0 deletions

File tree

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.temporal.samples.packetdelivery;
2+
3+
public class Packet {
4+
private int id;
5+
private String content;
6+
7+
public Packet() {}
8+
9+
public Packet(int id, String content) {
10+
this.id = id;
11+
this.content = content;
12+
}
13+
14+
public int getId() {
15+
return id;
16+
}
17+
18+
public String getContent() {
19+
return content;
20+
}
21+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package io.temporal.samples.packetdelivery;
2+
3+
import io.temporal.activity.ActivityOptions;
4+
import io.temporal.workflow.Async;
5+
import io.temporal.workflow.CompletablePromise;
6+
import io.temporal.workflow.Promise;
7+
import io.temporal.workflow.Workflow;
8+
import java.time.Duration;
9+
import org.slf4j.Logger;
10+
11+
public class PacketDelivery {
12+
private Packet packet;
13+
private boolean deliveryConfirmation = false;
14+
private CompletablePromise delivered = Workflow.newPromise();
15+
private String deliveryConfirmationCode = "";
16+
17+
private Logger logger = Workflow.getLogger(this.getClass().getName());
18+
19+
private final PacketDeliveryActivities activities =
20+
Workflow.newActivityStub(
21+
PacketDeliveryActivities.class,
22+
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(3)).build());
23+
24+
public PacketDelivery(Packet packet) {
25+
this.packet = packet;
26+
processDeliveryAsync();
27+
}
28+
29+
public Promise<Void> getDelivered() {
30+
System.out.println("*************** CHECKING GET DELIVERED!!! : " + delivered);
31+
return delivered;
32+
}
33+
34+
public void processDeliveryAsync() {
35+
delivered.completeFrom(Async.procedure(this::processDelivery));
36+
System.out.println("*************** DONE WITH PROCESS DELIVERY ASYNC");
37+
}
38+
39+
public void processDelivery() {
40+
while (!deliveryConfirmationCode.equals("Confirmed")) {
41+
// Step 1 perform delivery
42+
logger.info(
43+
"** Performing delivery for packet: " + packet.getId() + " - " + packet.getContent());
44+
activities.performDelivery(packet);
45+
// Step 2 wait for delivery confirmation
46+
logger.info(
47+
"** Delivery for packet: "
48+
+ packet.getId()
49+
+ " - "
50+
+ packet.getContent()
51+
+ " awaiting delivery confirmation");
52+
Workflow.await(() -> deliveryConfirmation);
53+
logger.info(
54+
"** Delivery for packet: "
55+
+ packet.getId()
56+
+ " - "
57+
+ packet.getContent()
58+
+ " received confirmation");
59+
// Step 3 complete delivery processing
60+
logger.info(
61+
"** Completing delivery for packet: " + packet.getId() + " - " + packet.getContent());
62+
deliveryConfirmationCode = activities.completeDelivery(packet);
63+
// Reset deliveryConfirmation
64+
deliveryConfirmation = false;
65+
}
66+
}
67+
68+
public void confirmDelivery() {
69+
this.deliveryConfirmation = true;
70+
}
71+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package io.temporal.samples.packetdelivery;
2+
3+
import io.temporal.activity.ActivityInterface;
4+
import java.util.List;
5+
6+
@ActivityInterface
7+
public interface PacketDeliveryActivities {
8+
List<Packet> generatePackets();
9+
10+
void performDelivery(Packet packet);
11+
12+
String completeDelivery(Packet packet);
13+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package io.temporal.samples.packetdelivery;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
6+
public class PacketDeliveryActivitiesImpl implements PacketDeliveryActivities {
7+
@Override
8+
public List<Packet> generatePackets() {
9+
List<Packet> result = new ArrayList<>();
10+
result.add(new Packet(1, "books"));
11+
result.add(new Packet(2, "jewelry"));
12+
result.add(new Packet(3, "furniture"));
13+
result.add(new Packet(4, "food"));
14+
result.add(new Packet(5, "electronics"));
15+
return result;
16+
}
17+
18+
@Override
19+
public void performDelivery(Packet packet) {
20+
System.out.println(
21+
"** Activity - Performing delivery for packet: "
22+
+ packet.getId()
23+
+ " with content: "
24+
+ packet.getContent());
25+
sleep(2);
26+
}
27+
28+
@Override
29+
public String completeDelivery(Packet packet) {
30+
System.out.println(
31+
"** Activity - Completing delivery for package: "
32+
+ packet.getId()
33+
+ " with content: "
34+
+ packet.getContent());
35+
sleep(1);
36+
// for sample we just confirm
37+
return "Confirmed";
38+
}
39+
40+
private void sleep(int seconds) {
41+
try {
42+
Thread.sleep(seconds * 1000L);
43+
} catch (Exception e) {
44+
System.out.println(e.getMessage());
45+
}
46+
}
47+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package io.temporal.samples.packetdelivery;
2+
3+
import io.temporal.workflow.SignalMethod;
4+
import io.temporal.workflow.WorkflowInterface;
5+
import io.temporal.workflow.WorkflowMethod;
6+
7+
@WorkflowInterface
8+
public interface PacketDeliveryWorkflow {
9+
@WorkflowMethod
10+
String execute();
11+
12+
@SignalMethod
13+
void confirmDelivery(int deliveryId);
14+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package io.temporal.samples.packetdelivery;
2+
3+
import io.temporal.activity.ActivityOptions;
4+
import io.temporal.workflow.Promise;
5+
import io.temporal.workflow.Workflow;
6+
import java.time.Duration;
7+
import java.util.ArrayList;
8+
import java.util.HashMap;
9+
import java.util.List;
10+
import java.util.Map;
11+
12+
public class PacketDeliveryWorkflowImpl implements PacketDeliveryWorkflow {
13+
14+
private final Map<Integer, PacketDelivery> packetDeliveries = new HashMap<>();
15+
// private Logger logger = Workflow.getLogger(this.getClass().getName());
16+
17+
private final PacketDeliveryActivities activities =
18+
Workflow.newActivityStub(
19+
PacketDeliveryActivities.class,
20+
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(3)).build());
21+
22+
@Override
23+
public String execute() {
24+
List<Promise<Void>> packetsDelivered = new ArrayList<>();
25+
// Step 1 - upload initial packets to deliver
26+
List<Packet> initialPackets = activities.generatePackets();
27+
// Step 2 - set up delivery processing
28+
for (Packet packet : initialPackets) {
29+
PacketDelivery delivery = new PacketDelivery(packet);
30+
packetDeliveries.put(packet.getId(), delivery);
31+
packetsDelivered.add(delivery.getDelivered());
32+
}
33+
34+
Promise.allOf(packetsDelivered).get();
35+
return "completed";
36+
}
37+
38+
@Override
39+
public void confirmDelivery(int deliveryId) {
40+
if (packetDeliveries.containsKey(deliveryId)) {
41+
packetDeliveries.get(deliveryId).confirmDelivery();
42+
}
43+
}
44+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package io.temporal.samples.packetdelivery;
2+
3+
import io.temporal.client.WorkflowClient;
4+
import io.temporal.client.WorkflowOptions;
5+
import io.temporal.client.WorkflowStub;
6+
import io.temporal.serviceclient.WorkflowServiceStubs;
7+
import io.temporal.worker.Worker;
8+
import io.temporal.worker.WorkerFactory;
9+
10+
public class Starter {
11+
public static void main(String[] args) {
12+
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
13+
WorkflowClient client = WorkflowClient.newInstance(service);
14+
WorkerFactory factory = WorkerFactory.newInstance(client);
15+
Worker worker = factory.newWorker("packet-delivery-taskqueue");
16+
17+
worker.registerWorkflowImplementationTypes(PacketDeliveryWorkflowImpl.class);
18+
worker.registerActivitiesImplementations(new PacketDeliveryActivitiesImpl());
19+
20+
factory.start();
21+
22+
PacketDeliveryWorkflow workflow =
23+
client.newWorkflowStub(
24+
PacketDeliveryWorkflow.class,
25+
WorkflowOptions.newBuilder()
26+
.setWorkflowId("packet-delivery-workflow")
27+
.setTaskQueue("packet-delivery-taskqueue")
28+
.build());
29+
30+
WorkflowClient.start(workflow::execute);
31+
32+
// start completing package deliveries (send confirmations)
33+
sleep(3);
34+
workflow.confirmDelivery(3); // furniture
35+
sleep(1);
36+
workflow.confirmDelivery(5); // electronics
37+
sleep(1);
38+
workflow.confirmDelivery(1); // books
39+
sleep(1);
40+
workflow.confirmDelivery(2); // jewelry
41+
sleep(1);
42+
workflow.confirmDelivery(4); // food
43+
44+
// wait for workflow to complete
45+
String result = WorkflowStub.fromTyped(workflow).getResult(String.class);
46+
System.out.println("** Workflow Result: " + result);
47+
}
48+
49+
private static void sleep(int seconds) {
50+
try {
51+
Thread.sleep(seconds * 1000L);
52+
} catch (Exception e) {
53+
System.out.println(e.getMessage());
54+
}
55+
}
56+
}

0 commit comments

Comments
 (0)