Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@

public class WorkflowApplication implements AutoCloseable {

private final String id;
private final TaskExecutorFactory taskFactory;
private final ExpressionFactory exprFactory;
private final ResourceLoaderFactory resourceLoaderFactory;
Expand Down Expand Up @@ -108,6 +109,7 @@ private WorkflowApplication(Builder builder) {
this.templateResolver = builder.templateResolver;
this.functionReader = builder.functionReader;
this.defaultCatalogURI = builder.defaultCatalogURI;
this.id = builder.id;
}

public TaskExecutorFactory taskFactory() {
Expand Down Expand Up @@ -165,6 +167,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
};
}

private String id;
private TaskExecutorFactory taskFactory;
private Collection<ExpressionFactory> exprFactories = new HashSet<>();
private Collection<WorkflowExecutionListener> listeners =
Expand Down Expand Up @@ -198,6 +201,11 @@ private Builder() {
.forEach(a -> additionalObjects.put(a.name(), a));
}

public Builder withId(String id) {
this.id = id;
return this;
}

public Builder withListener(WorkflowExecutionListener listener) {
listeners.add(listener);
return this;
Expand Down Expand Up @@ -304,13 +312,14 @@ public Builder withDefaultCatalogURI(URI defaultCatalogURI) {
}

public WorkflowApplication build() {

if (modelFactory == null) {
modelFactory =
loadFirst(WorkflowModelFactory.class)
.orElseThrow(
() ->
new IllegalStateException(
"WorkflowModelFactory instance has to be set in WorkflowApplication or present in the classpath"));
"WorkflowModelFactory instance has to be set in WorkflowApplication or pr^eesent in the classpath"));
}
if (contextFactory == null) {
contextFactory = modelFactory;
Expand Down Expand Up @@ -360,6 +369,9 @@ public WorkflowApplication build() {
if (defaultCatalogURI == null) {
defaultCatalogURI = URI.create("https://github.com/serverlessworkflow/catalog");
}
if (id == null) {
id = idFactory.get();
}
return new WorkflowApplication(this);
}
}
Expand Down Expand Up @@ -453,6 +465,10 @@ public URI defaultCatalogURI() {
return defaultCatalogURI;
}

public String id() {
return id;
}

