Skip to content

Commit 2e0f36f

Browse files
committed
After review modifications
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent 11d18b1 commit 2e0f36f

File tree

19 files changed

+309
-226
lines changed

19 files changed

+309
-226
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858

5959
public class WorkflowApplication implements AutoCloseable {
6060

61+
private final String id;
6162
private final TaskExecutorFactory taskFactory;
6263
private final ExpressionFactory exprFactory;
6364
private final ResourceLoaderFactory resourceLoaderFactory;
@@ -108,6 +109,7 @@ private WorkflowApplication(Builder builder) {
108109
this.templateResolver = builder.templateResolver;
109110
this.functionReader = builder.functionReader;
110111
this.defaultCatalogURI = builder.defaultCatalogURI;
112+
this.id = builder.id;
111113
}
112114

113115
public TaskExecutorFactory taskFactory() {
@@ -165,6 +167,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
165167
};
166168
}
167169

170+
private String id;
168171
private TaskExecutorFactory taskFactory;
169172
private Collection<ExpressionFactory> exprFactories = new HashSet<>();
170173
private Collection<WorkflowExecutionListener> listeners =
@@ -198,6 +201,11 @@ private Builder() {
198201
.forEach(a -> additionalObjects.put(a.name(), a));
199202
}
200203

204+
public Builder withId(String id) {
205+
this.id = id;
206+
return this;
207+
}
208+
201209
public Builder withListener(WorkflowExecutionListener listener) {
202210
listeners.add(listener);
203211
return this;
@@ -304,13 +312,14 @@ public Builder withDefaultCatalogURI(URI defaultCatalogURI) {
304312
}
305313

306314
public WorkflowApplication build() {
315+
307316
if (modelFactory == null) {
308317
modelFactory =
309318
loadFirst(WorkflowModelFactory.class)
310319
.orElseThrow(
311320
() ->
312321
new IllegalStateException(
313-
"WorkflowModelFactory instance has to be set in WorkflowApplication or present in the classpath"));
322+
"WorkflowModelFactory instance has to be set in WorkflowApplication or pr^eesent in the classpath"));
314323
}
315324
if (contextFactory == null) {
316325
contextFactory = modelFactory;
@@ -360,6 +369,9 @@ public WorkflowApplication build() {
360369
if (defaultCatalogURI == null) {
361370
defaultCatalogURI = URI.create("https://github.com/serverlessworkflow/catalog");
362371
}
372+
if (id == null) {
373+
id = idFactory.get();
374+
}
363375
return new WorkflowApplication(this);
364376
}
365377
}
@@ -453,6 +465,10 @@ public URI defaultCatalogURI() {
453465
return defaultCatalogURI;
454466
}
455467

