diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java index 550cddb2f..5d7593af7 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java @@ -58,6 +58,7 @@ public class WorkflowApplication implements AutoCloseable { + private final String id; private final TaskExecutorFactory taskFactory; private final ExpressionFactory exprFactory; private final ResourceLoaderFactory resourceLoaderFactory; @@ -108,6 +109,7 @@ private WorkflowApplication(Builder builder) { this.templateResolver = builder.templateResolver; this.functionReader = builder.functionReader; this.defaultCatalogURI = builder.defaultCatalogURI; + this.id = builder.id; } public TaskExecutorFactory taskFactory() { @@ -165,6 +167,7 @@ public SchemaValidator getValidator(SchemaInline inline) { }; } + private String id; private TaskExecutorFactory taskFactory; private Collection exprFactories = new HashSet<>(); private Collection listeners = @@ -198,6 +201,11 @@ private Builder() { .forEach(a -> additionalObjects.put(a.name(), a)); } + public Builder withId(String id) { + this.id = id; + return this; + } + public Builder withListener(WorkflowExecutionListener listener) { listeners.add(listener); return this; @@ -304,13 +312,14 @@ public Builder withDefaultCatalogURI(URI defaultCatalogURI) { } public WorkflowApplication build() { + if (modelFactory == null) { modelFactory = loadFirst(WorkflowModelFactory.class) .orElseThrow( () -> new IllegalStateException( - "WorkflowModelFactory instance has to be set in WorkflowApplication or present in the classpath")); + "WorkflowModelFactory instance has to be set in WorkflowApplication or pr^eesent in the classpath")); } if (contextFactory == null) { contextFactory = modelFactory; @@ -360,6 +369,9 @@ public WorkflowApplication build() { if (defaultCatalogURI == null) { defaultCatalogURI = URI.create("https://github.com/serverlessworkflow/catalog"); } + if (id == null) { + id = idFactory.get(); + } return new WorkflowApplication(this); } } @@ -453,6 +465,10 @@ public URI defaultCatalogURI() { return defaultCatalogURI; } + public String id() { + return id; + } + public Optional additionalObject( String name, WorkflowContext workflowContext, TaskContext taskContext) { return Optional.ofNullable(additionalObjects.get(name)) diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java index 6fe8856ad..503cc3829 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java @@ -165,6 +165,11 @@ public void addTaskExecutor(WorkflowMutablePosition position, TaskExecutor ta executors.put(position.jsonPointer(), taskExecutor); } + @Override + public WorkflowDefinitionId id() { + return definitionId; + } + @Override public void close() { safeClose(resourceLoader); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinitionData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinitionData.java index 8a0d5c0af..9466c497f 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinitionData.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinitionData.java @@ -22,4 +22,6 @@ public interface WorkflowDefinitionData { Workflow workflow(); WorkflowApplication application(); + + WorkflowDefinitionId id(); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinitionId.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinitionId.java index d69872cfa..b0f37ef3a 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinitionId.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinitionId.java @@ -32,4 +32,8 @@ public static WorkflowDefinitionId of(Workflow workflow) { public static WorkflowDefinitionId fromName(String name) { return new WorkflowDefinitionId(DEFAULT_NAMESPACE, name, DEFAULT_VERSION); } + + public String toString(String separator) { + return namespace + separator + name + separator + version; + } } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/MarshallingUtils.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/MarshallingUtils.java new file mode 100644 index 000000000..e82333951 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/MarshallingUtils.java @@ -0,0 +1,98 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.marshaller; + +import io.serverlessworkflow.impl.WorkflowModel; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.time.Instant; +import java.util.function.BiConsumer; +import java.util.function.Function; + +public class MarshallingUtils { + + private MarshallingUtils() {} + + public static byte[] writeInstant(WorkflowBufferFactory factory, Instant instant) { + return writeValue(factory, instant, (b, v) -> b.writeInstant(v)); + } + + public static > byte[] writeEnum( + WorkflowBufferFactory factory, T enumInstance) { + return writeValue(factory, enumInstance, (b, v) -> b.writeEnum(v)); + } + + public static byte[] writeModel(WorkflowBufferFactory factory, WorkflowModel model) { + return writeValue(factory, model, (b, v) -> b.writeObject(v)); + } + + public static byte[] writeShort(WorkflowBufferFactory factory, short value) { + return writeValue(factory, value, (b, v) -> b.writeShort(v)); + } + + public static byte[] writeBoolean(WorkflowBufferFactory factory, boolean value) { + return writeValue(factory, value, (b, v) -> b.writeBoolean(v)); + } + + public static byte[] writeString(WorkflowBufferFactory factory, String value) { + return writeValue(factory, value, (b, v) -> b.writeString(v)); + } + + public static String readString(WorkflowBufferFactory factory, byte[] value) { + return readValue(factory, value, WorkflowInputBuffer::readString); + } + + public static boolean readBoolean(WorkflowBufferFactory factory, byte[] value) { + return readValue(factory, value, WorkflowInputBuffer::readBoolean); + } + + public static short readShort(WorkflowBufferFactory factory, byte[] value) { + return readValue(factory, value, WorkflowInputBuffer::readShort); + } + + public static WorkflowModel readModel(WorkflowBufferFactory factory, byte[] value) { + return readValue(factory, value, b -> (WorkflowModel) b.readObject()); + } + + public static Instant readInstant(WorkflowBufferFactory factory, byte[] value) { + return readValue(factory, value, WorkflowInputBuffer::readInstant); + } + + public static > T readEnum( + WorkflowBufferFactory factory, byte[] value, Class enumClass) { + return readValue(factory, value, b -> b.readEnum(enumClass)); + } + + private static byte[] writeValue( + WorkflowBufferFactory factory, T value, BiConsumer valueConsumer) { + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + try (WorkflowOutputBuffer buffer = factory.output(bytesOut)) { + valueConsumer.accept(buffer, value); + } + return bytesOut.toByteArray(); + } + + private static T readValue( + WorkflowBufferFactory factory, byte[] value, Function valueConsumer) { + if (value == null) { + return null; + } + ByteArrayInputStream bytesIn = new ByteArrayInputStream(value); + try (WorkflowInputBuffer buffer = factory.input(bytesIn)) { + return valueConsumer.apply(buffer); + } + } +} diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/TaskStatus.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/TaskStatus.java similarity index 90% rename from impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/TaskStatus.java rename to impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/TaskStatus.java index 5db1a57cd..74bdbfc3d 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/TaskStatus.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/TaskStatus.java @@ -13,9 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.impl.persistence.bigmap; +package io.serverlessworkflow.impl.marshaller; -enum TaskStatus { +public enum TaskStatus { COMPLETED, RETRIED } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java new file mode 100644 index 000000000..8231a0078 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java @@ -0,0 +1,43 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import static io.serverlessworkflow.impl.WorkflowUtils.safeClose; + +public class DefaultPersistenceInstanceHandlers extends PersistenceInstanceHandlers { + + private final PersistenceInstanceStore store; + + public static DefaultPersistenceInstanceHandlers from(PersistenceInstanceStore store) { + return new DefaultPersistenceInstanceHandlers( + new DefaultPersistenceInstanceWriter(store), + new DefaultPersistenceInstanceReader(store), + store); + } + + private DefaultPersistenceInstanceHandlers( + PersistenceInstanceWriter writer, + PersistenceInstanceReader reader, + PersistenceInstanceStore store) { + super(writer, reader); + this.store = store; + } + + @Override + public void close() { + safeClose(store); + } +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java new file mode 100644 index 000000000..330dcb358 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java @@ -0,0 +1,56 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowInstance; +import java.util.Optional; +import java.util.stream.Stream; + +public class DefaultPersistenceInstanceReader implements PersistenceInstanceReader { + + private final PersistenceInstanceStore store; + + protected DefaultPersistenceInstanceReader(PersistenceInstanceStore store) { + this.store = store; + } + + @Override + public Optional find(WorkflowDefinition definition, String instanceId) { + PersistenceInstanceTransaction transaction = store.begin(); + try { + return read(transaction, definition, instanceId); + } catch (Exception ex) { + transaction.rollback(); + throw ex; + } + } + + private Optional read( + PersistenceInstanceTransaction t, WorkflowDefinition definition, String instanceId) { + return t.readWorkflowInfo(definition, instanceId) + .map(i -> new WorkflowPersistenceInstance(definition, i)); + } + + @Override + public Stream scanAll(WorkflowDefinition definition, String applicationId) { + PersistenceInstanceTransaction transaction = store.begin(); + return transaction + .scanAll(applicationId, definition) + .onClose(() -> transaction.commit()) + .map(v -> new WorkflowPersistenceInstance(definition, v)); + } +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java new file mode 100644 index 000000000..4258db856 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java @@ -0,0 +1,90 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import io.serverlessworkflow.impl.TaskContextData; +import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowStatus; +import java.util.function.Consumer; + +public class DefaultPersistenceInstanceWriter implements PersistenceInstanceWriter { + + private final PersistenceInstanceStore store; + + protected DefaultPersistenceInstanceWriter(PersistenceInstanceStore store) { + this.store = store; + } + + @Override + public void started(WorkflowContextData workflowContext) { + doTransaction(t -> t.writeInstanceData(workflowContext)); + } + + @Override + public void completed(WorkflowContextData workflowContext) { + removeProcessInstance(workflowContext); + } + + @Override + public void failed(WorkflowContextData workflowContext, Throwable ex) { + removeProcessInstance(workflowContext); + } + + @Override + public void aborted(WorkflowContextData workflowContext) { + removeProcessInstance(workflowContext); + } + + protected void removeProcessInstance(WorkflowContextData workflowContext) { + doTransaction(t -> t.removeProcessInstance(workflowContext)); + } + + @Override + public void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext) { + // not recording + } + + @Override + public void taskRetried(WorkflowContextData workflowContext, TaskContextData taskContext) { + doTransaction(t -> t.writeRetryTask(workflowContext, taskContext)); + } + + @Override + public void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext) { + doTransaction(t -> t.writeCompletedTask(workflowContext, taskContext)); + } + + @Override + public void suspended(WorkflowContextData workflowContext) { + doTransaction(t -> t.writeStatus(workflowContext, WorkflowStatus.SUSPENDED)); + } + + @Override + public void resumed(WorkflowContextData workflowContext) { + doTransaction(t -> t.clearStatus(workflowContext)); + } + + private void doTransaction(Consumer operations) { + PersistenceInstanceTransaction transaction = store.begin(); + try { + operations.accept(transaction); + transaction.commit(); + } catch (Exception ex) { + transaction.rollback(); + throw ex; + } + } +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java index 4d470af1f..84dd96c48 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java @@ -15,14 +15,12 @@ */ package io.serverlessworkflow.impl.persistence; -import static io.serverlessworkflow.impl.WorkflowUtils.safeClose; +public class PersistenceInstanceHandlers implements AutoCloseable { -public abstract class PersistenceInstanceHandlers implements AutoCloseable { + private final PersistenceInstanceWriter writer; + private final PersistenceInstanceReader reader; - protected final PersistenceInstanceWriter writer; - protected final PersistenceInstanceReader reader; - - protected PersistenceInstanceHandlers( + public PersistenceInstanceHandlers( PersistenceInstanceWriter writer, PersistenceInstanceReader reader) { this.writer = writer; this.reader = reader; @@ -37,8 +35,5 @@ public PersistenceInstanceReader reader() { } @Override - public void close() { - safeClose(writer); - safeClose(reader); - } + public void close() {} } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/PersistenceInstanceInfo.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceInfo.java similarity index 93% rename from impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/PersistenceInstanceInfo.java rename to impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceInfo.java index b45829868..a4a3a8350 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/PersistenceInstanceInfo.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceInfo.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.impl.persistence.bigmap; +package io.serverlessworkflow.impl.persistence; import io.serverlessworkflow.impl.WorkflowModel; import java.time.Instant; diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java index 5678e8941..73e78879b 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java @@ -17,17 +17,16 @@ import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowInstance; -import java.util.Collection; -import java.util.Map; import java.util.Optional; +import java.util.stream.Stream; -public interface PersistenceInstanceReader extends AutoCloseable { - Map readAll(WorkflowDefinition definition); +public interface PersistenceInstanceReader { - Map read(WorkflowDefinition definition, Collection instanceIds); + default Stream scanAll(WorkflowDefinition definition) { + return scanAll(definition, definition.application().id()); + } - Optional read(WorkflowDefinition definition, String instanceId); + Stream scanAll(WorkflowDefinition definition, String applicationId); - @Override - default void close() {} + Optional find(WorkflowDefinition definition, String instanceId); } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceStore.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceStore.java similarity index 76% rename from impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceStore.java rename to impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceStore.java index aa1d998e0..99c5096e0 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceStore.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceStore.java @@ -13,8 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.impl.persistence.bigmap; +package io.serverlessworkflow.impl.persistence; -public interface BigMapInstanceStore extends AutoCloseable { - BigMapInstanceTransaction begin(); +public interface PersistenceInstanceStore extends AutoCloseable { + PersistenceInstanceTransaction begin(); + + @Override + default void close() {} } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceTransaction.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceTransaction.java new file mode 100644 index 000000000..2a6c6782e --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceTransaction.java @@ -0,0 +1,47 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import io.serverlessworkflow.impl.TaskContextData; +import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowStatus; +import java.util.Optional; +import java.util.stream.Stream; + +public interface PersistenceInstanceTransaction { + + void commit(); + + void rollback(); + + void writeInstanceData(WorkflowContextData workflowContext); + + void writeRetryTask(WorkflowContextData workflowContext, TaskContextData taskContext); + + void writeCompletedTask(WorkflowContextData workflowContext, TaskContextData taskContext); + + void writeStatus(WorkflowContextData workflowContext, WorkflowStatus suspended); + + void removeProcessInstance(WorkflowContextData workflowContext); + + void clearStatus(WorkflowContextData workflowContext); + + Stream scanAll(String applicationId, WorkflowDefinition definition); + + Optional readWorkflowInfo( + WorkflowDefinition definition, String instanceId); +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java index f6b07548f..55f79faff 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java @@ -18,7 +18,7 @@ import io.serverlessworkflow.impl.TaskContextData; import io.serverlessworkflow.impl.WorkflowContextData; -public interface PersistenceInstanceWriter extends AutoCloseable { +public interface PersistenceInstanceWriter { void started(WorkflowContextData workflowContext); @@ -37,7 +37,4 @@ public interface PersistenceInstanceWriter extends AutoCloseable { void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext); void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext); - - @Override - default void close() {} } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java index 773f606cd..a245c113d 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java @@ -31,13 +31,13 @@ public class WorkflowPersistenceInstance extends WorkflowMutableInstance { public WorkflowPersistenceInstance(WorkflowDefinition definition, PersistenceWorkflowInfo info) { super(definition, info.id(), info.input()); this.info = info; + this.startedAt = info.startedAt(); } @Override public CompletableFuture start() { return startExecution( () -> { - startedAt = info.startedAt(); if (info.status() == WorkflowStatus.SUSPENDED) { internalSuspend(); } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java index 958036fc3..781b8c12a 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java @@ -15,8 +15,6 @@ */ package io.serverlessworkflow.impl.persistence; -import static io.serverlessworkflow.impl.WorkflowUtils.safeClose; - import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent; import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; @@ -80,8 +78,4 @@ public void onTaskCompleted(TaskCompletedEvent ev) { public void onTaskRetried(TaskRetriedEvent ev) { persistenceWriter.taskRetried(ev.workflowContext(), ev.taskContext()); } - - public void close() { - safeClose(persistenceWriter); - } } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceReader.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceReader.java deleted file mode 100644 index e12de9b89..000000000 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceReader.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.impl.persistence.bigmap; - -import io.serverlessworkflow.impl.WorkflowDefinition; -import io.serverlessworkflow.impl.WorkflowInstance; -import io.serverlessworkflow.impl.WorkflowStatus; -import io.serverlessworkflow.impl.persistence.PersistenceInstanceReader; -import io.serverlessworkflow.impl.persistence.PersistenceTaskInfo; -import io.serverlessworkflow.impl.persistence.PersistenceWorkflowInfo; -import io.serverlessworkflow.impl.persistence.WorkflowPersistenceInstance; -import java.util.Collection; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Optional; -import java.util.function.Function; -import java.util.stream.Collectors; - -public abstract class BigMapInstanceReader implements PersistenceInstanceReader { - - private final BigMapInstanceStore store; - - protected BigMapInstanceReader(BigMapInstanceStore store) { - this.store = store; - } - - private Result doTransaction( - Function, Result> operations) { - BigMapInstanceTransaction transaction = store.begin(); - try { - Result result = operations.apply(transaction); - transaction.commit(); - return result; - } catch (Exception ex) { - transaction.rollback(); - throw ex; - } - } - - @Override - public Map readAll(WorkflowDefinition definition) { - return doTransaction( - t -> { - Map instances = t.instanceData(definition); - Map status = t.status(definition); - return instances.entrySet().stream() - .map( - e -> - restore( - definition, - e.getKey(), - e.getValue(), - t.tasks(e.getKey()), - status.get(e.getKey()))) - .collect(Collectors.toMap(WorkflowInstance::id, i -> i)); - }); - } - - @Override - public Map read( - WorkflowDefinition definition, Collection instanceIds) { - return doTransaction( - t -> { - Map instances = t.instanceData(definition); - Map status = t.status(definition); - return instanceIds.stream() - .map(id -> read(instances, status, t.tasks(id), definition, id)) - .flatMap(Optional::stream) - .collect(Collectors.toMap(WorkflowInstance::id, id -> id)); - }); - } - - @Override - public Optional read(WorkflowDefinition definition, String instanceId) { - return doTransaction( - t -> - read( - t.instanceData(definition), - t.status(definition), - t.tasks(instanceId), - definition, - instanceId)); - } - - private Optional read( - Map instances, - Map status, - Map tasks, - WorkflowDefinition definition, - String instanceId) { - return instances.containsKey(instanceId) - ? Optional.empty() - : Optional.of( - restore( - definition, instanceId, instances.get(instanceId), tasks, status.get(instanceId))); - } - - public void close() {} - - protected WorkflowInstance restore( - WorkflowDefinition definition, - String instanceId, - V instanceData, - Map tasksData, - S status) { - return new WorkflowPersistenceInstance( - definition, readPersistenceInfo(instanceId, instanceData, tasksData, status)); - } - - protected abstract PersistenceTaskInfo unmarshallTaskInfo(T taskData); - - protected abstract PersistenceInstanceInfo unmarshallInstanceInfo(V instanceData); - - protected abstract WorkflowStatus unmarshallStatus(S statusData); - - protected PersistenceWorkflowInfo readPersistenceInfo( - String instanceId, V instanceData, Map tasksData, S status) { - PersistenceInstanceInfo instanceInfo = unmarshallInstanceInfo(instanceData); - return new PersistenceWorkflowInfo( - instanceId, - instanceInfo.startedAt(), - instanceInfo.input(), - status == null ? null : unmarshallStatus(status), - tasksData.entrySet().stream() - .collect( - Collectors.toMap(Entry::getKey, entry -> unmarshallTaskInfo(entry.getValue())))); - } -} diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java index 72b89ed17..949f5fe71 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java @@ -15,20 +15,147 @@ */ package io.serverlessworkflow.impl.persistence.bigmap; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.TaskContextData; +import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowDefinitionData; +import io.serverlessworkflow.impl.WorkflowInstanceData; +import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.persistence.PersistenceInstanceInfo; +import io.serverlessworkflow.impl.persistence.PersistenceInstanceTransaction; +import io.serverlessworkflow.impl.persistence.PersistenceTaskInfo; +import io.serverlessworkflow.impl.persistence.PersistenceWorkflowInfo; import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; -public interface BigMapInstanceTransaction { +public abstract class BigMapInstanceTransaction + implements PersistenceInstanceTransaction { - Map instanceData(WorkflowDefinitionData definition); + @Override + public void writeInstanceData(WorkflowContextData workflowContext) { + String key = key(workflowContext); + instanceData(workflowContext.definition()) + .put(key, marshallInstance(workflowContext.instanceData())); + applicationData() + .put(key, marshallApplicationId(workflowContext.definition().application().id())); + } - Map status(WorkflowDefinitionData workflowContext); + @Override + public void writeRetryTask(WorkflowContextData workflowContext, TaskContextData taskContext) { + tasks(key(workflowContext)) + .put( + taskContext.position().jsonPointer(), + marshallTaskRetried(workflowContext, (TaskContext) taskContext)); + } - Map tasks(K instanceId); + @Override + public void writeCompletedTask(WorkflowContextData workflowContext, TaskContextData taskContext) { + tasks(key(workflowContext)) + .put( + taskContext.position().jsonPointer(), + marshallTaskCompleted(workflowContext, (TaskContext) taskContext)); + } - void cleanupTasks(K instanceId); + @Override + public Stream scanAll( + String applicationId, WorkflowDefinition definition) { + Map instances = instanceData(definition); + Map applicationData = applicationData(); + Map status = status(definition); + return instances.entrySet().stream() + .filter(e -> testAppl(applicationData, e.getKey(), applicationId)) + .map( + e -> + readPersistenceInfo( + e.getKey(), e.getValue(), tasks(e.getKey()), status.get(e.getKey()))); + } - void commit(); + private boolean testAppl(Map applicationData, String key, String applicationId) { + A item = applicationData.get(key); + return item == null || unmarshallApplicationId(item).equals(applicationId); + } - void rollback(); + @Override + public Optional readWorkflowInfo( + WorkflowDefinition definition, String key) { + Map instances = instanceData(definition); + return instances.containsKey(key) + ? Optional.of( + readPersistenceInfo(key, instances.get(key), tasks(key), status(definition).get(key))) + : Optional.empty(); + } + + @Override + public void writeStatus(WorkflowContextData workflowContext, WorkflowStatus status) { + status(workflowContext.definition()).put(key(workflowContext), marshallStatus(status)); + } + + @Override + public void removeProcessInstance(WorkflowContextData workflowContext) { + String key = key(workflowContext); + WorkflowDefinitionData definition = workflowContext.definition(); + instanceData(definition).remove(key); + clearStatus(definition, key); + removeTasks(key); + } + + @Override + public void clearStatus(WorkflowContextData workflowContext) { + clearStatus(workflowContext.definition(), key(workflowContext)); + } + + private void clearStatus(WorkflowDefinitionData definition, String key) { + status(definition).remove(key); + } + + protected PersistenceWorkflowInfo readPersistenceInfo( + String instanceId, V instanceData, Map tasksData, S status) { + PersistenceInstanceInfo instanceInfo = unmarshallInstanceInfo(instanceData); + return new PersistenceWorkflowInfo( + instanceId, + instanceInfo.startedAt(), + instanceInfo.input(), + status == null ? null : unmarshallStatus(status), + tasksData.entrySet().stream() + .collect( + Collectors.toMap(Entry::getKey, entry -> unmarshallTaskInfo(entry.getValue())))); + } + + private String key(WorkflowContextData workflowContext) { + return workflowContext.instanceData().id(); + } + + protected abstract Map applicationData(); + + protected abstract Map instanceData(WorkflowDefinitionData definition); + + protected abstract Map status(WorkflowDefinitionData workflowContext); + + protected abstract Map tasks(String instanceId); + + protected abstract V marshallInstance(WorkflowInstanceData instance); + + protected abstract T marshallTaskCompleted( + WorkflowContextData workflowContext, TaskContext taskContext); + + protected abstract T marshallTaskRetried( + WorkflowContextData workflowContext, TaskContext taskContext); + + protected abstract A marshallApplicationId(String id); + + protected abstract S marshallStatus(WorkflowStatus status); + + protected abstract PersistenceTaskInfo unmarshallTaskInfo(T taskData); + + protected abstract PersistenceInstanceInfo unmarshallInstanceInfo(V instanceData); + + protected abstract WorkflowStatus unmarshallStatus(S statusData); + + protected abstract String unmarshallApplicationId(A a); + + protected abstract void removeTasks(String key); } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceWriter.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceWriter.java deleted file mode 100644 index 8d305264d..000000000 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceWriter.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.impl.persistence.bigmap; - -import io.serverlessworkflow.impl.TaskContext; -import io.serverlessworkflow.impl.TaskContextData; -import io.serverlessworkflow.impl.WorkflowContextData; -import io.serverlessworkflow.impl.WorkflowInstanceData; -import io.serverlessworkflow.impl.WorkflowStatus; -import io.serverlessworkflow.impl.persistence.PersistenceInstanceWriter; -import java.util.function.Consumer; - -public abstract class BigMapInstanceWriter implements PersistenceInstanceWriter { - - private BigMapInstanceStore store; - - protected BigMapInstanceWriter(BigMapInstanceStore store) { - this.store = store; - } - - private void doTransaction(Consumer> operations) { - BigMapInstanceTransaction transaction = store.begin(); - try { - operations.accept(transaction); - transaction.commit(); - } catch (Exception ex) { - transaction.rollback(); - throw ex; - } - } - - @Override - public void started(WorkflowContextData workflowContext) { - doTransaction( - t -> - t.instanceData(workflowContext.definition()) - .put(key(workflowContext), marshallInstance(workflowContext.instanceData()))); - } - - @Override - public void completed(WorkflowContextData workflowContext) { - removeProcessInstance(workflowContext); - } - - @Override - public void failed(WorkflowContextData workflowContext, Throwable ex) { - removeProcessInstance(workflowContext); - } - - @Override - public void aborted(WorkflowContextData workflowContext) { - removeProcessInstance(workflowContext); - } - - @Override - public void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext) {} - - @Override - public void taskRetried(WorkflowContextData workflowContext, TaskContextData taskContext) { - doTransaction( - t -> - t.tasks(key(workflowContext)) - .put( - taskContext.position().jsonPointer(), - marshallTaskRetried(workflowContext, (TaskContext) taskContext))); - } - - @Override - public void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext) { - doTransaction( - t -> - t.tasks(key(workflowContext)) - .put( - taskContext.position().jsonPointer(), - marshallTaskCompleted(workflowContext, (TaskContext) taskContext))); - } - - @Override - public void suspended(WorkflowContextData workflowContext) { - doTransaction( - t -> - t.status(workflowContext.definition()) - .put(key(workflowContext), marshallStatus(WorkflowStatus.SUSPENDED))); - } - - @Override - public void resumed(WorkflowContextData workflowContext) { - doTransaction(t -> t.status(workflowContext.definition()).remove(key(workflowContext))); - } - - protected void removeProcessInstance(WorkflowContextData workflowContext) { - doTransaction( - t -> { - K key = key(workflowContext); - t.instanceData(workflowContext.definition()).remove(key); - t.status(workflowContext.definition()).remove(key); - t.cleanupTasks(key); - }); - } - - protected abstract K key(WorkflowContextData workflowContext); - - protected abstract V marshallInstance(WorkflowInstanceData instance); - - protected abstract T marshallTaskCompleted( - WorkflowContextData workflowContext, TaskContext taskContext); - - protected abstract T marshallTaskRetried( - WorkflowContextData workflowContext, TaskContext taskContext); - - protected abstract S marshallStatus(WorkflowStatus status); -} diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceReader.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceReader.java deleted file mode 100644 index cd92e2df1..000000000 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceReader.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.impl.persistence.bigmap; - -import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowStatus; -import io.serverlessworkflow.impl.marshaller.WorkflowBufferFactory; -import io.serverlessworkflow.impl.marshaller.WorkflowInputBuffer; -import io.serverlessworkflow.impl.persistence.CompletedTaskInfo; -import io.serverlessworkflow.impl.persistence.PersistenceTaskInfo; -import io.serverlessworkflow.impl.persistence.RetriedTaskInfo; -import java.io.ByteArrayInputStream; - -public class BytesMapInstanceReader extends BigMapInstanceReader { - - private final WorkflowBufferFactory factory; - - public BytesMapInstanceReader( - BigMapInstanceStore store, WorkflowBufferFactory factory) { - super(store); - this.factory = factory; - } - - @Override - protected PersistenceTaskInfo unmarshallTaskInfo(byte[] taskData) { - try (WorkflowInputBuffer buffer = factory.input(new ByteArrayInputStream(taskData))) { - byte version = buffer.readByte(); - switch (version) { - case MarshallingUtils.VERSION_0: - default: - return readVersion0(buffer); - case MarshallingUtils.VERSION_1: - return readVersion1(buffer); - } - } - } - - private PersistenceTaskInfo readVersion1(WorkflowInputBuffer buffer) { - TaskStatus taskStatus = buffer.readEnum(TaskStatus.class); - switch (taskStatus) { - case COMPLETED: - default: - return readVersion0(buffer); - case RETRIED: - return new RetriedTaskInfo(buffer.readShort()); - } - } - - private PersistenceTaskInfo readVersion0(WorkflowInputBuffer buffer) { - return new CompletedTaskInfo( - buffer.readInstant(), - (WorkflowModel) buffer.readObject(), - (WorkflowModel) buffer.readObject(), - buffer.readBoolean(), - buffer.readBoolean() ? buffer.readString() : null); - } - - @Override - protected PersistenceInstanceInfo unmarshallInstanceInfo(byte[] instanceData) { - try (WorkflowInputBuffer buffer = factory.input(new ByteArrayInputStream(instanceData))) { - buffer.readByte(); // version byte not used at the moment - return new PersistenceInstanceInfo(buffer.readInstant(), (WorkflowModel) buffer.readObject()); - } - } - - @Override - protected WorkflowStatus unmarshallStatus(byte[] statusData) { - try (WorkflowInputBuffer buffer = factory.input(new ByteArrayInputStream(statusData))) { - buffer.readByte(); // version byte not used at the moment - return buffer.readEnum(WorkflowStatus.class); - } - } -} diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java new file mode 100644 index 000000000..2df8086cf --- /dev/null +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java @@ -0,0 +1,165 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence.bigmap; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowInstanceData; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.executors.AbstractTaskExecutor; +import io.serverlessworkflow.impl.executors.TransitionInfo; +import io.serverlessworkflow.impl.marshaller.MarshallingUtils; +import io.serverlessworkflow.impl.marshaller.TaskStatus; +import io.serverlessworkflow.impl.marshaller.WorkflowBufferFactory; +import io.serverlessworkflow.impl.marshaller.WorkflowInputBuffer; +import io.serverlessworkflow.impl.marshaller.WorkflowOutputBuffer; +import io.serverlessworkflow.impl.persistence.CompletedTaskInfo; +import io.serverlessworkflow.impl.persistence.PersistenceInstanceInfo; +import io.serverlessworkflow.impl.persistence.PersistenceTaskInfo; +import io.serverlessworkflow.impl.persistence.RetriedTaskInfo; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; + +public abstract class BytesMapInstanceTransaction + extends BigMapInstanceTransaction { + + private static final byte VERSION_0 = 0; + private static final byte VERSION_1 = 1; + + private final WorkflowBufferFactory factory; + + protected BytesMapInstanceTransaction(WorkflowBufferFactory factory) { + this.factory = factory; + } + + @Override + protected byte[] marshallTaskCompleted(WorkflowContextData contextData, TaskContext taskContext) { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + try (WorkflowOutputBuffer writer = factory.output(bytes)) { + writer.writeByte(VERSION_1); + writer.writeEnum(TaskStatus.COMPLETED); + writer.writeInstant(taskContext.completedAt()); + writeModel(writer, taskContext.output()); + writeModel(writer, contextData.context()); + TransitionInfo transition = taskContext.transition(); + writer.writeBoolean(transition.isEndNode()); + AbstractTaskExecutor next = (AbstractTaskExecutor) transition.next(); + if (next == null) { + writer.writeBoolean(false); + } else { + writer.writeBoolean(true); + writer.writeString(next.position().jsonPointer()); + } + } + return bytes.toByteArray(); + } + + @Override + protected byte[] marshallStatus(WorkflowStatus status) { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + try (WorkflowOutputBuffer writer = factory.output(bytes)) { + writer.writeByte(VERSION_0); + writer.writeEnum(status); + } + return bytes.toByteArray(); + } + + @Override + protected byte[] marshallInstance(WorkflowInstanceData instance) { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + try (WorkflowOutputBuffer writer = factory.output(bytes)) { + writer.writeByte(VERSION_0); + writer.writeInstant(instance.startedAt()); + writeModel(writer, instance.input()); + } + return bytes.toByteArray(); + } + + protected void writeModel(WorkflowOutputBuffer writer, WorkflowModel model) { + writer.writeObject(model); + } + + protected byte[] marshallApplicationId(String id) { + return MarshallingUtils.writeString(factory, id); + } + + protected String unmarshallApplicationId(byte[] value) { + return MarshallingUtils.readString(factory, value); + } + + @Override + protected byte[] marshallTaskRetried( + WorkflowContextData workflowContext, TaskContext taskContext) { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + try (WorkflowOutputBuffer writer = factory.output(bytes)) { + writer.writeByte(VERSION_1); + writer.writeEnum(TaskStatus.RETRIED); + writer.writeShort(taskContext.retryAttempt()); + } + return bytes.toByteArray(); + } + + @Override + protected PersistenceTaskInfo unmarshallTaskInfo(byte[] taskData) { + try (WorkflowInputBuffer buffer = factory.input(new ByteArrayInputStream(taskData))) { + byte version = buffer.readByte(); + switch (version) { + case VERSION_0: + default: + return readVersion0(buffer); + case VERSION_1: + return readVersion1(buffer); + } + } + } + + private PersistenceTaskInfo readVersion1(WorkflowInputBuffer buffer) { + TaskStatus taskStatus = buffer.readEnum(TaskStatus.class); + switch (taskStatus) { + case COMPLETED: + default: + return readVersion0(buffer); + case RETRIED: + return new RetriedTaskInfo(buffer.readShort()); + } + } + + private PersistenceTaskInfo readVersion0(WorkflowInputBuffer buffer) { + return new CompletedTaskInfo( + buffer.readInstant(), + (WorkflowModel) buffer.readObject(), + (WorkflowModel) buffer.readObject(), + buffer.readBoolean(), + buffer.readBoolean() ? buffer.readString() : null); + } + + @Override + protected PersistenceInstanceInfo unmarshallInstanceInfo(byte[] instanceData) { + try (WorkflowInputBuffer buffer = factory.input(new ByteArrayInputStream(instanceData))) { + buffer.readByte(); // version byte not used at the moment + return new PersistenceInstanceInfo(buffer.readInstant(), (WorkflowModel) buffer.readObject()); + } + } + + @Override + protected WorkflowStatus unmarshallStatus(byte[] statusData) { + try (WorkflowInputBuffer buffer = factory.input(new ByteArrayInputStream(statusData))) { + buffer.readByte(); // version byte not used at the moment + return buffer.readEnum(WorkflowStatus.class); + } + } +} diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceWriter.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceWriter.java deleted file mode 100644 index 279bc0937..000000000 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceWriter.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.impl.persistence.bigmap; - -import io.serverlessworkflow.impl.TaskContext; -import io.serverlessworkflow.impl.WorkflowContextData; -import io.serverlessworkflow.impl.WorkflowInstanceData; -import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowStatus; -import io.serverlessworkflow.impl.executors.AbstractTaskExecutor; -import io.serverlessworkflow.impl.executors.TaskExecutor; -import io.serverlessworkflow.impl.marshaller.WorkflowBufferFactory; -import io.serverlessworkflow.impl.marshaller.WorkflowOutputBuffer; -import java.io.ByteArrayOutputStream; - -public class BytesMapInstanceWriter extends BigMapIdInstanceWriter { - - private final WorkflowBufferFactory factory; - - public BytesMapInstanceWriter( - BigMapInstanceStore store, WorkflowBufferFactory factory) { - super(store); - this.factory = factory; - } - - @Override - protected byte[] marshallTaskCompleted(WorkflowContextData contextData, TaskContext taskContext) { - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - try (WorkflowOutputBuffer writer = factory.output(bytes)) { - writer.writeByte(MarshallingUtils.VERSION_1); - writer.writeEnum(TaskStatus.COMPLETED); - writer.writeInstant(taskContext.completedAt()); - writeModel(writer, taskContext.output()); - writeModel(writer, contextData.context()); - boolean isEndNode = taskContext.transition().isEndNode(); - writer.writeBoolean(isEndNode); - TaskExecutor next = taskContext.transition().next(); - if (next == null) { - writer.writeBoolean(false); - } else { - writer.writeBoolean(true); - writer.writeString(((AbstractTaskExecutor) next).position().jsonPointer()); - } - } - - return bytes.toByteArray(); - } - - @Override - protected byte[] marshallStatus(WorkflowStatus status) { - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - try (WorkflowOutputBuffer writer = factory.output(bytes)) { - writer.writeByte(MarshallingUtils.VERSION_0); - writer.writeEnum(status); - } - return bytes.toByteArray(); - } - - @Override - protected byte[] marshallInstance(WorkflowInstanceData instance) { - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - try (WorkflowOutputBuffer writer = factory.output(bytes)) { - writer.writeByte(MarshallingUtils.VERSION_0); - writer.writeInstant(instance.startedAt()); - writeModel(writer, instance.input()); - } - return bytes.toByteArray(); - } - - protected void writeModel(WorkflowOutputBuffer writer, WorkflowModel model) { - writer.writeObject(model); - } - - @Override - protected byte[] marshallTaskRetried( - WorkflowContextData workflowContext, TaskContext taskContext) { - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - try (WorkflowOutputBuffer writer = factory.output(bytes)) { - writer.writeByte(MarshallingUtils.VERSION_1); - writer.writeEnum(TaskStatus.RETRIED); - writer.writeShort(taskContext.retryAttempt()); - } - return bytes.toByteArray(); - } -} diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapPersistenceInstanceHandlers.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapPersistenceInstanceHandlers.java deleted file mode 100644 index e77678886..000000000 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapPersistenceInstanceHandlers.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.impl.persistence.bigmap; - -import static io.serverlessworkflow.impl.WorkflowUtils.safeClose; - -import io.serverlessworkflow.impl.marshaller.DefaultBufferFactory; -import io.serverlessworkflow.impl.marshaller.WorkflowBufferFactory; -import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers; -import io.serverlessworkflow.impl.persistence.PersistenceInstanceReader; -import io.serverlessworkflow.impl.persistence.PersistenceInstanceWriter; - -public class BytesMapPersistenceInstanceHandlers extends PersistenceInstanceHandlers - implements AutoCloseable { - - private final BigMapInstanceStore store; - - protected BytesMapPersistenceInstanceHandlers( - PersistenceInstanceWriter writer, - PersistenceInstanceReader reader, - BigMapInstanceStore store) { - super(writer, reader); - this.store = store; - } - - public static class Builder { - private final BigMapInstanceStore store; - private WorkflowBufferFactory factory; - - private Builder(BigMapInstanceStore store) { - this.store = store; - } - - public Builder withFactory(WorkflowBufferFactory factory) { - this.factory = factory; - return this; - } - - public PersistenceInstanceHandlers build() { - if (factory == null) { - factory = DefaultBufferFactory.factory(); - } - return new BytesMapPersistenceInstanceHandlers( - new BytesMapInstanceWriter(store, factory), - new BytesMapInstanceReader(store, factory), - store); - } - } - - public static Builder builder(BigMapInstanceStore store) { - return new Builder(store); - } - - @Override - public void close() { - super.close(); - safeClose(store); - } -} diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/MarshallingUtils.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/MarshallingUtils.java deleted file mode 100644 index 6c38d9016..000000000 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/MarshallingUtils.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.impl.persistence.bigmap; - -class MarshallingUtils { - - private MarshallingUtils() {} - - public static final byte VERSION_0 = 0; - public static final byte VERSION_1 = 1; -} diff --git a/impl/persistence/jackson-marshaller/src/main/java/io/serverlessworkflow/impl/marshaller/jackson/JacksonModelMarshaller.java b/impl/persistence/jackson-marshaller/src/main/java/io/serverlessworkflow/impl/marshaller/jackson/JacksonModelMarshaller.java index 9d62db1f0..ba467b766 100644 --- a/impl/persistence/jackson-marshaller/src/main/java/io/serverlessworkflow/impl/marshaller/jackson/JacksonModelMarshaller.java +++ b/impl/persistence/jackson-marshaller/src/main/java/io/serverlessworkflow/impl/marshaller/jackson/JacksonModelMarshaller.java @@ -38,7 +38,8 @@ public void write(WorkflowOutputBuffer buffer, JacksonModel object) { @Override public JacksonModel read(WorkflowInputBuffer buffer) { try { - return JsonUtils.mapper().readValue(buffer.readBytes(), JacksonModel.class); + JacksonModel model = JsonUtils.mapper().readValue(buffer.readBytes(), JacksonModel.class); + return model == null ? JacksonModel.NULL : model; } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/impl/persistence/mvstore/pom.xml b/impl/persistence/mvstore/pom.xml index e8e6dedc8..ba8ce88c9 100644 --- a/impl/persistence/mvstore/pom.xml +++ b/impl/persistence/mvstore/pom.xml @@ -16,5 +16,9 @@ io.serverlessworkflow serverlessworkflow-persistence-big-map + + io.serverlessworkflow + serverlessworkflow-persistence-tests + \ No newline at end of file diff --git a/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java b/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java index 0f206f9bf..6add6a994 100644 --- a/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java +++ b/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java @@ -15,17 +15,24 @@ */ package io.serverlessworkflow.impl.persistence.mvstore; -import io.serverlessworkflow.impl.persistence.bigmap.BigMapInstanceStore; +import io.serverlessworkflow.impl.marshaller.DefaultBufferFactory; +import io.serverlessworkflow.impl.marshaller.WorkflowBufferFactory; +import io.serverlessworkflow.impl.persistence.PersistenceInstanceStore; import io.serverlessworkflow.impl.persistence.bigmap.BigMapInstanceTransaction; import org.h2.mvstore.MVStore; import org.h2.mvstore.tx.TransactionStore; -public class MVStorePersistenceStore - implements BigMapInstanceStore { +public class MVStorePersistenceStore implements PersistenceInstanceStore { private final TransactionStore mvStore; + private WorkflowBufferFactory factory; public MVStorePersistenceStore(String dbName) { + this(dbName, DefaultBufferFactory.factory()); + } + + public MVStorePersistenceStore(String dbName, WorkflowBufferFactory factory) { this.mvStore = new TransactionStore(MVStore.open(dbName)); + this.factory = factory; } @Override @@ -34,7 +41,7 @@ public void close() { } @Override - public BigMapInstanceTransaction begin() { - return new MVStoreTransaction(mvStore.begin()); + public BigMapInstanceTransaction begin() { + return new MVStoreTransaction(mvStore.begin(), factory); } } diff --git a/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStoreTransaction.java b/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStoreTransaction.java index 66b09499e..9266ee4f2 100644 --- a/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStoreTransaction.java +++ b/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStoreTransaction.java @@ -18,19 +18,20 @@ import io.serverlessworkflow.api.types.Document; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.WorkflowDefinitionData; -import io.serverlessworkflow.impl.persistence.bigmap.BigMapInstanceTransaction; +import io.serverlessworkflow.impl.marshaller.WorkflowBufferFactory; +import io.serverlessworkflow.impl.persistence.bigmap.BytesMapInstanceTransaction; import java.util.Map; import org.h2.mvstore.tx.Transaction; import org.h2.mvstore.tx.TransactionMap; -public class MVStoreTransaction - implements BigMapInstanceTransaction { +public class MVStoreTransaction extends BytesMapInstanceTransaction { protected static final String ID_SEPARATOR = "-"; private final Transaction transaction; - public MVStoreTransaction(Transaction transaction) { + public MVStoreTransaction(Transaction transaction, WorkflowBufferFactory factory) { + super(factory); this.transaction = transaction; } @@ -55,7 +56,7 @@ public Map status(WorkflowDefinitionData workflowContext) { } @Override - public void cleanupTasks(String instanceId) { + public void removeTasks(String instanceId) { transaction.removeMap(taskMap(instanceId)); } @@ -81,4 +82,9 @@ public void commit() { public void rollback() { transaction.rollback(); } + + @Override + protected Map applicationData() { + return transaction.openMap("APPLICATION"); + } } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapIdInstanceWriter.java b/impl/persistence/mvstore/src/test/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStoreTest.java similarity index 50% rename from impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapIdInstanceWriter.java rename to impl/persistence/mvstore/src/test/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStoreTest.java index 25ca9e4f9..1412d1767 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapIdInstanceWriter.java +++ b/impl/persistence/mvstore/src/test/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStoreTest.java @@ -13,19 +13,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.impl.persistence.bigmap; +package io.serverlessworkflow.impl.persistence.mvstore; -import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.persistence.PersistenceInstanceStore; +import io.serverlessworkflow.impl.persistence.test.AbstractPersistenceTest; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import org.junit.jupiter.api.AfterEach; -public abstract class BigMapIdInstanceWriter - extends BigMapInstanceWriter { +class MVStorePersistenceStoreTest extends AbstractPersistenceTest { - protected BigMapIdInstanceWriter(BigMapInstanceStore store) { - super(store); - } + private static final String DB_NAME = "dbtest.db"; @Override - protected String key(WorkflowContextData workflowContext) { - return workflowContext.instanceData().id(); + protected PersistenceInstanceStore persistenceStore() { + return new MVStorePersistenceStore(DB_NAME); + } + + @AfterEach + void destroy() throws IOException { + Files.delete(Path.of(DB_NAME)); } } diff --git a/impl/persistence/pom.xml b/impl/persistence/pom.xml index 8e2d60ab5..e705fab75 100644 --- a/impl/persistence/pom.xml +++ b/impl/persistence/pom.xml @@ -13,5 +13,6 @@ mvstore bigmap api + tests diff --git a/impl/persistence/tests/pom.xml b/impl/persistence/tests/pom.xml new file mode 100644 index 000000000..f067c3794 --- /dev/null +++ b/impl/persistence/tests/pom.xml @@ -0,0 +1,58 @@ + + 4.0.0 + + io.serverlessworkflow + serverlessworkflow-persistence + 8.0.0-SNAPSHOT + + serverlessworkflow-persistence-tests + Serverless Workflow :: Impl :: Persistence:: Tests + + + org.junit.jupiter + junit-jupiter-engine + compile + + + org.mockito + mockito-core + compile + + + org.junit.jupiter + junit-jupiter-api + compile + + + org.junit.jupiter + junit-jupiter-params + compile + + + org.assertj + assertj-core + compile + + + ch.qos.logback + logback-classic + compile + + + io.serverlessworkflow + serverlessworkflow-persistence-api + + + io.serverlessworkflow + serverlessworkflow-impl-jackson + + + io.serverlessworkflow + serverlessworkflow-api + + + io.serverlessworkflow + serverlessworkflow-persistence-jackson-marshaller + + + \ No newline at end of file diff --git a/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractPersistenceTest.java b/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractPersistenceTest.java new file mode 100644 index 000000000..26d208699 --- /dev/null +++ b/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractPersistenceTest.java @@ -0,0 +1,133 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence.test; + +import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.TaskContextData; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.WorkflowInstanceData; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowMutablePosition; +import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.executors.TransitionInfo; +import io.serverlessworkflow.impl.persistence.DefaultPersistenceInstanceHandlers; +import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers; +import io.serverlessworkflow.impl.persistence.PersistenceInstanceStore; +import io.serverlessworkflow.impl.persistence.WorkflowPersistenceInstance; +import java.io.IOException; +import java.time.Instant; +import java.util.Map; +import java.util.Optional; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +public abstract class AbstractPersistenceTest { + + protected abstract PersistenceInstanceStore persistenceStore(); + + private PersistenceInstanceHandlers handlers; + private static WorkflowApplication app; + private static WorkflowDefinition definition; + protected WorkflowModel context; + protected WorkflowInstanceData workflowInstance; + protected WorkflowContextData workflowContext; + + @BeforeAll() + static void init() throws IOException { + app = WorkflowApplication.builder().build(); + definition = app.workflowDefinition(readWorkflowFromClasspath("simple-expression.yaml")); + } + + @BeforeEach + void setup() { + handlers = DefaultPersistenceInstanceHandlers.from(persistenceStore()); + context = app.modelFactory().fromNull(); + workflowContext = mock(WorkflowContext.class); + workflowInstance = mock(WorkflowInstance.class); + when(workflowContext.context()).thenReturn(context); + when(workflowContext.definition()).thenReturn(definition); + when(workflowContext.instanceData()).thenReturn(workflowInstance); + when(workflowInstance.startedAt()).thenReturn(Instant.now()); + when(workflowInstance.context()).thenReturn(context); + when(workflowInstance.status()).thenReturn(WorkflowStatus.RUNNING); + when(workflowInstance.id()).thenReturn(app.idFactory().get()); + when(workflowInstance.input()).thenReturn(app.modelFactory().from(Map.of("name", "Javierito"))); + } + + protected TaskContextData taskContext(WorkflowPosition position, Map model) { + TaskContext taskContext = mock(TaskContext.class); + when(taskContext.position()).thenReturn(position); + when(taskContext.completedAt()).thenReturn(Instant.now()); + when(taskContext.output()).thenReturn(app.modelFactory().from(model)); + when(taskContext.transition()).thenReturn(new TransitionInfo(null, true)); + return taskContext; + } + + @AfterEach + void close() { + handlers.close(); + } + + @AfterAll + static void cleanup() { + app.close(); + } + + @Test + void testWorkflowInstance() { + final WorkflowMutablePosition position = + app.positionFactory().get().addProperty("do").addIndex(0).addProperty("change"); + final Map completedMap = Map.of("name", "fulanito"); + handlers.writer().started(workflowContext); + handlers.writer().taskCompleted(workflowContext, taskContext(position, completedMap)); + Optional optional = handlers.reader().find(definition, workflowInstance.id()); + assertThat(optional).isPresent(); + WorkflowPersistenceInstance instance = (WorkflowPersistenceInstance) optional.orElseThrow(); + assertThat(instance.input().asMap().orElseThrow()).isEqualTo(Map.of("name", "Javierito")); + assertThat(instance.startedAt()).isNotNull().isBefore(Instant.now()); + WorkflowContext updateWContext = mock(WorkflowContext.class); + TaskContext updateTContext = mock(TaskContext.class); + when(updateTContext.position()).thenReturn(position); + instance.restoreContext(updateWContext, updateTContext); + ArgumentCaptor context = ArgumentCaptor.forClass(WorkflowModel.class); + verify(updateWContext).context(context.capture()); + assertThat(context.getValue()).isEqualTo(app.modelFactory().fromNull()); + ArgumentCaptor model = ArgumentCaptor.forClass(WorkflowModel.class); + verify(updateTContext).output(model.capture()); + assertThat(model.getValue().asMap().orElseThrow()).isEqualTo(completedMap); + ArgumentCaptor instant = ArgumentCaptor.forClass(Instant.class); + verify(updateTContext).completedAt(instant.capture()); + assertThat(instant.getValue()).isNotNull().isAfterOrEqualTo(instance.startedAt()); + ArgumentCaptor transition = ArgumentCaptor.forClass(TransitionInfo.class); + verify(updateTContext).transition(transition.capture()); + assertThat(transition.getValue().isEndNode()).isTrue(); + } +} diff --git a/impl/persistence/tests/src/main/resources/simple-expression.yaml b/impl/persistence/tests/src/main/resources/simple-expression.yaml new file mode 100644 index 000000000..4e240d6bc --- /dev/null +++ b/impl/persistence/tests/src/main/resources/simple-expression.yaml @@ -0,0 +1,11 @@ +document: + dsl: '1.0.0-alpha5' + namespace: test + name: simple-expression + version: '0.1.0' +do: + - useExpression: + set: + startedAt: ${$task.startedAt.epoch.milliseconds} + id : ${$workflow.id} + version: ${$runtime.version} \ No newline at end of file diff --git a/impl/pom.xml b/impl/pom.xml index 7773adb79..48751afaf 100644 --- a/impl/pom.xml +++ b/impl/pom.xml @@ -75,6 +75,12 @@ serverlessworkflow-persistence-big-map ${project.version} + + io.serverlessworkflow + serverlessworkflow-persistence-tests + ${project.version} + test + io.serverlessworkflow serverlessworkflow-persistence-jackson-marshaller diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java index f1a6adba0..9369edc85 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java @@ -20,9 +20,9 @@ import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.persistence.DefaultPersistenceInstanceHandlers; import io.serverlessworkflow.impl.persistence.PersistenceApplicationBuilder; import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers; -import io.serverlessworkflow.impl.persistence.bigmap.BytesMapPersistenceInstanceHandlers; import io.serverlessworkflow.impl.persistence.mvstore.MVStorePersistenceStore; import java.io.IOException; import java.nio.file.Files; @@ -44,8 +44,7 @@ private static void runInstance(String dbName, boolean suspend) throws IOExcepti LOG.info("---> Generating db samples at {}", dbName); Files.deleteIfExists(Path.of(dbName)); try (PersistenceInstanceHandlers factories = - BytesMapPersistenceInstanceHandlers.builder(new MVStorePersistenceStore(dbName)) - .build(); + DefaultPersistenceInstanceHandlers.from(new MVStorePersistenceStore(dbName)); WorkflowApplication application = PersistenceApplicationBuilder.builder( WorkflowApplication.builder().withListener(new TraceExecutionListener()), diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java index d814a25f5..c1011a3be 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java @@ -22,15 +22,16 @@ import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowInstance; import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.persistence.DefaultPersistenceInstanceHandlers; import io.serverlessworkflow.impl.persistence.PersistenceApplicationBuilder; import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers; -import io.serverlessworkflow.impl.persistence.bigmap.BytesMapPersistenceInstanceHandlers; import io.serverlessworkflow.impl.persistence.mvstore.MVStorePersistenceStore; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.Collection; import java.util.Map; +import java.util.stream.Stream; import org.junit.jupiter.api.Test; public class MvStorePersistenceTest { @@ -39,22 +40,28 @@ public class MvStorePersistenceTest { void testSimpleRun() throws IOException { final String dbName = "db-samples/simple.db"; try (PersistenceInstanceHandlers handlers = - BytesMapPersistenceInstanceHandlers.builder(new MVStorePersistenceStore(dbName)) - .build(); + DefaultPersistenceInstanceHandlers.from(new MVStorePersistenceStore(dbName)); WorkflowApplication application = PersistenceApplicationBuilder.builder(WorkflowApplication.builder(), handlers.writer()) .build(); ) { WorkflowDefinition definition = application.workflowDefinition( readWorkflowFromClasspath("workflows-samples/simple-expression.yaml")); - assertThat(handlers.reader().readAll(definition).values()).isEmpty(); + assertNoInstance(handlers, definition); definition.instance(Map.of()).start().join(); - assertThat(handlers.reader().readAll(definition).values()).isEmpty(); + assertNoInstance(handlers, definition); } finally { Files.delete(Path.of(dbName)); } } + private void assertNoInstance( + PersistenceInstanceHandlers handlers, WorkflowDefinition definition) { + try (Stream stream = handlers.reader().scanAll(definition)) { + assertThat(stream.count()).isEqualTo(0); + } + } + @Test void testWaitingInstance() throws IOException { TaskCounterPerInstanceListener taskCounter = new TaskCounterPerInstanceListener(); @@ -93,8 +100,7 @@ void testRestoreSuspendedInstanceV1() throws IOException { private void runIt(String dbName, WorkflowStatus expectedStatus) throws IOException { TaskCounterPerInstanceListener taskCounter = new TaskCounterPerInstanceListener(); try (PersistenceInstanceHandlers handlers = - BytesMapPersistenceInstanceHandlers.builder(new MVStorePersistenceStore(dbName)) - .build(); + DefaultPersistenceInstanceHandlers.from(new MVStorePersistenceStore(dbName)); WorkflowApplication application = PersistenceApplicationBuilder.builder( WorkflowApplication.builder() @@ -105,16 +111,19 @@ private void runIt(String dbName, WorkflowStatus expectedStatus) throws IOExcept WorkflowDefinition definition = application.workflowDefinition( readWorkflowFromClasspath("workflows-samples/set-listen-to-any.yaml")); - Collection instances = handlers.reader().readAll(definition).values(); - assertThat(instances).hasSize(1); - instances.forEach(WorkflowInstance::start); - assertThat(instances) - .singleElement() - .satisfies( - instance -> { - assertThat(instance.status()).isEqualTo(expectedStatus); - assertThat(taskCounter.taskCounter(instance.id()).completed()).isEqualTo(0); - }); + + try (Stream stream = handlers.reader().scanAll(definition)) { + Collection instances = stream.toList(); + assertThat(instances).hasSize(1); + instances.forEach(WorkflowInstance::start); + assertThat(instances) + .singleElement() + .satisfies( + instance -> { + assertThat(instance.status()).isEqualTo(expectedStatus); + assertThat(taskCounter.taskCounter(instance.id()).completed()).isEqualTo(0); + }); + } } } }