From 40885db86bd556e49289af4dec50393fb093c5ec Mon Sep 17 00:00:00 2001 From: Toshiya Kobayashi Date: Thu, 31 Aug 2023 21:27:42 +0900 Subject: [PATCH] [DROOLS-7480] Persist info to identify which activation is fired or not (#5466) - Introduce PersistedSessionOption.ActivationStrategy - Code Smells --- .../reliability/core/BaseStoredEvent.java | 3 +- .../reliability/core/BaseStoredObject.java | 6 +- .../reliability/core/ReliableKieSession.java | 5 ++ .../core/ReliableSessionInitializer.java | 48 +++++++++++-- .../ReliableStatefulKnowledgeSessionImpl.java | 18 ++++- .../core/SimpleReliableObjectStore.java | 4 +- ...impleSerializationReliableObjectStore.java | 72 ++++++++++++++----- .../drools/reliability/core/StoredObject.java | 2 +- .../core/util/ReliabilityUtils.java | 68 ++++++++++++++++++ .../reliability/test/ReliabilityCepTest.java | 30 ++++++++ .../reliability/test/ReliabilityTest.java | 24 +++++++ .../test/ReliabilityTestBasics.java | 32 ++++++++- .../runtime/conf/PersistedSessionOption.java | 19 +++++ 13 files changed, 302 insertions(+), 29 deletions(-) create mode 100644 drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/util/ReliabilityUtils.java diff --git a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/BaseStoredEvent.java b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/BaseStoredEvent.java index 10e46cbcc35..40c9354abb4 100644 --- a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/BaseStoredEvent.java +++ b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/BaseStoredEvent.java @@ -41,9 +41,10 @@ public long getDuration() { } @Override - public void repropagate(InternalWorkingMemoryEntryPoint ep) { + public long repropagate(InternalWorkingMemoryEntryPoint ep) { FactHandleFactory fhFactory = ep.getHandleFactory(); DefaultEventHandle eFh = fhFactory.createEventFactHandle(fhFactory.getNextId(), getObject(), fhFactory.getNextRecency(), ep, timestamp, duration); ep.insert(eFh); + return eFh.getId(); } } \ No newline at end of file diff --git a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/BaseStoredObject.java b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/BaseStoredObject.java index fd9600277d2..1a1a4a14631 100644 --- a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/BaseStoredObject.java +++ b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/BaseStoredObject.java @@ -18,6 +18,7 @@ import java.io.Serializable; import org.drools.core.common.InternalWorkingMemoryEntryPoint; +import org.kie.api.runtime.rule.FactHandle; public abstract class BaseStoredObject implements StoredObject, Serializable { @@ -34,7 +35,8 @@ public boolean isPropagated() { } @Override - public void repropagate(InternalWorkingMemoryEntryPoint ep) { - ep.insert(getObject()); + public long repropagate(InternalWorkingMemoryEntryPoint ep) { + FactHandle factHandle = ep.insert(getObject()); + return factHandle.getId(); } } \ No newline at end of file diff --git a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/ReliableKieSession.java b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/ReliableKieSession.java index 35b1c4506a7..a4eb617f4e5 100644 --- a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/ReliableKieSession.java +++ b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/ReliableKieSession.java @@ -15,8 +15,13 @@ package org.drools.reliability.core; +import org.drools.core.common.Storage; import org.kie.api.runtime.KieSession; public interface ReliableKieSession extends KieSession { void safepoint(); + + Storage getActivationsStorage(); + + void setActivationsStorage(Storage activationsStorage); } diff --git a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/ReliableSessionInitializer.java b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/ReliableSessionInitializer.java index c16eef57c7b..1abbf5d46a3 100644 --- a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/ReliableSessionInitializer.java +++ b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/ReliableSessionInitializer.java @@ -15,6 +15,10 @@ package org.drools.reliability.core; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.drools.core.SessionConfiguration; import org.drools.core.WorkingMemoryEntryPoint; import org.drools.core.common.InternalFactHandle; @@ -22,6 +26,11 @@ import org.drools.core.common.InternalWorkingMemoryEntryPoint; import org.drools.core.common.Storage; import org.drools.core.phreak.PropagationEntry; +import org.drools.reliability.core.util.ReliabilityUtils; +import org.kie.api.event.rule.AfterMatchFiredEvent; +import org.kie.api.event.rule.DefaultAgendaEventListener; +import org.kie.api.event.rule.MatchCancelledEvent; +import org.kie.api.event.rule.MatchCreatedEvent; import org.kie.api.event.rule.ObjectDeletedEvent; import org.kie.api.event.rule.ObjectInsertedEvent; import org.kie.api.event.rule.ObjectUpdatedEvent; @@ -29,10 +38,6 @@ import org.kie.api.runtime.conf.PersistedSessionOption; import org.kie.api.runtime.rule.EntryPoint; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import static org.drools.reliability.core.ReliablePropagationList.PROPAGATION_LIST; public class ReliableSessionInitializer { @@ -54,13 +59,24 @@ static class StoresOnlySessionInitializer implements SessionInitializer { @Override public InternalWorkingMemory init(InternalWorkingMemory session, PersistedSessionOption persistedSessionOption) { + + if (persistedSessionOption.getActivationStrategy() == PersistedSessionOption.ActivationStrategy.ACTIVATION_KEY) { + Storage activationsStorage = StorageManagerFactory.get().getStorageManager().getOrCreateStorageForSession(session, "activations"); + ((ReliableKieSession)session).setActivationsStorage(activationsStorage); + } + if (!persistedSessionOption.isNewSession()) { // re-propagate objects from the storage to the new session populateSessionFromStorage(session); } + // These listeners should be added after populateSessionFromStorage() session.setWorkingMemoryActionListener(entry -> onWorkingMemoryAction(session, entry)); session.getRuleRuntimeEventSupport().addEventListener(new SimpleStoreRuntimeEventListener(session)); + if (persistedSessionOption.getActivationStrategy() == PersistedSessionOption.ActivationStrategy.ACTIVATION_KEY) { + ((ReliableKieSession)session).getActivationsStorage().clear(); + session.getAgendaEventSupport().addEventListener(new SimpleStoreAgendaEventListener((ReliableKieSession)session)); + } return session; } @@ -108,6 +124,30 @@ public void objectUpdated(ObjectUpdatedEvent ev) { store.putIntoPersistedStorage(fh, false); } } + + static class SimpleStoreAgendaEventListener extends DefaultAgendaEventListener { + + private final ReliableKieSession session; + + public SimpleStoreAgendaEventListener(ReliableKieSession session) { + this.session = session; + } + + @Override + public void matchCreated(MatchCreatedEvent event) { + session.getActivationsStorage().put(ReliabilityUtils.getActivationKey(event.getMatch()), true); + } + + @Override + public void matchCancelled(MatchCancelledEvent event) { + session.getActivationsStorage().remove(ReliabilityUtils.getActivationKey(event.getMatch())); + } + + @Override + public void afterMatchFired(AfterMatchFiredEvent event) { + session.getActivationsStorage().remove(ReliabilityUtils.getActivationKey(event.getMatch())); + } + } } static class FullReliableSessionInitializer implements SessionInitializer { diff --git a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/ReliableStatefulKnowledgeSessionImpl.java b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/ReliableStatefulKnowledgeSessionImpl.java index 134e16a1960..d0b3589cab0 100644 --- a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/ReliableStatefulKnowledgeSessionImpl.java +++ b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/ReliableStatefulKnowledgeSessionImpl.java @@ -16,7 +16,7 @@ package org.drools.reliability.core; import org.drools.core.SessionConfiguration; -import org.drools.core.common.InternalAgenda; +import org.drools.core.common.Storage; import org.drools.core.rule.accessor.FactHandleFactory; import org.drools.kiesession.rulebase.InternalKnowledgeBase; import org.drools.kiesession.session.StatefulKnowledgeSessionImpl; @@ -27,6 +27,8 @@ public class ReliableStatefulKnowledgeSessionImpl extends StatefulKnowledgeSessionImpl implements ReliableKieSession { + private transient Storage activationsStorage; + public ReliableStatefulKnowledgeSessionImpl() { } @@ -73,8 +75,22 @@ public void endOperation(InternalOperationType operationType) { } } + @Override + public Storage getActivationsStorage() { + return activationsStorage; + } + + @Override + public void setActivationsStorage(Storage activationsStorage) { + this.activationsStorage = activationsStorage; + } + @Override public void safepoint() { getEntryPoints().stream().map(ReliableNamedEntryPoint.class::cast).forEach(ReliableNamedEntryPoint::safepoint); + if (getSessionConfiguration().getPersistedSessionOption().getActivationStrategy() == PersistedSessionOption.ActivationStrategy.ACTIVATION_KEY + && activationsStorage.requiresFlush()) { + activationsStorage.flush(); + } } } diff --git a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/SimpleReliableObjectStore.java b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/SimpleReliableObjectStore.java index 879e306675e..82ae1b43d3d 100644 --- a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/SimpleReliableObjectStore.java +++ b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/SimpleReliableObjectStore.java @@ -15,13 +15,13 @@ package org.drools.reliability.core; +import java.util.List; + import org.drools.core.common.InternalFactHandle; import org.drools.core.common.InternalWorkingMemory; import org.drools.core.common.InternalWorkingMemoryEntryPoint; import org.drools.core.common.ObjectStore; -import java.util.List; - public interface SimpleReliableObjectStore extends ObjectStore { List reInit(InternalWorkingMemory session, InternalWorkingMemoryEntryPoint ep); diff --git a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/SimpleSerializationReliableObjectStore.java b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/SimpleSerializationReliableObjectStore.java index 71618647ee9..f9db0621deb 100644 --- a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/SimpleSerializationReliableObjectStore.java +++ b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/SimpleSerializationReliableObjectStore.java @@ -15,6 +15,13 @@ package org.drools.reliability.core; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + import org.drools.core.ClockType; import org.drools.core.common.DefaultEventHandle; import org.drools.core.common.IdentityObjectStore; @@ -22,10 +29,8 @@ import org.drools.core.common.InternalWorkingMemory; import org.drools.core.common.InternalWorkingMemoryEntryPoint; import org.drools.core.common.Storage; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; +import org.drools.reliability.core.util.ReliabilityUtils; +import org.kie.api.runtime.conf.PersistedSessionOption; public class SimpleSerializationReliableObjectStore extends IdentityObjectStore implements SimpleReliableObjectStore { @@ -57,23 +62,23 @@ public void removeHandle(InternalFactHandle handle) { @Override public List reInit(InternalWorkingMemory session, InternalWorkingMemoryEntryPoint ep) { reInitPropagated = true; - List propagated = new ArrayList<>(); + Map propagated = new HashMap<>(); List notPropagated = new ArrayList<>(); - for (StoredObject entry : storage.values()) { - if (entry.isPropagated()) { - propagated.add(entry); + for (Long factHandleId : storage.keySet()) { + StoredObject storedObject = storage.get(factHandleId); + if (storedObject.isPropagated()) { + propagated.put(factHandleId, storedObject); } else { - notPropagated.add(entry); + notPropagated.add(storedObject); } } + storage.clear(); if (session.getSessionConfiguration().getClockType() == ClockType.PSEUDO_CLOCK) { repropagateWithPseudoClock(session, ep, propagated); } else { - // fact handles with a match have been already propagated in the original session, so they shouldn't fire - propagated.forEach(obj -> obj.repropagate(ep)); - session.fireAllRules(match -> false); + repropagate(session, ep, propagated); } reInitPropagated = false; @@ -82,9 +87,41 @@ public List reInit(InternalWorkingMemory session, InternalWorkingM return notPropagated; } - private void repropagateWithPseudoClock(InternalWorkingMemory session, InternalWorkingMemoryEntryPoint ep, List propagated) { + private void repropagate(InternalWorkingMemory session, InternalWorkingMemoryEntryPoint ep, Map propagated) { + Map factHandleIdMap = new HashMap<>(); + propagated.forEach((oldFactHandleId, storedObject) -> { + long newFactHandleId = storedObject.repropagate(ep); + factHandleIdMap.put(newFactHandleId, oldFactHandleId); + }); + + fireOnlyWhenActivationRemaining(session, factHandleIdMap); + } + + private void fireOnlyWhenActivationRemaining(InternalWorkingMemory session, Map factHandleIdMap) { + if (session.getSessionConfiguration().getPersistedSessionOption().getActivationStrategy() == PersistedSessionOption.ActivationStrategy.ACTIVATION_KEY) { + // fact handles with a match have been already propagated in the original session, so they shouldn't fire unless remained in activationsStorage + Storage activationsStorage = ((ReliableKieSession)session).getActivationsStorage(); + Set activationKeySet = activationsStorage.keySet(); + session.fireAllRules(match -> { + String activationKey = ReliabilityUtils.getActivationKeyReplacingNewIdWithOldId(match, factHandleIdMap); + if (activationKeySet.contains(activationKey)) { + // If there is a remaining activation, it can fire + activationsStorage.remove(activationKey); + return true; + } else { + return false; + } + }); + } else { + session.fireAllRules(match -> false); + } + } + + private void repropagateWithPseudoClock(InternalWorkingMemory session, InternalWorkingMemoryEntryPoint ep, Map propagated) { ReliablePseudoClockScheduler clock = (ReliablePseudoClockScheduler) session.getSessionClock(); - for (StoredObject storedObject : propagated) { + Map factHandleIdMap = new HashMap<>(); + for (Map.Entry entry : propagated.entrySet()) { + StoredObject storedObject = entry.getValue(); if (storedObject.isEvent()) { StoredEvent storedEvent = (StoredEvent) storedObject; long currentTime = clock.getCurrentTime(); @@ -93,10 +130,11 @@ private void repropagateWithPseudoClock(InternalWorkingMemory session, InternalW clock.advanceTime(timestamp - currentTime, TimeUnit.MILLISECONDS); } } - storedObject.repropagate(ep); // This may schedule an expiration + long newFactHandleId = storedObject.repropagate(ep); // This may schedule an expiration + factHandleIdMap.put(newFactHandleId, entry.getKey()); } - // fact handles with a match have been already propagated in the original session, so they shouldn't fire - session.fireAllRules(match -> false); + + fireOnlyWhenActivationRemaining(session, factHandleIdMap); // Finally, meet with the persistedTime long currentTime = clock.getCurrentTime(); diff --git a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/StoredObject.java b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/StoredObject.java index 1445f38a4f0..1728348e0ed 100644 --- a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/StoredObject.java +++ b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/StoredObject.java @@ -27,5 +27,5 @@ default boolean isEvent() { Object getObject(); - void repropagate(InternalWorkingMemoryEntryPoint ep); + long repropagate(InternalWorkingMemoryEntryPoint ep); } \ No newline at end of file diff --git a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/util/ReliabilityUtils.java b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/util/ReliabilityUtils.java new file mode 100644 index 00000000000..6fc695765c4 --- /dev/null +++ b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/util/ReliabilityUtils.java @@ -0,0 +1,68 @@ +/* + * Copyright 2023 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.drools.reliability.core.util; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.drools.core.reteoo.RuleTerminalNodeLeftTuple; +import org.drools.reliability.core.ReliabilityRuntimeException; +import org.kie.api.runtime.rule.FactHandle; +import org.kie.api.runtime.rule.Match; + +public class ReliabilityUtils { + + private ReliabilityUtils() { + // no constructor + } + + /** + * Returns a String representation of the activation. + */ + public static String getActivationKey(Match match) { + return getActivationKey(match, null); + } + + /** + * Returns a String representation of the activation, replacing the new fact handle id with the old fact handle id. + * Used to find an activation key in the persisted storage. + */ + public static String getActivationKeyReplacingNewIdWithOldId(Match match, Map factHandleIdMap) { + return getActivationKey(match, factHandleIdMap); + } + + private static String getActivationKey(Match match, Map factHandleIdMap) { + if (!(match instanceof RuleTerminalNodeLeftTuple)) { + throw new ReliabilityRuntimeException("getActivationKey doesn't support " + match.getClass()); + } + RuleTerminalNodeLeftTuple ruleTerminalNodeLeftTuple = (RuleTerminalNodeLeftTuple) match; + String packageName = ruleTerminalNodeLeftTuple.getRule().getPackageName(); + String ruleName = ruleTerminalNodeLeftTuple.getRule().getName(); + List factHandles = ruleTerminalNodeLeftTuple.getFactHandles(); + List factHandleIdList = factHandles.stream() + .map(FactHandle::getId) + .map(handleId -> { + if (factHandleIdMap != null) { + return factHandleIdMap.get(handleId); // replace new id with old id + } else { + return handleId; // don't replace + } + }) + .collect(Collectors.toList()); + return "ActivationKey [packageName=" + packageName + ", ruleName=" + ruleName + ", factHandleIdList=" + factHandleIdList + "]"; + } +} diff --git a/drools-reliability/drools-reliability-tests/src/test/java/org/drools/reliability/test/ReliabilityCepTest.java b/drools-reliability/drools-reliability-tests/src/test/java/org/drools/reliability/test/ReliabilityCepTest.java index 6aa37f4a97b..ae59ac79c74 100644 --- a/drools-reliability/drools-reliability-tests/src/test/java/org/drools/reliability/test/ReliabilityCepTest.java +++ b/drools-reliability/drools-reliability-tests/src/test/java/org/drools/reliability/test/ReliabilityCepTest.java @@ -291,4 +291,34 @@ void multipleKieSessions_insertAdvanceFireFailoverExpire_shouldExpireAfterFailov assertThat(getFactHandles(session2)).as("DROO (2) should have expired because @Expires = 60s") .hasSize(2); } + + @ParameterizedTest + @MethodSource("strategyProviderStoresOnlyWithAllSafepointsWithActivationKey") + void insertFireLimitFailoverFire_shouldFireRemainingActivations(PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy, + PersistedSessionOption.ActivationStrategy activationStrategy) { + + createSession(CEP_RULE, persistenceStrategy, safepointStrategy, activationStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); + + SessionPseudoClock clock = getSessionClock(); + + insert(new StockTick("DROO")); + clock.advanceTime(6, TimeUnit.SECONDS); + insert(new StockTick("ACME")); + clock.advanceTime(1, TimeUnit.SECONDS); + insert(new StockTick("ACME")); + clock.advanceTime(500, TimeUnit.MILLISECONDS); + insert(new StockTick("ACME")); + + fireAllRules(1); + assertThat(getResults()).as("Firing is limited to 1") + .hasSize(1); + assertThat(getResults()).containsExactly("fired"); + + failover(); + restoreSession(CEP_RULE, persistenceStrategy, safepointStrategy, activationStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); + + fireAllRules(); + assertThat(getResults()).as("All remaining activations should fire") + .containsExactlyInAnyOrder("fired", "fired", "fired"); + } } \ No newline at end of file diff --git a/drools-reliability/drools-reliability-tests/src/test/java/org/drools/reliability/test/ReliabilityTest.java b/drools-reliability/drools-reliability-tests/src/test/java/org/drools/reliability/test/ReliabilityTest.java index 7116d846687..ceaf2f16a51 100644 --- a/drools-reliability/drools-reliability-tests/src/test/java/org/drools/reliability/test/ReliabilityTest.java +++ b/drools-reliability/drools-reliability-tests/src/test/java/org/drools/reliability/test/ReliabilityTest.java @@ -493,4 +493,28 @@ void multipleKieSessions_deleteBeforeFailover_shouldRecoverFromFailover(Persiste assertThat(fireAllRules(session1)).isEqualTo(1); assertThat(getResults(session1)).containsExactlyInAnyOrder("Toshiya"); } + + @ParameterizedTest + @MethodSource("strategyProviderStoresOnlyWithAllSafepointsWithActivationKey") + void insertFireLimitFailoverFire_shouldFireRemainingActivations(PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy, + PersistedSessionOption.ActivationStrategy activationStrategy) { + createSession(BASIC_RULE, persistenceStrategy, safepointStrategy, activationStrategy); + + insert("M"); + insertMatchingPerson("Matching Person One"); + insertMatchingPerson("Matching Person Two"); + insertMatchingPerson("Matching Person Three"); + + fireAllRules(1); + assertThat(getResults()).as("Firing is limited to 1") + .hasSize(1); + + failover(); + + restoreSession(BASIC_RULE, persistenceStrategy, safepointStrategy, activationStrategy); + + fireAllRules(); + assertThat(getResults()).as("All remaining activations should fire") + .containsExactlyInAnyOrder("Matching Person One", "Matching Person Two", "Matching Person Three"); + } } diff --git a/drools-reliability/drools-reliability-tests/src/test/java/org/drools/reliability/test/ReliabilityTestBasics.java b/drools-reliability/drools-reliability-tests/src/test/java/org/drools/reliability/test/ReliabilityTestBasics.java index dc42fb7ad50..fcdc5f885da 100644 --- a/drools-reliability/drools-reliability-tests/src/test/java/org/drools/reliability/test/ReliabilityTestBasics.java +++ b/drools-reliability/drools-reliability-tests/src/test/java/org/drools/reliability/test/ReliabilityTestBasics.java @@ -112,6 +112,14 @@ static Stream strategyProviderStoresOnlyWithAllSafepoints() { ); } + static Stream strategyProviderStoresOnlyWithAllSafepointsWithActivationKey() { + return Stream.of( + arguments(PersistedSessionOption.PersistenceStrategy.STORES_ONLY, PersistedSessionOption.SafepointStrategy.ALWAYS, PersistedSessionOption.ActivationStrategy.ACTIVATION_KEY), + arguments(PersistedSessionOption.PersistenceStrategy.STORES_ONLY, PersistedSessionOption.SafepointStrategy.EXPLICIT, PersistedSessionOption.ActivationStrategy.ACTIVATION_KEY), + arguments(PersistedSessionOption.PersistenceStrategy.STORES_ONLY, PersistedSessionOption.SafepointStrategy.AFTER_FIRE, PersistedSessionOption.ActivationStrategy.ACTIVATION_KEY) + ); + } + static boolean isRemoteInfinispan() { return "INFINISPAN".equalsIgnoreCase(getConfig(DROOLS_RELIABILITY_MODULE_TEST)) && "REMOTE".equalsIgnoreCase(getConfig(INFINISPAN_STORAGE_MODE)); @@ -199,6 +207,10 @@ protected void delete(KieSession session, FactHandle fh) { session.delete(fh); } + protected FactHandle insertMatchingPerson(String name) { + return insertMatchingPerson(sessions.get(0), name, 20); // for rules don't care about age + } + protected FactHandle insertMatchingPerson(String name, Integer age) { return insertMatchingPerson(sessions.get(0), name, age); } @@ -262,6 +274,10 @@ protected KieSession createSession(String drl, PersistedSessionOption.Persistenc .withSafepointStrategy(safepointStrategy).withPersistenceObjectsStrategy(persistenceObjectsStrategy) : null, options); } + protected KieSession createSession(String drl, PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy, PersistedSessionOption.ActivationStrategy activationStrategy, Option... options) { + return getKieSession(drl, persistenceStrategy != null ? PersistedSessionOption.newSession().withPersistenceStrategy(persistenceStrategy).withSafepointStrategy(safepointStrategy).withActivationStrategy(activationStrategy) : null, options); + } + protected KieSession createSession(String drl, PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy, boolean useKieBaseCache, Option... options) { return getKieSession(drl, persistenceStrategy != null ? PersistedSessionOption.newSession().withPersistenceStrategy(persistenceStrategy).withSafepointStrategy(safepointStrategy) : null, useKieBaseCache, options); } @@ -281,10 +297,15 @@ protected KieSession restoreSession(String drl, PersistedSessionOption.Persisten protected KieSession restoreSession(String drl, PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy, PersistedSessionOption.PersistenceObjectsStrategy persistenceObjectsStrategy, Option... options) { - Long sessionIdToRestoreFrom = (Long)this.persistedSessionIds.values().toArray()[0]; + Long sessionIdToRestoreFrom = (Long) this.persistedSessionIds.values().toArray()[0]; return restoreSession(sessionIdToRestoreFrom, drl, persistenceStrategy, safepointStrategy, persistenceObjectsStrategy, options); } + protected KieSession restoreSession(String drl, PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy, PersistedSessionOption.ActivationStrategy activationStrategy, Option... options) { + Long sessionIdToRestoreFrom = (Long)this.persistedSessionIds.values().toArray()[0]; + return restoreSession(sessionIdToRestoreFrom, drl, persistenceStrategy, safepointStrategy, activationStrategy, options); + } + protected KieSession restoreSession(Long sessionId, String drl, PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy, Option... options) { Long sessionIdToRestoreFrom = this.persistedSessionIds.get(sessionId); return getKieSession(drl, PersistedSessionOption.fromSession(sessionIdToRestoreFrom).withPersistenceStrategy(persistenceStrategy).withSafepointStrategy(safepointStrategy), options); @@ -297,6 +318,11 @@ protected KieSession restoreSession(Long sessionId, String drl, PersistedSession .withSafepointStrategy(safepointStrategy).withPersistenceObjectsStrategy(persistenceObjectsStrategy), options); } + protected KieSession restoreSession(Long sessionId, String drl, PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy, PersistedSessionOption.ActivationStrategy activationStrategy, Option... options) { + Long sessionIdToRestoreFrom = this.persistedSessionIds.get(sessionId); + return getKieSession(drl, PersistedSessionOption.fromSession(sessionIdToRestoreFrom).withPersistenceStrategy(persistenceStrategy).withSafepointStrategy(safepointStrategy).withActivationStrategy(activationStrategy), options); + } + protected KieSession restoreSession(Long sessionId, String drl, PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy, boolean useKieBaseCache, Option... options) { Long sessionIdToRestoreFrom = this.persistedSessionIds.get(sessionId); return getKieSession(drl, PersistedSessionOption.fromSession(sessionIdToRestoreFrom).withPersistenceStrategy(persistenceStrategy).withSafepointStrategy(safepointStrategy), useKieBaseCache, options); @@ -306,6 +332,10 @@ protected int fireAllRules() { return fireAllRules(sessions.get(0)); } + protected int fireAllRules(int max) { + return sessions.get(0).fireAllRules(max); + } + protected int fireAllRules(KieSession session) { return session.fireAllRules(); } diff --git a/kie-api/src/main/java/org/kie/api/runtime/conf/PersistedSessionOption.java b/kie-api/src/main/java/org/kie/api/runtime/conf/PersistedSessionOption.java index e0bef84f904..329a5381d52 100644 --- a/kie-api/src/main/java/org/kie/api/runtime/conf/PersistedSessionOption.java +++ b/kie-api/src/main/java/org/kie/api/runtime/conf/PersistedSessionOption.java @@ -40,6 +40,14 @@ public boolean useSafepoints() { } } + /** + * NONE : On restoring a session, filter firing based on StoredObject.isPropagated + * ACTIVATION_KEY : On restoring a session, filter firing based on StoredObject.isPropagated and persisted ActivationKey + */ + public enum ActivationStrategy { + NONE, ACTIVATION_KEY + } + /** * The property name for the clock type configuration */ @@ -53,6 +61,8 @@ public boolean useSafepoints() { private SafepointStrategy safepointStrategy = SafepointStrategy.ALWAYS; + private ActivationStrategy activationStrategy = ActivationStrategy.NONE; + private PersistedSessionOption() { this(-1L); } @@ -105,6 +115,15 @@ public PersistedSessionOption withSafepointStrategy(SafepointStrategy safepointS return this; } + public ActivationStrategy getActivationStrategy() { + return activationStrategy; + } + + public PersistedSessionOption withActivationStrategy(ActivationStrategy activationStrategy) { + this.activationStrategy = activationStrategy; + return this; + } + public boolean isNewSession() { return sessionId < 0; }