From 11d18b13e431f429b41d240e1a316a3ca034610a Mon Sep 17 00:00:00 2001 From: fjtirado Date: Fri, 23 Jan 2026 21:04:06 +0100 Subject: [PATCH 1/2] Refactoring persistence layer Signed-off-by: fjtirado --- .../AbstractPersistenceInstanceWriter.java} | 88 ++++------- .../DefaultPersistenceInstanceReader.java | 67 +++++++++ .../DefaultPersistenceInstanceWriter.java} | 7 +- .../PersistenceInstanceHandlers.java | 24 ++- .../persistence}/PersistenceInstanceInfo.java | 2 +- .../PersistenceInstanceReader.java | 12 +- .../PersistenceInstanceStore.java} | 6 +- .../PersistenceInstanceTransaction.java | 48 ++++++ .../PersistenceInstanceWriter.java | 5 +- .../WorkflowPersistenceListener.java | 6 - .../bigmap/BigMapInstanceReader.java | 141 ------------------ .../bigmap/BigMapInstanceTransaction.java | 109 +++++++++++++- .../bigmap/BytesMapInstanceReader.java | 86 ----------- ....java => BytesMapInstanceTransaction.java} | 63 +++++++- .../BytesMapPersistenceInstanceHandlers.java | 72 --------- .../mvstore/MVStorePersistenceStore.java | 17 ++- .../mvstore/MVStoreTransaction.java | 11 +- .../impl/test/DBGenerator.java | 6 +- .../impl/test/MvStorePersistenceTest.java | 17 +-- 19 files changed, 364 insertions(+), 423 deletions(-) rename impl/persistence/{bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceWriter.java => api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java} (51%) create mode 100644 impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java rename impl/persistence/{bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapIdInstanceWriter.java => api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java} (77%) rename impl/persistence/{bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap => api/src/main/java/io/serverlessworkflow/impl/persistence}/PersistenceInstanceInfo.java (93%) rename impl/persistence/{bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceStore.java => api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceStore.java} (78%) create mode 100644 impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceTransaction.java delete mode 100644 impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceReader.java delete mode 100644 impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceReader.java rename impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/{BytesMapInstanceWriter.java => BytesMapInstanceTransaction.java} (60%) delete mode 100644 impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapPersistenceInstanceHandlers.java diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java similarity index 51% rename from impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceWriter.java rename to impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java index 8d305264d..0af2c32f1 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceWriter.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java @@ -13,41 +13,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.impl.persistence.bigmap; +package io.serverlessworkflow.impl.persistence; -import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.TaskContextData; import io.serverlessworkflow.impl.WorkflowContextData; -import io.serverlessworkflow.impl.WorkflowInstanceData; import io.serverlessworkflow.impl.WorkflowStatus; -import io.serverlessworkflow.impl.persistence.PersistenceInstanceWriter; import java.util.function.Consumer; -public abstract class BigMapInstanceWriter implements PersistenceInstanceWriter { +public abstract class AbstractPersistenceInstanceWriter implements PersistenceInstanceWriter { - private BigMapInstanceStore store; + private final PersistenceInstanceStore store; - protected BigMapInstanceWriter(BigMapInstanceStore store) { + protected AbstractPersistenceInstanceWriter(PersistenceInstanceStore store) { this.store = store; } - private void doTransaction(Consumer> operations) { - BigMapInstanceTransaction transaction = store.begin(); - try { - operations.accept(transaction); - transaction.commit(); - } catch (Exception ex) { - transaction.rollback(); - throw ex; - } - } - @Override public void started(WorkflowContextData workflowContext) { - doTransaction( - t -> - t.instanceData(workflowContext.definition()) - .put(key(workflowContext), marshallInstance(workflowContext.instanceData()))); + doTransaction(t -> t.writeInstanceData(key(workflowContext), workflowContext)); } @Override @@ -65,61 +48,52 @@ public void aborted(WorkflowContextData workflowContext) { removeProcessInstance(workflowContext); } + protected void removeProcessInstance(WorkflowContextData workflowContext) { + doTransaction( + t -> { + K key = key(workflowContext); + t.removeInstanceData(key, workflowContext); + t.removeStatus(key, workflowContext); + t.removeTasks(key); + }); + } + @Override - public void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext) {} + public void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext) { + // not recording + } @Override public void taskRetried(WorkflowContextData workflowContext, TaskContextData taskContext) { - doTransaction( - t -> - t.tasks(key(workflowContext)) - .put( - taskContext.position().jsonPointer(), - marshallTaskRetried(workflowContext, (TaskContext) taskContext))); + doTransaction(t -> t.writeRetryTask(key(workflowContext), workflowContext, taskContext)); } @Override public void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext) { - doTransaction( - t -> - t.tasks(key(workflowContext)) - .put( - taskContext.position().jsonPointer(), - marshallTaskCompleted(workflowContext, (TaskContext) taskContext))); + doTransaction(t -> t.writeCompletedTask(key(workflowContext), workflowContext, taskContext)); } @Override public void suspended(WorkflowContextData workflowContext) { doTransaction( - t -> - t.status(workflowContext.definition()) - .put(key(workflowContext), marshallStatus(WorkflowStatus.SUSPENDED))); + t -> t.writeStatus(key(workflowContext), WorkflowStatus.SUSPENDED, workflowContext)); } @Override public void resumed(WorkflowContextData workflowContext) { - doTransaction(t -> t.status(workflowContext.definition()).remove(key(workflowContext))); + doTransaction(t -> t.removeStatus(key(workflowContext), workflowContext)); } - protected void removeProcessInstance(WorkflowContextData workflowContext) { - doTransaction( - t -> { - K key = key(workflowContext); - t.instanceData(workflowContext.definition()).remove(key); - t.status(workflowContext.definition()).remove(key); - t.cleanupTasks(key); - }); + private void doTransaction(Consumer> operations) { + PersistenceInstanceTransaction transaction = store.begin(); + try { + operations.accept(transaction); + transaction.commit(); + } catch (Exception ex) { + transaction.rollback(); + throw ex; + } } protected abstract K key(WorkflowContextData workflowContext); - - protected abstract V marshallInstance(WorkflowInstanceData instance); - - protected abstract T marshallTaskCompleted( - WorkflowContextData workflowContext, TaskContext taskContext); - - protected abstract T marshallTaskRetried( - WorkflowContextData workflowContext, TaskContext taskContext); - - protected abstract S marshallStatus(WorkflowStatus status); } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java new file mode 100644 index 000000000..b21092f4e --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java @@ -0,0 +1,67 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowInstance; +import java.util.Collection; +import java.util.Optional; +import java.util.stream.Stream; + +public class DefaultPersistenceInstanceReader implements PersistenceInstanceReader { + + private final PersistenceInstanceStore store; + + protected DefaultPersistenceInstanceReader(PersistenceInstanceStore store) { + this.store = store; + } + + @Override + public Stream scan( + WorkflowDefinition definition, Collection instanceIds) { + PersistenceInstanceTransaction transaction = store.begin(); + return instanceIds.stream() + .map(id -> read(transaction, definition, id)) + .flatMap(Optional::stream) + .onClose(() -> transaction.commit()); + } + + @Override + public Optional find(WorkflowDefinition definition, String instanceId) { + PersistenceInstanceTransaction transaction = store.begin(); + try { + return read(transaction, definition, instanceId); + } catch (Exception ex) { + transaction.rollback(); + throw ex; + } + } + + private Optional read( + PersistenceInstanceTransaction t, WorkflowDefinition definition, String instanceId) { + return t.readWorkflowInfo(definition, instanceId) + .map(i -> new WorkflowPersistenceInstance(definition, i)); + } + + @Override + public Stream scanAll(WorkflowDefinition definition) { + PersistenceInstanceTransaction transaction = store.begin(); + return transaction + .scanAll(definition) + .onClose(() -> transaction.commit()) + .map(v -> new WorkflowPersistenceInstance(definition, v)); + } +} diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapIdInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java similarity index 77% rename from impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapIdInstanceWriter.java rename to impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java index 25ca9e4f9..07d4c3497 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapIdInstanceWriter.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java @@ -13,14 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.impl.persistence.bigmap; +package io.serverlessworkflow.impl.persistence; import io.serverlessworkflow.impl.WorkflowContextData; -public abstract class BigMapIdInstanceWriter - extends BigMapInstanceWriter { +public class DefaultPersistenceInstanceWriter extends AbstractPersistenceInstanceWriter { - protected BigMapIdInstanceWriter(BigMapInstanceStore store) { + public DefaultPersistenceInstanceWriter(PersistenceInstanceStore store) { super(store); } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java index 4d470af1f..529630f3d 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java @@ -17,15 +17,26 @@ import static io.serverlessworkflow.impl.WorkflowUtils.safeClose; -public abstract class PersistenceInstanceHandlers implements AutoCloseable { +public class PersistenceInstanceHandlers implements AutoCloseable { - protected final PersistenceInstanceWriter writer; - protected final PersistenceInstanceReader reader; + public static PersistenceInstanceHandlers from(PersistenceInstanceStore store) { + return new PersistenceInstanceHandlers<>( + new DefaultPersistenceInstanceWriter(store), + new DefaultPersistenceInstanceReader(store), + store); + } + + private final PersistenceInstanceWriter writer; + private final PersistenceInstanceReader reader; + private final PersistenceInstanceStore store; - protected PersistenceInstanceHandlers( - PersistenceInstanceWriter writer, PersistenceInstanceReader reader) { + public PersistenceInstanceHandlers( + PersistenceInstanceWriter writer, + PersistenceInstanceReader reader, + PersistenceInstanceStore store) { this.writer = writer; this.reader = reader; + this.store = store; } public PersistenceInstanceWriter writer() { @@ -38,7 +49,6 @@ public PersistenceInstanceReader reader() { @Override public void close() { - safeClose(writer); - safeClose(reader); + safeClose(store); } } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/PersistenceInstanceInfo.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceInfo.java similarity index 93% rename from impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/PersistenceInstanceInfo.java rename to impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceInfo.java index b45829868..a4a3a8350 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/PersistenceInstanceInfo.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceInfo.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.impl.persistence.bigmap; +package io.serverlessworkflow.impl.persistence; import io.serverlessworkflow.impl.WorkflowModel; import java.time.Instant; diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java index 5678e8941..3ee1ef21f 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java @@ -18,16 +18,14 @@ import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowInstance; import java.util.Collection; -import java.util.Map; import java.util.Optional; +import java.util.stream.Stream; -public interface PersistenceInstanceReader extends AutoCloseable { - Map readAll(WorkflowDefinition definition); +public interface PersistenceInstanceReader { - Map read(WorkflowDefinition definition, Collection instanceIds); + Stream scanAll(WorkflowDefinition definition); - Optional read(WorkflowDefinition definition, String instanceId); + Stream scan(WorkflowDefinition definition, Collection instanceIds); - @Override - default void close() {} + Optional find(WorkflowDefinition definition, String instanceId); } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceStore.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceStore.java similarity index 78% rename from impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceStore.java rename to impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceStore.java index aa1d998e0..4a1638419 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceStore.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceStore.java @@ -13,8 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.impl.persistence.bigmap; +package io.serverlessworkflow.impl.persistence; -public interface BigMapInstanceStore extends AutoCloseable { - BigMapInstanceTransaction begin(); +public interface PersistenceInstanceStore extends AutoCloseable { + PersistenceInstanceTransaction begin(); } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceTransaction.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceTransaction.java new file mode 100644 index 000000000..87d242af9 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceTransaction.java @@ -0,0 +1,48 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import io.serverlessworkflow.impl.TaskContextData; +import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowStatus; +import java.util.Optional; +import java.util.stream.Stream; + +public interface PersistenceInstanceTransaction { + + void commit(); + + void rollback(); + + void writeInstanceData(K key, WorkflowContextData workflowContext); + + void writeRetryTask(K key, WorkflowContextData workflowContext, TaskContextData taskContext); + + void writeCompletedTask(K key, WorkflowContextData workflowContext, TaskContextData taskContext); + + void writeStatus(K key, WorkflowStatus suspended, WorkflowContextData workflowContext); + + void removeInstanceData(K key, WorkflowContextData workflowContext); + + void removeStatus(K key, WorkflowContextData workflowContext); + + void removeTasks(K instanceId); + + Stream scanAll(WorkflowDefinition definition); + + Optional readWorkflowInfo(WorkflowDefinition definition, K key); +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java index f6b07548f..55f79faff 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java @@ -18,7 +18,7 @@ import io.serverlessworkflow.impl.TaskContextData; import io.serverlessworkflow.impl.WorkflowContextData; -public interface PersistenceInstanceWriter extends AutoCloseable { +public interface PersistenceInstanceWriter { void started(WorkflowContextData workflowContext); @@ -37,7 +37,4 @@ public interface PersistenceInstanceWriter extends AutoCloseable { void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext); void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext); - - @Override - default void close() {} } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java index 958036fc3..781b8c12a 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java @@ -15,8 +15,6 @@ */ package io.serverlessworkflow.impl.persistence; -import static io.serverlessworkflow.impl.WorkflowUtils.safeClose; - import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent; import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; @@ -80,8 +78,4 @@ public void onTaskCompleted(TaskCompletedEvent ev) { public void onTaskRetried(TaskRetriedEvent ev) { persistenceWriter.taskRetried(ev.workflowContext(), ev.taskContext()); } - - public void close() { - safeClose(persistenceWriter); - } } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceReader.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceReader.java deleted file mode 100644 index e12de9b89..000000000 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceReader.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.impl.persistence.bigmap; - -import io.serverlessworkflow.impl.WorkflowDefinition; -import io.serverlessworkflow.impl.WorkflowInstance; -import io.serverlessworkflow.impl.WorkflowStatus; -import io.serverlessworkflow.impl.persistence.PersistenceInstanceReader; -import io.serverlessworkflow.impl.persistence.PersistenceTaskInfo; -import io.serverlessworkflow.impl.persistence.PersistenceWorkflowInfo; -import io.serverlessworkflow.impl.persistence.WorkflowPersistenceInstance; -import java.util.Collection; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Optional; -import java.util.function.Function; -import java.util.stream.Collectors; - -public abstract class BigMapInstanceReader implements PersistenceInstanceReader { - - private final BigMapInstanceStore store; - - protected BigMapInstanceReader(BigMapInstanceStore store) { - this.store = store; - } - - private Result doTransaction( - Function, Result> operations) { - BigMapInstanceTransaction transaction = store.begin(); - try { - Result result = operations.apply(transaction); - transaction.commit(); - return result; - } catch (Exception ex) { - transaction.rollback(); - throw ex; - } - } - - @Override - public Map readAll(WorkflowDefinition definition) { - return doTransaction( - t -> { - Map instances = t.instanceData(definition); - Map status = t.status(definition); - return instances.entrySet().stream() - .map( - e -> - restore( - definition, - e.getKey(), - e.getValue(), - t.tasks(e.getKey()), - status.get(e.getKey()))) - .collect(Collectors.toMap(WorkflowInstance::id, i -> i)); - }); - } - - @Override - public Map read( - WorkflowDefinition definition, Collection instanceIds) { - return doTransaction( - t -> { - Map instances = t.instanceData(definition); - Map status = t.status(definition); - return instanceIds.stream() - .map(id -> read(instances, status, t.tasks(id), definition, id)) - .flatMap(Optional::stream) - .collect(Collectors.toMap(WorkflowInstance::id, id -> id)); - }); - } - - @Override - public Optional read(WorkflowDefinition definition, String instanceId) { - return doTransaction( - t -> - read( - t.instanceData(definition), - t.status(definition), - t.tasks(instanceId), - definition, - instanceId)); - } - - private Optional read( - Map instances, - Map status, - Map tasks, - WorkflowDefinition definition, - String instanceId) { - return instances.containsKey(instanceId) - ? Optional.empty() - : Optional.of( - restore( - definition, instanceId, instances.get(instanceId), tasks, status.get(instanceId))); - } - - public void close() {} - - protected WorkflowInstance restore( - WorkflowDefinition definition, - String instanceId, - V instanceData, - Map tasksData, - S status) { - return new WorkflowPersistenceInstance( - definition, readPersistenceInfo(instanceId, instanceData, tasksData, status)); - } - - protected abstract PersistenceTaskInfo unmarshallTaskInfo(T taskData); - - protected abstract PersistenceInstanceInfo unmarshallInstanceInfo(V instanceData); - - protected abstract WorkflowStatus unmarshallStatus(S statusData); - - protected PersistenceWorkflowInfo readPersistenceInfo( - String instanceId, V instanceData, Map tasksData, S status) { - PersistenceInstanceInfo instanceInfo = unmarshallInstanceInfo(instanceData); - return new PersistenceWorkflowInfo( - instanceId, - instanceInfo.startedAt(), - instanceInfo.input(), - status == null ? null : unmarshallStatus(status), - tasksData.entrySet().stream() - .collect( - Collectors.toMap(Entry::getKey, entry -> unmarshallTaskInfo(entry.getValue())))); - } -} diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java index 72b89ed17..350a3f29b 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java @@ -15,20 +15,115 @@ */ package io.serverlessworkflow.impl.persistence.bigmap; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.TaskContextData; +import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowDefinitionData; +import io.serverlessworkflow.impl.WorkflowInstanceData; +import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.persistence.PersistenceInstanceInfo; +import io.serverlessworkflow.impl.persistence.PersistenceInstanceTransaction; +import io.serverlessworkflow.impl.persistence.PersistenceTaskInfo; +import io.serverlessworkflow.impl.persistence.PersistenceWorkflowInfo; import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; -public interface BigMapInstanceTransaction { +public abstract class BigMapInstanceTransaction + implements PersistenceInstanceTransaction { - Map instanceData(WorkflowDefinitionData definition); + @Override + public void writeInstanceData(K key, WorkflowContextData workflowContext) { + instanceData(workflowContext.definition()) + .put(key, marshallInstance(workflowContext.instanceData())); + } - Map status(WorkflowDefinitionData workflowContext); + @Override + public void writeRetryTask( + K key, WorkflowContextData workflowContext, TaskContextData taskContext) { + tasks(key) + .put( + taskContext.position().jsonPointer(), + marshallTaskRetried(workflowContext, (TaskContext) taskContext)); + } - Map tasks(K instanceId); + @Override + public void writeCompletedTask( + K key, WorkflowContextData workflowContext, TaskContextData taskContext) { + tasks(key) + .put( + taskContext.position().jsonPointer(), + marshallTaskRetried(workflowContext, (TaskContext) taskContext)); + } - void cleanupTasks(K instanceId); + @Override + public Stream scanAll(WorkflowDefinition definition) { + Map instances = instanceData(definition); + Map status = status(definition); + return instances.entrySet().stream() + .map( + e -> + readPersistenceInfo( + e.getKey(), e.getValue(), tasks(e.getKey()), status.get(e.getKey()))); + } - void commit(); + @Override + public Optional readWorkflowInfo(WorkflowDefinition definition, K key) { + Map instances = instanceData(definition); + return instances.containsKey(key) + ? Optional.empty() + : Optional.of( + readPersistenceInfo(key, instances.get(key), tasks(key), status(definition).get(key))); + } - void rollback(); + @Override + public void writeStatus(K key, WorkflowStatus status, WorkflowContextData workflowContext) { + status(workflowContext.definition()).put(key, marshallStatus(status)); + } + + public void removeInstanceData(K key, WorkflowContextData workflowContext) { + instanceData(workflowContext.definition()).remove(key); + } + + public void removeStatus(K key, WorkflowContextData workflowContext) { + status(workflowContext.definition()).remove(key); + } + + protected PersistenceWorkflowInfo readPersistenceInfo( + K instanceId, V instanceData, Map tasksData, S status) { + PersistenceInstanceInfo instanceInfo = unmarshallInstanceInfo(instanceData); + return new PersistenceWorkflowInfo( + instanceId.toString(), + instanceInfo.startedAt(), + instanceInfo.input(), + status == null ? null : unmarshallStatus(status), + tasksData.entrySet().stream() + .collect( + Collectors.toMap(Entry::getKey, entry -> unmarshallTaskInfo(entry.getValue())))); + } + + protected abstract Map instanceData(WorkflowDefinitionData definition); + + protected abstract Map status(WorkflowDefinitionData workflowContext); + + protected abstract Map tasks(K instanceId); + + protected abstract V marshallInstance(WorkflowInstanceData instance); + + protected abstract T marshallTaskCompleted( + WorkflowContextData workflowContext, TaskContext taskContext); + + protected abstract T marshallTaskRetried( + WorkflowContextData workflowContext, TaskContext taskContext); + + protected abstract S marshallStatus(WorkflowStatus status); + + protected abstract PersistenceTaskInfo unmarshallTaskInfo(T taskData); + + protected abstract PersistenceInstanceInfo unmarshallInstanceInfo(V instanceData); + + protected abstract WorkflowStatus unmarshallStatus(S statusData); } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceReader.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceReader.java deleted file mode 100644 index cd92e2df1..000000000 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceReader.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.impl.persistence.bigmap; - -import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowStatus; -import io.serverlessworkflow.impl.marshaller.WorkflowBufferFactory; -import io.serverlessworkflow.impl.marshaller.WorkflowInputBuffer; -import io.serverlessworkflow.impl.persistence.CompletedTaskInfo; -import io.serverlessworkflow.impl.persistence.PersistenceTaskInfo; -import io.serverlessworkflow.impl.persistence.RetriedTaskInfo; -import java.io.ByteArrayInputStream; - -public class BytesMapInstanceReader extends BigMapInstanceReader { - - private final WorkflowBufferFactory factory; - - public BytesMapInstanceReader( - BigMapInstanceStore store, WorkflowBufferFactory factory) { - super(store); - this.factory = factory; - } - - @Override - protected PersistenceTaskInfo unmarshallTaskInfo(byte[] taskData) { - try (WorkflowInputBuffer buffer = factory.input(new ByteArrayInputStream(taskData))) { - byte version = buffer.readByte(); - switch (version) { - case MarshallingUtils.VERSION_0: - default: - return readVersion0(buffer); - case MarshallingUtils.VERSION_1: - return readVersion1(buffer); - } - } - } - - private PersistenceTaskInfo readVersion1(WorkflowInputBuffer buffer) { - TaskStatus taskStatus = buffer.readEnum(TaskStatus.class); - switch (taskStatus) { - case COMPLETED: - default: - return readVersion0(buffer); - case RETRIED: - return new RetriedTaskInfo(buffer.readShort()); - } - } - - private PersistenceTaskInfo readVersion0(WorkflowInputBuffer buffer) { - return new CompletedTaskInfo( - buffer.readInstant(), - (WorkflowModel) buffer.readObject(), - (WorkflowModel) buffer.readObject(), - buffer.readBoolean(), - buffer.readBoolean() ? buffer.readString() : null); - } - - @Override - protected PersistenceInstanceInfo unmarshallInstanceInfo(byte[] instanceData) { - try (WorkflowInputBuffer buffer = factory.input(new ByteArrayInputStream(instanceData))) { - buffer.readByte(); // version byte not used at the moment - return new PersistenceInstanceInfo(buffer.readInstant(), (WorkflowModel) buffer.readObject()); - } - } - - @Override - protected WorkflowStatus unmarshallStatus(byte[] statusData) { - try (WorkflowInputBuffer buffer = factory.input(new ByteArrayInputStream(statusData))) { - buffer.readByte(); // version byte not used at the moment - return buffer.readEnum(WorkflowStatus.class); - } - } -} diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceWriter.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java similarity index 60% rename from impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceWriter.java rename to impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java index 279bc0937..f54b2064c 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceWriter.java +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java @@ -23,16 +23,21 @@ import io.serverlessworkflow.impl.executors.AbstractTaskExecutor; import io.serverlessworkflow.impl.executors.TaskExecutor; import io.serverlessworkflow.impl.marshaller.WorkflowBufferFactory; +import io.serverlessworkflow.impl.marshaller.WorkflowInputBuffer; import io.serverlessworkflow.impl.marshaller.WorkflowOutputBuffer; +import io.serverlessworkflow.impl.persistence.CompletedTaskInfo; +import io.serverlessworkflow.impl.persistence.PersistenceInstanceInfo; +import io.serverlessworkflow.impl.persistence.PersistenceTaskInfo; +import io.serverlessworkflow.impl.persistence.RetriedTaskInfo; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -public class BytesMapInstanceWriter extends BigMapIdInstanceWriter { +public abstract class BytesMapInstanceTransaction + extends BigMapInstanceTransaction { private final WorkflowBufferFactory factory; - public BytesMapInstanceWriter( - BigMapInstanceStore store, WorkflowBufferFactory factory) { - super(store); + protected BytesMapInstanceTransaction(WorkflowBufferFactory factory) { this.factory = factory; } @@ -95,4 +100,54 @@ protected byte[] marshallTaskRetried( } return bytes.toByteArray(); } + + @Override + protected PersistenceTaskInfo unmarshallTaskInfo(byte[] taskData) { + try (WorkflowInputBuffer buffer = factory.input(new ByteArrayInputStream(taskData))) { + byte version = buffer.readByte(); + switch (version) { + case MarshallingUtils.VERSION_0: + default: + return readVersion0(buffer); + case MarshallingUtils.VERSION_1: + return readVersion1(buffer); + } + } + } + + private PersistenceTaskInfo readVersion1(WorkflowInputBuffer buffer) { + TaskStatus taskStatus = buffer.readEnum(TaskStatus.class); + switch (taskStatus) { + case COMPLETED: + default: + return readVersion0(buffer); + case RETRIED: + return new RetriedTaskInfo(buffer.readShort()); + } + } + + private PersistenceTaskInfo readVersion0(WorkflowInputBuffer buffer) { + return new CompletedTaskInfo( + buffer.readInstant(), + (WorkflowModel) buffer.readObject(), + (WorkflowModel) buffer.readObject(), + buffer.readBoolean(), + buffer.readBoolean() ? buffer.readString() : null); + } + + @Override + protected PersistenceInstanceInfo unmarshallInstanceInfo(byte[] instanceData) { + try (WorkflowInputBuffer buffer = factory.input(new ByteArrayInputStream(instanceData))) { + buffer.readByte(); // version byte not used at the moment + return new PersistenceInstanceInfo(buffer.readInstant(), (WorkflowModel) buffer.readObject()); + } + } + + @Override + protected WorkflowStatus unmarshallStatus(byte[] statusData) { + try (WorkflowInputBuffer buffer = factory.input(new ByteArrayInputStream(statusData))) { + buffer.readByte(); // version byte not used at the moment + return buffer.readEnum(WorkflowStatus.class); + } + } } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapPersistenceInstanceHandlers.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapPersistenceInstanceHandlers.java deleted file mode 100644 index e77678886..000000000 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapPersistenceInstanceHandlers.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.impl.persistence.bigmap; - -import static io.serverlessworkflow.impl.WorkflowUtils.safeClose; - -import io.serverlessworkflow.impl.marshaller.DefaultBufferFactory; -import io.serverlessworkflow.impl.marshaller.WorkflowBufferFactory; -import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers; -import io.serverlessworkflow.impl.persistence.PersistenceInstanceReader; -import io.serverlessworkflow.impl.persistence.PersistenceInstanceWriter; - -public class BytesMapPersistenceInstanceHandlers extends PersistenceInstanceHandlers - implements AutoCloseable { - - private final BigMapInstanceStore store; - - protected BytesMapPersistenceInstanceHandlers( - PersistenceInstanceWriter writer, - PersistenceInstanceReader reader, - BigMapInstanceStore store) { - super(writer, reader); - this.store = store; - } - - public static class Builder { - private final BigMapInstanceStore store; - private WorkflowBufferFactory factory; - - private Builder(BigMapInstanceStore store) { - this.store = store; - } - - public Builder withFactory(WorkflowBufferFactory factory) { - this.factory = factory; - return this; - } - - public PersistenceInstanceHandlers build() { - if (factory == null) { - factory = DefaultBufferFactory.factory(); - } - return new BytesMapPersistenceInstanceHandlers( - new BytesMapInstanceWriter(store, factory), - new BytesMapInstanceReader(store, factory), - store); - } - } - - public static Builder builder(BigMapInstanceStore store) { - return new Builder(store); - } - - @Override - public void close() { - super.close(); - safeClose(store); - } -} diff --git a/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java b/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java index 0f206f9bf..cfd1a475c 100644 --- a/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java +++ b/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java @@ -15,26 +15,33 @@ */ package io.serverlessworkflow.impl.persistence.mvstore; -import io.serverlessworkflow.impl.persistence.bigmap.BigMapInstanceStore; +import io.serverlessworkflow.impl.marshaller.DefaultBufferFactory; +import io.serverlessworkflow.impl.marshaller.WorkflowBufferFactory; +import io.serverlessworkflow.impl.persistence.PersistenceInstanceStore; import io.serverlessworkflow.impl.persistence.bigmap.BigMapInstanceTransaction; import org.h2.mvstore.MVStore; import org.h2.mvstore.tx.TransactionStore; -public class MVStorePersistenceStore - implements BigMapInstanceStore { +public class MVStorePersistenceStore implements PersistenceInstanceStore { private final TransactionStore mvStore; + private WorkflowBufferFactory factory; public MVStorePersistenceStore(String dbName) { + this(dbName, DefaultBufferFactory.factory()); + } + + public MVStorePersistenceStore(String dbName, WorkflowBufferFactory factory) { this.mvStore = new TransactionStore(MVStore.open(dbName)); + this.factory = factory; } @Override - public void close() { + public void close() throws Exception { mvStore.close(); } @Override public BigMapInstanceTransaction begin() { - return new MVStoreTransaction(mvStore.begin()); + return new MVStoreTransaction(mvStore.begin(), factory); } } diff --git a/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStoreTransaction.java b/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStoreTransaction.java index 66b09499e..d52655923 100644 --- a/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStoreTransaction.java +++ b/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStoreTransaction.java @@ -18,19 +18,20 @@ import io.serverlessworkflow.api.types.Document; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.WorkflowDefinitionData; -import io.serverlessworkflow.impl.persistence.bigmap.BigMapInstanceTransaction; +import io.serverlessworkflow.impl.marshaller.WorkflowBufferFactory; +import io.serverlessworkflow.impl.persistence.bigmap.BytesMapInstanceTransaction; import java.util.Map; import org.h2.mvstore.tx.Transaction; import org.h2.mvstore.tx.TransactionMap; -public class MVStoreTransaction - implements BigMapInstanceTransaction { +public class MVStoreTransaction extends BytesMapInstanceTransaction { protected static final String ID_SEPARATOR = "-"; private final Transaction transaction; - public MVStoreTransaction(Transaction transaction) { + public MVStoreTransaction(Transaction transaction, WorkflowBufferFactory factory) { + super(factory); this.transaction = transaction; } @@ -55,7 +56,7 @@ public Map status(WorkflowDefinitionData workflowContext) { } @Override - public void cleanupTasks(String instanceId) { + public void removeTasks(String instanceId) { transaction.removeMap(taskMap(instanceId)); } diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java index f1a6adba0..7f33418b2 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java @@ -22,7 +22,6 @@ import io.serverlessworkflow.impl.WorkflowInstance; import io.serverlessworkflow.impl.persistence.PersistenceApplicationBuilder; import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers; -import io.serverlessworkflow.impl.persistence.bigmap.BytesMapPersistenceInstanceHandlers; import io.serverlessworkflow.impl.persistence.mvstore.MVStorePersistenceStore; import java.io.IOException; import java.nio.file.Files; @@ -43,9 +42,8 @@ public static void main(String[] args) throws IOException { private static void runInstance(String dbName, boolean suspend) throws IOException { LOG.info("---> Generating db samples at {}", dbName); Files.deleteIfExists(Path.of(dbName)); - try (PersistenceInstanceHandlers factories = - BytesMapPersistenceInstanceHandlers.builder(new MVStorePersistenceStore(dbName)) - .build(); + try (PersistenceInstanceHandlers factories = + PersistenceInstanceHandlers.from(new MVStorePersistenceStore(dbName)); WorkflowApplication application = PersistenceApplicationBuilder.builder( WorkflowApplication.builder().withListener(new TraceExecutionListener()), diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java index d814a25f5..ee57ee3c9 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java @@ -24,7 +24,6 @@ import io.serverlessworkflow.impl.WorkflowStatus; import io.serverlessworkflow.impl.persistence.PersistenceApplicationBuilder; import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers; -import io.serverlessworkflow.impl.persistence.bigmap.BytesMapPersistenceInstanceHandlers; import io.serverlessworkflow.impl.persistence.mvstore.MVStorePersistenceStore; import java.io.IOException; import java.nio.file.Files; @@ -38,18 +37,17 @@ public class MvStorePersistenceTest { @Test void testSimpleRun() throws IOException { final String dbName = "db-samples/simple.db"; - try (PersistenceInstanceHandlers handlers = - BytesMapPersistenceInstanceHandlers.builder(new MVStorePersistenceStore(dbName)) - .build(); + try (PersistenceInstanceHandlers handlers = + PersistenceInstanceHandlers.from(new MVStorePersistenceStore(dbName)); WorkflowApplication application = PersistenceApplicationBuilder.builder(WorkflowApplication.builder(), handlers.writer()) .build(); ) { WorkflowDefinition definition = application.workflowDefinition( readWorkflowFromClasspath("workflows-samples/simple-expression.yaml")); - assertThat(handlers.reader().readAll(definition).values()).isEmpty(); + assertThat(handlers.reader().scanAll(definition).count()).isEqualTo(0); definition.instance(Map.of()).start().join(); - assertThat(handlers.reader().readAll(definition).values()).isEmpty(); + assertThat(handlers.reader().scanAll(definition).count()).isEqualTo(0); } finally { Files.delete(Path.of(dbName)); } @@ -92,9 +90,8 @@ void testRestoreSuspendedInstanceV1() throws IOException { private void runIt(String dbName, WorkflowStatus expectedStatus) throws IOException { TaskCounterPerInstanceListener taskCounter = new TaskCounterPerInstanceListener(); - try (PersistenceInstanceHandlers handlers = - BytesMapPersistenceInstanceHandlers.builder(new MVStorePersistenceStore(dbName)) - .build(); + try (PersistenceInstanceHandlers handlers = + PersistenceInstanceHandlers.from(new MVStorePersistenceStore(dbName)); WorkflowApplication application = PersistenceApplicationBuilder.builder( WorkflowApplication.builder() @@ -105,7 +102,7 @@ private void runIt(String dbName, WorkflowStatus expectedStatus) throws IOExcept WorkflowDefinition definition = application.workflowDefinition( readWorkflowFromClasspath("workflows-samples/set-listen-to-any.yaml")); - Collection instances = handlers.reader().readAll(definition).values(); + Collection instances = handlers.reader().scanAll(definition).toList(); assertThat(instances).hasSize(1); instances.forEach(WorkflowInstance::start); assertThat(instances) From fc480de308bce523a785b836362a66e9e851ebc7 Mon Sep 17 00:00:00 2001 From: fjtirado Date: Mon, 26 Jan 2026 23:16:17 +0100 Subject: [PATCH 2/2] After review modifications Signed-off-by: fjtirado --- .../impl/WorkflowDefinition.java | 5 + .../impl/WorkflowDefinitionData.java | 2 + .../impl/WorkflowDefinitionId.java | 4 + .../impl/marshaller/MarshallingUtils.java | 59 +++++++++++ .../impl/marshaller}/TaskStatus.java | 4 +- .../AbstractPersistenceInstanceWriter.java | 99 ------------------- .../DefaultPersistenceInstanceHandlers.java | 43 ++++++++ .../DefaultPersistenceInstanceReader.java | 12 +-- .../DefaultPersistenceInstanceWriter.java | 70 ++++++++++++- .../PersistenceInstanceHandlers.java | 21 +--- .../persistence/PersistenceInstanceStore.java | 7 +- .../PersistenceInstanceTransaction.java | 19 ++-- .../bigmap/BigMapInstanceTransaction.java | 64 +++++++----- .../bigmap/BytesMapInstanceTransaction.java | 29 +++--- .../persistence/bigmap/MarshallingUtils.java | 24 ----- .../mvstore/MVStorePersistenceStore.java | 6 +- .../impl/test/DBGenerator.java | 5 +- .../impl/test/MvStorePersistenceTest.java | 44 ++++++--- 18 files changed, 292 insertions(+), 225 deletions(-) create mode 100644 impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/MarshallingUtils.java rename impl/persistence/{bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap => api/src/main/java/io/serverlessworkflow/impl/marshaller}/TaskStatus.java (90%) delete mode 100644 impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java create mode 100644 impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java delete mode 100644 impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/MarshallingUtils.java diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java index 6fe8856ad..503cc3829 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java @@ -165,6 +165,11 @@ public void addTaskExecutor(WorkflowMutablePosition position, TaskExecutor ta executors.put(position.jsonPointer(), taskExecutor); } + @Override + public WorkflowDefinitionId id() { + return definitionId; + } + @Override public void close() { safeClose(resourceLoader); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinitionData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinitionData.java index 8a0d5c0af..9466c497f 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinitionData.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinitionData.java @@ -22,4 +22,6 @@ public interface WorkflowDefinitionData { Workflow workflow(); WorkflowApplication application(); + + WorkflowDefinitionId id(); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinitionId.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinitionId.java index d69872cfa..b0f37ef3a 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinitionId.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinitionId.java @@ -32,4 +32,8 @@ public static WorkflowDefinitionId of(Workflow workflow) { public static WorkflowDefinitionId fromName(String name) { return new WorkflowDefinitionId(DEFAULT_NAMESPACE, name, DEFAULT_VERSION); } + + public String toString(String separator) { + return namespace + separator + name + separator + version; + } } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/MarshallingUtils.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/MarshallingUtils.java new file mode 100644 index 000000000..ff4c1db56 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/MarshallingUtils.java @@ -0,0 +1,59 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.marshaller; + +import io.serverlessworkflow.impl.WorkflowModel; +import java.io.ByteArrayOutputStream; +import java.time.Instant; +import java.util.function.BiConsumer; + +public class MarshallingUtils { + + private MarshallingUtils() {} + + public static byte[] writeInstant(WorkflowBufferFactory factory, Instant instant) { + return writeValue(factory, instant, (b, v) -> b.writeInstant(v)); + } + + public static byte[] writeEnum(WorkflowBufferFactory factory, Enum enumInstance) { + return writeValue(factory, enumInstance, (b, v) -> b.writeEnum(v)); + } + + public static byte[] writeModel(WorkflowBufferFactory factory, WorkflowModel model) { + return writeValue(factory, model, (b, v) -> b.writeObject(v)); + } + + public static byte[] writeShort(WorkflowBufferFactory factory, short value) { + return writeValue(factory, value, (b, v) -> b.writeShort(value)); + } + + public static byte[] writeBoolean(WorkflowBufferFactory factory, boolean value) { + return writeValue(factory, value, (b, v) -> b.writeBoolean(value)); + } + + public static byte[] writeString(WorkflowBufferFactory factory, String value) { + return writeValue(factory, value, (b, v) -> b.writeString(value)); + } + + private static byte[] writeValue( + WorkflowBufferFactory factory, T value, BiConsumer valueConsumer) { + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + try (WorkflowOutputBuffer buffer = factory.output(bytesOut)) { + valueConsumer.accept(buffer, value); + } + return bytesOut.toByteArray(); + } +} diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/TaskStatus.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/TaskStatus.java similarity index 90% rename from impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/TaskStatus.java rename to impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/TaskStatus.java index 5db1a57cd..74bdbfc3d 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/TaskStatus.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/TaskStatus.java @@ -13,9 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.impl.persistence.bigmap; +package io.serverlessworkflow.impl.marshaller; -enum TaskStatus { +public enum TaskStatus { COMPLETED, RETRIED } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java deleted file mode 100644 index 0af2c32f1..000000000 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.impl.persistence; - -import io.serverlessworkflow.impl.TaskContextData; -import io.serverlessworkflow.impl.WorkflowContextData; -import io.serverlessworkflow.impl.WorkflowStatus; -import java.util.function.Consumer; - -public abstract class AbstractPersistenceInstanceWriter implements PersistenceInstanceWriter { - - private final PersistenceInstanceStore store; - - protected AbstractPersistenceInstanceWriter(PersistenceInstanceStore store) { - this.store = store; - } - - @Override - public void started(WorkflowContextData workflowContext) { - doTransaction(t -> t.writeInstanceData(key(workflowContext), workflowContext)); - } - - @Override - public void completed(WorkflowContextData workflowContext) { - removeProcessInstance(workflowContext); - } - - @Override - public void failed(WorkflowContextData workflowContext, Throwable ex) { - removeProcessInstance(workflowContext); - } - - @Override - public void aborted(WorkflowContextData workflowContext) { - removeProcessInstance(workflowContext); - } - - protected void removeProcessInstance(WorkflowContextData workflowContext) { - doTransaction( - t -> { - K key = key(workflowContext); - t.removeInstanceData(key, workflowContext); - t.removeStatus(key, workflowContext); - t.removeTasks(key); - }); - } - - @Override - public void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext) { - // not recording - } - - @Override - public void taskRetried(WorkflowContextData workflowContext, TaskContextData taskContext) { - doTransaction(t -> t.writeRetryTask(key(workflowContext), workflowContext, taskContext)); - } - - @Override - public void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext) { - doTransaction(t -> t.writeCompletedTask(key(workflowContext), workflowContext, taskContext)); - } - - @Override - public void suspended(WorkflowContextData workflowContext) { - doTransaction( - t -> t.writeStatus(key(workflowContext), WorkflowStatus.SUSPENDED, workflowContext)); - } - - @Override - public void resumed(WorkflowContextData workflowContext) { - doTransaction(t -> t.removeStatus(key(workflowContext), workflowContext)); - } - - private void doTransaction(Consumer> operations) { - PersistenceInstanceTransaction transaction = store.begin(); - try { - operations.accept(transaction); - transaction.commit(); - } catch (Exception ex) { - transaction.rollback(); - throw ex; - } - } - - protected abstract K key(WorkflowContextData workflowContext); -} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java new file mode 100644 index 000000000..8231a0078 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java @@ -0,0 +1,43 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import static io.serverlessworkflow.impl.WorkflowUtils.safeClose; + +public class DefaultPersistenceInstanceHandlers extends PersistenceInstanceHandlers { + + private final PersistenceInstanceStore store; + + public static DefaultPersistenceInstanceHandlers from(PersistenceInstanceStore store) { + return new DefaultPersistenceInstanceHandlers( + new DefaultPersistenceInstanceWriter(store), + new DefaultPersistenceInstanceReader(store), + store); + } + + private DefaultPersistenceInstanceHandlers( + PersistenceInstanceWriter writer, + PersistenceInstanceReader reader, + PersistenceInstanceStore store) { + super(writer, reader); + this.store = store; + } + + @Override + public void close() { + safeClose(store); + } +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java index b21092f4e..25095ab11 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java @@ -23,16 +23,16 @@ public class DefaultPersistenceInstanceReader implements PersistenceInstanceReader { - private final PersistenceInstanceStore store; + private final PersistenceInstanceStore store; - protected DefaultPersistenceInstanceReader(PersistenceInstanceStore store) { + protected DefaultPersistenceInstanceReader(PersistenceInstanceStore store) { this.store = store; } @Override public Stream scan( WorkflowDefinition definition, Collection instanceIds) { - PersistenceInstanceTransaction transaction = store.begin(); + PersistenceInstanceTransaction transaction = store.begin(); return instanceIds.stream() .map(id -> read(transaction, definition, id)) .flatMap(Optional::stream) @@ -41,7 +41,7 @@ public Stream scan( @Override public Optional find(WorkflowDefinition definition, String instanceId) { - PersistenceInstanceTransaction transaction = store.begin(); + PersistenceInstanceTransaction transaction = store.begin(); try { return read(transaction, definition, instanceId); } catch (Exception ex) { @@ -51,14 +51,14 @@ public Optional find(WorkflowDefinition definition, String ins } private Optional read( - PersistenceInstanceTransaction t, WorkflowDefinition definition, String instanceId) { + PersistenceInstanceTransaction t, WorkflowDefinition definition, String instanceId) { return t.readWorkflowInfo(definition, instanceId) .map(i -> new WorkflowPersistenceInstance(definition, i)); } @Override public Stream scanAll(WorkflowDefinition definition) { - PersistenceInstanceTransaction transaction = store.begin(); + PersistenceInstanceTransaction transaction = store.begin(); return transaction .scanAll(definition) .onClose(() -> transaction.commit()) diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java index 07d4c3497..4258db856 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java @@ -15,16 +15,76 @@ */ package io.serverlessworkflow.impl.persistence; +import io.serverlessworkflow.impl.TaskContextData; import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowStatus; +import java.util.function.Consumer; -public class DefaultPersistenceInstanceWriter extends AbstractPersistenceInstanceWriter { +public class DefaultPersistenceInstanceWriter implements PersistenceInstanceWriter { - public DefaultPersistenceInstanceWriter(PersistenceInstanceStore store) { - super(store); + private final PersistenceInstanceStore store; + + protected DefaultPersistenceInstanceWriter(PersistenceInstanceStore store) { + this.store = store; + } + + @Override + public void started(WorkflowContextData workflowContext) { + doTransaction(t -> t.writeInstanceData(workflowContext)); + } + + @Override + public void completed(WorkflowContextData workflowContext) { + removeProcessInstance(workflowContext); + } + + @Override + public void failed(WorkflowContextData workflowContext, Throwable ex) { + removeProcessInstance(workflowContext); + } + + @Override + public void aborted(WorkflowContextData workflowContext) { + removeProcessInstance(workflowContext); + } + + protected void removeProcessInstance(WorkflowContextData workflowContext) { + doTransaction(t -> t.removeProcessInstance(workflowContext)); } @Override - protected String key(WorkflowContextData workflowContext) { - return workflowContext.instanceData().id(); + public void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext) { + // not recording + } + + @Override + public void taskRetried(WorkflowContextData workflowContext, TaskContextData taskContext) { + doTransaction(t -> t.writeRetryTask(workflowContext, taskContext)); + } + + @Override + public void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext) { + doTransaction(t -> t.writeCompletedTask(workflowContext, taskContext)); + } + + @Override + public void suspended(WorkflowContextData workflowContext) { + doTransaction(t -> t.writeStatus(workflowContext, WorkflowStatus.SUSPENDED)); + } + + @Override + public void resumed(WorkflowContextData workflowContext) { + doTransaction(t -> t.clearStatus(workflowContext)); + } + + private void doTransaction(Consumer operations) { + PersistenceInstanceTransaction transaction = store.begin(); + try { + operations.accept(transaction); + transaction.commit(); + } catch (Exception ex) { + transaction.rollback(); + throw ex; + } } } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java index 529630f3d..84dd96c48 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java @@ -15,28 +15,15 @@ */ package io.serverlessworkflow.impl.persistence; -import static io.serverlessworkflow.impl.WorkflowUtils.safeClose; - -public class PersistenceInstanceHandlers implements AutoCloseable { - - public static PersistenceInstanceHandlers from(PersistenceInstanceStore store) { - return new PersistenceInstanceHandlers<>( - new DefaultPersistenceInstanceWriter(store), - new DefaultPersistenceInstanceReader(store), - store); - } +public class PersistenceInstanceHandlers implements AutoCloseable { private final PersistenceInstanceWriter writer; private final PersistenceInstanceReader reader; - private final PersistenceInstanceStore store; public PersistenceInstanceHandlers( - PersistenceInstanceWriter writer, - PersistenceInstanceReader reader, - PersistenceInstanceStore store) { + PersistenceInstanceWriter writer, PersistenceInstanceReader reader) { this.writer = writer; this.reader = reader; - this.store = store; } public PersistenceInstanceWriter writer() { @@ -48,7 +35,5 @@ public PersistenceInstanceReader reader() { } @Override - public void close() { - safeClose(store); - } + public void close() {} } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceStore.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceStore.java index 4a1638419..99c5096e0 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceStore.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceStore.java @@ -15,6 +15,9 @@ */ package io.serverlessworkflow.impl.persistence; -public interface PersistenceInstanceStore extends AutoCloseable { - PersistenceInstanceTransaction begin(); +public interface PersistenceInstanceStore extends AutoCloseable { + PersistenceInstanceTransaction begin(); + + @Override + default void close() {} } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceTransaction.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceTransaction.java index 87d242af9..016a628c8 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceTransaction.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceTransaction.java @@ -22,27 +22,26 @@ import java.util.Optional; import java.util.stream.Stream; -public interface PersistenceInstanceTransaction { +public interface PersistenceInstanceTransaction { void commit(); void rollback(); - void writeInstanceData(K key, WorkflowContextData workflowContext); + void writeInstanceData(WorkflowContextData workflowContext); - void writeRetryTask(K key, WorkflowContextData workflowContext, TaskContextData taskContext); + void writeRetryTask(WorkflowContextData workflowContext, TaskContextData taskContext); - void writeCompletedTask(K key, WorkflowContextData workflowContext, TaskContextData taskContext); + void writeCompletedTask(WorkflowContextData workflowContext, TaskContextData taskContext); - void writeStatus(K key, WorkflowStatus suspended, WorkflowContextData workflowContext); + void writeStatus(WorkflowContextData workflowContext, WorkflowStatus suspended); - void removeInstanceData(K key, WorkflowContextData workflowContext); + void removeProcessInstance(WorkflowContextData workflowContext); - void removeStatus(K key, WorkflowContextData workflowContext); - - void removeTasks(K instanceId); + void clearStatus(WorkflowContextData workflowContext); Stream scanAll(WorkflowDefinition definition); - Optional readWorkflowInfo(WorkflowDefinition definition, K key); + Optional readWorkflowInfo( + WorkflowDefinition definition, String instanceId); } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java index 350a3f29b..0e26a7dc3 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java @@ -32,28 +32,25 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -public abstract class BigMapInstanceTransaction - implements PersistenceInstanceTransaction { +public abstract class BigMapInstanceTransaction implements PersistenceInstanceTransaction { @Override - public void writeInstanceData(K key, WorkflowContextData workflowContext) { + public void writeInstanceData(WorkflowContextData workflowContext) { instanceData(workflowContext.definition()) - .put(key, marshallInstance(workflowContext.instanceData())); + .put(key(workflowContext), marshallInstance(workflowContext.instanceData())); } @Override - public void writeRetryTask( - K key, WorkflowContextData workflowContext, TaskContextData taskContext) { - tasks(key) + public void writeRetryTask(WorkflowContextData workflowContext, TaskContextData taskContext) { + tasks(key(workflowContext)) .put( taskContext.position().jsonPointer(), marshallTaskRetried(workflowContext, (TaskContext) taskContext)); } @Override - public void writeCompletedTask( - K key, WorkflowContextData workflowContext, TaskContextData taskContext) { - tasks(key) + public void writeCompletedTask(WorkflowContextData workflowContext, TaskContextData taskContext) { + tasks(key(workflowContext)) .put( taskContext.position().jsonPointer(), marshallTaskRetried(workflowContext, (TaskContext) taskContext)); @@ -61,8 +58,8 @@ public void writeCompletedTask( @Override public Stream scanAll(WorkflowDefinition definition) { - Map instances = instanceData(definition); - Map status = status(definition); + Map instances = instanceData(definition); + Map status = status(definition); return instances.entrySet().stream() .map( e -> @@ -71,8 +68,9 @@ public Stream scanAll(WorkflowDefinition definition) { } @Override - public Optional readWorkflowInfo(WorkflowDefinition definition, K key) { - Map instances = instanceData(definition); + public Optional readWorkflowInfo( + WorkflowDefinition definition, String key) { + Map instances = instanceData(definition); return instances.containsKey(key) ? Optional.empty() : Optional.of( @@ -80,23 +78,33 @@ public Optional readWorkflowInfo(WorkflowDefinition def } @Override - public void writeStatus(K key, WorkflowStatus status, WorkflowContextData workflowContext) { - status(workflowContext.definition()).put(key, marshallStatus(status)); + public void writeStatus(WorkflowContextData workflowContext, WorkflowStatus status) { + status(workflowContext.definition()).put(key(workflowContext), marshallStatus(status)); } - public void removeInstanceData(K key, WorkflowContextData workflowContext) { - instanceData(workflowContext.definition()).remove(key); + @Override + public void removeProcessInstance(WorkflowContextData workflowContext) { + String key = key(workflowContext); + WorkflowDefinitionData definition = workflowContext.definition(); + instanceData(definition).remove(key); + clearStatus(definition, key); + removeTasks(key); + } + + @Override + public void clearStatus(WorkflowContextData workflowContext) { + clearStatus(workflowContext.definition(), key(workflowContext)); } - public void removeStatus(K key, WorkflowContextData workflowContext) { - status(workflowContext.definition()).remove(key); + private void clearStatus(WorkflowDefinitionData definition, String key) { + status(definition).remove(key); } protected PersistenceWorkflowInfo readPersistenceInfo( - K instanceId, V instanceData, Map tasksData, S status) { + String instanceId, V instanceData, Map tasksData, S status) { PersistenceInstanceInfo instanceInfo = unmarshallInstanceInfo(instanceData); return new PersistenceWorkflowInfo( - instanceId.toString(), + instanceId, instanceInfo.startedAt(), instanceInfo.input(), status == null ? null : unmarshallStatus(status), @@ -105,11 +113,15 @@ protected PersistenceWorkflowInfo readPersistenceInfo( Collectors.toMap(Entry::getKey, entry -> unmarshallTaskInfo(entry.getValue())))); } - protected abstract Map instanceData(WorkflowDefinitionData definition); + private String key(WorkflowContextData workflowContext) { + return workflowContext.instanceData().id(); + } + + protected abstract Map instanceData(WorkflowDefinitionData definition); - protected abstract Map status(WorkflowDefinitionData workflowContext); + protected abstract Map status(WorkflowDefinitionData workflowContext); - protected abstract Map tasks(K instanceId); + protected abstract Map tasks(String instanceId); protected abstract V marshallInstance(WorkflowInstanceData instance); @@ -126,4 +138,6 @@ protected abstract T marshallTaskRetried( protected abstract PersistenceInstanceInfo unmarshallInstanceInfo(V instanceData); protected abstract WorkflowStatus unmarshallStatus(S statusData); + + protected abstract void removeTasks(String key); } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java index f54b2064c..464bfa5cb 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java @@ -21,7 +21,8 @@ import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowStatus; import io.serverlessworkflow.impl.executors.AbstractTaskExecutor; -import io.serverlessworkflow.impl.executors.TaskExecutor; +import io.serverlessworkflow.impl.executors.TransitionInfo; +import io.serverlessworkflow.impl.marshaller.TaskStatus; import io.serverlessworkflow.impl.marshaller.WorkflowBufferFactory; import io.serverlessworkflow.impl.marshaller.WorkflowInputBuffer; import io.serverlessworkflow.impl.marshaller.WorkflowOutputBuffer; @@ -33,7 +34,10 @@ import java.io.ByteArrayOutputStream; public abstract class BytesMapInstanceTransaction - extends BigMapInstanceTransaction { + extends BigMapInstanceTransaction { + + private static final byte VERSION_0 = 0; + private static final byte VERSION_1 = 1; private final WorkflowBufferFactory factory; @@ -45,22 +49,21 @@ protected BytesMapInstanceTransaction(WorkflowBufferFactory factory) { protected byte[] marshallTaskCompleted(WorkflowContextData contextData, TaskContext taskContext) { ByteArrayOutputStream bytes = new ByteArrayOutputStream(); try (WorkflowOutputBuffer writer = factory.output(bytes)) { - writer.writeByte(MarshallingUtils.VERSION_1); + writer.writeByte(VERSION_1); writer.writeEnum(TaskStatus.COMPLETED); writer.writeInstant(taskContext.completedAt()); writeModel(writer, taskContext.output()); writeModel(writer, contextData.context()); - boolean isEndNode = taskContext.transition().isEndNode(); - writer.writeBoolean(isEndNode); - TaskExecutor next = taskContext.transition().next(); + TransitionInfo transition = taskContext.transition(); + writer.writeBoolean(transition.isEndNode()); + AbstractTaskExecutor next = (AbstractTaskExecutor) transition.next(); if (next == null) { writer.writeBoolean(false); } else { writer.writeBoolean(true); - writer.writeString(((AbstractTaskExecutor) next).position().jsonPointer()); + writer.writeString(next.position().jsonPointer()); } } - return bytes.toByteArray(); } @@ -68,7 +71,7 @@ protected byte[] marshallTaskCompleted(WorkflowContextData contextData, TaskCont protected byte[] marshallStatus(WorkflowStatus status) { ByteArrayOutputStream bytes = new ByteArrayOutputStream(); try (WorkflowOutputBuffer writer = factory.output(bytes)) { - writer.writeByte(MarshallingUtils.VERSION_0); + writer.writeByte(VERSION_0); writer.writeEnum(status); } return bytes.toByteArray(); @@ -78,7 +81,7 @@ protected byte[] marshallStatus(WorkflowStatus status) { protected byte[] marshallInstance(WorkflowInstanceData instance) { ByteArrayOutputStream bytes = new ByteArrayOutputStream(); try (WorkflowOutputBuffer writer = factory.output(bytes)) { - writer.writeByte(MarshallingUtils.VERSION_0); + writer.writeByte(VERSION_0); writer.writeInstant(instance.startedAt()); writeModel(writer, instance.input()); } @@ -94,7 +97,7 @@ protected byte[] marshallTaskRetried( WorkflowContextData workflowContext, TaskContext taskContext) { ByteArrayOutputStream bytes = new ByteArrayOutputStream(); try (WorkflowOutputBuffer writer = factory.output(bytes)) { - writer.writeByte(MarshallingUtils.VERSION_1); + writer.writeByte(VERSION_1); writer.writeEnum(TaskStatus.RETRIED); writer.writeShort(taskContext.retryAttempt()); } @@ -106,10 +109,10 @@ protected PersistenceTaskInfo unmarshallTaskInfo(byte[] taskData) { try (WorkflowInputBuffer buffer = factory.input(new ByteArrayInputStream(taskData))) { byte version = buffer.readByte(); switch (version) { - case MarshallingUtils.VERSION_0: + case VERSION_0: default: return readVersion0(buffer); - case MarshallingUtils.VERSION_1: + case VERSION_1: return readVersion1(buffer); } } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/MarshallingUtils.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/MarshallingUtils.java deleted file mode 100644 index 6c38d9016..000000000 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/MarshallingUtils.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.impl.persistence.bigmap; - -class MarshallingUtils { - - private MarshallingUtils() {} - - public static final byte VERSION_0 = 0; - public static final byte VERSION_1 = 1; -} diff --git a/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java b/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java index cfd1a475c..acfb9949a 100644 --- a/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java +++ b/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java @@ -22,7 +22,7 @@ import org.h2.mvstore.MVStore; import org.h2.mvstore.tx.TransactionStore; -public class MVStorePersistenceStore implements PersistenceInstanceStore { +public class MVStorePersistenceStore implements PersistenceInstanceStore { private final TransactionStore mvStore; private WorkflowBufferFactory factory; @@ -36,12 +36,12 @@ public MVStorePersistenceStore(String dbName, WorkflowBufferFactory factory) { } @Override - public void close() throws Exception { + public void close() { mvStore.close(); } @Override - public BigMapInstanceTransaction begin() { + public BigMapInstanceTransaction begin() { return new MVStoreTransaction(mvStore.begin(), factory); } } diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java index 7f33418b2..9369edc85 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java @@ -20,6 +20,7 @@ import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.persistence.DefaultPersistenceInstanceHandlers; import io.serverlessworkflow.impl.persistence.PersistenceApplicationBuilder; import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers; import io.serverlessworkflow.impl.persistence.mvstore.MVStorePersistenceStore; @@ -42,8 +43,8 @@ public static void main(String[] args) throws IOException { private static void runInstance(String dbName, boolean suspend) throws IOException { LOG.info("---> Generating db samples at {}", dbName); Files.deleteIfExists(Path.of(dbName)); - try (PersistenceInstanceHandlers factories = - PersistenceInstanceHandlers.from(new MVStorePersistenceStore(dbName)); + try (PersistenceInstanceHandlers factories = + DefaultPersistenceInstanceHandlers.from(new MVStorePersistenceStore(dbName)); WorkflowApplication application = PersistenceApplicationBuilder.builder( WorkflowApplication.builder().withListener(new TraceExecutionListener()), diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java index ee57ee3c9..c1011a3be 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java @@ -22,6 +22,7 @@ import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowInstance; import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.persistence.DefaultPersistenceInstanceHandlers; import io.serverlessworkflow.impl.persistence.PersistenceApplicationBuilder; import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers; import io.serverlessworkflow.impl.persistence.mvstore.MVStorePersistenceStore; @@ -30,6 +31,7 @@ import java.nio.file.Path; import java.util.Collection; import java.util.Map; +import java.util.stream.Stream; import org.junit.jupiter.api.Test; public class MvStorePersistenceTest { @@ -37,22 +39,29 @@ public class MvStorePersistenceTest { @Test void testSimpleRun() throws IOException { final String dbName = "db-samples/simple.db"; - try (PersistenceInstanceHandlers handlers = - PersistenceInstanceHandlers.from(new MVStorePersistenceStore(dbName)); + try (PersistenceInstanceHandlers handlers = + DefaultPersistenceInstanceHandlers.from(new MVStorePersistenceStore(dbName)); WorkflowApplication application = PersistenceApplicationBuilder.builder(WorkflowApplication.builder(), handlers.writer()) .build(); ) { WorkflowDefinition definition = application.workflowDefinition( readWorkflowFromClasspath("workflows-samples/simple-expression.yaml")); - assertThat(handlers.reader().scanAll(definition).count()).isEqualTo(0); + assertNoInstance(handlers, definition); definition.instance(Map.of()).start().join(); - assertThat(handlers.reader().scanAll(definition).count()).isEqualTo(0); + assertNoInstance(handlers, definition); } finally { Files.delete(Path.of(dbName)); } } + private void assertNoInstance( + PersistenceInstanceHandlers handlers, WorkflowDefinition definition) { + try (Stream stream = handlers.reader().scanAll(definition)) { + assertThat(stream.count()).isEqualTo(0); + } + } + @Test void testWaitingInstance() throws IOException { TaskCounterPerInstanceListener taskCounter = new TaskCounterPerInstanceListener(); @@ -90,8 +99,8 @@ void testRestoreSuspendedInstanceV1() throws IOException { private void runIt(String dbName, WorkflowStatus expectedStatus) throws IOException { TaskCounterPerInstanceListener taskCounter = new TaskCounterPerInstanceListener(); - try (PersistenceInstanceHandlers handlers = - PersistenceInstanceHandlers.from(new MVStorePersistenceStore(dbName)); + try (PersistenceInstanceHandlers handlers = + DefaultPersistenceInstanceHandlers.from(new MVStorePersistenceStore(dbName)); WorkflowApplication application = PersistenceApplicationBuilder.builder( WorkflowApplication.builder() @@ -102,16 +111,19 @@ private void runIt(String dbName, WorkflowStatus expectedStatus) throws IOExcept WorkflowDefinition definition = application.workflowDefinition( readWorkflowFromClasspath("workflows-samples/set-listen-to-any.yaml")); - Collection instances = handlers.reader().scanAll(definition).toList(); - assertThat(instances).hasSize(1); - instances.forEach(WorkflowInstance::start); - assertThat(instances) - .singleElement() - .satisfies( - instance -> { - assertThat(instance.status()).isEqualTo(expectedStatus); - assertThat(taskCounter.taskCounter(instance.id()).completed()).isEqualTo(0); - }); + + try (Stream stream = handlers.reader().scanAll(definition)) { + Collection instances = stream.toList(); + assertThat(instances).hasSize(1); + instances.forEach(WorkflowInstance::start); + assertThat(instances) + .singleElement() + .satisfies( + instance -> { + assertThat(instance.status()).isEqualTo(expectedStatus); + assertThat(taskCounter.taskCounter(instance.id()).completed()).isEqualTo(0); + }); + } } } }