Skip to content

Commit

Permalink
Fix BytesRef owning issue in string terms aggregations.
Browse files Browse the repository at this point in the history
The byte[] array that was used to store the term was owned by the BytesRefHash
which is used to compute counts. However, the BytesRefHash is released at some
point and its content may be recycled.

MockPageCacheRecycler has been improved to expose this issue (putting random
content into the arrays upon release).

Number of documents/terms have been increased in RandomTests to make sure page
recycling occurs.

Close #5021
  • Loading branch information
jpountz committed Feb 7, 2014
1 parent 1e265b3 commit b39d034
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ private static int rehash(int hash) {

/**
* Return the key at <code>0 &lte; index &lte; capacity()</code>. The result is undefined if the slot is unused.
* <p color="red">Beware that the content of the {@link BytesRef} may become invalid as soon as {@link #release()} is called</p>
*/
public BytesRef get(long id, BytesRef dest) {
final long startOffset = startOffsets.get(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public static void registerStreams() {

public static class Bucket extends InternalTerms.Bucket {

final BytesRef termBytes;
BytesRef termBytes;

public Bucket(BytesRef term, long docCount, InternalAggregations aggregations) {
super(docCount, aggregations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ public boolean apply(BytesRef input) {
final InternalTerms.Bucket[] list = new InternalTerms.Bucket[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; --i) {
final StringTerms.Bucket bucket = (StringTerms.Bucket) ordered.pop();
// the terms are owned by the BytesRefHash, we need to pull a copy since the BytesRef hash data may be recycled at some point
bucket.termBytes = BytesRef.deepCopyOf(bucket.termBytes);
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
list[i] = bucket;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.test.TestCluster;
import org.elasticsearch.threadpool.ThreadPool;

import java.lang.reflect.Array;
import java.util.Random;
import java.util.concurrent.ConcurrentMap;

Expand All @@ -51,7 +52,7 @@ public MockPageCacheRecycler(Settings settings, ThreadPool threadPool) {
random = new Random(seed);
}

private static <T> V<T> wrap(final V<T> v) {
private <T> V<T> wrap(final V<T> v) {
ACQUIRED_PAGES.put(v, new Throwable());
final Thread t = Thread.currentThread();
return new V<T>() {
Expand All @@ -67,6 +68,14 @@ public boolean release() throws ElasticsearchException {
if (t == null) {
throw new IllegalStateException("Releasing a page that has not been acquired");
}
final T ref = v();
for (int i = 0; i < Array.getLength(ref); ++i) {
if (ref instanceof Object[]) {
Array.set(ref, i, null);
} else {
Array.set(ref, i, (byte) random.nextInt(256));
}
}
return v.release();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,9 @@ public void testRandomRanges() throws Exception {

// test long/double/string terms aggs with high number of buckets that require array growth
public void testDuelTerms() throws Exception {
final int numDocs = atLeast(1000);
final int maxNumTerms = randomIntBetween(10, 10000);
// These high numbers of docs and terms are important to trigger page recycling
final int numDocs = atLeast(10000);
final int maxNumTerms = randomIntBetween(10, 100000);

final IntOpenHashSet valuesSet = new IntOpenHashSet();
wipeIndices("idx");
Expand Down

0 comments on commit b39d034

Please sign in to comment.