diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesQueryCache.java b/server/src/main/java/org/elasticsearch/indices/IndicesQueryCache.java index 814288dbe588f..dc054f8b51d3e 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesQueryCache.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesQueryCache.java @@ -42,6 +42,7 @@ import java.io.Closeable; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.IdentityHashMap; import java.util.Map; @@ -71,7 +72,7 @@ public class IndicesQueryCache implements QueryCache, Closeable { // This is a hack for the fact that the close listener for the // ShardCoreKeyMap will be called before onDocIdSetEviction // See onDocIdSetEviction for more info - private final Map stats2 = new IdentityHashMap<>(); + private final Map stats2 = Collections.synchronizedMap(new IdentityHashMap<>()); public IndicesQueryCache(Settings settings) { final ByteSizeValue size = INDICES_CACHE_QUERY_SIZE_SETTING.get(settings); @@ -189,20 +190,35 @@ public void close() { assert shardKeyMap.size() == 0 : shardKeyMap.size(); assert shardStats.isEmpty() : shardStats.keySet(); assert stats2.isEmpty() : stats2; + + // This cache stores two things: filters, and doc id sets. At this time + // we only know that there are no more doc id sets, but we still track + // recently used queries, which we want to reclaim. cache.clear(); } private static class Stats implements Cloneable { + final ShardId shardId; volatile long ramBytesUsed; volatile long hitCount; volatile long missCount; volatile long cacheCount; volatile long cacheSize; + Stats(ShardId shardId) { + this.shardId = shardId; + } + QueryCacheStats toQueryCacheStats() { return new QueryCacheStats(ramBytesUsed, hitCount, missCount, cacheCount, cacheSize); } + + @Override + public String toString() { + return "{shardId=" + shardId + ", ramBytedUsed=" + ramBytesUsed + ", hitCount=" + hitCount + ", missCount=" + missCount + + ", cacheCount=" + cacheCount + ", cacheSize=" + cacheSize + "}"; + } } private static class StatsAndCount { @@ -213,6 +229,11 @@ private static class StatsAndCount { this.stats = stats; this.count = 0; } + + @Override + public String toString() { + return "{stats=" + stats + " ,count=" + count + "}"; + } } private boolean empty(Stats stats) { @@ -249,7 +270,7 @@ private Stats getOrCreateStats(Object coreKey) { final ShardId shardId = shardKeyMap.getShardId(coreKey); Stats stats = shardStats.get(shardId); if (stats == null) { - stats = new Stats(); + stats = new Stats(shardId); shardStats.put(shardId, stats); } return stats; @@ -265,6 +286,7 @@ protected void onClear() { stats.cacheSize = 0; stats.ramBytesUsed = 0; } + stats2.clear(); sharedRamBytesUsed = 0; } diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 019f36d6c22f9..4a12bdae6b9ea 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -192,7 +192,7 @@ public class IndicesService extends AbstractLifecycleComponent private final NamedWriteableRegistry namedWriteableRegistry; private final IndexingMemoryController indexingMemoryController; private final TimeValue cleanInterval; - private final IndicesRequestCache indicesRequestCache; + final IndicesRequestCache indicesRequestCache; // pkg-private for testing private final IndicesQueryCache indicesQueryCache; private final MetaStateService metaStateService; private final Collection>> engineFactoryProviders; @@ -481,9 +481,9 @@ public void onStoreCreated(ShardId shardId) { @Override public void onStoreClosed(ShardId shardId) { try { - indicesRefCount.decRef(); - } finally { indicesQueryCache.onClose(shardId); + } finally { + indicesRefCount.decRef(); } } }; diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesServiceCloseTests.java b/server/src/test/java/org/elasticsearch/indices/IndicesServiceCloseTests.java index 9990d7b082e5c..15b45330530d3 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndicesServiceCloseTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndicesServiceCloseTests.java @@ -19,26 +19,37 @@ package org.elasticsearch.indices; +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.search.Query; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.cache.RemovalNotification; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.engine.Engine.Searcher; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.IndicesRequestCache.Key; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.node.MockNode; import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeValidationException; import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.MockHttpTransport; +import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; import org.elasticsearch.transport.nio.MockNioTransportPlugin; import java.nio.file.Path; import java.util.Arrays; +import java.util.Collections; import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; @@ -71,9 +82,11 @@ private Node startNode() throws NodeValidationException { .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false) .putList(DISCOVERY_SEED_HOSTS_SETTING.getKey()) // empty list disables a port scan for other nodes .putList(INITIAL_MASTER_NODES_SETTING.getKey(), nodeName) + .put(IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING.getKey(), true) .build(); - Node node = new MockNode(settings, Arrays.asList(MockNioTransportPlugin.class, MockHttpTransport.TestPlugin.class), true); + Node node = new MockNode(settings, + Arrays.asList(MockNioTransportPlugin.class, MockHttpTransport.TestPlugin.class, InternalSettingsPlugin.class), true); node.start(); return node; } @@ -100,7 +113,7 @@ public void testCloseNonEmptyIndicesService() throws Exception { assertEquals(0, indicesService.indicesRefCount.refCount()); } - public void testCloseWhileOngoingRequest() throws Exception { + public void testCloseWithIncedRefStore() throws Exception { Node node = startNode(); IndicesService indicesService = node.injector().getInstance(IndicesService.class); assertEquals(1, indicesService.indicesRefCount.refCount()); @@ -121,4 +134,157 @@ public void testCloseWhileOngoingRequest() throws Exception { assertEquals(0, indicesService.indicesRefCount.refCount()); } + public void testCloseWhileOngoingRequest() throws Exception { + Node node = startNode(); + IndicesService indicesService = node.injector().getInstance(IndicesService.class); + assertEquals(1, indicesService.indicesRefCount.refCount()); + + assertAcked(node.client().admin().indices().prepareCreate("test") + .setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0))); + node.client().prepareIndex("test", "_doc", "1").setSource(Collections.emptyMap()).get(); + ElasticsearchAssertions.assertAllSuccessful(node.client().admin().indices().prepareRefresh("test").get()); + + assertEquals(2, indicesService.indicesRefCount.refCount()); + + IndexService indexService = indicesService.iterator().next(); + IndexShard shard = indexService.getShard(0); + Searcher searcher = shard.acquireSearcher("test"); + assertEquals(1, searcher.reader().maxDoc()); + + node.close(); + assertEquals(1, indicesService.indicesRefCount.refCount()); + + searcher.close(); + assertEquals(0, indicesService.indicesRefCount.refCount()); + } + + public void testCloseAfterRequestHasUsedQueryCache() throws Exception { + Node node = startNode(); + IndicesService indicesService = node.injector().getInstance(IndicesService.class); + assertEquals(1, indicesService.indicesRefCount.refCount()); + + assertAcked(node.client().admin().indices().prepareCreate("test") + .setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.getKey(), true))); + node.client().prepareIndex("test", "_doc", "1").setSource(Collections.singletonMap("foo", 3L)).get(); + ElasticsearchAssertions.assertAllSuccessful(node.client().admin().indices().prepareRefresh("test").get()); + + assertEquals(2, indicesService.indicesRefCount.refCount()); + + IndicesQueryCache cache = indicesService.getIndicesQueryCache(); + + IndexService indexService = indicesService.iterator().next(); + IndexShard shard = indexService.getShard(0); + Searcher searcher = shard.acquireSearcher("test"); + assertEquals(1, searcher.reader().maxDoc()); + + Query query = LongPoint.newRangeQuery("foo", 0, 5); + assertEquals(0L, cache.getStats(shard.shardId()).getCacheSize()); + searcher.searcher().count(query); + assertEquals(1L, cache.getStats(shard.shardId()).getCacheSize()); + + searcher.close(); + assertEquals(2, indicesService.indicesRefCount.refCount()); + assertEquals(1L, cache.getStats(shard.shardId()).getCacheSize()); + + node.close(); + assertEquals(0, indicesService.indicesRefCount.refCount()); + assertEquals(0L, cache.getStats(shard.shardId()).getCacheSize()); + } + + public void testCloseWhileOngoingRequestUsesQueryCache() throws Exception { + Node node = startNode(); + IndicesService indicesService = node.injector().getInstance(IndicesService.class); + assertEquals(1, indicesService.indicesRefCount.refCount()); + + assertAcked(node.client().admin().indices().prepareCreate("test") + .setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.getKey(), true))); + node.client().prepareIndex("test", "_doc", "1").setSource(Collections.singletonMap("foo", 3L)).get(); + ElasticsearchAssertions.assertAllSuccessful(node.client().admin().indices().prepareRefresh("test").get()); + + assertEquals(2, indicesService.indicesRefCount.refCount()); + + IndicesQueryCache cache = indicesService.getIndicesQueryCache(); + + IndexService indexService = indicesService.iterator().next(); + IndexShard shard = indexService.getShard(0); + Searcher searcher = shard.acquireSearcher("test"); + assertEquals(1, searcher.reader().maxDoc()); + + node.close(); + assertEquals(1, indicesService.indicesRefCount.refCount()); + + Query query = LongPoint.newRangeQuery("foo", 0, 5); + assertEquals(0L, cache.getStats(shard.shardId()).getCacheSize()); + searcher.searcher().count(query); + assertEquals(1L, cache.getStats(shard.shardId()).getCacheSize()); + + searcher.close(); + assertEquals(0, indicesService.indicesRefCount.refCount()); + assertEquals(0L, cache.getStats(shard.shardId()).getCacheSize()); + } + + public void testCloseWhileOngoingRequestUsesRequestCache() throws Exception { + Node node = startNode(); + IndicesService indicesService = node.injector().getInstance(IndicesService.class); + assertEquals(1, indicesService.indicesRefCount.refCount()); + + assertAcked(node.client().admin().indices().prepareCreate("test") + .setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.getKey(), true))); + node.client().prepareIndex("test", "_doc", "1").setSource(Collections.singletonMap("foo", 3L)).get(); + ElasticsearchAssertions.assertAllSuccessful(node.client().admin().indices().prepareRefresh("test").get()); + + assertEquals(2, indicesService.indicesRefCount.refCount()); + + IndicesRequestCache cache = indicesService.indicesRequestCache; + + IndexService indexService = indicesService.iterator().next(); + IndexShard shard = indexService.getShard(0); + Searcher searcher = shard.acquireSearcher("test"); + assertEquals(1, searcher.reader().maxDoc()); + + node.close(); + assertEquals(1, indicesService.indicesRefCount.refCount()); + + assertEquals(0L, cache.count()); + IndicesRequestCache.CacheEntity cacheEntity = new IndicesRequestCache.CacheEntity() { + @Override + public long ramBytesUsed() { + return 42; + } + + @Override + public void onCached(Key key, BytesReference value) {} + + @Override + public boolean isOpen() { + return true; + } + + @Override + public Object getCacheIdentity() { + return this; + } + + @Override + public void onHit() {} + + @Override + public void onMiss() {} + + @Override + public void onRemoval(RemovalNotification notification) {} + }; + cache.getOrCompute(cacheEntity, () -> new BytesArray("bar"), searcher.getDirectoryReader(), new BytesArray("foo"), () -> "foo"); + assertEquals(1L, cache.count()); + + searcher.close(); + assertEquals(0, indicesService.indicesRefCount.refCount()); + assertEquals(0L, cache.count()); + } }