From 11d18b13e431f429b41d240e1a316a3ca034610a Mon Sep 17 00:00:00 2001 From: fjtirado Date: Fri, 23 Jan 2026 21:04:06 +0100 Subject: [PATCH 1/3] Refactoring persistence layer Signed-off-by: fjtirado --- .../AbstractPersistenceInstanceWriter.java} | 88 ++++------- .../DefaultPersistenceInstanceReader.java | 67 +++++++++ .../DefaultPersistenceInstanceWriter.java} | 7 +- .../PersistenceInstanceHandlers.java | 24 ++- .../persistence}/PersistenceInstanceInfo.java | 2 +- .../PersistenceInstanceReader.java | 12 +- .../PersistenceInstanceStore.java} | 6 +- .../PersistenceInstanceTransaction.java | 48 ++++++ .../PersistenceInstanceWriter.java | 5 +- .../WorkflowPersistenceListener.java | 6 - .../bigmap/BigMapInstanceReader.java | 141 ------------------ .../bigmap/BigMapInstanceTransaction.java | 109 +++++++++++++- .../bigmap/BytesMapInstanceReader.java | 86 ----------- ....java => BytesMapInstanceTransaction.java} | 63 +++++++- .../BytesMapPersistenceInstanceHandlers.java | 72 --------- .../mvstore/MVStorePersistenceStore.java | 17 ++- .../mvstore/MVStoreTransaction.java | 11 +- .../impl/test/DBGenerator.java | 6 +- .../impl/test/MvStorePersistenceTest.java | 17 +-- 19 files changed, 364 insertions(+), 423 deletions(-) rename impl/persistence/{bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceWriter.java => api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java} (51%) create mode 100644 impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java rename impl/persistence/{bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapIdInstanceWriter.java => api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java} (77%) rename impl/persistence/{bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap => api/src/main/java/io/serverlessworkflow/impl/persistence}/PersistenceInstanceInfo.java (93%) rename impl/persistence/{bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceStore.java => api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceStore.java} (78%) create mode 100644 impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceTransaction.java delete mode 100644 impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceReader.java delete mode 100644 impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceReader.java rename impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/{BytesMapInstanceWriter.java => BytesMapInstanceTransaction.java} (60%) delete mode 100644 impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapPersistenceInstanceHandlers.java diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java similarity index 51% rename from impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceWriter.java rename to impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java index 8d305264d..0af2c32f1 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceWriter.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java @@ -13,41 +13,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.impl.persistence.bigmap; +package io.serverlessworkflow.impl.persistence; -import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.TaskContextData; import io.serverlessworkflow.impl.WorkflowContextData; -import io.serverlessworkflow.impl.WorkflowInstanceData; import io.serverlessworkflow.impl.WorkflowStatus; -import io.serverlessworkflow.impl.persistence.PersistenceInstanceWriter; import java.util.function.Consumer; -public abstract class BigMapInstanceWriter implements PersistenceInstanceWriter { +public abstract class AbstractPersistenceInstanceWriter implements PersistenceInstanceWriter { - private BigMapInstanceStore store; + private final PersistenceInstanceStore store; - protected BigMapInstanceWriter(BigMapInstanceStore store) { + protected AbstractPersistenceInstanceWriter(PersistenceInstanceStore store) { this.store = store; } - private void doTransaction(Consumer> operations) { - BigMapInstanceTransaction transaction = store.begin(); - try { - operations.accept(transaction); - transaction.commit(); - } catch (Exception ex) { - transaction.rollback(); - throw ex; - } - } - @Override public void started(WorkflowContextData workflowContext) { - doTransaction( - t -> - t.instanceData(workflowContext.definition()) - .put(key(workflowContext), marshallInstance(workflowContext.instanceData()))); + doTransaction(t -> t.writeInstanceData(key(workflowContext), workflowContext)); } @Override @@ -65,61 +48,52 @@ public void aborted(WorkflowContextData workflowContext) { removeProcessInstance(workflowContext); } + protected void removeProcessInstance(WorkflowContextData workflowContext) { + doTransaction( + t -> { + K key = key(workflowContext); + t.removeInstanceData(key, workflowContext); + t.removeStatus(key, workflowContext); + t.removeTasks(key); + }); + } + @Override - public void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext) {} + public void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext) { + // not recording + } @Override public void taskRetried(WorkflowContextData workflowContext, TaskContextData taskContext) { - doTransaction( - t -> - t.tasks(key(workflowContext)) - .put( - taskContext.position().jsonPointer(), - marshallTaskRetried(workflowContext, (TaskContext) taskContext))); + doTransaction(t -> t.writeRetryTask(key(workflowContext), workflowContext, taskContext)); } @Override public void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext) { - doTransaction( - t -> - t.tasks(key(workflowContext)) - .put( - taskContext.position().jsonPointer(), - marshallTaskCompleted(workflowContext, (TaskContext) taskContext))); + doTransaction(t -> t.writeCompletedTask(key(workflowContext), workflowContext, taskContext)); } @Override public void suspended(WorkflowContextData workflowContext) { doTransaction( - t -> - t.status(workflowContext.definition()) - .put(key(workflowContext), marshallStatus(WorkflowStatus.SUSPENDED))); + t -> t.writeStatus(key(workflowContext), WorkflowStatus.SUSPENDED, workflowContext)); } @Override public void resumed(WorkflowContextData workflowContext) { - doTransaction(t -> t.status(workflowContext.definition()).remove(key(workflowContext))); + doTransaction(t -> t.removeStatus(key(workflowContext), workflowContext)); } - protected void removeProcessInstance(WorkflowContextData workflowContext) { - doTransaction( - t -> { - K key = key(workflowContext); - t.instanceData(workflowContext.definition()).remove(key); - t.status(workflowContext.definition()).remove(key); - t.cleanupTasks(key); - }); + private void doTransaction(Consumer> operations) { + PersistenceInstanceTransaction transaction = store.begin(); + try { + operations.accept(transaction); + transaction.commit(); + } catch (Exception ex) { + transaction.rollback(); + throw ex; + } } protected abstract K key(WorkflowContextData workflowContext); - - protected abstract V marshallInstance(WorkflowInstanceData instance); - - protected abstract T marshallTaskCompleted( - WorkflowContextData workflowContext, TaskContext taskContext); - - protected abstract T marshallTaskRetried( - WorkflowContextData workflowContext, TaskContext taskContext); - - protected abstract S marshallStatus(WorkflowStatus status); } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java new file mode 100644 index 000000000..b21092f4e --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java @@ -0,0 +1,67 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowInstance; +import java.util.Collection; +import java.util.Optional; +import java.util.stream.Stream; + +public class DefaultPersistenceInstanceReader implements PersistenceInstanceReader { + + private final PersistenceInstanceStore store; + + protected DefaultPersistenceInstanceReader(PersistenceInstanceStore store) { + this.store = store; + } + + @Override + public Stream scan( + WorkflowDefinition definition, Collection instanceIds) { + PersistenceInstanceTransaction transaction = store.begin(); + return instanceIds.stream() + .map(id -> read(transaction, definition, id)) + .flatMap(Optional::stream) + .onClose(() -> transaction.commit()); + } + + @Override + public Optional find(WorkflowDefinition definition, String instanceId) { + PersistenceInstanceTransaction transaction = store.begin(); + try { + return read(transaction, definition, instanceId); + } catch (Exception ex) { + transaction.rollback(); + throw ex; + } + } + + private Optional read( + PersistenceInstanceTransaction t, WorkflowDefinition definition, String instanceId) { + return t.readWorkflowInfo(definition, instanceId) + .map(i -> new WorkflowPersistenceInstance(definition, i)); + } + + @Override + public Stream scanAll(WorkflowDefinition definition) { + PersistenceInstanceTransaction transaction = store.begin(); + return transaction + .scanAll(definition) + .onClose(() -> transaction.commit()) + .map(v -> new WorkflowPersistenceInstance(definition, v)); + } +} diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapIdInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java similarity index 77% rename from impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapIdInstanceWriter.java rename to impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java index 25ca9e4f9..07d4c3497 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapIdInstanceWriter.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java @@ -13,14 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.impl.persistence.bigmap; +package io.serverlessworkflow.impl.persistence; import io.serverlessworkflow.impl.WorkflowContextData; -public abstract class BigMapIdInstanceWriter - extends BigMapInstanceWriter { +public class DefaultPersistenceInstanceWriter extends AbstractPersistenceInstanceWriter { - protected BigMapIdInstanceWriter(BigMapInstanceStore store) { + public DefaultPersistenceInstanceWriter(PersistenceInstanceStore store) { super(store); } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java index 4d470af1f..529630f3d 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java @@ -17,15 +17,26 @@ import static io.serverlessworkflow.impl.WorkflowUtils.safeClose; -public abstract class PersistenceInstanceHandlers implements AutoCloseable { +public class PersistenceInstanceHandlers implements AutoCloseable { - protected final PersistenceInstanceWriter writer; - protected final PersistenceInstanceReader reader; + public static PersistenceInstanceHandlers from(PersistenceInstanceStore store) { + return new PersistenceInstanceHandlers<>( + new DefaultPersistenceInstanceWriter(store), + new DefaultPersistenceInstanceReader(store), + store); + } + + private final PersistenceInstanceWriter writer; + private final PersistenceInstanceReader reader; + private final PersistenceInstanceStore store; - protected PersistenceInstanceHandlers( - PersistenceInstanceWriter writer, PersistenceInstanceReader reader) { + public PersistenceInstanceHandlers( + PersistenceInstanceWriter writer, + PersistenceInstanceReader reader, + PersistenceInstanceStore store) { this.writer = writer; this.reader = reader; + this.store = store; } public PersistenceInstanceWriter writer() { @@ -38,7 +49,6 @@ public PersistenceInstanceReader reader() { @Override public void close() { - safeClose(writer); - safeClose(reader); + safeClose(store); } } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/PersistenceInstanceInfo.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceInfo.java similarity index 93% rename from impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/PersistenceInstanceInfo.java rename to impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceInfo.java index b45829868..a4a3a8350 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/PersistenceInstanceInfo.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceInfo.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.impl.persistence.bigmap; +package io.serverlessworkflow.impl.persistence; import io.serverlessworkflow.impl.WorkflowModel; import java.time.Instant; diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java index 5678e8941..3ee1ef21f 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java @@ -18,16 +18,14 @@ import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowInstance; import java.util.Collection; -import java.util.Map; import java.util.Optional; +import java.util.stream.Stream; -public interface PersistenceInstanceReader extends AutoCloseable { - Map readAll(WorkflowDefinition definition); +public interface PersistenceInstanceReader { - Map read(WorkflowDefinition definition, Collection instanceIds); + Stream scanAll(WorkflowDefinition definition); - Optional read(WorkflowDefinition definition, String instanceId); + Stream scan(WorkflowDefinition definition, Collection instanceIds); - @Override - default void close() {} + Optional find(WorkflowDefinition definition, String instanceId); } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceStore.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceStore.java similarity index 78% rename from impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceStore.java rename to impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceStore.java index aa1d998e0..4a1638419 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceStore.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceStore.java @@ -13,8 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.impl.persistence.bigmap; +package io.serverlessworkflow.impl.persistence; -public interface BigMapInstanceStore extends AutoCloseable { - BigMapInstanceTransaction begin(); +public interface PersistenceInstanceStore extends AutoCloseable { + PersistenceInstanceTransaction begin(); } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceTransaction.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceTransaction.java new file mode 100644 index 000000000..87d242af9 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceTransaction.java @@ -0,0 +1,48 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import io.serverlessworkflow.impl.TaskContextData; +import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowStatus; +import java.util.Optional; +import java.util.stream.Stream; + +public interface PersistenceInstanceTransaction { + + void commit(); + + void rollback(); + + void writeInstanceData(K key, WorkflowContextData workflowContext); + + void writeRetryTask(K key, WorkflowContextData workflowContext, TaskContextData taskContext); + + void writeCompletedTask(K key, WorkflowContextData workflowContext, TaskContextData taskContext); + + void writeStatus(K key, WorkflowStatus suspended, WorkflowContextData workflowContext); + + void removeInstanceData(K key, WorkflowContextData workflowContext); + + void removeStatus(K key, WorkflowContextData workflowContext); + + void removeTasks(K instanceId); + + Stream scanAll(WorkflowDefinition definition); + + Optional readWorkflowInfo(WorkflowDefinition definition, K key); +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java index f6b07548f..55f79faff 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java @@ -18,7 +18,7 @@ import io.serverlessworkflow.impl.TaskContextData; import io.serverlessworkflow.impl.WorkflowContextData; -public interface PersistenceInstanceWriter extends AutoCloseable { +public interface PersistenceInstanceWriter { void started(WorkflowContextData workflowContext); @@ -37,7 +37,4 @@ public interface PersistenceInstanceWriter extends AutoCloseable { void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext); void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext); - - @Override - default void close() {} } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java index 958036fc3..781b8c12a 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java @@ -15,8 +15,6 @@ */ package io.serverlessworkflow.impl.persistence; -import static io.serverlessworkflow.impl.WorkflowUtils.safeClose; - import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent; import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; @@ -80,8 +78,4 @@ public void onTaskCompleted(TaskCompletedEvent ev) { public void onTaskRetried(TaskRetriedEvent ev) { persistenceWriter.taskRetried(ev.workflowContext(), ev.taskContext()); } - - public void close() { - safeClose(persistenceWriter); - } } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceReader.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceReader.java deleted file mode 100644 index e12de9b89..000000000 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceReader.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.impl.persistence.bigmap; - -import io.serverlessworkflow.impl.WorkflowDefinition; -import io.serverlessworkflow.impl.WorkflowInstance; -import io.serverlessworkflow.impl.WorkflowStatus; -import io.serverlessworkflow.impl.persistence.PersistenceInstanceReader; -import io.serverlessworkflow.impl.persistence.PersistenceTaskInfo; -import io.serverlessworkflow.impl.persistence.PersistenceWorkflowInfo; -import io.serverlessworkflow.impl.persistence.WorkflowPersistenceInstance; -import java.util.Collection; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Optional; -import java.util.function.Function; -import java.util.stream.Collectors; - -public abstract class BigMapInstanceReader implements PersistenceInstanceReader { - - private final BigMapInstanceStore store; - - protected BigMapInstanceReader(BigMapInstanceStore store) { - this.store = store; - } - - private Result doTransaction( - Function, Result> operations) { - BigMapInstanceTransaction transaction = store.begin(); - try { - Result result = operations.apply(transaction); - transaction.commit(); - return result; - } catch (Exception ex) { - transaction.rollback(); - throw ex; - } - } - - @Override - public Map readAll(WorkflowDefinition definition) { - return doTransaction( - t -> { - Map instances = t.instanceData(definition); - Map status = t.status(definition); - return instances.entrySet().stream() - .map( - e -> - restore( - definition, - e.getKey(), - e.getValue(), - t.tasks(e.getKey()), - status.get(e.getKey()))) - .collect(Collectors.toMap(WorkflowInstance::id, i -> i)); - }); - } - - @Override - public Map read( - WorkflowDefinition definition, Collection instanceIds) { - return doTransaction( - t -> { - Map instances = t.instanceData(definition); - Map status = t.status(definition); - return instanceIds.stream() - .map(id -> read(instances, status, t.tasks(id), definition, id)) - .flatMap(Optional::stream) - .collect(Collectors.toMap(WorkflowInstance::id, id -> id)); - }); - } - - @Override - public Optional read(WorkflowDefinition definition, String instanceId) { - return doTransaction( - t -> - read( - t.instanceData(definition), - t.status(definition), - t.tasks(instanceId), - definition, - instanceId)); - } - - private Optional read( - Map instances, - Map status, - Map tasks, - WorkflowDefinition definition, - String instanceId) { - return instances.containsKey(instanceId) - ? Optional.empty() - : Optional.of( - restore( - definition, instanceId, instances.get(instanceId), tasks, status.get(instanceId))); - } - - public void close() {} - - protected WorkflowInstance restore( - WorkflowDefinition definition, - String instanceId, - V instanceData, - Map tasksData, - S status) { - return new WorkflowPersistenceInstance( - definition, readPersistenceInfo(instanceId, instanceData, tasksData, status)); - } - - protected abstract PersistenceTaskInfo unmarshallTaskInfo(T taskData); - - protected abstract PersistenceInstanceInfo unmarshallInstanceInfo(V instanceData); - - protected abstract WorkflowStatus unmarshallStatus(S statusData); - - protected PersistenceWorkflowInfo readPersistenceInfo( - String instanceId, V instanceData, Map tasksData, S status) { - PersistenceInstanceInfo instanceInfo = unmarshallInstanceInfo(instanceData); - return new PersistenceWorkflowInfo( - instanceId, - instanceInfo.startedAt(), - instanceInfo.input(), - status == null ? null : unmarshallStatus(status), - tasksData.entrySet().stream() - .collect( - Collectors.toMap(Entry::getKey, entry -> unmarshallTaskInfo(entry.getValue())))); - } -} diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java index 72b89ed17..350a3f29b 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java @@ -15,20 +15,115 @@ */ package io.serverlessworkflow.impl.persistence.bigmap; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.TaskContextData; +import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowDefinitionData; +import io.serverlessworkflow.impl.WorkflowInstanceData; +import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.persistence.PersistenceInstanceInfo; +import io.serverlessworkflow.impl.persistence.PersistenceInstanceTransaction; +import io.serverlessworkflow.impl.persistence.PersistenceTaskInfo; +import io.serverlessworkflow.impl.persistence.PersistenceWorkflowInfo; import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; -public interface BigMapInstanceTransaction { +public abstract class BigMapInstanceTransaction + implements PersistenceInstanceTransaction { - Map instanceData(WorkflowDefinitionData definition); + @Override + public void writeInstanceData(K key, WorkflowContextData workflowContext) { + instanceData(workflowContext.definition()) + .put(key, marshallInstance(workflowContext.instanceData())); + } - Map status(WorkflowDefinitionData workflowContext); + @Override + public void writeRetryTask( + K key, WorkflowContextData workflowContext, TaskContextData taskContext) { + tasks(key) + .put( + taskContext.position().jsonPointer(), + marshallTaskRetried(workflowContext, (TaskContext) taskContext)); + } - Map tasks(K instanceId); + @Override + public void writeCompletedTask( + K key, WorkflowContextData workflowContext, TaskContextData taskContext) { + tasks(key) + .put( + taskContext.position().jsonPointer(), + marshallTaskRetried(workflowContext, (TaskContext) taskContext)); + } - void cleanupTasks(K instanceId); + @Override + public Stream scanAll(WorkflowDefinition definition) { + Map instances = instanceData(definition); + Map status = status(definition); + return instances.entrySet().stream() + .map( + e -> + readPersistenceInfo( + e.getKey(), e.getValue(), tasks(e.getKey()), status.get(e.getKey()))); + } - void commit(); + @Override + public Optional readWorkflowInfo(WorkflowDefinition definition, K key) { + Map instances = instanceData(definition); + return instances.containsKey(key) + ? Optional.empty() + : Optional.of( + readPersistenceInfo(key, instances.get(key), tasks(key), status(definition).get(key))); + } - void rollback(); + @Override + public void writeStatus(K key, WorkflowStatus status, WorkflowContextData workflowContext) { + status(workflowContext.definition()).put(key, marshallStatus(status)); + } + + public void removeInstanceData(K key, WorkflowContextData workflowContext) { + instanceData(workflowContext.definition()).remove(key); + } + + public void removeStatus(K key, WorkflowContextData workflowContext) { + status(workflowContext.definition()).remove(key); + } + + protected PersistenceWorkflowInfo readPersistenceInfo( + K instanceId, V instanceData, Map tasksData, S status) { + PersistenceInstanceInfo instanceInfo = unmarshallInstanceInfo(instanceData); + return new PersistenceWorkflowInfo( + instanceId.toString(), + instanceInfo.startedAt(), + instanceInfo.input(), + status == null ? null : unmarshallStatus(status), + tasksData.entrySet().stream() + .collect( + Collectors.toMap(Entry::getKey, entry -> unmarshallTaskInfo(entry.getValue())))); + } + + protected abstract Map instanceData(WorkflowDefinitionData definition); + + protected abstract Map status(WorkflowDefinitionData workflowContext); + + protected abstract Map tasks(K instanceId); + + protected abstract V marshallInstance(WorkflowInstanceData instance); + + protected abstract T marshallTaskCompleted( + WorkflowContextData workflowContext, TaskContext taskContext); + + protected abstract T marshallTaskRetried( + WorkflowContextData workflowContext, TaskContext taskContext); + + protected abstract S marshallStatus(WorkflowStatus status); + + protected abstract PersistenceTaskInfo unmarshallTaskInfo(T taskData); + + protected abstract PersistenceInstanceInfo unmarshallInstanceInfo(V instanceData); + + protected abstract WorkflowStatus unmarshallStatus(S statusData); } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceReader.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceReader.java deleted file mode 100644 index cd92e2df1..000000000 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceReader.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.impl.persistence.bigmap; - -import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowStatus; -import io.serverlessworkflow.impl.marshaller.WorkflowBufferFactory; -import io.serverlessworkflow.impl.marshaller.WorkflowInputBuffer; -import io.serverlessworkflow.impl.persistence.CompletedTaskInfo; -import io.serverlessworkflow.impl.persistence.PersistenceTaskInfo; -import io.serverlessworkflow.impl.persistence.RetriedTaskInfo; -import java.io.ByteArrayInputStream; - -public class BytesMapInstanceReader extends BigMapInstanceReader { - - private final WorkflowBufferFactory factory; - - public BytesMapInstanceReader( - BigMapInstanceStore store, WorkflowBufferFactory factory) { - super(store); - this.factory = factory; - } - - @Override - protected PersistenceTaskInfo unmarshallTaskInfo(byte[] taskData) { - try (WorkflowInputBuffer buffer = factory.input(new ByteArrayInputStream(taskData))) { - byte version = buffer.readByte(); - switch (version) { - case MarshallingUtils.VERSION_0: - default: - return readVersion0(buffer); - case MarshallingUtils.VERSION_1: - return readVersion1(buffer); - } - } - } - - private PersistenceTaskInfo readVersion1(WorkflowInputBuffer buffer) { - TaskStatus taskStatus = buffer.readEnum(TaskStatus.class); - switch (taskStatus) { - case COMPLETED: - default: - return readVersion0(buffer); - case RETRIED: - return new RetriedTaskInfo(buffer.readShort()); - } - } - - private PersistenceTaskInfo readVersion0(WorkflowInputBuffer buffer) { - return new CompletedTaskInfo( - buffer.readInstant(), - (WorkflowModel) buffer.readObject(), - (WorkflowModel) buffer.readObject(), - buffer.readBoolean(), - buffer.readBoolean() ? buffer.readString() : null); - } - - @Override - protected PersistenceInstanceInfo unmarshallInstanceInfo(byte[] instanceData) { - try (WorkflowInputBuffer buffer = factory.input(new ByteArrayInputStream(instanceData))) { - buffer.readByte(); // version byte not used at the moment - return new PersistenceInstanceInfo(buffer.readInstant(), (WorkflowModel) buffer.readObject()); - } - } - - @Override - protected WorkflowStatus unmarshallStatus(byte[] statusData) { - try (WorkflowInputBuffer buffer = factory.input(new ByteArrayInputStream(statusData))) { - buffer.readByte(); // version byte not used at the moment - return buffer.readEnum(WorkflowStatus.class); - } - } -} diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceWriter.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java similarity index 60% rename from impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceWriter.java rename to impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java index 279bc0937..f54b2064c 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceWriter.java +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java @@ -23,16 +23,21 @@ import io.serverlessworkflow.impl.executors.AbstractTaskExecutor; import io.serverlessworkflow.impl.executors.TaskExecutor; import io.serverlessworkflow.impl.marshaller.WorkflowBufferFactory; +import io.serverlessworkflow.impl.marshaller.WorkflowInputBuffer; import io.serverlessworkflow.impl.marshaller.WorkflowOutputBuffer; +import io.serverlessworkflow.impl.persistence.CompletedTaskInfo; +import io.serverlessworkflow.impl.persistence.PersistenceInstanceInfo; +import io.serverlessworkflow.impl.persistence.PersistenceTaskInfo; +import io.serverlessworkflow.impl.persistence.RetriedTaskInfo; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -public class BytesMapInstanceWriter extends BigMapIdInstanceWriter { +public abstract class BytesMapInstanceTransaction + extends BigMapInstanceTransaction { private final WorkflowBufferFactory factory; - public BytesMapInstanceWriter( - BigMapInstanceStore store, WorkflowBufferFactory factory) { - super(store); + protected BytesMapInstanceTransaction(WorkflowBufferFactory factory) { this.factory = factory; } @@ -95,4 +100,54 @@ protected byte[] marshallTaskRetried( } return bytes.toByteArray(); } + + @Override + protected PersistenceTaskInfo unmarshallTaskInfo(byte[] taskData) { + try (WorkflowInputBuffer buffer = factory.input(new ByteArrayInputStream(taskData))) { + byte version = buffer.readByte(); + switch (version) { + case MarshallingUtils.VERSION_0: + default: + return readVersion0(buffer); + case MarshallingUtils.VERSION_1: + return readVersion1(buffer); + } + } + } + + private PersistenceTaskInfo readVersion1(WorkflowInputBuffer buffer) { + TaskStatus taskStatus = buffer.readEnum(TaskStatus.class); + switch (taskStatus) { + case COMPLETED: + default: + return readVersion0(buffer); + case RETRIED: + return new RetriedTaskInfo(buffer.readShort()); + } + } + + private PersistenceTaskInfo readVersion0(WorkflowInputBuffer buffer) { + return new CompletedTaskInfo( + buffer.readInstant(), + (WorkflowModel) buffer.readObject(), + (WorkflowModel) buffer.readObject(), + buffer.readBoolean(), + buffer.readBoolean() ? buffer.readString() : null); + } + + @Override + protected PersistenceInstanceInfo unmarshallInstanceInfo(byte[] instanceData) { + try (WorkflowInputBuffer buffer = factory.input(new ByteArrayInputStream(instanceData))) { + buffer.readByte(); // version byte not used at the moment + return new PersistenceInstanceInfo(buffer.readInstant(), (WorkflowModel) buffer.readObject()); + } + } + + @Override + protected WorkflowStatus unmarshallStatus(byte[] statusData) { + try (WorkflowInputBuffer buffer = factory.input(new ByteArrayInputStream(statusData))) { + buffer.readByte(); // version byte not used at the moment + return buffer.readEnum(WorkflowStatus.class); + } + } } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapPersistenceInstanceHandlers.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapPersistenceInstanceHandlers.java deleted file mode 100644 index e77678886..000000000 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapPersistenceInstanceHandlers.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.impl.persistence.bigmap; - -import static io.serverlessworkflow.impl.WorkflowUtils.safeClose; - -import io.serverlessworkflow.impl.marshaller.DefaultBufferFactory; -import io.serverlessworkflow.impl.marshaller.WorkflowBufferFactory; -import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers; -import io.serverlessworkflow.impl.persistence.PersistenceInstanceReader; -import io.serverlessworkflow.impl.persistence.PersistenceInstanceWriter; - -public class BytesMapPersistenceInstanceHandlers extends PersistenceInstanceHandlers - implements AutoCloseable { - - private final BigMapInstanceStore store; - - protected BytesMapPersistenceInstanceHandlers( - PersistenceInstanceWriter writer, - PersistenceInstanceReader reader, - BigMapInstanceStore store) { - super(writer, reader); - this.store = store; - } - - public static class Builder { - private final BigMapInstanceStore store; - private WorkflowBufferFactory factory; - - private Builder(BigMapInstanceStore store) { - this.store = store; - } - - public Builder withFactory(WorkflowBufferFactory factory) { - this.factory = factory; - return this; - } - - public PersistenceInstanceHandlers build() { - if (factory == null) { - factory = DefaultBufferFactory.factory(); - } - return new BytesMapPersistenceInstanceHandlers( - new BytesMapInstanceWriter(store, factory), - new BytesMapInstanceReader(store, factory), - store); - } - } - - public static Builder builder(BigMapInstanceStore store) { - return new Builder(store); - } - - @Override - public void close() { - super.close(); - safeClose(store); - } -} diff --git a/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java b/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java index 0f206f9bf..cfd1a475c 100644 --- a/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java +++ b/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java @@ -15,26 +15,33 @@ */ package io.serverlessworkflow.impl.persistence.mvstore; -import io.serverlessworkflow.impl.persistence.bigmap.BigMapInstanceStore; +import io.serverlessworkflow.impl.marshaller.DefaultBufferFactory; +import io.serverlessworkflow.impl.marshaller.WorkflowBufferFactory; +import io.serverlessworkflow.impl.persistence.PersistenceInstanceStore; import io.serverlessworkflow.impl.persistence.bigmap.BigMapInstanceTransaction; import org.h2.mvstore.MVStore; import org.h2.mvstore.tx.TransactionStore; -public class MVStorePersistenceStore - implements BigMapInstanceStore { +public class MVStorePersistenceStore implements PersistenceInstanceStore { private final TransactionStore mvStore; + private WorkflowBufferFactory factory; public MVStorePersistenceStore(String dbName) { + this(dbName, DefaultBufferFactory.factory()); + } + + public MVStorePersistenceStore(String dbName, WorkflowBufferFactory factory) { this.mvStore = new TransactionStore(MVStore.open(dbName)); + this.factory = factory; } @Override - public void close() { + public void close() throws Exception { mvStore.close(); } @Override public BigMapInstanceTransaction begin() { - return new MVStoreTransaction(mvStore.begin()); + return new MVStoreTransaction(mvStore.begin(), factory); } } diff --git a/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStoreTransaction.java b/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStoreTransaction.java index 66b09499e..d52655923 100644 --- a/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStoreTransaction.java +++ b/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStoreTransaction.java @@ -18,19 +18,20 @@ import io.serverlessworkflow.api.types.Document; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.WorkflowDefinitionData; -import io.serverlessworkflow.impl.persistence.bigmap.BigMapInstanceTransaction; +import io.serverlessworkflow.impl.marshaller.WorkflowBufferFactory; +import io.serverlessworkflow.impl.persistence.bigmap.BytesMapInstanceTransaction; import java.util.Map; import org.h2.mvstore.tx.Transaction; import org.h2.mvstore.tx.TransactionMap; -public class MVStoreTransaction - implements BigMapInstanceTransaction { +public class MVStoreTransaction extends BytesMapInstanceTransaction { protected static final String ID_SEPARATOR = "-"; private final Transaction transaction; - public MVStoreTransaction(Transaction transaction) { + public MVStoreTransaction(Transaction transaction, WorkflowBufferFactory factory) { + super(factory); this.transaction = transaction; } @@ -55,7 +56,7 @@ public Map status(WorkflowDefinitionData workflowContext) { } @Override - public void cleanupTasks(String instanceId) { + public void removeTasks(String instanceId) { transaction.removeMap(taskMap(instanceId)); } diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java index f1a6adba0..7f33418b2 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java @@ -22,7 +22,6 @@ import io.serverlessworkflow.impl.WorkflowInstance; import io.serverlessworkflow.impl.persistence.PersistenceApplicationBuilder; import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers; -import io.serverlessworkflow.impl.persistence.bigmap.BytesMapPersistenceInstanceHandlers; import io.serverlessworkflow.impl.persistence.mvstore.MVStorePersistenceStore; import java.io.IOException; import java.nio.file.Files; @@ -43,9 +42,8 @@ public static void main(String[] args) throws IOException { private static void runInstance(String dbName, boolean suspend) throws IOException { LOG.info("---> Generating db samples at {}", dbName); Files.deleteIfExists(Path.of(dbName)); - try (PersistenceInstanceHandlers factories = - BytesMapPersistenceInstanceHandlers.builder(new MVStorePersistenceStore(dbName)) - .build(); + try (PersistenceInstanceHandlers factories = + PersistenceInstanceHandlers.from(new MVStorePersistenceStore(dbName)); WorkflowApplication application = PersistenceApplicationBuilder.builder( WorkflowApplication.builder().withListener(new TraceExecutionListener()), diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java index d814a25f5..ee57ee3c9 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java @@ -24,7 +24,6 @@ import io.serverlessworkflow.impl.WorkflowStatus; import io.serverlessworkflow.impl.persistence.PersistenceApplicationBuilder; import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers; -import io.serverlessworkflow.impl.persistence.bigmap.BytesMapPersistenceInstanceHandlers; import io.serverlessworkflow.impl.persistence.mvstore.MVStorePersistenceStore; import java.io.IOException; import java.nio.file.Files; @@ -38,18 +37,17 @@ public class MvStorePersistenceTest { @Test void testSimpleRun() throws IOException { final String dbName = "db-samples/simple.db"; - try (PersistenceInstanceHandlers handlers = - BytesMapPersistenceInstanceHandlers.builder(new MVStorePersistenceStore(dbName)) - .build(); + try (PersistenceInstanceHandlers handlers = + PersistenceInstanceHandlers.from(new MVStorePersistenceStore(dbName)); WorkflowApplication application = PersistenceApplicationBuilder.builder(WorkflowApplication.builder(), handlers.writer()) .build(); ) { WorkflowDefinition definition = application.workflowDefinition( readWorkflowFromClasspath("workflows-samples/simple-expression.yaml")); - assertThat(handlers.reader().readAll(definition).values()).isEmpty(); + assertThat(handlers.reader().scanAll(definition).count()).isEqualTo(0); definition.instance(Map.of()).start().join(); - assertThat(handlers.reader().readAll(definition).values()).isEmpty(); + assertThat(handlers.reader().scanAll(definition).count()).isEqualTo(0); } finally { Files.delete(Path.of(dbName)); } @@ -92,9 +90,8 @@ void testRestoreSuspendedInstanceV1() throws IOException { private void runIt(String dbName, WorkflowStatus expectedStatus) throws IOException { TaskCounterPerInstanceListener taskCounter = new TaskCounterPerInstanceListener(); - try (PersistenceInstanceHandlers handlers = - BytesMapPersistenceInstanceHandlers.builder(new MVStorePersistenceStore(dbName)) - .build(); + try (PersistenceInstanceHandlers handlers = + PersistenceInstanceHandlers.from(new MVStorePersistenceStore(dbName)); WorkflowApplication application = PersistenceApplicationBuilder.builder( WorkflowApplication.builder() @@ -105,7 +102,7 @@ private void runIt(String dbName, WorkflowStatus expectedStatus) throws IOExcept WorkflowDefinition definition = application.workflowDefinition( readWorkflowFromClasspath("workflows-samples/set-listen-to-any.yaml")); - Collection instances = handlers.reader().readAll(definition).values(); + Collection instances = handlers.reader().scanAll(definition).toList(); assertThat(instances).hasSize(1); instances.forEach(WorkflowInstance::start); assertThat(instances) From e78f79e30677f9dbef5485c56c19ce5328f2c3b4 Mon Sep 17 00:00:00 2001 From: fjtirado Date: Mon, 26 Jan 2026 23:16:17 +0100 Subject: [PATCH 2/3] After review modifications Signed-off-by: fjtirado --- .../impl/WorkflowApplication.java | 18 +++- .../impl/WorkflowDefinition.java | 5 + .../impl/WorkflowDefinitionData.java | 2 + .../impl/WorkflowDefinitionId.java | 4 + .../impl/marshaller/MarshallingUtils.java | 98 ++++++++++++++++++ .../impl/marshaller}/TaskStatus.java | 4 +- .../AbstractPersistenceInstanceWriter.java | 99 ------------------- .../DefaultPersistenceInstanceHandlers.java | 43 ++++++++ .../DefaultPersistenceInstanceReader.java | 12 +-- .../DefaultPersistenceInstanceWriter.java | 70 ++++++++++++- .../PersistenceInstanceHandlers.java | 21 +--- .../persistence/PersistenceInstanceStore.java | 7 +- .../PersistenceInstanceTransaction.java | 19 ++-- .../bigmap/BigMapInstanceTransaction.java | 64 +++++++----- .../bigmap/BytesMapInstanceTransaction.java | 29 +++--- .../persistence/bigmap/MarshallingUtils.java | 24 ----- .../mvstore/MVStorePersistenceStore.java | 6 +- .../impl/test/DBGenerator.java | 5 +- .../impl/test/MvStorePersistenceTest.java | 44 ++++++--- 19 files changed, 348 insertions(+), 226 deletions(-) create mode 100644 impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/MarshallingUtils.java rename impl/persistence/{bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap => api/src/main/java/io/serverlessworkflow/impl/marshaller}/TaskStatus.java (90%) delete mode 100644 impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java create mode 100644 impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java delete mode 100644 impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/MarshallingUtils.java diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java index 550cddb2f..5d7593af7 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java @@ -58,6 +58,7 @@ public class WorkflowApplication implements AutoCloseable { + private final String id; private final TaskExecutorFactory taskFactory; private final ExpressionFactory exprFactory; private final ResourceLoaderFactory resourceLoaderFactory; @@ -108,6 +109,7 @@ private WorkflowApplication(Builder builder) { this.templateResolver = builder.templateResolver; this.functionReader = builder.functionReader; this.defaultCatalogURI = builder.defaultCatalogURI; + this.id = builder.id; } public TaskExecutorFactory taskFactory() { @@ -165,6 +167,7 @@ public SchemaValidator getValidator(SchemaInline inline) { }; } + private String id; private TaskExecutorFactory taskFactory; private Collection exprFactories = new HashSet<>(); private Collection listeners = @@ -198,6 +201,11 @@ private Builder() { .forEach(a -> additionalObjects.put(a.name(), a)); } + public Builder withId(String id) { + this.id = id; + return this; + } + public Builder withListener(WorkflowExecutionListener listener) { listeners.add(listener); return this; @@ -304,13 +312,14 @@ public Builder withDefaultCatalogURI(URI defaultCatalogURI) { } public WorkflowApplication build() { + if (modelFactory == null) { modelFactory = loadFirst(WorkflowModelFactory.class) .orElseThrow( () -> new IllegalStateException( - "WorkflowModelFactory instance has to be set in WorkflowApplication or present in the classpath")); + "WorkflowModelFactory instance has to be set in WorkflowApplication or pr^eesent in the classpath")); } if (contextFactory == null) { contextFactory = modelFactory; @@ -360,6 +369,9 @@ public WorkflowApplication build() { if (defaultCatalogURI == null) { defaultCatalogURI = URI.create("https://github.com/serverlessworkflow/catalog"); } + if (id == null) { + id = idFactory.get(); + } return new WorkflowApplication(this); } } @@ -453,6 +465,10 @@ public URI defaultCatalogURI() { return defaultCatalogURI; } + public String id() { + return id; + } + public Optional additionalObject( String name, WorkflowContext workflowContext, TaskContext taskContext) { return Optional.ofNullable(additionalObjects.get(name)) diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java index 6fe8856ad..503cc3829 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java @@ -165,6 +165,11 @@ public void addTaskExecutor(WorkflowMutablePosition position, TaskExecutor ta executors.put(position.jsonPointer(), taskExecutor); } + @Override + public WorkflowDefinitionId id() { + return definitionId; + } + @Override public void close() { safeClose(resourceLoader); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinitionData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinitionData.java index 8a0d5c0af..9466c497f 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinitionData.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinitionData.java @@ -22,4 +22,6 @@ public interface WorkflowDefinitionData { Workflow workflow(); WorkflowApplication application(); + + WorkflowDefinitionId id(); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinitionId.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinitionId.java index d69872cfa..b0f37ef3a 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinitionId.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinitionId.java @@ -32,4 +32,8 @@ public static WorkflowDefinitionId of(Workflow workflow) { public static WorkflowDefinitionId fromName(String name) { return new WorkflowDefinitionId(DEFAULT_NAMESPACE, name, DEFAULT_VERSION); } + + public String toString(String separator) { + return namespace + separator + name + separator + version; + } } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/MarshallingUtils.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/MarshallingUtils.java new file mode 100644 index 000000000..e82333951 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/MarshallingUtils.java @@ -0,0 +1,98 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.marshaller; + +import io.serverlessworkflow.impl.WorkflowModel; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.time.Instant; +import java.util.function.BiConsumer; +import java.util.function.Function; + +public class MarshallingUtils { + + private MarshallingUtils() {} + + public static byte[] writeInstant(WorkflowBufferFactory factory, Instant instant) { + return writeValue(factory, instant, (b, v) -> b.writeInstant(v)); + } + + public static > byte[] writeEnum( + WorkflowBufferFactory factory, T enumInstance) { + return writeValue(factory, enumInstance, (b, v) -> b.writeEnum(v)); + } + + public static byte[] writeModel(WorkflowBufferFactory factory, WorkflowModel model) { + return writeValue(factory, model, (b, v) -> b.writeObject(v)); + } + + public static byte[] writeShort(WorkflowBufferFactory factory, short value) { + return writeValue(factory, value, (b, v) -> b.writeShort(v)); + } + + public static byte[] writeBoolean(WorkflowBufferFactory factory, boolean value) { + return writeValue(factory, value, (b, v) -> b.writeBoolean(v)); + } + + public static byte[] writeString(WorkflowBufferFactory factory, String value) { + return writeValue(factory, value, (b, v) -> b.writeString(v)); + } + + public static String readString(WorkflowBufferFactory factory, byte[] value) { + return readValue(factory, value, WorkflowInputBuffer::readString); + } + + public static boolean readBoolean(WorkflowBufferFactory factory, byte[] value) { + return readValue(factory, value, WorkflowInputBuffer::readBoolean); + } + + public static short readShort(WorkflowBufferFactory factory, byte[] value) { + return readValue(factory, value, WorkflowInputBuffer::readShort); + } + + public static WorkflowModel readModel(WorkflowBufferFactory factory, byte[] value) { + return readValue(factory, value, b -> (WorkflowModel) b.readObject()); + } + + public static Instant readInstant(WorkflowBufferFactory factory, byte[] value) { + return readValue(factory, value, WorkflowInputBuffer::readInstant); + } + + public static > T readEnum( + WorkflowBufferFactory factory, byte[] value, Class enumClass) { + return readValue(factory, value, b -> b.readEnum(enumClass)); + } + + private static byte[] writeValue( + WorkflowBufferFactory factory, T value, BiConsumer valueConsumer) { + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + try (WorkflowOutputBuffer buffer = factory.output(bytesOut)) { + valueConsumer.accept(buffer, value); + } + return bytesOut.toByteArray(); + } + + private static T readValue( + WorkflowBufferFactory factory, byte[] value, Function valueConsumer) { + if (value == null) { + return null; + } + ByteArrayInputStream bytesIn = new ByteArrayInputStream(value); + try (WorkflowInputBuffer buffer = factory.input(bytesIn)) { + return valueConsumer.apply(buffer); + } + } +} diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/TaskStatus.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/TaskStatus.java similarity index 90% rename from impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/TaskStatus.java rename to impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/TaskStatus.java index 5db1a57cd..74bdbfc3d 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/TaskStatus.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/TaskStatus.java @@ -13,9 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.impl.persistence.bigmap; +package io.serverlessworkflow.impl.marshaller; -enum TaskStatus { +public enum TaskStatus { COMPLETED, RETRIED } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java deleted file mode 100644 index 0af2c32f1..000000000 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.impl.persistence; - -import io.serverlessworkflow.impl.TaskContextData; -import io.serverlessworkflow.impl.WorkflowContextData; -import io.serverlessworkflow.impl.WorkflowStatus; -import java.util.function.Consumer; - -public abstract class AbstractPersistenceInstanceWriter implements PersistenceInstanceWriter { - - private final PersistenceInstanceStore store; - - protected AbstractPersistenceInstanceWriter(PersistenceInstanceStore store) { - this.store = store; - } - - @Override - public void started(WorkflowContextData workflowContext) { - doTransaction(t -> t.writeInstanceData(key(workflowContext), workflowContext)); - } - - @Override - public void completed(WorkflowContextData workflowContext) { - removeProcessInstance(workflowContext); - } - - @Override - public void failed(WorkflowContextData workflowContext, Throwable ex) { - removeProcessInstance(workflowContext); - } - - @Override - public void aborted(WorkflowContextData workflowContext) { - removeProcessInstance(workflowContext); - } - - protected void removeProcessInstance(WorkflowContextData workflowContext) { - doTransaction( - t -> { - K key = key(workflowContext); - t.removeInstanceData(key, workflowContext); - t.removeStatus(key, workflowContext); - t.removeTasks(key); - }); - } - - @Override - public void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext) { - // not recording - } - - @Override - public void taskRetried(WorkflowContextData workflowContext, TaskContextData taskContext) { - doTransaction(t -> t.writeRetryTask(key(workflowContext), workflowContext, taskContext)); - } - - @Override - public void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext) { - doTransaction(t -> t.writeCompletedTask(key(workflowContext), workflowContext, taskContext)); - } - - @Override - public void suspended(WorkflowContextData workflowContext) { - doTransaction( - t -> t.writeStatus(key(workflowContext), WorkflowStatus.SUSPENDED, workflowContext)); - } - - @Override - public void resumed(WorkflowContextData workflowContext) { - doTransaction(t -> t.removeStatus(key(workflowContext), workflowContext)); - } - - private void doTransaction(Consumer> operations) { - PersistenceInstanceTransaction transaction = store.begin(); - try { - operations.accept(transaction); - transaction.commit(); - } catch (Exception ex) { - transaction.rollback(); - throw ex; - } - } - - protected abstract K key(WorkflowContextData workflowContext); -} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java new file mode 100644 index 000000000..8231a0078 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java @@ -0,0 +1,43 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import static io.serverlessworkflow.impl.WorkflowUtils.safeClose; + +public class DefaultPersistenceInstanceHandlers extends PersistenceInstanceHandlers { + + private final PersistenceInstanceStore store; + + public static DefaultPersistenceInstanceHandlers from(PersistenceInstanceStore store) { + return new DefaultPersistenceInstanceHandlers( + new DefaultPersistenceInstanceWriter(store), + new DefaultPersistenceInstanceReader(store), + store); + } + + private DefaultPersistenceInstanceHandlers( + PersistenceInstanceWriter writer, + PersistenceInstanceReader reader, + PersistenceInstanceStore store) { + super(writer, reader); + this.store = store; + } + + @Override + public void close() { + safeClose(store); + } +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java index b21092f4e..25095ab11 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java @@ -23,16 +23,16 @@ public class DefaultPersistenceInstanceReader implements PersistenceInstanceReader { - private final PersistenceInstanceStore store; + private final PersistenceInstanceStore store; - protected DefaultPersistenceInstanceReader(PersistenceInstanceStore store) { + protected DefaultPersistenceInstanceReader(PersistenceInstanceStore store) { this.store = store; } @Override public Stream scan( WorkflowDefinition definition, Collection instanceIds) { - PersistenceInstanceTransaction transaction = store.begin(); + PersistenceInstanceTransaction transaction = store.begin(); return instanceIds.stream() .map(id -> read(transaction, definition, id)) .flatMap(Optional::stream) @@ -41,7 +41,7 @@ public Stream scan( @Override public Optional find(WorkflowDefinition definition, String instanceId) { - PersistenceInstanceTransaction transaction = store.begin(); + PersistenceInstanceTransaction transaction = store.begin(); try { return read(transaction, definition, instanceId); } catch (Exception ex) { @@ -51,14 +51,14 @@ public Optional find(WorkflowDefinition definition, String ins } private Optional read( - PersistenceInstanceTransaction t, WorkflowDefinition definition, String instanceId) { + PersistenceInstanceTransaction t, WorkflowDefinition definition, String instanceId) { return t.readWorkflowInfo(definition, instanceId) .map(i -> new WorkflowPersistenceInstance(definition, i)); } @Override public Stream scanAll(WorkflowDefinition definition) { - PersistenceInstanceTransaction transaction = store.begin(); + PersistenceInstanceTransaction transaction = store.begin(); return transaction .scanAll(definition) .onClose(() -> transaction.commit()) diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java index 07d4c3497..4258db856 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java @@ -15,16 +15,76 @@ */ package io.serverlessworkflow.impl.persistence; +import io.serverlessworkflow.impl.TaskContextData; import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowStatus; +import java.util.function.Consumer; -public class DefaultPersistenceInstanceWriter extends AbstractPersistenceInstanceWriter { +public class DefaultPersistenceInstanceWriter implements PersistenceInstanceWriter { - public DefaultPersistenceInstanceWriter(PersistenceInstanceStore store) { - super(store); + private final PersistenceInstanceStore store; + + protected DefaultPersistenceInstanceWriter(PersistenceInstanceStore store) { + this.store = store; + } + + @Override + public void started(WorkflowContextData workflowContext) { + doTransaction(t -> t.writeInstanceData(workflowContext)); + } + + @Override + public void completed(WorkflowContextData workflowContext) { + removeProcessInstance(workflowContext); + } + + @Override + public void failed(WorkflowContextData workflowContext, Throwable ex) { + removeProcessInstance(workflowContext); + } + + @Override + public void aborted(WorkflowContextData workflowContext) { + removeProcessInstance(workflowContext); + } + + protected void removeProcessInstance(WorkflowContextData workflowContext) { + doTransaction(t -> t.removeProcessInstance(workflowContext)); } @Override - protected String key(WorkflowContextData workflowContext) { - return workflowContext.instanceData().id(); + public void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext) { + // not recording + } + + @Override + public void taskRetried(WorkflowContextData workflowContext, TaskContextData taskContext) { + doTransaction(t -> t.writeRetryTask(workflowContext, taskContext)); + } + + @Override + public void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext) { + doTransaction(t -> t.writeCompletedTask(workflowContext, taskContext)); + } + + @Override + public void suspended(WorkflowContextData workflowContext) { + doTransaction(t -> t.writeStatus(workflowContext, WorkflowStatus.SUSPENDED)); + } + + @Override + public void resumed(WorkflowContextData workflowContext) { + doTransaction(t -> t.clearStatus(workflowContext)); + } + + private void doTransaction(Consumer operations) { + PersistenceInstanceTransaction transaction = store.begin(); + try { + operations.accept(transaction); + transaction.commit(); + } catch (Exception ex) { + transaction.rollback(); + throw ex; + } } } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java index 529630f3d..84dd96c48 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java @@ -15,28 +15,15 @@ */ package io.serverlessworkflow.impl.persistence; -import static io.serverlessworkflow.impl.WorkflowUtils.safeClose; - -public class PersistenceInstanceHandlers implements AutoCloseable { - - public static PersistenceInstanceHandlers from(PersistenceInstanceStore store) { - return new PersistenceInstanceHandlers<>( - new DefaultPersistenceInstanceWriter(store), - new DefaultPersistenceInstanceReader(store), - store); - } +public class PersistenceInstanceHandlers implements AutoCloseable { private final PersistenceInstanceWriter writer; private final PersistenceInstanceReader reader; - private final PersistenceInstanceStore store; public PersistenceInstanceHandlers( - PersistenceInstanceWriter writer, - PersistenceInstanceReader reader, - PersistenceInstanceStore store) { + PersistenceInstanceWriter writer, PersistenceInstanceReader reader) { this.writer = writer; this.reader = reader; - this.store = store; } public PersistenceInstanceWriter writer() { @@ -48,7 +35,5 @@ public PersistenceInstanceReader reader() { } @Override - public void close() { - safeClose(store); - } + public void close() {} } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceStore.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceStore.java index 4a1638419..99c5096e0 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceStore.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceStore.java @@ -15,6 +15,9 @@ */ package io.serverlessworkflow.impl.persistence; -public interface PersistenceInstanceStore extends AutoCloseable { - PersistenceInstanceTransaction begin(); +public interface PersistenceInstanceStore extends AutoCloseable { + PersistenceInstanceTransaction begin(); + + @Override + default void close() {} } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceTransaction.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceTransaction.java index 87d242af9..016a628c8 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceTransaction.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceTransaction.java @@ -22,27 +22,26 @@ import java.util.Optional; import java.util.stream.Stream; -public interface PersistenceInstanceTransaction { +public interface PersistenceInstanceTransaction { void commit(); void rollback(); - void writeInstanceData(K key, WorkflowContextData workflowContext); + void writeInstanceData(WorkflowContextData workflowContext); - void writeRetryTask(K key, WorkflowContextData workflowContext, TaskContextData taskContext); + void writeRetryTask(WorkflowContextData workflowContext, TaskContextData taskContext); - void writeCompletedTask(K key, WorkflowContextData workflowContext, TaskContextData taskContext); + void writeCompletedTask(WorkflowContextData workflowContext, TaskContextData taskContext); - void writeStatus(K key, WorkflowStatus suspended, WorkflowContextData workflowContext); + void writeStatus(WorkflowContextData workflowContext, WorkflowStatus suspended); - void removeInstanceData(K key, WorkflowContextData workflowContext); + void removeProcessInstance(WorkflowContextData workflowContext); - void removeStatus(K key, WorkflowContextData workflowContext); - - void removeTasks(K instanceId); + void clearStatus(WorkflowContextData workflowContext); Stream scanAll(WorkflowDefinition definition); - Optional readWorkflowInfo(WorkflowDefinition definition, K key); + Optional readWorkflowInfo( + WorkflowDefinition definition, String instanceId); } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java index 350a3f29b..0e26a7dc3 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java @@ -32,28 +32,25 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -public abstract class BigMapInstanceTransaction - implements PersistenceInstanceTransaction { +public abstract class BigMapInstanceTransaction implements PersistenceInstanceTransaction { @Override - public void writeInstanceData(K key, WorkflowContextData workflowContext) { + public void writeInstanceData(WorkflowContextData workflowContext) { instanceData(workflowContext.definition()) - .put(key, marshallInstance(workflowContext.instanceData())); + .put(key(workflowContext), marshallInstance(workflowContext.instanceData())); } @Override - public void writeRetryTask( - K key, WorkflowContextData workflowContext, TaskContextData taskContext) { - tasks(key) + public void writeRetryTask(WorkflowContextData workflowContext, TaskContextData taskContext) { + tasks(key(workflowContext)) .put( taskContext.position().jsonPointer(), marshallTaskRetried(workflowContext, (TaskContext) taskContext)); } @Override - public void writeCompletedTask( - K key, WorkflowContextData workflowContext, TaskContextData taskContext) { - tasks(key) + public void writeCompletedTask(WorkflowContextData workflowContext, TaskContextData taskContext) { + tasks(key(workflowContext)) .put( taskContext.position().jsonPointer(), marshallTaskRetried(workflowContext, (TaskContext) taskContext)); @@ -61,8 +58,8 @@ public void writeCompletedTask( @Override public Stream scanAll(WorkflowDefinition definition) { - Map instances = instanceData(definition); - Map status = status(definition); + Map instances = instanceData(definition); + Map status = status(definition); return instances.entrySet().stream() .map( e -> @@ -71,8 +68,9 @@ public Stream scanAll(WorkflowDefinition definition) { } @Override - public Optional readWorkflowInfo(WorkflowDefinition definition, K key) { - Map instances = instanceData(definition); + public Optional readWorkflowInfo( + WorkflowDefinition definition, String key) { + Map instances = instanceData(definition); return instances.containsKey(key) ? Optional.empty() : Optional.of( @@ -80,23 +78,33 @@ public Optional readWorkflowInfo(WorkflowDefinition def } @Override - public void writeStatus(K key, WorkflowStatus status, WorkflowContextData workflowContext) { - status(workflowContext.definition()).put(key, marshallStatus(status)); + public void writeStatus(WorkflowContextData workflowContext, WorkflowStatus status) { + status(workflowContext.definition()).put(key(workflowContext), marshallStatus(status)); } - public void removeInstanceData(K key, WorkflowContextData workflowContext) { - instanceData(workflowContext.definition()).remove(key); + @Override + public void removeProcessInstance(WorkflowContextData workflowContext) { + String key = key(workflowContext); + WorkflowDefinitionData definition = workflowContext.definition(); + instanceData(definition).remove(key); + clearStatus(definition, key); + removeTasks(key); + } + + @Override + public void clearStatus(WorkflowContextData workflowContext) { + clearStatus(workflowContext.definition(), key(workflowContext)); } - public void removeStatus(K key, WorkflowContextData workflowContext) { - status(workflowContext.definition()).remove(key); + private void clearStatus(WorkflowDefinitionData definition, String key) { + status(definition).remove(key); } protected PersistenceWorkflowInfo readPersistenceInfo( - K instanceId, V instanceData, Map tasksData, S status) { + String instanceId, V instanceData, Map tasksData, S status) { PersistenceInstanceInfo instanceInfo = unmarshallInstanceInfo(instanceData); return new PersistenceWorkflowInfo( - instanceId.toString(), + instanceId, instanceInfo.startedAt(), instanceInfo.input(), status == null ? null : unmarshallStatus(status), @@ -105,11 +113,15 @@ protected PersistenceWorkflowInfo readPersistenceInfo( Collectors.toMap(Entry::getKey, entry -> unmarshallTaskInfo(entry.getValue())))); } - protected abstract Map instanceData(WorkflowDefinitionData definition); + private String key(WorkflowContextData workflowContext) { + return workflowContext.instanceData().id(); + } + + protected abstract Map instanceData(WorkflowDefinitionData definition); - protected abstract Map status(WorkflowDefinitionData workflowContext); + protected abstract Map status(WorkflowDefinitionData workflowContext); - protected abstract Map tasks(K instanceId); + protected abstract Map tasks(String instanceId); protected abstract V marshallInstance(WorkflowInstanceData instance); @@ -126,4 +138,6 @@ protected abstract T marshallTaskRetried( protected abstract PersistenceInstanceInfo unmarshallInstanceInfo(V instanceData); protected abstract WorkflowStatus unmarshallStatus(S statusData); + + protected abstract void removeTasks(String key); } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java index f54b2064c..464bfa5cb 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java @@ -21,7 +21,8 @@ import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowStatus; import io.serverlessworkflow.impl.executors.AbstractTaskExecutor; -import io.serverlessworkflow.impl.executors.TaskExecutor; +import io.serverlessworkflow.impl.executors.TransitionInfo; +import io.serverlessworkflow.impl.marshaller.TaskStatus; import io.serverlessworkflow.impl.marshaller.WorkflowBufferFactory; import io.serverlessworkflow.impl.marshaller.WorkflowInputBuffer; import io.serverlessworkflow.impl.marshaller.WorkflowOutputBuffer; @@ -33,7 +34,10 @@ import java.io.ByteArrayOutputStream; public abstract class BytesMapInstanceTransaction - extends BigMapInstanceTransaction { + extends BigMapInstanceTransaction { + + private static final byte VERSION_0 = 0; + private static final byte VERSION_1 = 1; private final WorkflowBufferFactory factory; @@ -45,22 +49,21 @@ protected BytesMapInstanceTransaction(WorkflowBufferFactory factory) { protected byte[] marshallTaskCompleted(WorkflowContextData contextData, TaskContext taskContext) { ByteArrayOutputStream bytes = new ByteArrayOutputStream(); try (WorkflowOutputBuffer writer = factory.output(bytes)) { - writer.writeByte(MarshallingUtils.VERSION_1); + writer.writeByte(VERSION_1); writer.writeEnum(TaskStatus.COMPLETED); writer.writeInstant(taskContext.completedAt()); writeModel(writer, taskContext.output()); writeModel(writer, contextData.context()); - boolean isEndNode = taskContext.transition().isEndNode(); - writer.writeBoolean(isEndNode); - TaskExecutor next = taskContext.transition().next(); + TransitionInfo transition = taskContext.transition(); + writer.writeBoolean(transition.isEndNode()); + AbstractTaskExecutor next = (AbstractTaskExecutor) transition.next(); if (next == null) { writer.writeBoolean(false); } else { writer.writeBoolean(true); - writer.writeString(((AbstractTaskExecutor) next).position().jsonPointer()); + writer.writeString(next.position().jsonPointer()); } } - return bytes.toByteArray(); } @@ -68,7 +71,7 @@ protected byte[] marshallTaskCompleted(WorkflowContextData contextData, TaskCont protected byte[] marshallStatus(WorkflowStatus status) { ByteArrayOutputStream bytes = new ByteArrayOutputStream(); try (WorkflowOutputBuffer writer = factory.output(bytes)) { - writer.writeByte(MarshallingUtils.VERSION_0); + writer.writeByte(VERSION_0); writer.writeEnum(status); } return bytes.toByteArray(); @@ -78,7 +81,7 @@ protected byte[] marshallStatus(WorkflowStatus status) { protected byte[] marshallInstance(WorkflowInstanceData instance) { ByteArrayOutputStream bytes = new ByteArrayOutputStream(); try (WorkflowOutputBuffer writer = factory.output(bytes)) { - writer.writeByte(MarshallingUtils.VERSION_0); + writer.writeByte(VERSION_0); writer.writeInstant(instance.startedAt()); writeModel(writer, instance.input()); } @@ -94,7 +97,7 @@ protected byte[] marshallTaskRetried( WorkflowContextData workflowContext, TaskContext taskContext) { ByteArrayOutputStream bytes = new ByteArrayOutputStream(); try (WorkflowOutputBuffer writer = factory.output(bytes)) { - writer.writeByte(MarshallingUtils.VERSION_1); + writer.writeByte(VERSION_1); writer.writeEnum(TaskStatus.RETRIED); writer.writeShort(taskContext.retryAttempt()); } @@ -106,10 +109,10 @@ protected PersistenceTaskInfo unmarshallTaskInfo(byte[] taskData) { try (WorkflowInputBuffer buffer = factory.input(new ByteArrayInputStream(taskData))) { byte version = buffer.readByte(); switch (version) { - case MarshallingUtils.VERSION_0: + case VERSION_0: default: return readVersion0(buffer); - case MarshallingUtils.VERSION_1: + case VERSION_1: return readVersion1(buffer); } } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/MarshallingUtils.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/MarshallingUtils.java deleted file mode 100644 index 6c38d9016..000000000 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/MarshallingUtils.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.impl.persistence.bigmap; - -class MarshallingUtils { - - private MarshallingUtils() {} - - public static final byte VERSION_0 = 0; - public static final byte VERSION_1 = 1; -} diff --git a/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java b/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java index cfd1a475c..acfb9949a 100644 --- a/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java +++ b/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java @@ -22,7 +22,7 @@ import org.h2.mvstore.MVStore; import org.h2.mvstore.tx.TransactionStore; -public class MVStorePersistenceStore implements PersistenceInstanceStore { +public class MVStorePersistenceStore implements PersistenceInstanceStore { private final TransactionStore mvStore; private WorkflowBufferFactory factory; @@ -36,12 +36,12 @@ public MVStorePersistenceStore(String dbName, WorkflowBufferFactory factory) { } @Override - public void close() throws Exception { + public void close() { mvStore.close(); } @Override - public BigMapInstanceTransaction begin() { + public BigMapInstanceTransaction begin() { return new MVStoreTransaction(mvStore.begin(), factory); } } diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java index 7f33418b2..9369edc85 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java @@ -20,6 +20,7 @@ import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.persistence.DefaultPersistenceInstanceHandlers; import io.serverlessworkflow.impl.persistence.PersistenceApplicationBuilder; import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers; import io.serverlessworkflow.impl.persistence.mvstore.MVStorePersistenceStore; @@ -42,8 +43,8 @@ public static void main(String[] args) throws IOException { private static void runInstance(String dbName, boolean suspend) throws IOException { LOG.info("---> Generating db samples at {}", dbName); Files.deleteIfExists(Path.of(dbName)); - try (PersistenceInstanceHandlers factories = - PersistenceInstanceHandlers.from(new MVStorePersistenceStore(dbName)); + try (PersistenceInstanceHandlers factories = + DefaultPersistenceInstanceHandlers.from(new MVStorePersistenceStore(dbName)); WorkflowApplication application = PersistenceApplicationBuilder.builder( WorkflowApplication.builder().withListener(new TraceExecutionListener()), diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java index ee57ee3c9..c1011a3be 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java @@ -22,6 +22,7 @@ import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowInstance; import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.persistence.DefaultPersistenceInstanceHandlers; import io.serverlessworkflow.impl.persistence.PersistenceApplicationBuilder; import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers; import io.serverlessworkflow.impl.persistence.mvstore.MVStorePersistenceStore; @@ -30,6 +31,7 @@ import java.nio.file.Path; import java.util.Collection; import java.util.Map; +import java.util.stream.Stream; import org.junit.jupiter.api.Test; public class MvStorePersistenceTest { @@ -37,22 +39,29 @@ public class MvStorePersistenceTest { @Test void testSimpleRun() throws IOException { final String dbName = "db-samples/simple.db"; - try (PersistenceInstanceHandlers handlers = - PersistenceInstanceHandlers.from(new MVStorePersistenceStore(dbName)); + try (PersistenceInstanceHandlers handlers = + DefaultPersistenceInstanceHandlers.from(new MVStorePersistenceStore(dbName)); WorkflowApplication application = PersistenceApplicationBuilder.builder(WorkflowApplication.builder(), handlers.writer()) .build(); ) { WorkflowDefinition definition = application.workflowDefinition( readWorkflowFromClasspath("workflows-samples/simple-expression.yaml")); - assertThat(handlers.reader().scanAll(definition).count()).isEqualTo(0); + assertNoInstance(handlers, definition); definition.instance(Map.of()).start().join(); - assertThat(handlers.reader().scanAll(definition).count()).isEqualTo(0); + assertNoInstance(handlers, definition); } finally { Files.delete(Path.of(dbName)); } } + private void assertNoInstance( + PersistenceInstanceHandlers handlers, WorkflowDefinition definition) { + try (Stream stream = handlers.reader().scanAll(definition)) { + assertThat(stream.count()).isEqualTo(0); + } + } + @Test void testWaitingInstance() throws IOException { TaskCounterPerInstanceListener taskCounter = new TaskCounterPerInstanceListener(); @@ -90,8 +99,8 @@ void testRestoreSuspendedInstanceV1() throws IOException { private void runIt(String dbName, WorkflowStatus expectedStatus) throws IOException { TaskCounterPerInstanceListener taskCounter = new TaskCounterPerInstanceListener(); - try (PersistenceInstanceHandlers handlers = - PersistenceInstanceHandlers.from(new MVStorePersistenceStore(dbName)); + try (PersistenceInstanceHandlers handlers = + DefaultPersistenceInstanceHandlers.from(new MVStorePersistenceStore(dbName)); WorkflowApplication application = PersistenceApplicationBuilder.builder( WorkflowApplication.builder() @@ -102,16 +111,19 @@ private void runIt(String dbName, WorkflowStatus expectedStatus) throws IOExcept WorkflowDefinition definition = application.workflowDefinition( readWorkflowFromClasspath("workflows-samples/set-listen-to-any.yaml")); - Collection instances = handlers.reader().scanAll(definition).toList(); - assertThat(instances).hasSize(1); - instances.forEach(WorkflowInstance::start); - assertThat(instances) - .singleElement() - .satisfies( - instance -> { - assertThat(instance.status()).isEqualTo(expectedStatus); - assertThat(taskCounter.taskCounter(instance.id()).completed()).isEqualTo(0); - }); + + try (Stream stream = handlers.reader().scanAll(definition)) { + Collection instances = stream.toList(); + assertThat(instances).hasSize(1); + instances.forEach(WorkflowInstance::start); + assertThat(instances) + .singleElement() + .satisfies( + instance -> { + assertThat(instance.status()).isEqualTo(expectedStatus); + assertThat(taskCounter.taskCounter(instance.id()).completed()).isEqualTo(0); + }); + } } } } From a1f520d4e1be40a4c96bd3250cf8bb89018a29d2 Mon Sep 17 00:00:00 2001 From: fjtirado Date: Thu, 29 Jan 2026 18:41:30 +0100 Subject: [PATCH 3/3] Adding persistence test module Signed-off-by: fjtirado --- .../DefaultPersistenceInstanceReader.java | 23 +-- .../DefaultPersistenceInstanceWriter.java | 19 +- .../PersistenceInstanceReader.java | 7 +- .../PersistenceInstanceTransaction.java | 7 +- .../WorkflowPersistenceInstance.java | 2 +- .../bigmap/BigMapInstanceTransaction.java | 32 +++- .../bigmap/BytesMapInstanceTransaction.java | 11 +- .../jackson/JacksonModelMarshaller.java | 3 +- impl/persistence/mvstore/pom.xml | 4 + .../mvstore/MVStorePersistenceStore.java | 2 +- .../mvstore/MVStoreTransaction.java | 9 +- .../mvstore/MVStorePersistenceStoreTest.java | 38 ++++ impl/persistence/pom.xml | 1 + impl/persistence/tests/pom.xml | 58 +++++++ .../test/AbstractPersistenceTest.java | 163 ++++++++++++++++++ .../src/main/resources/simple-expression.yaml | 11 ++ impl/pom.xml | 6 + 17 files changed, 352 insertions(+), 44 deletions(-) create mode 100644 impl/persistence/mvstore/src/test/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStoreTest.java create mode 100644 impl/persistence/tests/pom.xml create mode 100644 impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractPersistenceTest.java create mode 100644 impl/persistence/tests/src/main/resources/simple-expression.yaml diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java index 25095ab11..19efa14af 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java @@ -17,7 +17,6 @@ import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowInstance; -import java.util.Collection; import java.util.Optional; import java.util.stream.Stream; @@ -29,23 +28,15 @@ protected DefaultPersistenceInstanceReader(PersistenceInstanceStore store) { this.store = store; } - @Override - public Stream scan( - WorkflowDefinition definition, Collection instanceIds) { - PersistenceInstanceTransaction transaction = store.begin(); - return instanceIds.stream() - .map(id -> read(transaction, definition, id)) - .flatMap(Optional::stream) - .onClose(() -> transaction.commit()); - } - @Override public Optional find(WorkflowDefinition definition, String instanceId) { PersistenceInstanceTransaction transaction = store.begin(); try { - return read(transaction, definition, instanceId); + Optional instance = read(transaction, definition, instanceId); + transaction.commit(definition); + return instance; } catch (Exception ex) { - transaction.rollback(); + transaction.rollback(definition); throw ex; } } @@ -57,11 +48,11 @@ private Optional read( } @Override - public Stream scanAll(WorkflowDefinition definition) { + public Stream scanAll(WorkflowDefinition definition, String applicationId) { PersistenceInstanceTransaction transaction = store.begin(); return transaction - .scanAll(definition) - .onClose(() -> transaction.commit()) + .scanAll(applicationId, definition) + .onClose(() -> transaction.commit(definition)) .map(v -> new WorkflowPersistenceInstance(definition, v)); } } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java index 4258db856..e67a1cb4d 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java @@ -30,7 +30,7 @@ protected DefaultPersistenceInstanceWriter(PersistenceInstanceStore store) { @Override public void started(WorkflowContextData workflowContext) { - doTransaction(t -> t.writeInstanceData(workflowContext)); + doTransaction(t -> t.writeInstanceData(workflowContext), workflowContext); } @Override @@ -49,7 +49,7 @@ public void aborted(WorkflowContextData workflowContext) { } protected void removeProcessInstance(WorkflowContextData workflowContext) { - doTransaction(t -> t.removeProcessInstance(workflowContext)); + doTransaction(t -> t.removeProcessInstance(workflowContext), workflowContext); } @Override @@ -59,31 +59,32 @@ public void taskStarted(WorkflowContextData workflowContext, TaskContextData tas @Override public void taskRetried(WorkflowContextData workflowContext, TaskContextData taskContext) { - doTransaction(t -> t.writeRetryTask(workflowContext, taskContext)); + doTransaction(t -> t.writeRetryTask(workflowContext, taskContext), workflowContext); } @Override public void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext) { - doTransaction(t -> t.writeCompletedTask(workflowContext, taskContext)); + doTransaction(t -> t.writeCompletedTask(workflowContext, taskContext), workflowContext); } @Override public void suspended(WorkflowContextData workflowContext) { - doTransaction(t -> t.writeStatus(workflowContext, WorkflowStatus.SUSPENDED)); + doTransaction(t -> t.writeStatus(workflowContext, WorkflowStatus.SUSPENDED), workflowContext); } @Override public void resumed(WorkflowContextData workflowContext) { - doTransaction(t -> t.clearStatus(workflowContext)); + doTransaction(t -> t.clearStatus(workflowContext), workflowContext); } - private void doTransaction(Consumer operations) { + private void doTransaction( + Consumer operations, WorkflowContextData context) { PersistenceInstanceTransaction transaction = store.begin(); try { operations.accept(transaction); - transaction.commit(); + transaction.commit(context.definition()); } catch (Exception ex) { - transaction.rollback(); + transaction.rollback(context.definition()); throw ex; } } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java index 3ee1ef21f..73e78879b 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java @@ -17,15 +17,16 @@ import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowInstance; -import java.util.Collection; import java.util.Optional; import java.util.stream.Stream; public interface PersistenceInstanceReader { - Stream scanAll(WorkflowDefinition definition); + default Stream scanAll(WorkflowDefinition definition) { + return scanAll(definition, definition.application().id()); + } - Stream scan(WorkflowDefinition definition, Collection instanceIds); + Stream scanAll(WorkflowDefinition definition, String applicationId); Optional find(WorkflowDefinition definition, String instanceId); } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceTransaction.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceTransaction.java index 016a628c8..7606f99f2 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceTransaction.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceTransaction.java @@ -18,15 +18,16 @@ import io.serverlessworkflow.impl.TaskContextData; import io.serverlessworkflow.impl.WorkflowContextData; import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowDefinitionData; import io.serverlessworkflow.impl.WorkflowStatus; import java.util.Optional; import java.util.stream.Stream; public interface PersistenceInstanceTransaction { - void commit(); + void commit(WorkflowDefinitionData definition); - void rollback(); + void rollback(WorkflowDefinitionData definition); void writeInstanceData(WorkflowContextData workflowContext); @@ -40,7 +41,7 @@ public interface PersistenceInstanceTransaction { void clearStatus(WorkflowContextData workflowContext); - Stream scanAll(WorkflowDefinition definition); + Stream scanAll(String applicationId, WorkflowDefinition definition); Optional readWorkflowInfo( WorkflowDefinition definition, String instanceId); diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java index 773f606cd..a245c113d 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java @@ -31,13 +31,13 @@ public class WorkflowPersistenceInstance extends WorkflowMutableInstance { public WorkflowPersistenceInstance(WorkflowDefinition definition, PersistenceWorkflowInfo info) { super(definition, info.id(), info.input()); this.info = info; + this.startedAt = info.startedAt(); } @Override public CompletableFuture start() { return startExecution( () -> { - startedAt = info.startedAt(); if (info.status() == WorkflowStatus.SUSPENDED) { internalSuspend(); } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java index 0e26a7dc3..949f5fe71 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java @@ -32,12 +32,16 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -public abstract class BigMapInstanceTransaction implements PersistenceInstanceTransaction { +public abstract class BigMapInstanceTransaction + implements PersistenceInstanceTransaction { @Override public void writeInstanceData(WorkflowContextData workflowContext) { + String key = key(workflowContext); instanceData(workflowContext.definition()) - .put(key(workflowContext), marshallInstance(workflowContext.instanceData())); + .put(key, marshallInstance(workflowContext.instanceData())); + applicationData() + .put(key, marshallApplicationId(workflowContext.definition().application().id())); } @Override @@ -53,28 +57,36 @@ public void writeCompletedTask(WorkflowContextData workflowContext, TaskContextD tasks(key(workflowContext)) .put( taskContext.position().jsonPointer(), - marshallTaskRetried(workflowContext, (TaskContext) taskContext)); + marshallTaskCompleted(workflowContext, (TaskContext) taskContext)); } @Override - public Stream scanAll(WorkflowDefinition definition) { + public Stream scanAll( + String applicationId, WorkflowDefinition definition) { Map instances = instanceData(definition); + Map applicationData = applicationData(); Map status = status(definition); return instances.entrySet().stream() + .filter(e -> testAppl(applicationData, e.getKey(), applicationId)) .map( e -> readPersistenceInfo( e.getKey(), e.getValue(), tasks(e.getKey()), status.get(e.getKey()))); } + private boolean testAppl(Map applicationData, String key, String applicationId) { + A item = applicationData.get(key); + return item == null || unmarshallApplicationId(item).equals(applicationId); + } + @Override public Optional readWorkflowInfo( WorkflowDefinition definition, String key) { Map instances = instanceData(definition); return instances.containsKey(key) - ? Optional.empty() - : Optional.of( - readPersistenceInfo(key, instances.get(key), tasks(key), status(definition).get(key))); + ? Optional.of( + readPersistenceInfo(key, instances.get(key), tasks(key), status(definition).get(key))) + : Optional.empty(); } @Override @@ -117,6 +129,8 @@ private String key(WorkflowContextData workflowContext) { return workflowContext.instanceData().id(); } + protected abstract Map applicationData(); + protected abstract Map instanceData(WorkflowDefinitionData definition); protected abstract Map status(WorkflowDefinitionData workflowContext); @@ -131,6 +145,8 @@ protected abstract T marshallTaskCompleted( protected abstract T marshallTaskRetried( WorkflowContextData workflowContext, TaskContext taskContext); + protected abstract A marshallApplicationId(String id); + protected abstract S marshallStatus(WorkflowStatus status); protected abstract PersistenceTaskInfo unmarshallTaskInfo(T taskData); @@ -139,5 +155,7 @@ protected abstract T marshallTaskRetried( protected abstract WorkflowStatus unmarshallStatus(S statusData); + protected abstract String unmarshallApplicationId(A a); + protected abstract void removeTasks(String key); } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java index 464bfa5cb..2df8086cf 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java @@ -22,6 +22,7 @@ import io.serverlessworkflow.impl.WorkflowStatus; import io.serverlessworkflow.impl.executors.AbstractTaskExecutor; import io.serverlessworkflow.impl.executors.TransitionInfo; +import io.serverlessworkflow.impl.marshaller.MarshallingUtils; import io.serverlessworkflow.impl.marshaller.TaskStatus; import io.serverlessworkflow.impl.marshaller.WorkflowBufferFactory; import io.serverlessworkflow.impl.marshaller.WorkflowInputBuffer; @@ -34,7 +35,7 @@ import java.io.ByteArrayOutputStream; public abstract class BytesMapInstanceTransaction - extends BigMapInstanceTransaction { + extends BigMapInstanceTransaction { private static final byte VERSION_0 = 0; private static final byte VERSION_1 = 1; @@ -92,6 +93,14 @@ protected void writeModel(WorkflowOutputBuffer writer, WorkflowModel model) { writer.writeObject(model); } + protected byte[] marshallApplicationId(String id) { + return MarshallingUtils.writeString(factory, id); + } + + protected String unmarshallApplicationId(byte[] value) { + return MarshallingUtils.readString(factory, value); + } + @Override protected byte[] marshallTaskRetried( WorkflowContextData workflowContext, TaskContext taskContext) { diff --git a/impl/persistence/jackson-marshaller/src/main/java/io/serverlessworkflow/impl/marshaller/jackson/JacksonModelMarshaller.java b/impl/persistence/jackson-marshaller/src/main/java/io/serverlessworkflow/impl/marshaller/jackson/JacksonModelMarshaller.java index 9d62db1f0..ba467b766 100644 --- a/impl/persistence/jackson-marshaller/src/main/java/io/serverlessworkflow/impl/marshaller/jackson/JacksonModelMarshaller.java +++ b/impl/persistence/jackson-marshaller/src/main/java/io/serverlessworkflow/impl/marshaller/jackson/JacksonModelMarshaller.java @@ -38,7 +38,8 @@ public void write(WorkflowOutputBuffer buffer, JacksonModel object) { @Override public JacksonModel read(WorkflowInputBuffer buffer) { try { - return JsonUtils.mapper().readValue(buffer.readBytes(), JacksonModel.class); + JacksonModel model = JsonUtils.mapper().readValue(buffer.readBytes(), JacksonModel.class); + return model == null ? JacksonModel.NULL : model; } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/impl/persistence/mvstore/pom.xml b/impl/persistence/mvstore/pom.xml index e8e6dedc8..ba8ce88c9 100644 --- a/impl/persistence/mvstore/pom.xml +++ b/impl/persistence/mvstore/pom.xml @@ -16,5 +16,9 @@ io.serverlessworkflow serverlessworkflow-persistence-big-map + + io.serverlessworkflow + serverlessworkflow-persistence-tests + \ No newline at end of file diff --git a/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java b/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java index acfb9949a..6add6a994 100644 --- a/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java +++ b/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java @@ -41,7 +41,7 @@ public void close() { } @Override - public BigMapInstanceTransaction begin() { + public BigMapInstanceTransaction begin() { return new MVStoreTransaction(mvStore.begin(), factory); } } diff --git a/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStoreTransaction.java b/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStoreTransaction.java index d52655923..c54c80b40 100644 --- a/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStoreTransaction.java +++ b/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStoreTransaction.java @@ -74,12 +74,17 @@ private String mapTaskName(String instanceId) { } @Override - public void commit() { + public void commit(WorkflowDefinitionData definition) { transaction.commit(); } @Override - public void rollback() { + public void rollback(WorkflowDefinitionData definition) { transaction.rollback(); } + + @Override + protected Map applicationData() { + return transaction.openMap("APPLICATION"); + } } diff --git a/impl/persistence/mvstore/src/test/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStoreTest.java b/impl/persistence/mvstore/src/test/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStoreTest.java new file mode 100644 index 000000000..1412d1767 --- /dev/null +++ b/impl/persistence/mvstore/src/test/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStoreTest.java @@ -0,0 +1,38 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence.mvstore; + +import io.serverlessworkflow.impl.persistence.PersistenceInstanceStore; +import io.serverlessworkflow.impl.persistence.test.AbstractPersistenceTest; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import org.junit.jupiter.api.AfterEach; + +class MVStorePersistenceStoreTest extends AbstractPersistenceTest { + + private static final String DB_NAME = "dbtest.db"; + + @Override + protected PersistenceInstanceStore persistenceStore() { + return new MVStorePersistenceStore(DB_NAME); + } + + @AfterEach + void destroy() throws IOException { + Files.delete(Path.of(DB_NAME)); + } +} diff --git a/impl/persistence/pom.xml b/impl/persistence/pom.xml index 8e2d60ab5..e705fab75 100644 --- a/impl/persistence/pom.xml +++ b/impl/persistence/pom.xml @@ -13,5 +13,6 @@ mvstore bigmap api + tests diff --git a/impl/persistence/tests/pom.xml b/impl/persistence/tests/pom.xml new file mode 100644 index 000000000..f067c3794 --- /dev/null +++ b/impl/persistence/tests/pom.xml @@ -0,0 +1,58 @@ + + 4.0.0 + + io.serverlessworkflow + serverlessworkflow-persistence + 8.0.0-SNAPSHOT + + serverlessworkflow-persistence-tests + Serverless Workflow :: Impl :: Persistence:: Tests + + + org.junit.jupiter + junit-jupiter-engine + compile + + + org.mockito + mockito-core + compile + + + org.junit.jupiter + junit-jupiter-api + compile + + + org.junit.jupiter + junit-jupiter-params + compile + + + org.assertj + assertj-core + compile + + + ch.qos.logback + logback-classic + compile + + + io.serverlessworkflow + serverlessworkflow-persistence-api + + + io.serverlessworkflow + serverlessworkflow-impl-jackson + + + io.serverlessworkflow + serverlessworkflow-api + + + io.serverlessworkflow + serverlessworkflow-persistence-jackson-marshaller + + + \ No newline at end of file diff --git a/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractPersistenceTest.java b/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractPersistenceTest.java new file mode 100644 index 000000000..0051dead4 --- /dev/null +++ b/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractPersistenceTest.java @@ -0,0 +1,163 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence.test; + +import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.TaskContextData; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.WorkflowInstanceData; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowMutablePosition; +import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.executors.TransitionInfo; +import io.serverlessworkflow.impl.persistence.DefaultPersistenceInstanceHandlers; +import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers; +import io.serverlessworkflow.impl.persistence.PersistenceInstanceStore; +import io.serverlessworkflow.impl.persistence.WorkflowPersistenceInstance; +import java.io.IOException; +import java.time.Instant; +import java.util.Map; +import java.util.Optional; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +public abstract class AbstractPersistenceTest { + + protected abstract PersistenceInstanceStore persistenceStore(); + + private PersistenceInstanceHandlers handlers; + private static WorkflowApplication app; + private static WorkflowDefinition definition; + protected WorkflowModel context; + protected WorkflowInstanceData workflowInstance; + protected WorkflowContextData workflowContext; + + @BeforeAll() + static void init() throws IOException { + app = WorkflowApplication.builder().build(); + definition = app.workflowDefinition(readWorkflowFromClasspath("simple-expression.yaml")); + } + + @BeforeEach + void setup() { + handlers = DefaultPersistenceInstanceHandlers.from(persistenceStore()); + context = app.modelFactory().fromNull(); + workflowContext = mock(WorkflowContext.class); + workflowInstance = mock(WorkflowInstance.class); + when(workflowContext.context()).thenReturn(context); + when(workflowContext.definition()).thenReturn(definition); + when(workflowContext.instanceData()).thenReturn(workflowInstance); + when(workflowInstance.startedAt()).thenReturn(Instant.now()); + when(workflowInstance.context()).thenReturn(context); + when(workflowInstance.id()).thenReturn(app.idFactory().get()); + when(workflowInstance.input()).thenReturn(app.modelFactory().from(Map.of("name", "Javierito"))); + } + + protected TaskContextData completedTaskContext( + WorkflowPosition position, Map model) { + TaskContext taskContext = mock(TaskContext.class); + when(taskContext.position()).thenReturn(position); + when(taskContext.completedAt()).thenReturn(Instant.now()); + when(taskContext.output()).thenReturn(app.modelFactory().from(model)); + when(taskContext.transition()).thenReturn(new TransitionInfo(null, true)); + return taskContext; + } + + protected TaskContextData retriedTaskContext(WorkflowPosition position, short retryAttempt) { + TaskContext taskContext = mock(TaskContext.class); + when(taskContext.position()).thenReturn(position); + when(taskContext.retryAttempt()).thenReturn(retryAttempt); + return taskContext; + } + + @AfterEach + void close() { + handlers.close(); + } + + @AfterAll + static void cleanup() { + if (app != null) { + app.close(); + } + } + + @Test + void testWorkflowInstance() throws InterruptedException { + final WorkflowMutablePosition position = + app.positionFactory().get().addProperty("do").addIndex(0).addProperty("useExpression"); + final short numRetries = 1; + + final Map completedMap = Map.of("name", "fulanito"); + + handlers.writer().started(workflowContext); + handlers.writer().taskRetried(workflowContext, retriedTaskContext(position, numRetries)); + Optional optional = handlers.reader().find(definition, workflowInstance.id()); + assertThat(optional).isPresent(); + WorkflowPersistenceInstance instance = (WorkflowPersistenceInstance) optional.orElseThrow(); + assertThat(instance.input().asMap().orElseThrow()).isEqualTo(Map.of("name", "Javierito")); + assertThat(instance.startedAt()).isNotNull().isBefore(Instant.now()); + + // task retry + WorkflowContext updateWContext = mock(WorkflowContext.class); + TaskContext updateTContext = mock(TaskContext.class); + when(updateTContext.position()).thenReturn(position); + instance.restoreContext(updateWContext, updateTContext); + ArgumentCaptor retryAttempt = ArgumentCaptor.forClass(Short.class); + verify(updateTContext).retryAttempt(retryAttempt.capture()); + assertThat(retryAttempt.getValue()).isEqualTo(numRetries); + + // task completed + handlers.writer().taskCompleted(workflowContext, completedTaskContext(position, completedMap)); + instance = + (WorkflowPersistenceInstance) + handlers.reader().find(definition, workflowInstance.id()).orElseThrow(); + updateWContext = mock(WorkflowContext.class); + updateTContext = mock(TaskContext.class); + when(updateTContext.position()).thenReturn(position); + instance.restoreContext(updateWContext, updateTContext); + ArgumentCaptor context = ArgumentCaptor.forClass(WorkflowModel.class); + verify(updateWContext).context(context.capture()); + assertThat(context.getValue()).isEqualTo(app.modelFactory().fromNull()); + ArgumentCaptor model = ArgumentCaptor.forClass(WorkflowModel.class); + verify(updateTContext).output(model.capture()); + assertThat(model.getValue().asMap().orElseThrow()).isEqualTo(completedMap); + ArgumentCaptor instant = ArgumentCaptor.forClass(Instant.class); + verify(updateTContext).completedAt(instant.capture()); + assertThat(instant.getValue()).isNotNull().isAfterOrEqualTo(instance.startedAt()); + ArgumentCaptor transition = ArgumentCaptor.forClass(TransitionInfo.class); + verify(updateTContext).transition(transition.capture()); + assertThat(transition.getValue().isEndNode()).isTrue(); + + // workflow completed + handlers.writer().completed(workflowContext); + assertThat(handlers.reader().find(definition, workflowInstance.id())).isEmpty(); + } +} diff --git a/impl/persistence/tests/src/main/resources/simple-expression.yaml b/impl/persistence/tests/src/main/resources/simple-expression.yaml new file mode 100644 index 000000000..4e240d6bc --- /dev/null +++ b/impl/persistence/tests/src/main/resources/simple-expression.yaml @@ -0,0 +1,11 @@ +document: + dsl: '1.0.0-alpha5' + namespace: test + name: simple-expression + version: '0.1.0' +do: + - useExpression: + set: + startedAt: ${$task.startedAt.epoch.milliseconds} + id : ${$workflow.id} + version: ${$runtime.version} \ No newline at end of file diff --git a/impl/pom.xml b/impl/pom.xml index 7773adb79..48751afaf 100644 --- a/impl/pom.xml +++ b/impl/pom.xml @@ -75,6 +75,12 @@ serverlessworkflow-persistence-big-map ${project.version} + + io.serverlessworkflow + serverlessworkflow-persistence-tests + ${project.version} + test + io.serverlessworkflow serverlessworkflow-persistence-jackson-marshaller