Skip to content

Commit

Permalink
Adds trace logging to IndicesRequestCache (#34180)
Browse files Browse the repository at this point in the history
* Adds trace logging to IndicesRequestCache

This change adds trace level logging to `IndicesrrequestCache` witht eh
primary aim of helping to identify the cause of teh failures in
#32827. The cache will
log at trace level when a cache hit or miss occurs including the reader
version and the cache key. Note that this change adds a
`cacheKeyRenderer` whcih supplies a human readable String of the cache
key since the actual cache key itself is a `BytesReference` containing
the wire protocol serialised form of the request.

Logging is also added for the case where a search timeout occurs and fr
that reason the cache entry is invalidated.

* Adds comment to remaind us to remove cacheKeyRenderer
  • Loading branch information
colings86 authored and kcm committed Oct 30, 2018
1 parent 688073e commit ac0da70
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.ObjectSet;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.util.Accountable;
Expand Down Expand Up @@ -75,6 +78,8 @@ public final class IndicesRequestCache extends AbstractComponent implements Remo
public static final Setting<TimeValue> INDICES_CACHE_QUERY_EXPIRE =
Setting.positiveTimeSetting("indices.requests.cache.expire", new TimeValue(0), Property.NodeScope);

private static final Logger LOGGER = LogManager.getLogger(IndicesRequestCache.class);

private final ConcurrentMap<CleanupKey, Boolean> registeredClosedListeners = ConcurrentCollections.newConcurrentMap();
private final Set<CleanupKey> keysToClean = ConcurrentCollections.newConcurrentSet();
private final ByteSizeValue size;
Expand Down Expand Up @@ -109,13 +114,19 @@ public void onRemoval(RemovalNotification<Key, BytesReference> notification) {
notification.getKey().entity.onRemoval(notification);
}

// NORELEASE The cacheKeyRenderer has been added in order to debug
// https://github.com/elastic/elasticsearch/issues/32827, it should be
// removed when this issue is solved
BytesReference getOrCompute(CacheEntity cacheEntity, Supplier<BytesReference> loader,
DirectoryReader reader, BytesReference cacheKey) throws Exception {
DirectoryReader reader, BytesReference cacheKey, Supplier<String> cacheKeyRenderer) throws Exception {
final Key key = new Key(cacheEntity, reader.getVersion(), cacheKey);
Loader cacheLoader = new Loader(cacheEntity, loader);
BytesReference value = cache.computeIfAbsent(key, cacheLoader);
if (cacheLoader.isLoaded()) {
key.entity.onMiss();
if (logger.isTraceEnabled()) {
logger.trace("Cache miss for reader version [{}] and request:\n {}", reader.getVersion(), cacheKeyRenderer.get());
}
// see if its the first time we see this reader, and make sure to register a cleanup key
CleanupKey cleanupKey = new CleanupKey(cacheEntity, reader.getVersion());
if (!registeredClosedListeners.containsKey(cleanupKey)) {
Expand All @@ -126,6 +137,9 @@ BytesReference getOrCompute(CacheEntity cacheEntity, Supplier<BytesReference> lo
}
} else {
key.entity.onHit();
if (logger.isTraceEnabled()) {
logger.trace("Cache hit for reader version [{}] and request:\n {}", reader.getVersion(), cacheKeyRenderer.get());
}
}
return value;
}
Expand Down
14 changes: 10 additions & 4 deletions server/src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -1191,7 +1191,9 @@ public void loadIntoContext(ShardSearchRequest request, SearchContext context, Q
final DirectoryReader directoryReader = context.searcher().getDirectoryReader();

boolean[] loadedFromCache = new boolean[] { true };
BytesReference bytesReference = cacheShardLevelResult(context.indexShard(), directoryReader, request.cacheKey(), out -> {
BytesReference bytesReference = cacheShardLevelResult(context.indexShard(), directoryReader, request.cacheKey(), () -> {
return "Shard: " + request.shardId() + "\nSource:\n" + request.source();
}, out -> {
queryPhase.execute(context);
try {
context.queryResult().writeToNoId(out);
Expand All @@ -1217,6 +1219,10 @@ public void loadIntoContext(ShardSearchRequest request, SearchContext context, Q
// running a search that times out concurrently will likely timeout again if it's run while we have this `stale` result in the
// cache. One other option is to not cache requests with a timeout at all...
indicesRequestCache.invalidate(new IndexShardCacheEntity(context.indexShard()), directoryReader, request.cacheKey());
if (logger.isTraceEnabled()) {
logger.trace("Query timed out, invalidating cache entry for request on shard [{}]:\n {}", request.shardId(),
request.source());
}
}
}

Expand All @@ -1232,8 +1238,8 @@ public ByteSizeValue getTotalIndexingBufferBytes() {
* @param loader loads the data into the cache if needed
* @return the contents of the cache or the result of calling the loader
*/
private BytesReference cacheShardLevelResult(IndexShard shard, DirectoryReader reader, BytesReference cacheKey, Consumer<StreamOutput> loader)
throws Exception {
private BytesReference cacheShardLevelResult(IndexShard shard, DirectoryReader reader, BytesReference cacheKey,
Supplier<String> cacheKeyRenderer, Consumer<StreamOutput> loader) throws Exception {
IndexShardCacheEntity cacheEntity = new IndexShardCacheEntity(shard);
Supplier<BytesReference> supplier = () -> {
/* BytesStreamOutput allows to pass the expected size but by default uses
Expand All @@ -1251,7 +1257,7 @@ private BytesReference cacheShardLevelResult(IndexShard shard, DirectoryReader r
return out.bytes();
}
};
return indicesRequestCache.getOrCompute(cacheEntity, supplier, reader, cacheKey);
return indicesRequestCache.getOrCompute(cacheEntity, supplier, reader, cacheKey, cacheKeyRenderer);
}

static final class IndexShardCacheEntity extends AbstractIndexShardCacheEntity {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.joda.time.DateTimeZone;

import java.time.ZoneOffset;
Expand All @@ -49,6 +50,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

@TestLogging(value = "org.elasticsearch.indices.IndicesRequestCache:TRACE")
public class IndicesRequestCacheIT extends ESIntegTestCase {

// One of the primary purposes of the query cache is to cache aggs results
Expand Down Expand Up @@ -417,8 +419,8 @@ private static void assertCacheState(Client client, String index, long expectedH
.getRequestCache();
// Check the hit count and miss count together so if they are not
// correct we can see both values
assertEquals(Arrays.asList(expectedHits, expectedMisses),
Arrays.asList(requestCacheStats.getHitCount(), requestCacheStats.getMissCount()));
assertEquals(Arrays.asList(expectedHits, expectedMisses, 0L),
Arrays.asList(requestCacheStats.getHitCount(), requestCacheStats.getMissCount(), requestCacheStats.getEvictions()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.cache.request.ShardRequestCache;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -68,7 +68,7 @@ public void testBasicOperationsCache() throws Exception {
// initial cache
TestEntity entity = new TestEntity(requestCacheStats, indexShard);
Loader loader = new Loader(reader, 0);
BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes);
BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value.streamInput().readString());
assertEquals(0, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
Expand All @@ -79,7 +79,7 @@ public void testBasicOperationsCache() throws Exception {
// cache hit
entity = new TestEntity(requestCacheStats, indexShard);
loader = new Loader(reader, 0);
value = cache.getOrCompute(entity, loader, reader, termBytes);
value = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value.streamInput().readString());
assertEquals(1, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
Expand Down Expand Up @@ -126,7 +126,7 @@ public void testCacheDifferentReaders() throws Exception {
// initial cache
TestEntity entity = new TestEntity(requestCacheStats, indexShard);
Loader loader = new Loader(reader, 0);
BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes);
BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value.streamInput().readString());
assertEquals(0, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
Expand All @@ -140,7 +140,7 @@ public void testCacheDifferentReaders() throws Exception {
// cache the second
TestEntity secondEntity = new TestEntity(requestCacheStats, indexShard);
loader = new Loader(secondReader, 0);
value = cache.getOrCompute(entity, loader, secondReader, termBytes);
value = cache.getOrCompute(entity, loader, secondReader, termBytes, () -> termQuery.toString());
assertEquals("bar", value.streamInput().readString());
assertEquals(0, requestCacheStats.stats().getHitCount());
assertEquals(2, requestCacheStats.stats().getMissCount());
Expand All @@ -152,7 +152,7 @@ public void testCacheDifferentReaders() throws Exception {

secondEntity = new TestEntity(requestCacheStats, indexShard);
loader = new Loader(secondReader, 0);
value = cache.getOrCompute(secondEntity, loader, secondReader, termBytes);
value = cache.getOrCompute(secondEntity, loader, secondReader, termBytes, () -> termQuery.toString());
assertEquals("bar", value.streamInput().readString());
assertEquals(1, requestCacheStats.stats().getHitCount());
assertEquals(2, requestCacheStats.stats().getMissCount());
Expand All @@ -162,7 +162,7 @@ public void testCacheDifferentReaders() throws Exception {

entity = new TestEntity(requestCacheStats, indexShard);
loader = new Loader(reader, 0);
value = cache.getOrCompute(entity, loader, reader, termBytes);
value = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value.streamInput().readString());
assertEquals(2, requestCacheStats.stats().getHitCount());
assertEquals(2, requestCacheStats.stats().getMissCount());
Expand Down Expand Up @@ -222,9 +222,9 @@ public void testEviction() throws Exception {
TestEntity secondEntity = new TestEntity(requestCacheStats, indexShard);
Loader secondLoader = new Loader(secondReader, 0);

BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes);
BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value1.streamInput().readString());
BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes);
BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes, () -> termQuery.toString());
assertEquals("bar", value2.streamInput().readString());
size = requestCacheStats.stats().getMemorySize();
IOUtils.close(reader, secondReader, writer, dir, cache);
Expand Down Expand Up @@ -257,12 +257,12 @@ public void testEviction() throws Exception {
TestEntity thirddEntity = new TestEntity(requestCacheStats, indexShard);
Loader thirdLoader = new Loader(thirdReader, 0);

BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes);
BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value1.streamInput().readString());
BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes);
BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes, () -> termQuery.toString());
assertEquals("bar", value2.streamInput().readString());
logger.info("Memory size: {}", requestCacheStats.stats().getMemorySize());
BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes);
BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes, () -> termQuery.toString());
assertEquals("baz", value3.streamInput().readString());
assertEquals(2, cache.count());
assertEquals(1, requestCacheStats.stats().getEvictions());
Expand Down Expand Up @@ -298,12 +298,12 @@ public void testClearAllEntityIdentity() throws Exception {
TestEntity thirddEntity = new TestEntity(requestCacheStats, differentIdentity);
Loader thirdLoader = new Loader(thirdReader, 0);

BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes);
BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value1.streamInput().readString());
BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes);
BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes, () -> termQuery.toString());
assertEquals("bar", value2.streamInput().readString());
logger.info("Memory size: {}", requestCacheStats.stats().getMemorySize());
BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes);
BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes, () -> termQuery.toString());
assertEquals("baz", value3.streamInput().readString());
assertEquals(3, cache.count());
final long hitCount = requestCacheStats.stats().getHitCount();
Expand All @@ -312,7 +312,7 @@ public void testClearAllEntityIdentity() throws Exception {
cache.cleanCache();
assertEquals(1, cache.count());
// third has not been validated since it's a different identity
value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes);
value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes, () -> termQuery.toString());
assertEquals(hitCount + 1, requestCacheStats.stats().getHitCount());
assertEquals("baz", value3.streamInput().readString());

Expand Down Expand Up @@ -371,7 +371,7 @@ public void testInvalidate() throws Exception {
// initial cache
TestEntity entity = new TestEntity(requestCacheStats, indexShard);
Loader loader = new Loader(reader, 0);
BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes);
BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value.streamInput().readString());
assertEquals(0, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
Expand All @@ -382,7 +382,7 @@ public void testInvalidate() throws Exception {
// cache hit
entity = new TestEntity(requestCacheStats, indexShard);
loader = new Loader(reader, 0);
value = cache.getOrCompute(entity, loader, reader, termBytes);
value = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value.streamInput().readString());
assertEquals(1, requestCacheStats.stats().getHitCount());
assertEquals(1, requestCacheStats.stats().getMissCount());
Expand All @@ -396,7 +396,7 @@ public void testInvalidate() throws Exception {
entity = new TestEntity(requestCacheStats, indexShard);
loader = new Loader(reader, 0);
cache.invalidate(entity, reader, termBytes);
value = cache.getOrCompute(entity, loader, reader, termBytes);
value = cache.getOrCompute(entity, loader, reader, termBytes, () -> termQuery.toString());
assertEquals("foo", value.streamInput().readString());
assertEquals(1, requestCacheStats.stats().getHitCount());
assertEquals(2, requestCacheStats.stats().getMissCount());
Expand Down

0 comments on commit ac0da70

Please sign in to comment.