public <T> Optional<T> additionalObject(
String name, WorkflowContext workflowContext, TaskContext taskContext) {
return Optional.ofNullable(additionalObjects.get(name))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ public void addTaskExecutor(WorkflowMutablePosition position, TaskExecutor<?> ta
executors.put(position.jsonPointer(), taskExecutor);
}

@Override
public WorkflowDefinitionId id() {
return definitionId;
}

@Override
public void close() {
safeClose(resourceLoader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ public interface WorkflowDefinitionData {
Workflow workflow();

WorkflowApplication application();

WorkflowDefinitionId id();
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,8 @@ public static WorkflowDefinitionId of(Workflow workflow) {
public static WorkflowDefinitionId fromName(String name) {
return new WorkflowDefinitionId(DEFAULT_NAMESPACE, name, DEFAULT_VERSION);
}

public String toString(String separator) {
return namespace + separator + name + separator + version;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.marshaller;

import io.serverlessworkflow.impl.WorkflowModel;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.time.Instant;
import java.util.function.BiConsumer;
import java.util.function.Function;

public class MarshallingUtils {

private MarshallingUtils() {}

public static byte[] writeInstant(WorkflowBufferFactory factory, Instant instant) {
return writeValue(factory, instant, (b, v) -> b.writeInstant(v));
}

public static <T extends Enum<T>> byte[] writeEnum(
WorkflowBufferFactory factory, T enumInstance) {
return writeValue(factory, enumInstance, (b, v) -> b.writeEnum(v));
}

public static byte[] writeModel(WorkflowBufferFactory factory, WorkflowModel model) {
return writeValue(factory, model, (b, v) -> b.writeObject(v));
}

public static byte[] writeShort(WorkflowBufferFactory factory, short value) {
return writeValue(factory, value, (b, v) -> b.writeShort(v));
}

public static byte[] writeBoolean(WorkflowBufferFactory factory, boolean value) {
return writeValue(factory, value, (b, v) -> b.writeBoolean(v));
}

public static byte[] writeString(WorkflowBufferFactory factory, String value) {
return writeValue(factory, value, (b, v) -> b.writeString(v));
}

public static String readString(WorkflowBufferFactory factory, byte[] value) {
return readValue(factory, value, WorkflowInputBuffer::readString);
}

public static boolean readBoolean(WorkflowBufferFactory factory, byte[] value) {
return readValue(factory, value, WorkflowInputBuffer::readBoolean);
}

public static short readShort(WorkflowBufferFactory factory, byte[] value) {
return readValue(factory, value, WorkflowInputBuffer::readShort);
}

public static WorkflowModel readModel(WorkflowBufferFactory factory, byte[] value) {
return readValue(factory, value, b -> (WorkflowModel) b.readObject());
}

public static Instant readInstant(WorkflowBufferFactory factory, byte[] value) {
return readValue(factory, value, WorkflowInputBuffer::readInstant);
}

public static <T extends Enum<T>> T readEnum(
WorkflowBufferFactory factory, byte[] value, Class<T> enumClass) {
return readValue(factory, value, b -> b.readEnum(enumClass));
}

private static <T> byte[] writeValue(
WorkflowBufferFactory factory, T value, BiConsumer<WorkflowOutputBuffer, T> valueConsumer) {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
try (WorkflowOutputBuffer buffer = factory.output(bytesOut)) {
valueConsumer.accept(buffer, value);
}
return bytesOut.toByteArray();
}

private static <T> T readValue(
WorkflowBufferFactory factory, byte[] value, Function<WorkflowInputBuffer, T> valueConsumer) {
if (value == null) {
return null;
}
ByteArrayInputStream bytesIn = new ByteArrayInputStream(value);
try (WorkflowInputBuffer buffer = factory.input(bytesIn)) {
return valueConsumer.apply(buffer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.persistence.bigmap;
package io.serverlessworkflow.impl.marshaller;

enum TaskStatus {
public enum TaskStatus {
COMPLETED,
RETRIED
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.persistence;

import static io.serverlessworkflow.impl.WorkflowUtils.safeClose;

public class DefaultPersistenceInstanceHandlers extends PersistenceInstanceHandlers {

private final PersistenceInstanceStore store;

public static DefaultPersistenceInstanceHandlers from(PersistenceInstanceStore store) {
return new DefaultPersistenceInstanceHandlers(
new DefaultPersistenceInstanceWriter(store),
new DefaultPersistenceInstanceReader(store),
store);
}

private DefaultPersistenceInstanceHandlers(
PersistenceInstanceWriter writer,
PersistenceInstanceReader reader,
PersistenceInstanceStore store) {
super(writer, reader);
this.store = store;
}

@Override
public void close() {
safeClose(store);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.persistence;

import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowInstance;
import java.util.Optional;
import java.util.stream.Stream;

public class DefaultPersistenceInstanceReader implements PersistenceInstanceReader {

private final PersistenceInstanceStore store;

protected DefaultPersistenceInstanceReader(PersistenceInstanceStore store) {
this.store = store;
}

@Override
public Optional<WorkflowInstance> find(WorkflowDefinition definition, String instanceId) {
PersistenceInstanceTransaction transaction = store.begin();
try {
return read(transaction, definition, instanceId);
} catch (Exception ex) {
transaction.rollback();
throw ex;
}
}

private Optional<WorkflowInstance> read(
PersistenceInstanceTransaction t, WorkflowDefinition definition, String instanceId) {
return t.readWorkflowInfo(definition, instanceId)
.map(i -> new WorkflowPersistenceInstance(definition, i));
}

@Override
public Stream<WorkflowInstance> scanAll(WorkflowDefinition definition, String applicationId) {
PersistenceInstanceTransaction transaction = store.begin();
return transaction
.scanAll(applicationId, definition)
.onClose(() -> transaction.commit())
.map(v -> new WorkflowPersistenceInstance(definition, v));
}
}
Loading