Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tiered Caching] Cache tier policies #12542

Merged
merged 24 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.cache.common.policy;

import org.opensearch.common.unit.TimeValue;

import java.util.function.Function;
import java.util.function.Predicate;

/**
* A cache tier policy which accepts queries whose took time is greater than some threshold.
* The threshold should be set to approximately
* the time it takes to get a result from the cache tier.
peteralfonsi marked this conversation as resolved.
Show resolved Hide resolved
* The policy accepts values of type V and decodes them into CachePolicyInfoWrapper, which has the data needed
peteralfonsi marked this conversation as resolved.
Show resolved Hide resolved
* to decide whether to admit the value.
* @param <V> The type of data consumed by test().
*/
public class TookTimePolicy<V> implements Predicate<V> {
/**
* The minimum took time to allow a query. Set to TimeValue.ZERO to let all data through.
*/
private final TimeValue threshold;

/**
* Function which extracts took time in nanoseconds from a serialized CachedQueryResult
*/
private final Function<V, Long> cachedResultParser; //

/**
* Constructs a took time policy.
* @param threshold the threshold
* @param cachedResultParser the function providing took time
*/
public TookTimePolicy(TimeValue threshold, Function<V, Long> cachedResultParser) {
this.threshold = threshold;
this.cachedResultParser = cachedResultParser;
}

/**
* Check whether to admit data.
* @param data the input argument
* @return whether to admit the data
*/
public boolean test(V data) {
Long tookTimeNanos;
try {
tookTimeNanos = cachedResultParser.apply(data);
} catch (Exception e) {
// If we can't read a CachePolicyInfoWrapper from the BytesReference, reject the data
return false;
}

if (tookTimeNanos == null) {
// If the wrapper contains null took time, reject the data
// This can happen if no CachePolicyInfoWrapper was written to the BytesReference, as the wrapper's constructor
// reads an optional long, which will end up as null in this case. This is why we should reject it.
peteralfonsi marked this conversation as resolved.
Show resolved Hide resolved
return false;
}
TimeValue tookTime = TimeValue.timeValueNanos(tookTimeNanos);
if (tookTime.compareTo(threshold) < 0) { // negative -> tookTime is shorter than threshold
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.cache.common.tier;

import org.opensearch.cache.common.policy.TookTimePolicy;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
Expand All @@ -17,17 +18,20 @@
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.common.util.iterable.Iterables;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.Predicate;

/**
* This cache spillover the evicted items from heap tier to disk tier. All the new items are first cached on heap
Expand All @@ -52,6 +56,7 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
* Maintains caching tiers in ascending order of cache latency.
*/
private final List<ICache<K, V>> cacheList;
private final List<Predicate<V>> policies;

TieredSpilloverCache(Builder<K, V> builder) {
Objects.requireNonNull(builder.onHeapCacheFactory, "onHeap cache builder can't be null");
Expand All @@ -63,7 +68,9 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
@Override
public void onRemoval(RemovalNotification<K, V> notification) {
try (ReleasableLock ignore = writeLock.acquire()) {
diskCache.put(notification.getKey(), notification.getValue());
if (evaluatePolicies(notification.getValue())) {
diskCache.put(notification.getKey(), notification.getValue());
}
}
removalListener.onRemoval(notification);
}
Expand All @@ -79,6 +86,8 @@ public void onRemoval(RemovalNotification<K, V> notification) {
);
this.diskCache = builder.diskCacheFactory.create(builder.cacheConfig, builder.cacheType, builder.cacheFactories);
this.cacheList = Arrays.asList(onHeapCache, diskCache);

this.policies = builder.policies; // Will never be null; builder initializes it to an empty list
}

// Package private for testing
Expand Down Expand Up @@ -193,6 +202,15 @@ private Function<K, V> getValueFromTieredCache() {
};
}

boolean evaluatePolicies(V value) {
for (Predicate<V> policy : policies) {
if (!policy.test(value)) {
return false;
}
}
return true;
}

/**
* Factory to create TieredSpilloverCache objects.
*/
Expand Down Expand Up @@ -232,11 +250,21 @@ public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType,
);
}
ICache.Factory diskCacheFactory = cacheFactories.get(diskCacheStoreName);

TimeValue diskPolicyThreshold = TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOKTIME_THRESHOLD
.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
.get(settings);
Function<V, Long> cachedResultParser = Objects.requireNonNull(
config.getCachedResultParser(),
"Cached result parser fn can't be null"
);

return new Builder<K, V>().setDiskCacheFactory(diskCacheFactory)
.setOnHeapCacheFactory(onHeapCacheFactory)
.setRemovalListener(config.getRemovalListener())
.setCacheConfig(config)
.setCacheType(cacheType)
.addPolicy(new TookTimePolicy<V>(diskPolicyThreshold, cachedResultParser))
peteralfonsi marked this conversation as resolved.
Show resolved Hide resolved
.build();
}

