From 1cef22e0592ea1fd7e345a5cf5e252d77d97ef5e Mon Sep 17 00:00:00 2001 From: Toshiya Kobayashi Date: Fri, 25 Aug 2023 13:54:51 +0900 Subject: [PATCH] Introduce PersistedSessionOption.ActivationStrategy --- .../core/ReliableSessionInitializer.java | 13 +++++--- .../ReliableStatefulKnowledgeSessionImpl.java | 6 ++-- ...impleSerializationReliableObjectStore.java | 32 ++++++++++++------- .../reliability/test/ReliabilityCepTest.java | 30 +++++++++++++++++ .../reliability/test/ReliabilityTest.java | 11 ++++--- .../test/ReliabilityTestBasics.java | 22 +++++++++++++ .../runtime/conf/PersistedSessionOption.java | 19 +++++++++++ 7 files changed, 109 insertions(+), 24 deletions(-) diff --git a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/ReliableSessionInitializer.java b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/ReliableSessionInitializer.java index afe8253f0f32..1abbf5d46a31 100644 --- a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/ReliableSessionInitializer.java +++ b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/ReliableSessionInitializer.java @@ -60,20 +60,23 @@ static class StoresOnlySessionInitializer implements SessionInitializer { @Override public InternalWorkingMemory init(InternalWorkingMemory session, PersistedSessionOption persistedSessionOption) { - Storage activationsStorage = StorageManagerFactory.get().getStorageManager().getOrCreateStorageForSession(session, "activations"); - ((ReliableKieSession)session).setActivationsStorage(activationsStorage); + if (persistedSessionOption.getActivationStrategy() == PersistedSessionOption.ActivationStrategy.ACTIVATION_KEY) { + Storage activationsStorage = StorageManagerFactory.get().getStorageManager().getOrCreateStorageForSession(session, "activations"); + ((ReliableKieSession)session).setActivationsStorage(activationsStorage); + } if (!persistedSessionOption.isNewSession()) { // re-propagate objects from the storage to the new session populateSessionFromStorage(session); } - activationsStorage.clear(); - // These listeners should be added after populateSessionFromStorage() session.setWorkingMemoryActionListener(entry -> onWorkingMemoryAction(session, entry)); session.getRuleRuntimeEventSupport().addEventListener(new SimpleStoreRuntimeEventListener(session)); - session.getAgendaEventSupport().addEventListener(new SimpleStoreAgendaEventListener((ReliableKieSession)session)); + if (persistedSessionOption.getActivationStrategy() == PersistedSessionOption.ActivationStrategy.ACTIVATION_KEY) { + ((ReliableKieSession)session).getActivationsStorage().clear(); + session.getAgendaEventSupport().addEventListener(new SimpleStoreAgendaEventListener((ReliableKieSession)session)); + } return session; } diff --git a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/ReliableStatefulKnowledgeSessionImpl.java b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/ReliableStatefulKnowledgeSessionImpl.java index 1ed8aedecf2b..fc6f90358e11 100644 --- a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/ReliableStatefulKnowledgeSessionImpl.java +++ b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/ReliableStatefulKnowledgeSessionImpl.java @@ -88,8 +88,10 @@ public void setActivationsStorage(Storage activationsStorage) { @Override public void safepoint() { getEntryPoints().stream().map(ReliableNamedEntryPoint.class::cast).forEach(ReliableNamedEntryPoint::safepoint); - if (activationsStorage.requiresFlush()) { - activationsStorage.flush(); + if (getSessionConfiguration().getPersistedSessionOption().getActivationStrategy() == PersistedSessionOption.ActivationStrategy.ACTIVATION_KEY) { + if (activationsStorage.requiresFlush()) { + activationsStorage.flush(); + } } } } diff --git a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/SimpleSerializationReliableObjectStore.java b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/SimpleSerializationReliableObjectStore.java index 45cc44a4015c..39bec02ca0d7 100644 --- a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/SimpleSerializationReliableObjectStore.java +++ b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/SimpleSerializationReliableObjectStore.java @@ -19,6 +19,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.drools.core.ClockType; @@ -29,6 +30,8 @@ import org.drools.core.common.InternalWorkingMemoryEntryPoint; import org.drools.core.common.Storage; import org.drools.reliability.core.util.ReliabilityUtils; +import org.kie.api.runtime.conf.PersistedSessionOption; +import org.kie.api.runtime.conf.PersistedSessionOption.PersistenceStrategy; public class SimpleSerializationReliableObjectStore extends IdentityObjectStore implements SimpleReliableObjectStore { @@ -96,18 +99,23 @@ private void repropagate(InternalWorkingMemory session, InternalWorkingMemoryEnt } private void fireOnlyWhenActivationRemaining(InternalWorkingMemory session, Map factHandleIdMap) { - // fact handles with a match have been already propagated in the original session, so they shouldn't fire unless remained in activationsStorage - Storage activationsStorage = ((ReliableKieSession)session).getActivationsStorage(); - session.fireAllRules(match -> { - String activationKey = ReliabilityUtils.getActivationKeyReplacingNewIdWithOldId(match, factHandleIdMap); - if (activationsStorage.containsKey(activationKey)) { - // If there is a remaining activation, it can fire - activationsStorage.remove(activationKey); - return true; - } else { - return false; - } - }); + if (session.getSessionConfiguration().getPersistedSessionOption().getActivationStrategy() == PersistedSessionOption.ActivationStrategy.ACTIVATION_KEY) { + // fact handles with a match have been already propagated in the original session, so they shouldn't fire unless remained in activationsStorage + Storage activationsStorage = ((ReliableKieSession)session).getActivationsStorage(); + Set activationKeySet = activationsStorage.keySet(); + session.fireAllRules(match -> { + String activationKey = ReliabilityUtils.getActivationKeyReplacingNewIdWithOldId(match, factHandleIdMap); + if (activationKeySet.contains(activationKey)) { + // If there is a remaining activation, it can fire + activationsStorage.remove(activationKey); + return true; + } else { + return false; + } + }); + } else { + session.fireAllRules(match -> false); + } } private void repropagateWithPseudoClock(InternalWorkingMemory session, InternalWorkingMemoryEntryPoint ep, Map propagated) { diff --git a/drools-reliability/drools-reliability-tests/src/test/java/org/drools/reliability/test/ReliabilityCepTest.java b/drools-reliability/drools-reliability-tests/src/test/java/org/drools/reliability/test/ReliabilityCepTest.java index 6aa37f4a97bb..ae59ac79c74d 100644 --- a/drools-reliability/drools-reliability-tests/src/test/java/org/drools/reliability/test/ReliabilityCepTest.java +++ b/drools-reliability/drools-reliability-tests/src/test/java/org/drools/reliability/test/ReliabilityCepTest.java @@ -291,4 +291,34 @@ void multipleKieSessions_insertAdvanceFireFailoverExpire_shouldExpireAfterFailov assertThat(getFactHandles(session2)).as("DROO (2) should have expired because @Expires = 60s") .hasSize(2); } + + @ParameterizedTest + @MethodSource("strategyProviderStoresOnlyWithAllSafepointsWithActivationKey") + void insertFireLimitFailoverFire_shouldFireRemainingActivations(PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy, + PersistedSessionOption.ActivationStrategy activationStrategy) { + + createSession(CEP_RULE, persistenceStrategy, safepointStrategy, activationStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); + + SessionPseudoClock clock = getSessionClock(); + + insert(new StockTick("DROO")); + clock.advanceTime(6, TimeUnit.SECONDS); + insert(new StockTick("ACME")); + clock.advanceTime(1, TimeUnit.SECONDS); + insert(new StockTick("ACME")); + clock.advanceTime(500, TimeUnit.MILLISECONDS); + insert(new StockTick("ACME")); + + fireAllRules(1); + assertThat(getResults()).as("Firing is limited to 1") + .hasSize(1); + assertThat(getResults()).containsExactly("fired"); + + failover(); + restoreSession(CEP_RULE, persistenceStrategy, safepointStrategy, activationStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO); + + fireAllRules(); + assertThat(getResults()).as("All remaining activations should fire") + .containsExactlyInAnyOrder("fired", "fired", "fired"); + } } \ No newline at end of file diff --git a/drools-reliability/drools-reliability-tests/src/test/java/org/drools/reliability/test/ReliabilityTest.java b/drools-reliability/drools-reliability-tests/src/test/java/org/drools/reliability/test/ReliabilityTest.java index 55e3e631f2d0..ceaf2f16a513 100644 --- a/drools-reliability/drools-reliability-tests/src/test/java/org/drools/reliability/test/ReliabilityTest.java +++ b/drools-reliability/drools-reliability-tests/src/test/java/org/drools/reliability/test/ReliabilityTest.java @@ -495,9 +495,10 @@ void multipleKieSessions_deleteBeforeFailover_shouldRecoverFromFailover(Persiste } @ParameterizedTest - @MethodSource("strategyProviderStoresOnlyWithAllSafepoints") - void insertFireLimitFailoverFire_shouldFireRemainingActivations(PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy) { - createSession(BASIC_RULE, persistenceStrategy, safepointStrategy); + @MethodSource("strategyProviderStoresOnlyWithAllSafepointsWithActivationKey") + void insertFireLimitFailoverFire_shouldFireRemainingActivations(PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy, + PersistedSessionOption.ActivationStrategy activationStrategy) { + createSession(BASIC_RULE, persistenceStrategy, safepointStrategy, activationStrategy); insert("M"); insertMatchingPerson("Matching Person One"); @@ -510,9 +511,9 @@ void insertFireLimitFailoverFire_shouldFireRemainingActivations(PersistedSession failover(); - restoreSession(BASIC_RULE, persistenceStrategy, safepointStrategy); + restoreSession(BASIC_RULE, persistenceStrategy, safepointStrategy, activationStrategy); - fireAllRules(10); + fireAllRules(); assertThat(getResults()).as("All remaining activations should fire") .containsExactlyInAnyOrder("Matching Person One", "Matching Person Two", "Matching Person Three"); } diff --git a/drools-reliability/drools-reliability-tests/src/test/java/org/drools/reliability/test/ReliabilityTestBasics.java b/drools-reliability/drools-reliability-tests/src/test/java/org/drools/reliability/test/ReliabilityTestBasics.java index 496099540c83..afdbebd629af 100644 --- a/drools-reliability/drools-reliability-tests/src/test/java/org/drools/reliability/test/ReliabilityTestBasics.java +++ b/drools-reliability/drools-reliability-tests/src/test/java/org/drools/reliability/test/ReliabilityTestBasics.java @@ -112,6 +112,14 @@ static Stream strategyProviderStoresOnlyWithAllSafepoints() { ); } + static Stream strategyProviderStoresOnlyWithAllSafepointsWithActivationKey() { + return Stream.of( + arguments(PersistedSessionOption.PersistenceStrategy.STORES_ONLY, PersistedSessionOption.SafepointStrategy.ALWAYS, PersistedSessionOption.ActivationStrategy.ACTIVATION_KEY), + arguments(PersistedSessionOption.PersistenceStrategy.STORES_ONLY, PersistedSessionOption.SafepointStrategy.EXPLICIT, PersistedSessionOption.ActivationStrategy.ACTIVATION_KEY), + arguments(PersistedSessionOption.PersistenceStrategy.STORES_ONLY, PersistedSessionOption.SafepointStrategy.AFTER_FIRE, PersistedSessionOption.ActivationStrategy.ACTIVATION_KEY) + ); + } + static boolean isRemoteInfinispan() { return "INFINISPAN".equalsIgnoreCase(getConfig(DROOLS_RELIABILITY_MODULE_TEST)) && "REMOTE".equalsIgnoreCase(getConfig(INFINISPAN_STORAGE_MODE)); @@ -260,6 +268,10 @@ protected KieSession createSession(String drl, PersistedSessionOption.Persistenc return getKieSession(drl, persistenceStrategy != null ? PersistedSessionOption.newSession().withPersistenceStrategy(persistenceStrategy).withSafepointStrategy(safepointStrategy) : null, options); } + protected KieSession createSession(String drl, PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy, PersistedSessionOption.ActivationStrategy activationStrategy, Option... options) { + return getKieSession(drl, persistenceStrategy != null ? PersistedSessionOption.newSession().withPersistenceStrategy(persistenceStrategy).withSafepointStrategy(safepointStrategy).withActivationStrategy(activationStrategy) : null, options); + } + protected KieSession createSession(String drl, PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy, boolean useKieBaseCache, Option... options) { return getKieSession(drl, persistenceStrategy != null ? PersistedSessionOption.newSession().withPersistenceStrategy(persistenceStrategy).withSafepointStrategy(safepointStrategy) : null, useKieBaseCache, options); } @@ -277,11 +289,21 @@ protected KieSession restoreSession(String drl, PersistedSessionOption.Persisten return restoreSession(sessionIdToRestoreFrom, drl, persistenceStrategy, safepointStrategy, options); } + protected KieSession restoreSession(String drl, PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy, PersistedSessionOption.ActivationStrategy activationStrategy, Option... options) { + Long sessionIdToRestoreFrom = (Long)this.persistedSessionIds.values().toArray()[0]; + return restoreSession(sessionIdToRestoreFrom, drl, persistenceStrategy, safepointStrategy, activationStrategy, options); + } + protected KieSession restoreSession(Long sessionId, String drl, PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy, Option... options) { Long sessionIdToRestoreFrom = this.persistedSessionIds.get(sessionId); return getKieSession(drl, PersistedSessionOption.fromSession(sessionIdToRestoreFrom).withPersistenceStrategy(persistenceStrategy).withSafepointStrategy(safepointStrategy), options); } + protected KieSession restoreSession(Long sessionId, String drl, PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy, PersistedSessionOption.ActivationStrategy activationStrategy, Option... options) { + Long sessionIdToRestoreFrom = this.persistedSessionIds.get(sessionId); + return getKieSession(drl, PersistedSessionOption.fromSession(sessionIdToRestoreFrom).withPersistenceStrategy(persistenceStrategy).withSafepointStrategy(safepointStrategy).withActivationStrategy(activationStrategy), options); + } + protected KieSession restoreSession(Long sessionId, String drl, PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy, boolean useKieBaseCache, Option... options) { Long sessionIdToRestoreFrom = this.persistedSessionIds.get(sessionId); return getKieSession(drl, PersistedSessionOption.fromSession(sessionIdToRestoreFrom).withPersistenceStrategy(persistenceStrategy).withSafepointStrategy(safepointStrategy), useKieBaseCache, options); diff --git a/kie-api/src/main/java/org/kie/api/runtime/conf/PersistedSessionOption.java b/kie-api/src/main/java/org/kie/api/runtime/conf/PersistedSessionOption.java index aa6fa2082eea..c768e4942bca 100644 --- a/kie-api/src/main/java/org/kie/api/runtime/conf/PersistedSessionOption.java +++ b/kie-api/src/main/java/org/kie/api/runtime/conf/PersistedSessionOption.java @@ -36,6 +36,14 @@ public boolean useSafepoints() { } } + /** + * NONE : On restoring a session, filter firing based on StoredObject.isPropagated + * ACTIVATION_KEY : On restoring a session, filter firing based on StoredObject.isPropagated and persisted ActivationKey + */ + public enum ActivationStrategy { + NONE, ACTIVATION_KEY + } + /** * The property name for the clock type configuration */ @@ -47,6 +55,8 @@ public boolean useSafepoints() { private SafepointStrategy safepointStrategy = SafepointStrategy.ALWAYS; + private ActivationStrategy activationStrategy = ActivationStrategy.NONE; + private PersistedSessionOption() { this(-1L); } @@ -92,6 +102,15 @@ public PersistedSessionOption withSafepointStrategy(SafepointStrategy safepointS return this; } + public ActivationStrategy getActivationStrategy() { + return activationStrategy; + } + + public PersistedSessionOption withActivationStrategy(ActivationStrategy activationStrategy) { + this.activationStrategy = activationStrategy; + return this; + } + public boolean isNewSession() { return sessionId < 0; }