diff --git a/libs/core/src/main/java/org/opensearch/core/index/shard/ShardId.java b/libs/core/src/main/java/org/opensearch/core/index/shard/ShardId.java index 984434190b486..9e8b67eb8fd0d 100644 --- a/libs/core/src/main/java/org/opensearch/core/index/shard/ShardId.java +++ b/libs/core/src/main/java/org/opensearch/core/index/shard/ShardId.java @@ -32,6 +32,7 @@ package org.opensearch.core.index.shard; +import org.apache.lucene.util.RamUsageEstimator; import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.Strings; import org.opensearch.core.common.io.stream.StreamInput; @@ -55,6 +56,8 @@ public class ShardId implements Comparable, ToXContentFragment, Writeab private final int shardId; private final int hashCode; + private final static long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(ShardId.class); + /** * Constructs a new shard id. * @param index the index name @@ -88,6 +91,10 @@ public ShardId(StreamInput in) throws IOException { hashCode = computeHashCode(); } + public long getBaseRamBytesUsed() { + return BASE_RAM_BYTES_USED; + } + /** * Writes this shard id to a stream. * @param out the stream to write to diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 98a22717019cf..a1815d9be2daf 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -634,6 +634,45 @@ public void testProfileDisableCache() throws Exception { } } + public void testCacheWithInvalidation() throws Exception { + Client client = client(); + assertAcked( + client.admin() + .indices() + .prepareCreate("index") + .setMapping("k", "type=keyword") + .setSettings( + Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + .get() + ); + indexRandom(true, client.prepareIndex("index").setSource("k", "hello")); + ensureSearchable("index"); + SearchResponse resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); + assertSearchResponse(resp); + OpenSearchAssertions.assertAllSuccessful(resp); + assertThat(resp.getHits().getTotalHits().value, equalTo(1L)); + + assertCacheState(client, "index", 0, 1); + // Index but don't refresh + indexRandom(false, client.prepareIndex("index").setSource("k", "hello2")); + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); + assertSearchResponse(resp); + // Should expect hit as here as refresh didn't happen + assertCacheState(client, "index", 1, 1); + + // Explicit refresh would invalidate cache + refresh(); + // Hit same query again + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); + assertSearchResponse(resp); + // Should expect miss as key has changed due to change in IndexReader.CacheKey (due to refresh) + assertCacheState(client, "index", 1, 2); + } + private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) { RequestCacheStats requestCacheStats = client.admin() .indices() @@ -648,6 +687,7 @@ private static void assertCacheState(Client client, String index, long expectedH Arrays.asList(expectedHits, expectedMisses, 0L), Arrays.asList(requestCacheStats.getHitCount(), requestCacheStats.getMissCount(), requestCacheStats.getEvictions()) ); + } } diff --git a/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java b/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java index 77609822d3d90..25224b946b784 100644 --- a/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java +++ b/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java @@ -39,6 +39,8 @@ import org.opensearch.core.index.shard.ShardId; import java.io.IOException; +import java.util.Optional; +import java.util.UUID; /** * A {@link org.apache.lucene.index.FilterDirectoryReader} that exposes @@ -51,11 +53,14 @@ public final class OpenSearchDirectoryReader extends FilterDirectoryReader { private final ShardId shardId; private final FilterDirectoryReader.SubReaderWrapper wrapper; + private final DelegatingCacheHelper delegatingCacheHelper; + private OpenSearchDirectoryReader(DirectoryReader in, FilterDirectoryReader.SubReaderWrapper wrapper, ShardId shardId) throws IOException { super(in, wrapper); this.wrapper = wrapper; this.shardId = shardId; + this.delegatingCacheHelper = new DelegatingCacheHelper(in.getReaderCacheHelper()); } /** @@ -68,7 +73,61 @@ public ShardId shardId() { @Override public CacheHelper getReaderCacheHelper() { // safe to delegate since this reader does not alter the index - return in.getReaderCacheHelper(); + return this.delegatingCacheHelper; + } + + public DelegatingCacheHelper getDelegatingCacheHelper() { + return this.delegatingCacheHelper; + } + + /** + * Wraps existing IndexReader cache helper which internally provides a way to wrap CacheKey. + * @opensearch.internal + */ + public class DelegatingCacheHelper implements CacheHelper { + private final CacheHelper cacheHelper; + private final DelegatingCacheKey serializableCacheKey; + + DelegatingCacheHelper(CacheHelper cacheHelper) { + this.cacheHelper = cacheHelper; + this.serializableCacheKey = new DelegatingCacheKey(Optional.ofNullable(cacheHelper).map(key -> getKey()).orElse(null)); + } + + @Override + public CacheKey getKey() { + return this.cacheHelper.getKey(); + } + + public DelegatingCacheKey getDelegatingCacheKey() { + return this.serializableCacheKey; + } + + @Override + public void addClosedListener(ClosedListener listener) { + this.cacheHelper.addClosedListener(listener); + } + } + + /** + * Wraps internal IndexReader.CacheKey and attaches a uniqueId to it which can be eventually be used instead of + * object itself for serialization purposes. + */ + public class DelegatingCacheKey { + private final CacheKey cacheKey; + private final String uniqueId; + + DelegatingCacheKey(CacheKey cacheKey) { + this.cacheKey = cacheKey; + this.uniqueId = UUID.randomUUID().toString(); + } + + public CacheKey getCacheKey() { + return this.cacheKey; + } + + public String getId() { + return uniqueId; + } } @Override diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 4a19f8eb8714d..6d5c23274dbd6 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -51,7 +51,12 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.shard.IndexShard; import java.io.Closeable; import java.io.IOException; @@ -60,8 +65,10 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.function.Function; /** * The indices request cache allows to cache a shard level request stage responses, helping with improving @@ -103,13 +110,16 @@ public final class IndicesRequestCache implements RemovalListener registeredClosedListeners = ConcurrentCollections.newConcurrentMap(); private final Set keysToClean = ConcurrentCollections.newConcurrentSet(); private final ByteSizeValue size; private final TimeValue expire; private final Cache cache; + private final Function> cacheEntityLookup; - IndicesRequestCache(Settings settings) { + IndicesRequestCache(Settings settings, Function> cacheEntityFunction) { this.size = INDICES_CACHE_QUERY_SIZE.get(settings); this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null; long sizeInBytes = size.getBytes(); @@ -121,6 +131,7 @@ public final class IndicesRequestCache implements RemovalListener notification) { - notification.getKey().entity.onRemoval(notification); + // In case this event happens for an old shard, we can safely ignore this as we don't keep track for old + // shards as part of request cache. + cacheEntityLookup.apply(notification.getKey().shardId).ifPresent(entity -> entity.onRemoval(notification)); } BytesReference getOrCompute( - CacheEntity cacheEntity, + IndicesService.IndexShardCacheEntity cacheEntity, CheckedSupplier loader, DirectoryReader reader, BytesReference cacheKey ) throws Exception { assert reader.getReaderCacheHelper() != null; - final Key key = new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey); + assert reader.getReaderCacheHelper() instanceof OpenSearchDirectoryReader.DelegatingCacheHelper; + + OpenSearchDirectoryReader.DelegatingCacheHelper delegatingCacheHelper = (OpenSearchDirectoryReader.DelegatingCacheHelper) reader + .getReaderCacheHelper(); + String readerCacheKeyId = delegatingCacheHelper.getDelegatingCacheKey().getId(); + assert readerCacheKeyId != null; + final Key key = new Key(((IndexShard) cacheEntity.getCacheIdentity()).shardId(), cacheKey, readerCacheKeyId); Loader cacheLoader = new Loader(cacheEntity, loader); BytesReference value = cache.computeIfAbsent(key, cacheLoader); if (cacheLoader.isLoaded()) { - key.entity.onMiss(); + cacheEntity.onMiss(); // see if its the first time we see this reader, and make sure to register a cleanup key - CleanupKey cleanupKey = new CleanupKey(cacheEntity, reader.getReaderCacheHelper().getKey()); + CleanupKey cleanupKey = new CleanupKey(cacheEntity, readerCacheKeyId); if (!registeredClosedListeners.containsKey(cleanupKey)) { Boolean previous = registeredClosedListeners.putIfAbsent(cleanupKey, Boolean.TRUE); if (previous == null) { @@ -159,7 +178,7 @@ BytesReference getOrCompute( } } } else { - key.entity.onHit(); + cacheEntity.onHit(); } return value; } @@ -170,9 +189,14 @@ BytesReference getOrCompute( * @param reader the reader to invalidate the cache entry for * @param cacheKey the cache key to invalidate */ - void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) { + void invalidate(IndicesService.IndexShardCacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) { assert reader.getReaderCacheHelper() != null; - cache.invalidate(new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey)); + String readerCacheKeyId = null; + if (reader instanceof OpenSearchDirectoryReader) { + IndexReader.CacheHelper cacheHelper = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper(); + readerCacheKeyId = ((OpenSearchDirectoryReader.DelegatingCacheHelper) cacheHelper).getDelegatingCacheKey().getId(); + } + cache.invalidate(new Key(((IndexShard) cacheEntity.getCacheIdentity()).shardId(), cacheKey, readerCacheKeyId)); } /** @@ -240,6 +264,7 @@ interface CacheEntity extends Accountable { * Called when this entity instance is removed */ void onRemoval(RemovalNotification notification); + } /** @@ -247,22 +272,26 @@ interface CacheEntity extends Accountable { * * @opensearch.internal */ - public static class Key implements Accountable { - private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class); - - public final CacheEntity entity; // use as identity equality - public final IndexReader.CacheKey readerCacheKey; + static class Key implements Accountable, Writeable { + public final ShardId shardId; // use as identity equality + public final String readerCacheKeyId; public final BytesReference value; - Key(CacheEntity entity, IndexReader.CacheKey readerCacheKey, BytesReference value) { - this.entity = entity; - this.readerCacheKey = Objects.requireNonNull(readerCacheKey); + Key(ShardId shardId, BytesReference value, String readerCacheKeyId) { + this.shardId = shardId; this.value = value; + this.readerCacheKeyId = Objects.requireNonNull(readerCacheKeyId); + } + + Key(StreamInput in) throws IOException { + this.shardId = in.readOptionalWriteable(ShardId::new); + this.readerCacheKeyId = in.readOptionalString(); + this.value = in.readBytesReference(); } @Override public long ramBytesUsed() { - return BASE_RAM_BYTES_USED + entity.ramBytesUsed() + value.length(); + return BASE_RAM_BYTES_USED + shardId.getBaseRamBytesUsed() + value.length(); } @Override @@ -276,28 +305,35 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Key key = (Key) o; - if (Objects.equals(readerCacheKey, key.readerCacheKey) == false) return false; - if (!entity.getCacheIdentity().equals(key.entity.getCacheIdentity())) return false; + if (!Objects.equals(readerCacheKeyId, key.readerCacheKeyId)) return false; + if (!shardId.equals(key.shardId)) return false; if (!value.equals(key.value)) return false; return true; } @Override public int hashCode() { - int result = entity.getCacheIdentity().hashCode(); - result = 31 * result + readerCacheKey.hashCode(); + int result = shardId.hashCode(); + result = 31 * result + readerCacheKeyId.hashCode(); result = 31 * result + value.hashCode(); return result; } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalWriteable(shardId); + out.writeOptionalString(readerCacheKeyId); + out.writeBytesReference(value); + } } private class CleanupKey implements IndexReader.ClosedListener { final CacheEntity entity; - final IndexReader.CacheKey readerCacheKey; + final String readerCacheKeyId; - private CleanupKey(CacheEntity entity, IndexReader.CacheKey readerCacheKey) { + private CleanupKey(CacheEntity entity, String readerCacheKeyId) { this.entity = entity; - this.readerCacheKey = readerCacheKey; + this.readerCacheKeyId = readerCacheKeyId; } @Override @@ -315,7 +351,7 @@ public boolean equals(Object o) { return false; } CleanupKey that = (CleanupKey) o; - if (Objects.equals(readerCacheKey, that.readerCacheKey) == false) return false; + if (!Objects.equals(readerCacheKeyId, that.readerCacheKeyId)) return false; if (!entity.getCacheIdentity().equals(that.entity.getCacheIdentity())) return false; return true; } @@ -323,7 +359,7 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = entity.getCacheIdentity().hashCode(); - result = 31 * result + Objects.hashCode(readerCacheKey); + result = 31 * result + Objects.hashCode(readerCacheKeyId); return result; } } @@ -339,9 +375,9 @@ synchronized void cleanCache() { for (Iterator iterator = keysToClean.iterator(); iterator.hasNext();) { CleanupKey cleanupKey = iterator.next(); iterator.remove(); - if (cleanupKey.readerCacheKey == null || cleanupKey.entity.isOpen() == false) { + if (cleanupKey.readerCacheKeyId == null || !cleanupKey.entity.isOpen()) { // null indicates full cleanup, as does a closed shard - currentFullClean.add(cleanupKey.entity.getCacheIdentity()); + currentFullClean.add(((IndexShard) cleanupKey.entity.getCacheIdentity()).shardId()); } else { currentKeysToClean.add(cleanupKey); } @@ -349,10 +385,13 @@ synchronized void cleanCache() { if (!currentKeysToClean.isEmpty() || !currentFullClean.isEmpty()) { for (Iterator iterator = cache.keys().iterator(); iterator.hasNext();) { Key key = iterator.next(); - if (currentFullClean.contains(key.entity.getCacheIdentity())) { + if (currentFullClean.contains(key.shardId)) { iterator.remove(); } else { - if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerCacheKey))) { + // If the flow comes here, then we should have a open shard available on node. + if (currentKeysToClean.contains( + new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId) + )) { iterator.remove(); } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 01bff4c417222..00f13948709cf 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -396,7 +396,13 @@ public IndicesService( this.shardsClosedTimeout = settings.getAsTime(INDICES_SHARDS_CLOSED_TIMEOUT, new TimeValue(1, TimeUnit.DAYS)); this.analysisRegistry = analysisRegistry; this.indexNameExpressionResolver = indexNameExpressionResolver; - this.indicesRequestCache = new IndicesRequestCache(settings); + this.indicesRequestCache = new IndicesRequestCache(settings, (shardId -> { + IndexService indexService = this.indices.get(shardId.getIndex().getUUID()); + if (indexService == null) { + return Optional.empty(); + } + return Optional.of(new IndexShardCacheEntity(indexService.getShard(shardId.id()))); + })); this.indicesQueryCache = new IndicesQueryCache(settings); this.mapperRegistry = mapperRegistry; this.namedWriteableRegistry = namedWriteableRegistry; @@ -1729,7 +1735,6 @@ private BytesReference cacheShardLevelResult( BytesReference cacheKey, CheckedConsumer loader ) throws Exception { - IndexShardCacheEntity cacheEntity = new IndexShardCacheEntity(shard); CheckedSupplier supplier = () -> { /* BytesStreamOutput allows to pass the expected size but by default uses * BigArrays.PAGE_SIZE_IN_BYTES which is 16k. A common cached result ie. @@ -1746,7 +1751,7 @@ private BytesReference cacheShardLevelResult( return out.bytes(); } }; - return indicesRequestCache.getOrCompute(cacheEntity, supplier, reader, cacheKey); + return indicesRequestCache.getOrCompute(new IndexShardCacheEntity(shard), supplier, reader, cacheKey); } /** @@ -1754,11 +1759,12 @@ private BytesReference cacheShardLevelResult( * * @opensearch.internal */ - static final class IndexShardCacheEntity extends AbstractIndexShardCacheEntity { + public static class IndexShardCacheEntity extends AbstractIndexShardCacheEntity { + private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(IndexShardCacheEntity.class); private final IndexShard indexShard; - protected IndexShardCacheEntity(IndexShard indexShard) { + public IndexShardCacheEntity(IndexShard indexShard) { this.indexShard = indexShard; } diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 8494259c8fd8a..73728aec12e51 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -52,23 +52,36 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.common.bytes.AbstractBytesReference; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.XContentHelper; +import org.opensearch.index.IndexNotFoundException; +import org.opensearch.index.IndexService; +import org.opensearch.index.cache.request.RequestCacheStats; import org.opensearch.index.cache.request.ShardRequestCache; import org.opensearch.index.query.TermQueryBuilder; -import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardState; +import org.opensearch.test.OpenSearchSingleNodeTestCase; import java.io.IOException; import java.util.Arrays; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Optional; +import java.util.UUID; -public class IndicesRequestCacheTests extends OpenSearchTestCase { +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class IndicesRequestCacheTests extends OpenSearchSingleNodeTestCase { public void testBasicOperationsCache() throws Exception { - ShardRequestCache requestCacheStats = new ShardRequestCache(); - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + IndexShard indexShard = createIndex("test").getShard(0); + IndicesRequestCache cache = new IndicesRequestCache( + Settings.EMPTY, + (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))) + ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -76,13 +89,13 @@ public void testBasicOperationsCache() throws Exception { DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - AtomicBoolean indexShard = new AtomicBoolean(true); // initial cache - TestEntity entity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value.streamInput().readString()); + ShardRequestCache requestCacheStats = indexShard.requestCache(); assertEquals(0, requestCacheStats.stats().getHitCount()); assertEquals(1, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -90,10 +103,11 @@ public void testBasicOperationsCache() throws Exception { assertEquals(1, cache.count()); // cache hit - entity = new TestEntity(requestCacheStats, indexShard); + entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); value = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value.streamInput().readString()); + requestCacheStats = indexShard.requestCache(); assertEquals(1, requestCacheStats.stats().getHitCount()); assertEquals(1, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -106,7 +120,7 @@ public void testBasicOperationsCache() throws Exception { if (randomBoolean()) { reader.close(); } else { - indexShard.set(false); // closed shard but reader is still open + indexShard.close("test", true, true); // closed shard but reader is still open cache.clear(entity); } cache.cleanCache(); @@ -122,9 +136,17 @@ public void testBasicOperationsCache() throws Exception { } public void testCacheDifferentReaders() throws Exception { - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); - AtomicBoolean indexShard = new AtomicBoolean(true); - ShardRequestCache requestCacheStats = new ShardRequestCache(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexShard indexShard = createIndex("test").getShard(0); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { + IndexService indexService = null; + try { + indexService = indicesService.indexServiceSafe(shardId.getIndex()); + } catch (IndexNotFoundException ex) { + return Optional.empty(); + } + return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); + })); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -141,9 +163,10 @@ public void testCacheDifferentReaders() throws Exception { DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); // initial cache - TestEntity entity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); + ShardRequestCache requestCacheStats = entity.stats(); assertEquals("foo", value.streamInput().readString()); assertEquals(0, requestCacheStats.stats().getHitCount()); assertEquals(1, requestCacheStats.stats().getMissCount()); @@ -155,9 +178,10 @@ public void testCacheDifferentReaders() throws Exception { assertEquals(1, cache.numRegisteredCloseListeners()); // cache the second - TestEntity secondEntity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(secondReader, 0); value = cache.getOrCompute(entity, loader, secondReader, termBytes); + requestCacheStats = entity.stats(); assertEquals("bar", value.streamInput().readString()); assertEquals(0, requestCacheStats.stats().getHitCount()); assertEquals(2, requestCacheStats.stats().getMissCount()); @@ -167,9 +191,10 @@ public void testCacheDifferentReaders() throws Exception { assertTrue(requestCacheStats.stats().getMemorySize().bytesAsInt() > cacheSize + value.length()); assertEquals(2, cache.numRegisteredCloseListeners()); - secondEntity = new TestEntity(requestCacheStats, indexShard); + secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(secondReader, 0); value = cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + requestCacheStats = entity.stats(); assertEquals("bar", value.streamInput().readString()); assertEquals(1, requestCacheStats.stats().getHitCount()); assertEquals(2, requestCacheStats.stats().getMissCount()); @@ -177,10 +202,11 @@ public void testCacheDifferentReaders() throws Exception { assertTrue(loader.loadedFromCache); assertEquals(2, cache.count()); - entity = new TestEntity(requestCacheStats, indexShard); + entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); value = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value.streamInput().readString()); + requestCacheStats = entity.stats(); assertEquals(2, requestCacheStats.stats().getHitCount()); assertEquals(2, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -201,7 +227,7 @@ public void testCacheDifferentReaders() throws Exception { if (randomBoolean()) { secondReader.close(); } else { - indexShard.set(false); // closed shard but reader is still open + indexShard.close("test", true, true); // closed shard but reader is still open cache.clear(secondEntity); } cache.cleanCache(); @@ -218,9 +244,11 @@ public void testCacheDifferentReaders() throws Exception { public void testEviction() throws Exception { final ByteSizeValue size; { - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); - AtomicBoolean indexShard = new AtomicBoolean(true); - ShardRequestCache requestCacheStats = new ShardRequestCache(); + IndexShard indexShard = createIndex("test").getShard(0); + IndicesRequestCache cache = new IndicesRequestCache( + Settings.EMPTY, + (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))) + ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -228,26 +256,26 @@ public void testEviction() throws Exception { DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - TestEntity entity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TestEntity secondEntity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); Loader secondLoader = new Loader(secondReader, 0); BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value1.streamInput().readString()); BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes); assertEquals("bar", value2.streamInput().readString()); - size = requestCacheStats.stats().getMemorySize(); + size = indexShard.requestCache().stats().getMemorySize(); IOUtils.close(reader, secondReader, writer, dir, cache); } + IndexShard indexShard = createIndex("test1").getShard(0); IndicesRequestCache cache = new IndicesRequestCache( - Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), size.getBytes() + 1 + "b").build() + Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), size.getBytes() + 1 + "b").build(), + (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))) ); - AtomicBoolean indexShard = new AtomicBoolean(true); - ShardRequestCache requestCacheStats = new ShardRequestCache(); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -255,36 +283,44 @@ public void testEviction() throws Exception { DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - TestEntity entity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TestEntity secondEntity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); Loader secondLoader = new Loader(secondReader, 0); writer.updateDocument(new Term("id", "0"), newDoc(0, "baz")); DirectoryReader thirdReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TestEntity thirddEntity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity thirddEntity = new IndicesService.IndexShardCacheEntity(indexShard); Loader thirdLoader = new Loader(thirdReader, 0); BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value1.streamInput().readString()); BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes); assertEquals("bar", value2.streamInput().readString()); - logger.info("Memory size: {}", requestCacheStats.stats().getMemorySize()); + logger.info("Memory size: {}", indexShard.requestCache().stats().getMemorySize()); BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes); assertEquals("baz", value3.streamInput().readString()); assertEquals(2, cache.count()); - assertEquals(1, requestCacheStats.stats().getEvictions()); + assertEquals(1, indexShard.requestCache().stats().getEvictions()); IOUtils.close(reader, secondReader, thirdReader, writer, dir, cache); } public void testClearAllEntityIdentity() throws Exception { - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); - AtomicBoolean indexShard = new AtomicBoolean(true); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexShard indexShard = createIndex("test").getShard(0); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { + IndexService indexService = null; + try { + indexService = indicesService.indexServiceSafe(shardId.getIndex()); + } catch (IndexNotFoundException ex) { + return Optional.empty(); + } + return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); + })); - ShardRequestCache requestCacheStats = new ShardRequestCache(); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -292,36 +328,39 @@ public void testClearAllEntityIdentity() throws Exception { DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - TestEntity entity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TestEntity secondEntity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); Loader secondLoader = new Loader(secondReader, 0); writer.updateDocument(new Term("id", "0"), newDoc(0, "baz")); DirectoryReader thirdReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - AtomicBoolean differentIdentity = new AtomicBoolean(true); - TestEntity thirddEntity = new TestEntity(requestCacheStats, differentIdentity); + IndicesService.IndexShardCacheEntity thirddEntity = new IndicesService.IndexShardCacheEntity(createIndex("test1").getShard(0)); Loader thirdLoader = new Loader(thirdReader, 0); BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value1.streamInput().readString()); BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes); assertEquals("bar", value2.streamInput().readString()); - logger.info("Memory size: {}", requestCacheStats.stats().getMemorySize()); + logger.info("Memory size: {}", indexShard.requestCache().stats().getMemorySize()); BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes); assertEquals("baz", value3.streamInput().readString()); assertEquals(3, cache.count()); - final long hitCount = requestCacheStats.stats().getHitCount(); + RequestCacheStats requestCacheStats = entity.stats().stats(); + requestCacheStats.add(thirddEntity.stats().stats()); + final long hitCount = requestCacheStats.getHitCount(); // clear all for the indexShard Idendity even though is't still open cache.clear(randomFrom(entity, secondEntity)); cache.cleanCache(); assertEquals(1, cache.count()); // third has not been validated since it's a different identity value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes); - assertEquals(hitCount + 1, requestCacheStats.stats().getHitCount()); + requestCacheStats = entity.stats().stats(); + requestCacheStats.add(thirddEntity.stats().stats()); + assertEquals(hitCount + 1, requestCacheStats.getHitCount()); assertEquals("baz", value3.streamInput().readString()); IOUtils.close(reader, secondReader, thirdReader, writer, dir, cache); @@ -365,8 +404,17 @@ public BytesReference get() { } public void testInvalidate() throws Exception { - ShardRequestCache requestCacheStats = new ShardRequestCache(); - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexShard indexShard = createIndex("test").getShard(0); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { + IndexService indexService = null; + try { + indexService = indicesService.indexServiceSafe(shardId.getIndex()); + } catch (IndexNotFoundException ex) { + return Optional.empty(); + } + return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); + })); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -374,13 +422,13 @@ public void testInvalidate() throws Exception { DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - AtomicBoolean indexShard = new AtomicBoolean(true); // initial cache - TestEntity entity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value.streamInput().readString()); + ShardRequestCache requestCacheStats = entity.stats(); assertEquals(0, requestCacheStats.stats().getHitCount()); assertEquals(1, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -388,10 +436,11 @@ public void testInvalidate() throws Exception { assertEquals(1, cache.count()); // cache hit - entity = new TestEntity(requestCacheStats, indexShard); + entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); value = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value.streamInput().readString()); + requestCacheStats = entity.stats(); assertEquals(1, requestCacheStats.stats().getHitCount()); assertEquals(1, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -401,11 +450,12 @@ public void testInvalidate() throws Exception { assertEquals(1, cache.numRegisteredCloseListeners()); // load again after invalidate - entity = new TestEntity(requestCacheStats, indexShard); + entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); cache.invalidate(entity, reader, termBytes); value = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value.streamInput().readString()); + requestCacheStats = entity.stats(); assertEquals(1, requestCacheStats.stats().getHitCount()); assertEquals(2, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -418,7 +468,7 @@ public void testInvalidate() throws Exception { if (randomBoolean()) { reader.close(); } else { - indexShard.set(false); // closed shard but reader is still open + indexShard.close("test", true, true); // closed shard but reader is still open cache.clear(entity); } cache.cleanCache(); @@ -433,22 +483,25 @@ public void testInvalidate() throws Exception { } public void testEqualsKey() throws IOException { - AtomicBoolean trueBoolean = new AtomicBoolean(true); - AtomicBoolean falseBoolean = new AtomicBoolean(false); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); Directory dir = newDirectory(); IndexWriterConfig config = newIndexWriterConfig(); IndexWriter writer = new IndexWriter(dir, config); - IndexReader reader1 = DirectoryReader.open(writer); - IndexReader.CacheKey rKey1 = reader1.getReaderCacheHelper().getKey(); + ShardId shardId = new ShardId("foo", "bar", 1); + ShardId shardId1 = new ShardId("foo1", "bar1", 2); + IndexReader reader1 = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); + String rKey1 = ((OpenSearchDirectoryReader) reader1).getDelegatingCacheHelper().getDelegatingCacheKey().getId(); writer.addDocument(new Document()); - IndexReader reader2 = DirectoryReader.open(writer); - IndexReader.CacheKey rKey2 = reader2.getReaderCacheHelper().getKey(); + IndexReader reader2 = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); + String rKey2 = ((OpenSearchDirectoryReader) reader2).getDelegatingCacheHelper().getDelegatingCacheKey().getId(); IOUtils.close(reader1, reader2, writer, dir); - IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(1)); - IndicesRequestCache.Key key2 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(1)); - IndicesRequestCache.Key key3 = new IndicesRequestCache.Key(new TestEntity(null, falseBoolean), rKey1, new TestBytesReference(1)); - IndicesRequestCache.Key key4 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey2, new TestBytesReference(1)); - IndicesRequestCache.Key key5 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(2)); + IndexShard indexShard = mock(IndexShard.class); + when(indexShard.state()).thenReturn(IndexShardState.STARTED); + IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(shardId, new TestBytesReference(1), rKey1); + IndicesRequestCache.Key key2 = new IndicesRequestCache.Key(shardId, new TestBytesReference(1), rKey1); + IndicesRequestCache.Key key3 = new IndicesRequestCache.Key(shardId1, new TestBytesReference(1), rKey1); + IndicesRequestCache.Key key4 = new IndicesRequestCache.Key(shardId, new TestBytesReference(1), rKey2); + IndicesRequestCache.Key key5 = new IndicesRequestCache.Key(shardId, new TestBytesReference(2), rKey2); String s = "Some other random object"; assertEquals(key1, key1); assertEquals(key1, key2); @@ -459,6 +512,29 @@ public void testEqualsKey() throws IOException { assertNotEquals(key1, key5); } + public void testSerializationDeserializationOfCacheKey() throws Exception { + TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); + BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + IndexService indexService = createIndex("test"); + IndexShard indexShard = indexService.getShard(0); + IndicesService.IndexShardCacheEntity shardCacheEntity = new IndicesService.IndexShardCacheEntity(indexShard); + String readerCacheKeyId = UUID.randomUUID().toString(); + IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(indexShard.shardId(), termBytes, readerCacheKeyId); + BytesReference bytesReference = null; + try (BytesStreamOutput out = new BytesStreamOutput()) { + key1.writeTo(out); + bytesReference = out.bytes(); + } + StreamInput in = bytesReference.streamInput(); + + IndicesRequestCache.Key key2 = new IndicesRequestCache.Key(in); + + assertEquals(readerCacheKeyId, key2.readerCacheKeyId); + assertEquals(((IndexShard) shardCacheEntity.getCacheIdentity()).shardId(), key2.shardId); + assertEquals(termBytes, key2.value); + + } + private class TestBytesReference extends AbstractBytesReference { int dummyValue; @@ -509,34 +585,4 @@ public boolean isFragment() { return false; } } - - private class TestEntity extends AbstractIndexShardCacheEntity { - private final AtomicBoolean standInForIndexShard; - private final ShardRequestCache shardRequestCache; - - private TestEntity(ShardRequestCache shardRequestCache, AtomicBoolean standInForIndexShard) { - this.standInForIndexShard = standInForIndexShard; - this.shardRequestCache = shardRequestCache; - } - - @Override - protected ShardRequestCache stats() { - return shardRequestCache; - } - - @Override - public boolean isOpen() { - return standInForIndexShard.get(); - } - - @Override - public Object getCacheIdentity() { - return standInForIndexShard; - } - - @Override - public long ramBytesUsed() { - return 42; - } - } } diff --git a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java index 415844dccb611..b0ab0cc08a5ec 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java @@ -36,18 +36,15 @@ import org.apache.lucene.search.Query; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.routing.allocation.DiskThresholdSettings; -import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.common.bytes.BytesArray; -import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexService; import org.opensearch.index.engine.Engine; import org.opensearch.index.shard.IndexShard; -import org.opensearch.indices.IndicesRequestCache.Key; import org.opensearch.indices.breaker.HierarchyCircuitBreakerService; import org.opensearch.node.MockNode; import org.opensearch.node.Node; @@ -314,15 +311,12 @@ public void testCloseWhileOngoingRequestUsesRequestCache() throws Exception { assertEquals(1, indicesService.indicesRefCount.refCount()); assertEquals(0L, cache.count()); - IndicesRequestCache.CacheEntity cacheEntity = new IndicesRequestCache.CacheEntity() { + IndicesService.IndexShardCacheEntity cacheEntity = new IndicesService.IndexShardCacheEntity(shard) { @Override public long ramBytesUsed() { return 42; } - @Override - public void onCached(Key key, BytesReference value) {} - @Override public boolean isOpen() { return true; @@ -330,17 +324,8 @@ public boolean isOpen() { @Override public Object getCacheIdentity() { - return this; + return shard; } - - @Override - public void onHit() {} - - @Override - public void onMiss() {} - - @Override - public void onRemoval(RemovalNotification notification) {} }; cache.getOrCompute(cacheEntity, () -> new BytesArray("bar"), searcher.getDirectoryReader(), new BytesArray("foo")); assertEquals(1L, cache.count());