Expand All @@ -258,6 +286,7 @@ public static class Builder<K, V> {
private CacheConfig<K, V> cacheConfig;
private CacheType cacheType;
private Map<String, ICache.Factory> cacheFactories;
private final ArrayList<Predicate<V>> policies = new ArrayList<>();

/**
* Default constructor
Expand Down Expand Up @@ -324,6 +353,26 @@ public Builder<K, V> setCacheFactories(Map<String, ICache.Factory> cacheFactorie
return this;
}

/**
* Set a cache policy to be used to limit access to this cache's disk tier.
* @param policy the policy
* @return builder
*/
public Builder<K, V> addPolicy(Predicate<V> policy) {
this.policies.add(policy);
return this;
}

/**
* Set multiple policies to be used to limit access to this cache's disk tier.
* @param policies the policies
* @return builder
*/
public Builder<K, V> addPolicies(List<Predicate<V>> policies) {
this.policies.addAll(policies);
return this;
}

/**
* Build tiered spillover cache.
* @return TieredSpilloverCache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public List<Setting<?>> getSettings() {
settingList.add(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
);
settingList.add(
peteralfonsi marked this conversation as resolved.
Show resolved Hide resolved
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOKTIME_THRESHOLD.getConcreteSettingForNamespace(
cacheType.getSettingPrefix()
)
);
}
return settingList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
package org.opensearch.cache.common.tier;

import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.TimeValue;

import java.util.concurrent.TimeUnit;

import static org.opensearch.common.settings.Setting.Property.Dynamic;
import static org.opensearch.common.settings.Setting.Property.NodeScope;

/**
Expand All @@ -36,6 +40,16 @@ public class TieredSpilloverCacheSettings {
(key) -> Setting.simpleString(key, "", NodeScope)
);

/**
* Setting defining the minimum took time for a query to be allowed into the disk cache.
*/
public static final Setting.AffixSetting<TimeValue> TIERED_SPILLOVER_DISK_TOOKTIME_THRESHOLD = Setting.suffixKeySetting(
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.took_time.threshold",
peteralfonsi marked this conversation as resolved.
Show resolved Hide resolved
(key) -> Setting.timeSetting(key, new TimeValue(10, TimeUnit.MILLISECONDS), NodeScope, Dynamic)
);
// 10 ms was chosen as a safe value based on proof of concept, where we saw disk latencies in this range.
// Will be tuned further with future benchmarks.

/**
* Default constructor
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cache.common.policy;

import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHits;
import org.opensearch.common.Randomness;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.search.TopDocsAndMaxScore;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.util.Random;
import java.util.function.Function;

public class TookTimePolicyTests extends OpenSearchTestCase {
private final Function<BytesReference, Long> transformationFunction = (data) -> {
try {
return CachedQueryResult.getTookTimeNanos(data);
} catch (IOException e) {
throw new RuntimeException(e);
}
};

private TookTimePolicy<BytesReference> getTookTimePolicy(TimeValue threshold) {
return new TookTimePolicy<>(threshold, transformationFunction);
}

public void testTookTimePolicy() throws Exception {
double threshMillis = 10;
long shortMillis = (long) (0.9 * threshMillis);
long longMillis = (long) (1.5 * threshMillis);
TookTimePolicy<BytesReference> tookTimePolicy = getTookTimePolicy(new TimeValue((long) threshMillis));
BytesReference shortTime = getValidPolicyInput(shortMillis * 1000000);
BytesReference longTime = getValidPolicyInput(longMillis * 1000000);

boolean shortResult = tookTimePolicy.test(shortTime);
assertFalse(shortResult);
boolean longResult = tookTimePolicy.test(longTime);
assertTrue(longResult);

TookTimePolicy<BytesReference> disabledPolicy = getTookTimePolicy(TimeValue.ZERO);
shortResult = disabledPolicy.test(shortTime);
assertTrue(shortResult);
longResult = disabledPolicy.test(longTime);
assertTrue(longResult);
}

public void testMissingWrapper() throws Exception {
TookTimePolicy<BytesReference> tookTimePolicy = getTookTimePolicy(TimeValue.ZERO);
BytesStreamOutput out = new BytesStreamOutput();
getQSR().writeToNoId(out);
BytesReference missingWrapper = out.bytes();
boolean allowedMissingWrapper = tookTimePolicy.test(missingWrapper);
assertFalse(allowedMissingWrapper);
}

private BytesReference getValidPolicyInput(Long tookTimeNanos) throws IOException {
// When it's used in the cache, the policy will receive BytesReferences which have a CachePolicyInfoWrapper
// at the beginning of them, followed by the actual QSR.
CachedQueryResult cachedQueryResult = new CachedQueryResult(getQSR(), tookTimeNanos);
BytesStreamOutput out = new BytesStreamOutput();
cachedQueryResult.writeToNoId(out);
return out.bytes();
}

private QuerySearchResult getQSR() {
// We can't mock the QSR with mockito because the class is final. Construct a real one
QuerySearchResult mockQSR = new QuerySearchResult();

// duplicated from DfsQueryPhaseTests.java
mockQSR.topDocs(
new TopDocsAndMaxScore(
new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(42, 1.0F) }),
2.0F
),
new DocValueFormat[0]
);
return mockQSR;
}

private void writeRandomBytes(StreamOutput out, int numBytes) throws IOException {
Random rand = Randomness.get();
byte[] bytes = new byte[numBytes];
rand.nextBytes(bytes);
out.writeBytes(bytes);
}
}
Loading
Loading