468+
public String id() {
469+
return id;
470+
}
471+
456472
public <T> Optional<T> additionalObject(
457473
String name, WorkflowContext workflowContext, TaskContext taskContext) {
458474
return Optional.ofNullable(additionalObjects.get(name))

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,11 @@ public void addTaskExecutor(WorkflowMutablePosition position, TaskExecutor<?> ta
165165
executors.put(position.jsonPointer(), taskExecutor);
166166
}
167167

168+
@Override
169+
public WorkflowDefinitionId id() {
170+
return definitionId;
171+
}
172+
168173
@Override
169174
public void close() {
170175
safeClose(resourceLoader);

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinitionData.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,6 @@ public interface WorkflowDefinitionData {
2222
Workflow workflow();
2323

2424
WorkflowApplication application();
25+
26+
WorkflowDefinitionId id();
2527
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinitionId.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,8 @@ public static WorkflowDefinitionId of(Workflow workflow) {
3232
public static WorkflowDefinitionId fromName(String name) {
3333
return new WorkflowDefinitionId(DEFAULT_NAMESPACE, name, DEFAULT_VERSION);
3434
}
35+
36+
public String toString(String separator) {
37+
return namespace + separator + name + separator + version;
38+
}
3539
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.marshaller;
17+
18+
import io.serverlessworkflow.impl.WorkflowModel;
19+
import java.io.ByteArrayOutputStream;
20+
import java.time.Instant;
21+
import java.util.function.BiConsumer;
22+
23+
public class MarshallingUtils {
24+
25+
private MarshallingUtils() {}
26+
27+
public static byte[] writeInstant(WorkflowBufferFactory factory, Instant instant) {
28+
return writeValue(factory, instant, (b, v) -> b.writeInstant(v));
29+
}
30+
31+
public static byte[] writeEnum(WorkflowBufferFactory factory, Enum enumInstance) {
32+
return writeValue(factory, enumInstance, (b, v) -> b.writeEnum(v));
33+
}
34+
35+
public static byte[] writeModel(WorkflowBufferFactory factory, WorkflowModel model) {
36+
return writeValue(factory, model, (b, v) -> b.writeObject(v));
37+
}
38+
39+
public static byte[] writeShort(WorkflowBufferFactory factory, short value) {
40+
return writeValue(factory, value, (b, v) -> b.writeShort(value));
41+
}
42+
43+
public static byte[] writeBoolean(WorkflowBufferFactory factory, boolean value) {
44+
return writeValue(factory, value, (b, v) -> b.writeBoolean(value));
45+
}
46+
47+
public static byte[] writeString(WorkflowBufferFactory factory, String value) {
48+
return writeValue(factory, value, (b, v) -> b.writeString(value));
49+
}
50+
51+
private static <T> byte[] writeValue(
52+
WorkflowBufferFactory factory, T value, BiConsumer<WorkflowOutputBuffer, T> valueConsumer) {
53+
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
54+
try (WorkflowOutputBuffer buffer = factory.output(bytesOut)) {
55+
valueConsumer.accept(buffer, value);
56+
}
57+
return bytesOut.toByteArray();
58+
}
59+
}

impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/TaskStatus.java renamed to impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/TaskStatus.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.serverlessworkflow.impl.persistence.bigmap;
16+
package io.serverlessworkflow.impl.marshaller;
1717

18-
enum TaskStatus {
18+
public enum TaskStatus {
1919
COMPLETED,
2020
RETRIED
2121
}

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java

Lines changed: 0 additions & 99 deletions
This file was deleted.
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.persistence;
17+
18+
import static io.serverlessworkflow.impl.WorkflowUtils.safeClose;
19+
20+
public class DefaultPersistenceInstanceHandlers extends PersistenceInstanceHandlers {
21+
22+
private final PersistenceInstanceStore store;
23+
24+
public static DefaultPersistenceInstanceHandlers from(PersistenceInstanceStore store) {
25+
return new DefaultPersistenceInstanceHandlers(
26+
new DefaultPersistenceInstanceWriter(store),
27+
new DefaultPersistenceInstanceReader(store),
28+
store);
29+
}
30+
31+
private DefaultPersistenceInstanceHandlers(
32+
PersistenceInstanceWriter writer,
33+
PersistenceInstanceReader reader,
34+
PersistenceInstanceStore store) {
35+
super(writer, reader);
36+
this.store = store;
37+
}
38+
39+
@Override
40+
public void close() {
41+
safeClose(store);
42+
}
43+
}

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,16 @@
2323

2424
public class DefaultPersistenceInstanceReader implements PersistenceInstanceReader {
2525

26-
private final PersistenceInstanceStore<String> store;
26+
private final PersistenceInstanceStore store;
2727

28-
protected DefaultPersistenceInstanceReader(PersistenceInstanceStore<String> store) {
28+
protected DefaultPersistenceInstanceReader(PersistenceInstanceStore store) {
2929
this.store = store;
3030
}
3131

3232
@Override
3333
public Stream<WorkflowInstance> scan(
3434
WorkflowDefinition definition, Collection<String> instanceIds) {
35-
PersistenceInstanceTransaction<String> transaction = store.begin();
35+
PersistenceInstanceTransaction transaction = store.begin();
3636
return instanceIds.stream()
3737
.map(id -> read(transaction, definition, id))
3838
.flatMap(Optional::stream)
@@ -41,7 +41,7 @@ public Stream<WorkflowInstance> scan(
4141

4242
@Override
4343
public Optional<WorkflowInstance> find(WorkflowDefinition definition, String instanceId) {
44-
PersistenceInstanceTransaction<String> transaction = store.begin();
44+
PersistenceInstanceTransaction transaction = store.begin();
4545
try {
4646
return read(transaction, definition, instanceId);
4747
} catch (Exception ex) {
@@ -51,14 +51,14 @@ public Optional<WorkflowInstance> find(WorkflowDefinition definition, String ins
5151
}
5252

5353
private Optional<WorkflowInstance> read(
54-
PersistenceInstanceTransaction<String> t, WorkflowDefinition definition, String instanceId) {
54+
PersistenceInstanceTransaction t, WorkflowDefinition definition, String instanceId) {
5555
return t.readWorkflowInfo(definition, instanceId)
5656
.map(i -> new WorkflowPersistenceInstance(definition, i));
5757
}
5858

5959
@Override
6060
public Stream<WorkflowInstance> scanAll(WorkflowDefinition definition) {
61-
PersistenceInstanceTransaction<String> transaction = store.begin();
61+
PersistenceInstanceTransaction transaction = store.begin();
6262
return transaction
6363
.scanAll(definition)
6464
.onClose(() -> transaction.commit())

0 commit comments

Comments
 (0)