Skip to content

Commit 30f0d36

Browse files
committed
Adding persistence test module
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent e78f79e commit 30f0d36

16 files changed

Lines changed: 302 additions & 28 deletions

File tree

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import io.serverlessworkflow.impl.WorkflowDefinition;
1919
import io.serverlessworkflow.impl.WorkflowInstance;
20-
import java.util.Collection;
2120
import java.util.Optional;
2221
import java.util.stream.Stream;
2322

@@ -29,16 +28,6 @@ protected DefaultPersistenceInstanceReader(PersistenceInstanceStore store) {
2928
this.store = store;
3029
}
3130

32-
@Override
33-
public Stream<WorkflowInstance> scan(
34-
WorkflowDefinition definition, Collection<String> instanceIds) {
35-
PersistenceInstanceTransaction transaction = store.begin();
36-
return instanceIds.stream()
37-
.map(id -> read(transaction, definition, id))
38-
.flatMap(Optional::stream)
39-
.onClose(() -> transaction.commit());
40-
}
41-
4231
@Override
4332
public Optional<WorkflowInstance> find(WorkflowDefinition definition, String instanceId) {
4433
PersistenceInstanceTransaction transaction = store.begin();
@@ -57,10 +46,10 @@ private Optional<WorkflowInstance> read(
5746
}
5847

5948
@Override
60-
public Stream<WorkflowInstance> scanAll(WorkflowDefinition definition) {
49+
public Stream<WorkflowInstance> scanAll(WorkflowDefinition definition, String applicationId) {
6150
PersistenceInstanceTransaction transaction = store.begin();
6251
return transaction
63-
.scanAll(definition)
52+
.scanAll(applicationId, definition)
6453
.onClose(() -> transaction.commit())
6554
.map(v -> new WorkflowPersistenceInstance(definition, v));
6655
}

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,16 @@
1717

1818
import io.serverlessworkflow.impl.WorkflowDefinition;
1919
import io.serverlessworkflow.impl.WorkflowInstance;
20-
import java.util.Collection;
2120
import java.util.Optional;
2221
import java.util.stream.Stream;
2322

2423
public interface PersistenceInstanceReader {
2524

26-
Stream<WorkflowInstance> scanAll(WorkflowDefinition definition);
25+
default Stream<WorkflowInstance> scanAll(WorkflowDefinition definition) {
26+
return scanAll(definition, definition.application().id());
27+
}
2728

28-
Stream<WorkflowInstance> scan(WorkflowDefinition definition, Collection<String> instanceIds);
29+
Stream<WorkflowInstance> scanAll(WorkflowDefinition definition, String applicationId);
2930

3031
Optional<WorkflowInstance> find(WorkflowDefinition definition, String instanceId);
3132
}

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceTransaction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public interface PersistenceInstanceTransaction {
4040

4141
void clearStatus(WorkflowContextData workflowContext);
4242

43-
Stream<PersistenceWorkflowInfo> scanAll(WorkflowDefinition definition);
43+
Stream<PersistenceWorkflowInfo> scanAll(String applicationId, WorkflowDefinition definition);
4444

4545
Optional<PersistenceWorkflowInfo> readWorkflowInfo(
4646
WorkflowDefinition definition, String instanceId);

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,13 @@ public class WorkflowPersistenceInstance extends WorkflowMutableInstance {
3131
public WorkflowPersistenceInstance(WorkflowDefinition definition, PersistenceWorkflowInfo info) {
3232
super(definition, info.id(), info.input());
3333
this.info = info;
34+
this.startedAt = info.startedAt();
3435
}
3536

3637
@Override
3738
public CompletableFuture<WorkflowModel> start() {
3839
return startExecution(
3940
() -> {
40-
startedAt = info.startedAt();
4141
if (info.status() == WorkflowStatus.SUSPENDED) {
4242
internalSuspend();
4343
}

impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,16 @@
3232
import java.util.stream.Collectors;
3333
import java.util.stream.Stream;
3434

35-
public abstract class BigMapInstanceTransaction<V, T, S> implements PersistenceInstanceTransaction {
35+
public abstract class BigMapInstanceTransaction<V, T, S, A>
36+
implements PersistenceInstanceTransaction {
3637

3738
@Override
3839
public void writeInstanceData(WorkflowContextData workflowContext) {
40+
String key = key(workflowContext);
3941
instanceData(workflowContext.definition())
40-
.put(key(workflowContext), marshallInstance(workflowContext.instanceData()));
42+
.put(key, marshallInstance(workflowContext.instanceData()));
43+
applicationData()
44+
.put(key, marshallApplicationId(workflowContext.definition().application().id()));
4145
}
4246

4347
@Override
@@ -53,28 +57,36 @@ public void writeCompletedTask(WorkflowContextData workflowContext, TaskContextD
5357
tasks(key(workflowContext))
5458
.put(
5559
taskContext.position().jsonPointer(),
56-
marshallTaskRetried(workflowContext, (TaskContext) taskContext));
60+
marshallTaskCompleted(workflowContext, (TaskContext) taskContext));
5761
}
5862

5963
@Override
60-
public Stream<PersistenceWorkflowInfo> scanAll(WorkflowDefinition definition) {
64+
public Stream<PersistenceWorkflowInfo> scanAll(
65+
String applicationId, WorkflowDefinition definition) {
6166
Map<String, V> instances = instanceData(definition);
67+
Map<String, A> applicationData = applicationData();
6268
Map<String, S> status = status(definition);
6369
return instances.entrySet().stream()
70+
.filter(e -> testAppl(applicationData, e.getKey(), applicationId))
6471
.map(
6572
e ->
6673
readPersistenceInfo(
6774
e.getKey(), e.getValue(), tasks(e.getKey()), status.get(e.getKey())));
6875
}
6976

77+
private boolean testAppl(Map<String, A> applicationData, String key, String applicationId) {
78+
A item = applicationData.get(key);
79+
return item == null || unmarshallApplicationId(item).equals(applicationId);
80+
}
81+
7082
@Override
7183
public Optional<PersistenceWorkflowInfo> readWorkflowInfo(
7284
WorkflowDefinition definition, String key) {
7385
Map<String, V> instances = instanceData(definition);
7486
return instances.containsKey(key)
75-
? Optional.empty()
76-
: Optional.of(
77-
readPersistenceInfo(key, instances.get(key), tasks(key), status(definition).get(key)));
87+
? Optional.of(
88+
readPersistenceInfo(key, instances.get(key), tasks(key), status(definition).get(key)))
89+
: Optional.empty();
7890
}
7991

8092
@Override
@@ -117,6 +129,8 @@ private String key(WorkflowContextData workflowContext) {
117129
return workflowContext.instanceData().id();
118130
}
119131

132+
protected abstract Map<String, A> applicationData();
133+
120134
protected abstract Map<String, V> instanceData(WorkflowDefinitionData definition);
121135

122136
protected abstract Map<String, S> status(WorkflowDefinitionData workflowContext);
@@ -131,6 +145,8 @@ protected abstract T marshallTaskCompleted(
131145
protected abstract T marshallTaskRetried(
132146
WorkflowContextData workflowContext, TaskContext taskContext);
133147

148+
protected abstract A marshallApplicationId(String id);
149+
134150
protected abstract S marshallStatus(WorkflowStatus status);
135151

136152
protected abstract PersistenceTaskInfo unmarshallTaskInfo(T taskData);
@@ -139,5 +155,7 @@ protected abstract T marshallTaskRetried(
139155

140156
protected abstract WorkflowStatus unmarshallStatus(S statusData);
141157

158+
protected abstract String unmarshallApplicationId(A a);
159+
142160
protected abstract void removeTasks(String key);
143161
}

impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.serverlessworkflow.impl.WorkflowStatus;
2323
import io.serverlessworkflow.impl.executors.AbstractTaskExecutor;
2424
import io.serverlessworkflow.impl.executors.TransitionInfo;
25+
import io.serverlessworkflow.impl.marshaller.MarshallingUtils;
2526
import io.serverlessworkflow.impl.marshaller.TaskStatus;
2627
import io.serverlessworkflow.impl.marshaller.WorkflowBufferFactory;
2728
import io.serverlessworkflow.impl.marshaller.WorkflowInputBuffer;
@@ -34,7 +35,7 @@
3435
import java.io.ByteArrayOutputStream;
3536

3637
public abstract class BytesMapInstanceTransaction
37-
extends BigMapInstanceTransaction<byte[], byte[], byte[]> {
38+
extends BigMapInstanceTransaction<byte[], byte[], byte[], byte[]> {
3839

3940
private static final byte VERSION_0 = 0;
4041
private static final byte VERSION_1 = 1;
@@ -92,6 +93,14 @@ protected void writeModel(WorkflowOutputBuffer writer, WorkflowModel model) {
9293
writer.writeObject(model);
9394
}
9495

96+
protected byte[] marshallApplicationId(String id) {
97+
return MarshallingUtils.writeString(factory, id);
98+
}
99+
100+
protected String unmarshallApplicationId(byte[] value) {
101+
return MarshallingUtils.readString(factory, value);
102+
}
103+
95104
@Override
96105
protected byte[] marshallTaskRetried(
97106
WorkflowContextData workflowContext, TaskContext taskContext) {

impl/persistence/jackson-marshaller/src/main/java/io/serverlessworkflow/impl/marshaller/jackson/JacksonModelMarshaller.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ public void write(WorkflowOutputBuffer buffer, JacksonModel object) {
3838
@Override
3939
public JacksonModel read(WorkflowInputBuffer buffer) {
4040
try {
41-
return JsonUtils.mapper().readValue(buffer.readBytes(), JacksonModel.class);
41+
JacksonModel model = JsonUtils.mapper().readValue(buffer.readBytes(), JacksonModel.class);
42+
return model == null ? JacksonModel.NULL : model;
4243
} catch (IOException e) {
4344
throw new UncheckedIOException(e);
4445
}

impl/persistence/mvstore/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,9 @@
1616
<groupId>io.serverlessworkflow</groupId>
1717
<artifactId>serverlessworkflow-persistence-big-map</artifactId>
1818
</dependency>
19+
<dependency>
20+
<groupId>io.serverlessworkflow</groupId>
21+
<artifactId>serverlessworkflow-persistence-tests</artifactId>
22+
</dependency>
1923
</dependencies>
2024
</project>

impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public void close() {
4141
}
4242

4343
@Override
44-
public BigMapInstanceTransaction<byte[], byte[], byte[]> begin() {
44+
public BigMapInstanceTransaction<byte[], byte[], byte[], byte[]> begin() {
4545
return new MVStoreTransaction(mvStore.begin(), factory);
4646
}
4747
}

impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStoreTransaction.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,9 @@ public void commit() {
8282
public void rollback() {
8383
transaction.rollback();
8484
}
85+
86+
@Override
87+
protected Map<String, byte[]> applicationData() {
88+
return transaction.openMap("APPLICATION");
89+
}
8590
}

0 commit comments

Comments
 (0)