Skip to content

Commit

Permalink
Introduce PersistedSessionOption.ActivationStrategy
Browse files Browse the repository at this point in the history
  • Loading branch information
tkobayas committed Aug 25, 2023
1 parent 70944a9 commit 1cef22e
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,23 @@ static class StoresOnlySessionInitializer implements SessionInitializer {
@Override
public InternalWorkingMemory init(InternalWorkingMemory session, PersistedSessionOption persistedSessionOption) {

Storage<String, Object> activationsStorage = StorageManagerFactory.get().getStorageManager().getOrCreateStorageForSession(session, "activations");
((ReliableKieSession)session).setActivationsStorage(activationsStorage);
if (persistedSessionOption.getActivationStrategy() == PersistedSessionOption.ActivationStrategy.ACTIVATION_KEY) {
Storage<String, Object> activationsStorage = StorageManagerFactory.get().getStorageManager().getOrCreateStorageForSession(session, "activations");
((ReliableKieSession)session).setActivationsStorage(activationsStorage);
}

if (!persistedSessionOption.isNewSession()) {
// re-propagate objects from the storage to the new session
populateSessionFromStorage(session);
}

activationsStorage.clear();

// These listeners should be added after populateSessionFromStorage()
session.setWorkingMemoryActionListener(entry -> onWorkingMemoryAction(session, entry));
session.getRuleRuntimeEventSupport().addEventListener(new SimpleStoreRuntimeEventListener(session));
session.getAgendaEventSupport().addEventListener(new SimpleStoreAgendaEventListener((ReliableKieSession)session));
if (persistedSessionOption.getActivationStrategy() == PersistedSessionOption.ActivationStrategy.ACTIVATION_KEY) {
((ReliableKieSession)session).getActivationsStorage().clear();
session.getAgendaEventSupport().addEventListener(new SimpleStoreAgendaEventListener((ReliableKieSession)session));
}

return session;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,10 @@ public void setActivationsStorage(Storage<String, Object> activationsStorage) {
@Override
public void safepoint() {
getEntryPoints().stream().map(ReliableNamedEntryPoint.class::cast).forEach(ReliableNamedEntryPoint::safepoint);
if (activationsStorage.requiresFlush()) {
activationsStorage.flush();
if (getSessionConfiguration().getPersistedSessionOption().getActivationStrategy() == PersistedSessionOption.ActivationStrategy.ACTIVATION_KEY) {
if (activationsStorage.requiresFlush()) {
activationsStorage.flush();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.drools.core.ClockType;
Expand All @@ -29,6 +30,8 @@
import org.drools.core.common.InternalWorkingMemoryEntryPoint;
import org.drools.core.common.Storage;
import org.drools.reliability.core.util.ReliabilityUtils;
import org.kie.api.runtime.conf.PersistedSessionOption;
import org.kie.api.runtime.conf.PersistedSessionOption.PersistenceStrategy;

public class SimpleSerializationReliableObjectStore extends IdentityObjectStore implements SimpleReliableObjectStore {

Expand Down Expand Up @@ -96,18 +99,23 @@ private void repropagate(InternalWorkingMemory session, InternalWorkingMemoryEnt
}

private void fireOnlyWhenActivationRemaining(InternalWorkingMemory session, Map<Long, Long> factHandleIdMap) {
// fact handles with a match have been already propagated in the original session, so they shouldn't fire unless remained in activationsStorage
Storage<String, Object> activationsStorage = ((ReliableKieSession)session).getActivationsStorage();
session.fireAllRules(match -> {
String activationKey = ReliabilityUtils.getActivationKeyReplacingNewIdWithOldId(match, factHandleIdMap);
if (activationsStorage.containsKey(activationKey)) {
// If there is a remaining activation, it can fire
activationsStorage.remove(activationKey);
return true;
} else {
return false;
}
});
if (session.getSessionConfiguration().getPersistedSessionOption().getActivationStrategy() == PersistedSessionOption.ActivationStrategy.ACTIVATION_KEY) {
// fact handles with a match have been already propagated in the original session, so they shouldn't fire unless remained in activationsStorage
Storage<String, Object> activationsStorage = ((ReliableKieSession)session).getActivationsStorage();
Set<String> activationKeySet = activationsStorage.keySet();
session.fireAllRules(match -> {
String activationKey = ReliabilityUtils.getActivationKeyReplacingNewIdWithOldId(match, factHandleIdMap);
if (activationKeySet.contains(activationKey)) {
// If there is a remaining activation, it can fire
activationsStorage.remove(activationKey);
return true;
} else {
return false;
}
});
} else {
session.fireAllRules(match -> false);
}
}

private void repropagateWithPseudoClock(InternalWorkingMemory session, InternalWorkingMemoryEntryPoint ep, Map<Long, StoredObject> propagated) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,4 +291,34 @@ void multipleKieSessions_insertAdvanceFireFailoverExpire_shouldExpireAfterFailov
assertThat(getFactHandles(session2)).as("DROO (2) should have expired because @Expires = 60s")
.hasSize(2);
}

@ParameterizedTest
@MethodSource("strategyProviderStoresOnlyWithAllSafepointsWithActivationKey")
void insertFireLimitFailoverFire_shouldFireRemainingActivations(PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy,
PersistedSessionOption.ActivationStrategy activationStrategy) {

createSession(CEP_RULE, persistenceStrategy, safepointStrategy, activationStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO);

SessionPseudoClock clock = getSessionClock();

insert(new StockTick("DROO"));
clock.advanceTime(6, TimeUnit.SECONDS);
insert(new StockTick("ACME"));
clock.advanceTime(1, TimeUnit.SECONDS);
insert(new StockTick("ACME"));
clock.advanceTime(500, TimeUnit.MILLISECONDS);
insert(new StockTick("ACME"));

fireAllRules(1);
assertThat(getResults()).as("Firing is limited to 1")
.hasSize(1);
assertThat(getResults()).containsExactly("fired");

failover();
restoreSession(CEP_RULE, persistenceStrategy, safepointStrategy, activationStrategy, EventProcessingOption.STREAM, ClockTypeOption.PSEUDO);

fireAllRules();
assertThat(getResults()).as("All remaining activations should fire")
.containsExactlyInAnyOrder("fired", "fired", "fired");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -495,9 +495,10 @@ void multipleKieSessions_deleteBeforeFailover_shouldRecoverFromFailover(Persiste
}

@ParameterizedTest
@MethodSource("strategyProviderStoresOnlyWithAllSafepoints")
void insertFireLimitFailoverFire_shouldFireRemainingActivations(PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy) {
createSession(BASIC_RULE, persistenceStrategy, safepointStrategy);
@MethodSource("strategyProviderStoresOnlyWithAllSafepointsWithActivationKey")
void insertFireLimitFailoverFire_shouldFireRemainingActivations(PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy,
PersistedSessionOption.ActivationStrategy activationStrategy) {
createSession(BASIC_RULE, persistenceStrategy, safepointStrategy, activationStrategy);

insert("M");
insertMatchingPerson("Matching Person One");
Expand All @@ -510,9 +511,9 @@ void insertFireLimitFailoverFire_shouldFireRemainingActivations(PersistedSession

failover();

restoreSession(BASIC_RULE, persistenceStrategy, safepointStrategy);
restoreSession(BASIC_RULE, persistenceStrategy, safepointStrategy, activationStrategy);

fireAllRules(10);
fireAllRules();
assertThat(getResults()).as("All remaining activations should fire")
.containsExactlyInAnyOrder("Matching Person One", "Matching Person Two", "Matching Person Three");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ static Stream<Arguments> strategyProviderStoresOnlyWithAllSafepoints() {
);
}

static Stream<Arguments> strategyProviderStoresOnlyWithAllSafepointsWithActivationKey() {
return Stream.of(
arguments(PersistedSessionOption.PersistenceStrategy.STORES_ONLY, PersistedSessionOption.SafepointStrategy.ALWAYS, PersistedSessionOption.ActivationStrategy.ACTIVATION_KEY),
arguments(PersistedSessionOption.PersistenceStrategy.STORES_ONLY, PersistedSessionOption.SafepointStrategy.EXPLICIT, PersistedSessionOption.ActivationStrategy.ACTIVATION_KEY),
arguments(PersistedSessionOption.PersistenceStrategy.STORES_ONLY, PersistedSessionOption.SafepointStrategy.AFTER_FIRE, PersistedSessionOption.ActivationStrategy.ACTIVATION_KEY)
);
}

static boolean isRemoteInfinispan() {
return "INFINISPAN".equalsIgnoreCase(getConfig(DROOLS_RELIABILITY_MODULE_TEST))
&& "REMOTE".equalsIgnoreCase(getConfig(INFINISPAN_STORAGE_MODE));
Expand Down Expand Up @@ -260,6 +268,10 @@ protected KieSession createSession(String drl, PersistedSessionOption.Persistenc
return getKieSession(drl, persistenceStrategy != null ? PersistedSessionOption.newSession().withPersistenceStrategy(persistenceStrategy).withSafepointStrategy(safepointStrategy) : null, options);
}

protected KieSession createSession(String drl, PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy, PersistedSessionOption.ActivationStrategy activationStrategy, Option... options) {
return getKieSession(drl, persistenceStrategy != null ? PersistedSessionOption.newSession().withPersistenceStrategy(persistenceStrategy).withSafepointStrategy(safepointStrategy).withActivationStrategy(activationStrategy) : null, options);
}

protected KieSession createSession(String drl, PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy, boolean useKieBaseCache, Option... options) {
return getKieSession(drl, persistenceStrategy != null ? PersistedSessionOption.newSession().withPersistenceStrategy(persistenceStrategy).withSafepointStrategy(safepointStrategy) : null, useKieBaseCache, options);
}
Expand All @@ -277,11 +289,21 @@ protected KieSession restoreSession(String drl, PersistedSessionOption.Persisten
return restoreSession(sessionIdToRestoreFrom, drl, persistenceStrategy, safepointStrategy, options);
}

protected KieSession restoreSession(String drl, PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy, PersistedSessionOption.ActivationStrategy activationStrategy, Option... options) {
Long sessionIdToRestoreFrom = (Long)this.persistedSessionIds.values().toArray()[0];
return restoreSession(sessionIdToRestoreFrom, drl, persistenceStrategy, safepointStrategy, activationStrategy, options);
}

protected KieSession restoreSession(Long sessionId, String drl, PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy, Option... options) {
Long sessionIdToRestoreFrom = this.persistedSessionIds.get(sessionId);
return getKieSession(drl, PersistedSessionOption.fromSession(sessionIdToRestoreFrom).withPersistenceStrategy(persistenceStrategy).withSafepointStrategy(safepointStrategy), options);
}

protected KieSession restoreSession(Long sessionId, String drl, PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy, PersistedSessionOption.ActivationStrategy activationStrategy, Option... options) {
Long sessionIdToRestoreFrom = this.persistedSessionIds.get(sessionId);
return getKieSession(drl, PersistedSessionOption.fromSession(sessionIdToRestoreFrom).withPersistenceStrategy(persistenceStrategy).withSafepointStrategy(safepointStrategy).withActivationStrategy(activationStrategy), options);
}

protected KieSession restoreSession(Long sessionId, String drl, PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy, boolean useKieBaseCache, Option... options) {
Long sessionIdToRestoreFrom = this.persistedSessionIds.get(sessionId);
return getKieSession(drl, PersistedSessionOption.fromSession(sessionIdToRestoreFrom).withPersistenceStrategy(persistenceStrategy).withSafepointStrategy(safepointStrategy), useKieBaseCache, options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ public boolean useSafepoints() {
}
}

/**
* NONE : On restoring a session, filter firing based on StoredObject.isPropagated
* ACTIVATION_KEY : On restoring a session, filter firing based on StoredObject.isPropagated and persisted ActivationKey
*/
public enum ActivationStrategy {
NONE, ACTIVATION_KEY
}

/**
* The property name for the clock type configuration
*/
Expand All @@ -47,6 +55,8 @@ public boolean useSafepoints() {

private SafepointStrategy safepointStrategy = SafepointStrategy.ALWAYS;

private ActivationStrategy activationStrategy = ActivationStrategy.NONE;

private PersistedSessionOption() {
this(-1L);
}
Expand Down Expand Up @@ -92,6 +102,15 @@ public PersistedSessionOption withSafepointStrategy(SafepointStrategy safepointS
return this;
}

public ActivationStrategy getActivationStrategy() {
return activationStrategy;
}

public PersistedSessionOption withActivationStrategy(ActivationStrategy activationStrategy) {
this.activationStrategy = activationStrategy;
return this;
}

public boolean isNewSession() {
return sessionId < 0;
}
Expand Down

0 comments on commit 1cef22e

Please sign in to comment.