From 5a4484d373e0653bb4d60cb974a492b94b0569eb Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Tue, 11 Jun 2024 22:59:58 -0700 Subject: [PATCH] use concurrentmap Signed-off-by: Kiran Prakash --- .../indices/IndicesRequestCache.java | 34 +++++++--- .../indices/IndicesRequestCacheTests.java | 66 +++++++++++++++++-- 2 files changed, 84 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 57f7e402536f2..61f16adcb878d 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -61,6 +61,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.RatioValue; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.StreamInput; @@ -75,7 +76,6 @@ import java.io.IOException; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -175,7 +175,8 @@ public final class IndicesRequestCache implements RemovalListener keysToClean; - private final ConcurrentMap> cleanupKeyToCountMap; + private final ConcurrentMap> cleanupKeyToCountMap; private final AtomicInteger staleKeysCount; private volatile double stalenessThreshold; private final IndicesRequestCacheCleaner cacheCleaner; + private final boolean pluggableCacheEnabled; - IndicesRequestCacheCleanupManager(ThreadPool threadpool, TimeValue cleanInterval, double stalenessThreshold) { + IndicesRequestCacheCleanupManager( + ThreadPool threadpool, + TimeValue cleanInterval, + double stalenessThreshold, + boolean pluggableCacheEnabled + ) { this.stalenessThreshold = stalenessThreshold; this.keysToClean = ConcurrentCollections.newConcurrentSet(); this.cleanupKeyToCountMap = ConcurrentCollections.newConcurrentMap(); this.staleKeysCount = new AtomicInteger(0); this.cacheCleaner = new IndicesRequestCacheCleaner(this, threadpool, cleanInterval); + this.pluggableCacheEnabled = pluggableCacheEnabled; threadpool.schedule(cacheCleaner, cleanInterval, ThreadPool.Names.SAME); } @@ -555,7 +563,7 @@ void enqueueCleanupKey(CleanupKey cleanupKey) { * @param cleanupKey the CleanupKey to be updated in the map */ private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) { - if (cleanupKey.entity == null) { + if (pluggableCacheEnabled || cleanupKey.entity == null) { return; } IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); @@ -567,7 +575,13 @@ private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) { // If the key doesn't exist, it's added with a value of 1. // If the key exists, its value is incremented by 1. - cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new HashMap<>()).merge(cleanupKey.readerCacheKeyId, 1, Integer::sum); + addToCleanupKeyToCountMap(shardId, cleanupKey.readerCacheKeyId); + } + + // pkg-private for testing + void addToCleanupKeyToCountMap(ShardId shardId, String readerCacheKeyId) { + cleanupKeyToCountMap.computeIfAbsent(shardId, k -> ConcurrentCollections.newConcurrentMap()) + .merge(readerCacheKeyId, 1, Integer::sum); } /** @@ -586,7 +600,7 @@ private void updateStaleCountOnEntryRemoval( CleanupKey cleanupKey, RemovalNotification, BytesReference> notification ) { - if (notification.getRemovalReason() == RemovalReason.REPLACED) { + if (pluggableCacheEnabled || notification.getRemovalReason() == RemovalReason.REPLACED) { // The reason of the notification is REPLACED when a cache entry's value is updated, since replacing an entry // does not affect the staleness count, we skip such notifications. return; @@ -646,7 +660,7 @@ private void updateStaleCountOnEntryRemoval( * @param cleanupKey the CleanupKey that has been marked for cleanup */ private void incrementStaleKeysCount(CleanupKey cleanupKey) { - if (cleanupKey.entity == null) { + if (pluggableCacheEnabled || cleanupKey.entity == null) { return; } IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); @@ -788,7 +802,7 @@ private synchronized void cleanCache(double stalenessThreshold) { * @return true if the cache cleanup process can be skipped, false otherwise. */ private synchronized boolean canSkipCacheCleanup(double cleanThresholdPercent) { - if (cleanThresholdPercent == 0.0) { + if (pluggableCacheEnabled || cleanThresholdPercent == 0.0) { return false; } double staleKeysInCachePercentage = staleKeysInCachePercentage(); @@ -825,7 +839,7 @@ public void close() { } // for testing - ConcurrentMap> getCleanupKeyToCountMap() { + ConcurrentMap> getCleanupKeyToCountMap() { return cleanupKeyToCountMap; } diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index dcddd9f3d1318..723967e12304e 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -95,7 +95,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; +import java.util.ConcurrentModificationException; import java.util.List; import java.util.Map; import java.util.Optional; @@ -105,6 +105,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.emptyMap; @@ -488,7 +491,7 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount( indexShard.hashCode() ); // test the mapping - ConcurrentMap> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap(); + ConcurrentMap> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap(); // shard id should exist assertTrue(cleanupKeyToCountMap.containsKey(shardId)); // reader CacheKeyId should NOT exist @@ -551,7 +554,7 @@ public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsS ); // test the mapping - ConcurrentMap> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap(); + ConcurrentMap> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap(); // shard id should exist assertTrue(cleanupKeyToCountMap.containsKey(shardId)); // reader CacheKeyId should NOT exist @@ -719,7 +722,7 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception { cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); assertEquals(1, cache.count()); // test the mappings - ConcurrentMap> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap(); + ConcurrentMap> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap(); assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(reader))); cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); @@ -792,8 +795,55 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception { IOUtils.close(secondReader); } - private DirectoryReader getReader(IndexWriter writer, ShardId shardId) throws IOException { - return OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); + // test adding to cleanupKeyToCountMap with multiple threads + public void testAddToCleanupKeyToCountMap() throws InterruptedException { + threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build(); + cache = getIndicesRequestCache(settings); + + int numberOfThreads = 10; + int numberOfIterations = 1000; + Phaser phaser = new Phaser(numberOfThreads + 1); // +1 for the main thread + AtomicBoolean exceptionDetected = new AtomicBoolean(false); + + ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads); + + for (int i = 0; i < numberOfThreads; i++) { + executorService.submit(() -> { + phaser.arriveAndAwaitAdvance(); // Ensure all threads start at the same time + try { + for (int j = 0; j < numberOfIterations; j++) { + cache.cacheCleanupManager.addToCleanupKeyToCountMap(indexShard.shardId(), UUID.randomUUID().toString()); + } + } catch (ConcurrentModificationException e) { + e.printStackTrace(); + exceptionDetected.set(true); // Set flag if exception is detected + } + }); + } + phaser.arriveAndAwaitAdvance(); // Start all threads + + // Main thread iterates over the map + executorService.submit(() -> { + try { + for (int j = 0; j < numberOfIterations; j++) { + cache.cacheCleanupManager.getCleanupKeyToCountMap().forEach((k, v) -> { + v.forEach((k1, v1) -> { + // Accessing the map to create contention + v.get(k1); + }); + }); + } + } catch (ConcurrentModificationException e) { + e.printStackTrace(); + exceptionDetected.set(true); // Set flag if exception is detected + } + }); + + executorService.shutdown(); + executorService.awaitTermination(60, TimeUnit.SECONDS); + + assertFalse(exceptionDetected.get()); } private IndicesRequestCache getIndicesRequestCache(Settings settings) { @@ -813,6 +863,10 @@ private IndicesRequestCache getIndicesRequestCache(Settings settings) { ); } + private DirectoryReader getReader(IndexWriter writer, ShardId shardId) throws IOException { + return OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); + } + private Loader getLoader(DirectoryReader reader) { return new Loader(reader, 0); }